redis源码分析之有序集SortedSet

有序集SortedSet算是redis中一个很有特色的数据结构,通过这篇文章来总结一下这块知识点。

原文地址:www.jianshu.com/p/75ca5a359…

一、有序集SortedSet命令简介

redis中的有序集,允许用户使用指定值对放进去的元素进行排序,并且基于该已排序的集合提供了一系列丰富的操作集合的API。
举例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码//添加元素,table1为有序集的名字,100为用于排序字段(redis把它叫做score),a为我们要存储的元素
127.0.0.1:6379> zadd table1 100 a
(integer) 1
127.0.0.1:6379> zadd table1 200 b
(integer) 1
127.0.0.1:6379> zadd table1 300 c
(integer) 1
//按照元素索引返回有序集中的元素,索引从0开始
127.0.0.1:6379> zrange table1 0 1
1) "a"
2) "b"
//按照元素排序范围返回有序集中的元素,这里用于排序的字段在redis中叫做score
127.0.0.1:6379> zrangebyscore table1 150 400
1) "b"
2) "c"
//删除元素
127.0.0.1:6379> zrem table1 b
(integer) 1

在有序集中,用于排序的值叫做score,实际存储的值叫做member。

由于有序集中提供的API较多,这里只举了几个常见的,具体可以参考redis文档。

关于有序集,我们有一个十分常见的使用场景就是用户评论。在APP或者网站上发布一条消息,下面会有很多评论,通常展示是按照发布时间倒序排列,这个需求就可以使用有序集,以发布评论的时间戳作为score,然后按照展示评论的数量倒序查找有序集。

二、有序集SortedSet命令源码分析

老规矩,我们还是从server.c文件中的命令表中找到相关命令的处理函数,然后一一分析。
依旧从添加元素开始,zaddCommand函数:

1
2
3
复制代码void zaddCommand(client *c) {
zaddGenericCommand(c,ZADD_NONE);
}

这里可以看到流程转向了zaddGenericCommand,并且传入了一个模式标记。
关于SortedSet的操作模式这里简单说明一下,先来看一条完整的zadd命令:

1
复制代码zadd key [NX|XX] [CH] [INCR] score member [score member ...]

其中的可选项我们依次看下:

  1. NX表示如果元素存在,则不执行替换操作直接返回。
  2. XX表示只操作已存在的元素。
  3. CH表示返回修改(包括添加,更新)元素的数量,只能被ZADD命令使用。
  4. INCR表示在原来的score基础上加上新的score,而不是替换。

上面代码片段中的ZADD_NONE表示普通操作。

接下来看下zaddGenericCommand函数的源码,很长,耐心一点点看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
复制代码void zaddGenericCommand(client *c, int flags) {
//一条错误提示信息
static char *nanerr = "resulting score is not a number (NaN)";
//有序集名字
robj *key = c->argv[1];
robj *zobj;
sds ele;
double score = 0, *scores = NULL;
int j, elements;
int scoreidx = 0;
//记录元素操作个数
int added = 0;
int updated = 0;
int processed = 0;

//查找score的位置,默认score在位置2上,但由于有各种模式,所以需要判断
scoreidx = 2;
while(scoreidx < c->argc) {
char *opt = c->argv[scoreidx]->ptr;
//判断命令中是否设置了各种模式
if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
else break;
scoreidx++;
}

//设置模式
int incr = (flags & ZADD_INCR) != 0;
int nx = (flags & ZADD_NX) != 0;
int xx = (flags & ZADD_XX) != 0;
int ch = (flags & ZADD_CH) != 0;

//通过上面的解析,scoreidx为真实的初始score的索引位置
//这里客户端参数数量减去scoreidx就是剩余所有元素的数量
elements = c->argc - scoreidx;
//由于有序集中score,member成对出现,所以加一层判断
if (elements % 2 || !elements) {
addReply(c,shared.syntaxerr);
return;
}
//这里计算score,member有多少对
elements /= 2;

//参数合法性校验
if (nx && xx) {
addReplyError(c,
"XX and NX options at the same time are not compatible");
return;
}
//参数合法性校验
if (incr && elements > 1) {
addReplyError(c,
"INCR option supports a single increment-element pair");
return;
}

//这里开始解析score,先初始化scores数组
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
//填充数组,这里注意元素是成对出现,所以各个score之间要隔一个member
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
!= C_OK) goto cleanup;
}

//这里首先在client对应的db中查找该key,即有序集
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
//没有指定有序集且模式为XX(只操作已存在的元素),直接返回
if (xx) goto reply_to_client;
//根据元素数量选择不同的存储结构初始化有序集
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
//哈希表 + 跳表的组合模式
zobj = createZsetObject();
} else {
//ziplist(压缩链表)模式
zobj = createZsetZiplistObject();
}
//加入db中
dbAdd(c->db,key,zobj);
} else {
//如果ZADD操作的集合类型不对,则返回
if (zobj->type != OBJ_ZSET) {
addReply(c,shared.wrongtypeerr);
goto cleanup;
}
}
//这里开始往有序集中添加元素
for (j = 0; j < elements; j++) {
double newscore;
//取出client传过来的score
score = scores[j];
int retflags = flags;
//取出与之对应的member
ele = c->argv[scoreidx+1+j*2]->ptr;
//向有序集中添加元素,参数依次是有序集,要添加的元素的score,要添加的元素,操作模式,新的score
int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
//添加失败则返回
if (retval == 0) {
addReplyError(c,nanerr);
goto cleanup;
}
//记录操作
if (retflags & ZADD_ADDED) added++;
if (retflags & ZADD_UPDATED) updated++;
if (!(retflags & ZADD_NOP)) processed++;
//设置新score值
score = newscore;
}
//操作记录
server.dirty += (added+updated);

//返回逻辑
reply_to_client:
if (incr) {
if (processed)
addReplyDouble(c,score);
else
addReply(c,shared.nullbulk);
} else {
addReplyLongLong(c,ch ? added+updated : added);
}
//清理逻辑
cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}

代码有点长,来张图看一下存储结构:

有序集存储结构

有序集存储结构

注:每个entry都是由score+member组成
有了上面的结构图以后,可以想到删除操作应该就是根据不同的存储结构进行,如果是ziplist就执行链表删除,如果是哈希表+跳表结构,那就要把两个集合都进行删除。真实逻辑是什么呢?
我们来看下删除函数zremCommand的源码,相对短一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
复制代码void zremCommand(client *c) {
//获取有序集名
robj *key = c->argv[1];
robj *zobj;
int deleted = 0, keyremoved = 0, j;
//做校验
if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,OBJ_ZSET)) return;

for (j = 2; j < c->argc; j++) {
//一次删除指定元素
if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;
//如果有序集中全部元素都被删除,则回收有序表
if (zsetLength(zobj) == 0) {
dbDelete(c->db,key);
keyremoved = 1;
break;
}
}
//同步操作
if (deleted) {
notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
signalModifiedKey(c->db,key);
server.dirty += deleted;
}
//返回
addReplyLongLong(c,deleted);
}

看下具体的删除操作源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
复制代码//参数zobj为有序集,ele为要删除的元素
int zsetDel(robj *zobj, sds ele) {
//与添加元素相同,根据不同的存储结构执行不同的删除逻辑
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
//ziplist是一个简单的链表删除节点操作
if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
dictEntry *de;
double score;

de = dictUnlink(zs->dict,ele);
if (de != NULL) {
//查询该元素的score
score = *(double*)dictGetVal(de);
//从哈希表中删除元素
dictFreeUnlinkedEntry(zs->dict,de);

//从跳表中删除元素
int retval = zslDelete(zs->zsl,score,ele,NULL);
serverAssert(retval);
//如果有需要则对哈希表进行resize操作
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
//没有找到指定元素返回0
return 0;
}

最后看一个查询函数zrangeCommand源码,也是很长,汗~,不过放心,有了上面的基础,大致也能猜到查询逻辑应该是什么样子的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
复制代码void zrangeCommand(client *c) {
//第二个参数,0表示顺序,1表示倒序
zrangeGenericCommand(c,0);
}

void zrangeGenericCommand(client *c, int reverse) {
//有序集名
robj *key = c->argv[1];
robj *zobj;
int withscores = 0;
long start;
long end;
int llen;
int rangelen;
//参数校验
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;

//根据参数附加信息判断是否需要返回score
if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
withscores = 1;
} else if (c->argc >= 5) {
addReply(c,shared.syntaxerr);
return;
}
//有序集校验
if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
|| checkType(c,zobj,OBJ_ZSET)) return;

//索引值重置
llen = zsetLength(zobj);
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
//返回空集
if (start > end || start >= llen) {
addReply(c,shared.emptymultibulk);
return;
}
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;

//返回给客户端结果长度
addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
//同样是根据有序集的不同结构执行不同的查询逻辑
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
//根据正序还是倒序计算起始索引
if (reverse)
eptr = ziplistIndex(zl,-2-(2*start));
else
eptr = ziplistIndex(zl,2*start);

serverAssertWithInfo(c,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);

while (rangelen--) {
serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
//注意嵌套的ziplistGet方法就是把eptr索引的值读出来保存在后面三个参数中
serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
//返回value
if (vstr == NULL)
addReplyBulkLongLong(c,vlong);
else
addReplyBulkCBuffer(c,vstr,vlen);
//如果需要则返回score
if (withscores)
addReplyDouble(c,zzlGetScore(sptr));
//倒序从后往前,正序从前往后
if (reverse)
zzlPrev(zl,&eptr,&sptr);
else
zzlNext(zl,&eptr,&sptr);
}

} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
sds ele;

//找到起始节点
if (reverse) {
ln = zsl->tail;
if (start > 0)
ln = zslGetElementByRank(zsl,llen-start);
} else {
ln = zsl->header->level[0].forward;
if (start > 0)
ln = zslGetElementByRank(zsl,start+1);
}
//遍历并返回给客户端
while(rangelen--) {
serverAssertWithInfo(c,zobj,ln != NULL);
ele = ln->ele;
addReplyBulkCBuffer(c,ele,sdslen(ele));
if (withscores)
addReplyDouble(c,ln->score);
ln = reverse ? ln->backward : ln->level[0].forward;
}
} else {
serverPanic("Unknown sorted set encoding");
}
}

上面就是关于有序集SortedSet的添加,删除,查找的源码。可以看出SortedSet会根据存放元素的数量选择ziplist或者哈希表+跳表两种数据结构进行实现,之所以源码看上去很长,主要原因也就是要根据不同的数据结构进行不同的代码实现。只要掌握了这个核心思路,再看源码就不会太难。

三、有序集SortedSet命令总结

有序集的逻辑不难,就是代码有点长,涉及到ziplist,skiplist,dict三套数据结构,其中除了常规的dict之外,另外两个数据结构内容都不少,准备专门写文章进行总结,就不在这里赘述了。本文主要目的是总结一下有序集SortedSet的实现原理。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%