Redis源码剖析 集合对象t_set实现
目录
从之前的章节“Redis源码剖析–对象Object” 可以知道,redis中的SET(集合)有两种可能的数据存储方式。分别是整数集合REDIS_ENCODING_INTSET和哈希表REDIS_ENCODING_HT。
robj *setTypeCreate(sds value) {
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
return createIntsetObject();
return createSetObject();
}
第一个添加到集合的元素决定了初始的存储方式,如果第一个原始是一个整数,初始的编码就会是REDIS_ENCODING_INTSET,否则就是REDIS_ENCODING_HT,所以集合中的数据操作的时候基本都涉及到两个存储方式的判断。
SET的结构
typedef struct redisObject {
unsigned type:4; // 此字段为OBJ_SET
unsigned encoding:4; // 如果是set结构,编码为OBJ_ENCODING_INTSET或OBJ_ENCODING_HT
unsigned lru:LRU_BITS;
int refcount;
void *ptr;
} robj;
同其他的对象一样, set结构也是存储在redisObject结构体中,通过指定 type=OBJ_SET 来确定这是一个集合对象,当是一个集合对象的时候,配套的endoding只能有对应的两种取值。
SET命令
命令 | 说明 |
---|---|
sadd | 将一个或多个 member 元素加入到集合 key 当中,已经存在于集合的 member 元素将被忽略 |
scard | 返回集合 key 的基数(集合中元素的数量) |
spop | 移除并返回集合中的一个随机元素 |
smembers | 返回集合 key 中的所有成员 |
sismember | 判断 member 元素是否集合 key 的成员 |
sinter | 返回多个集合的交集,多个集合由 keys 指定 |
srandmember | 返回集合中的一个随机元素 |
srandmember | 返回集合中的 count 个随机元素 |
srem | 移除集合 key 中的一个或多个 member 元素,不存在的 member 元素会被忽略 |
sunion | 返回多个集合的并集,多个集合由 keys 指定 |
sdiff | 返回一个集合的全部成员,该集合是所有给定集合之间的差集 |
SET命令实现
SET编码转换
如果一个集合使用 REDIS_ENCODING_INTSET 编码, 那么当以下任何一个条件被满足时, 这个集合会被转换成 REDIS_ENCODING_HT 编码:
- intset 保存的整数值个数超过
- server.set_max_intset_entries (默认值为 512 )。 试图往集合里添加一个新元素,并且这个元素不能被表示为 long long 类型(也即是,它不是一个整数)。
/* Convert the set to specified encoding. The resulting dict (when converting
* to a hash table) is presized to hold the number of elements in the original
* set. */
void setTypeConvert(robj *setobj, int enc) {
setTypeIterator *si;
serverAssertWithInfo(NULL,setobj,setobj->type == OBJ_SET &&
setobj->encoding == OBJ_ENCODING_INTSET);
if (enc == OBJ_ENCODING_HT) {
int64_t intele;
dict *d = dictCreate(&setDictType,NULL);
sds element;
/* 提前扩张字典结构的大小,避免出现rehash过程 */
dictExpand(d,intsetLen(setobj->ptr));
/* To add the elements we extract integers and create redis objects */
si = setTypeInitIterator(setobj); // 初始化迭代器
// 循环遍历intset,以及加入到dict中
while (setTypeNext(si,&element,&intele) != -1) {
element = sdsfromlonglong(intele);
serverAssert(dictAdd(d,element,NULL) == DICT_OK);
}
setTypeReleaseIterator(si); // 释放迭代器
// 设定转换后的编码类型
setobj->encoding = OBJ_ENCODING_HT;
zfree(setobj->ptr);
setobj->ptr = d; // 指针指向新创建的dict
} else {
// 如果要转换的类型不是OBJ_ENCODING_HT,退出
serverPanic("Unsupported set conversion");
}
}
当SET用HT编码的时候,所有元素其实是保存在字典的键里边,而字典的值全都设置成了NULL,这样就能保证集合中所有元素的唯一性。
SADD接口实现
/* Add the specified value into a set.
*
* If the value was already member of the set, nothing is done and 0 is
* returned, otherwise the new element is added and 1 is returned. */
// 插入元素到集合,如果元素已存在,返回0, 插入成功返回1
int setTypeAdd(robj *subject, sds value) {
long long llval;
// 如果是HT编码结构,在dict中填加元素
if (subject->encoding == OBJ_ENCODING_HT) {
dict *ht = subject->ptr;
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
} else if (subject->encoding == OBJ_ENCODING_INTSET) {
// 如果是整数集合编码结构,判断新加的元素是否能够用long long表示
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
// 如果是整数结构,在intset中插入整数
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
/* Convert to regular set when the intset contains
* too many entries. */
// 插入成功之后,判断是否超出范围,如果超过了set_max_intset_entries,需要做结构转换
if (intsetLen(subject->ptr) > server.set_max_intset_entries)
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
// 如果要插入的不是整数,先将原先的intset做结构转换,然后执行插入
/* Failed to get integer from object, convert to regular set. */
setTypeConvert(subject,OBJ_ENCODING_HT);
/* The set *was* an intset and this value is not integer
* encodable, so dictAdd should always work. */
serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
return 1;
}
} else {
serverPanic("Unknown set encoding");
}
return 0;
}
往SET中插入的时候,需要先判断当前的SET的底层结构,如果是HT格式,直接调用dict的插入命令;如果是intset结构,需要先判断要插入的新元素是否也是整数,如果不是整数,需要将SET的编码格式转换为HT结构体,再执行插入,如果是整数,正常插入之后也要检查一遍是否超过了intset的最大设置。
其他的比如移除元素之类的操作,也是类似,需要考虑在不同的编码格式下的处理方案。
集合命令的实现
SET相比较其他格式的结构,多了集合命令的实现,就是我们所熟知的集合的交差并集。这边讲一下求交集的算法。
求交集算法
/* Iterate all the elements of the first (smallest) set, and test
* the element against all the other sets, if at least one set does
* not include the element it is discarded */
// 获取第一个(基数最小的)集合,循环遍历
si = setTypeInitIterator(sets[0]);
while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) {
for (j = 1; j < setnum; j++) {
if (sets[j] == sets[0]) continue;
// 如果底层编码是INTSET
if (encoding == OBJ_ENCODING_INTSET) {
/* intset with intset is simple... and fast */
if (sets[j]->encoding == OBJ_ENCODING_INTSET &&
!intsetFind((intset*)sets[j]->ptr,intobj))
{
break;
/* in order to compare an integer with an object we
* have to use the generic function, creating an object
* for this */
} else if (sets[j]->encoding == OBJ_ENCODING_HT) {
elesds = sdsfromlonglong(intobj);
if (!setTypeIsMember(sets[j],elesds)) {
sdsfree(elesds);
break;
}
sdsfree(elesds);
}
} else if (encoding == OBJ_ENCODING_HT) {
// 如果底层编码HT
if (!setTypeIsMember(sets[j],elesds)) {
break;
}
}
}
/* Only take action when all sets contain the member */
// 如果j==setNum,说明已经检查到了最后,所有的set都有这个元素
if (j == setnum) {
if (!dstkey) {
if (encoding == OBJ_ENCODING_HT)
addReplyBulkCBuffer(c,elesds,sdslen(elesds));
else
addReplyBulkLongLong(c,intobj);
cardinality++;
} else {
if (encoding == OBJ_ENCODING_INTSET) {
elesds = sdsfromlonglong(intobj);
setTypeAdd(dstset,elesds);
sdsfree(elesds);
} else {
setTypeAdd(dstset,elesds);
}
}
}
}
setTypeReleaseIterator(si);
// 如果目标key存在,保存结果
if (dstkey) {
/* Store the resulting set into the target, if the intersection
* is not an empty set. */
int deleted = dbDelete(c->db,dstkey);
// 如果最终的结果set非空, 将最终set结果放到dstkey中
if (setTypeSize(dstset) > 0) {
dbAdd(c->db,dstkey,dstset);
addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent(NOTIFY_SET,"sinterstore",
dstkey,c->db->id);
} else {
// 如果结果集合为空,删除
decrRefCount(dstset);
addReply(c,shared.czero);
if (deleted)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
dstkey,c->db->id);
}
signalModifiedKey(c->db,dstkey);
server.dirty++;
} else {
setDeferredSetLen(c,replylen,cardinality);
}
zfree(sets);
}
通俗的来讲,求交集主要分为以下几个步骤:
- 将所有的集合按照基数进行排序
- 使用基数最小的集合作为结果集
- 遍历这个基数最小集合中的每个元素,检查这个元素在剩余的其他集合中是否存在
- 如果都存在,就将这个元素加入到最终的结果集中
算法的复杂度是O(N2)。