目录

从之前的章节“Redis源码剖析–对象Object” 可以知道,redis中的SET(集合)有两种可能的数据存储方式。分别是整数集合REDIS_ENCODING_INTSET和哈希表REDIS_ENCODING_HT。

robj *setTypeCreate(sds value) {
    if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
        return createIntsetObject();
    return createSetObject();
}

第一个添加到集合的元素决定了初始的存储方式,如果第一个原始是一个整数,初始的编码就会是REDIS_ENCODING_INTSET,否则就是REDIS_ENCODING_HT,所以集合中的数据操作的时候基本都涉及到两个存储方式的判断。

SET的结构

typedef struct redisObject {
    unsigned type:4;  // 此字段为OBJ_SET
    unsigned encoding:4;  // 如果是set结构,编码为OBJ_ENCODING_INTSET或OBJ_ENCODING_HT
    unsigned lru:LRU_BITS; 
    int refcount;
    void *ptr;
} robj;

同其他的对象一样, set结构也是存储在redisObject结构体中,通过指定 type=OBJ_SET 来确定这是一个集合对象,当是一个集合对象的时候,配套的endoding只能有对应的两种取值。

SET命令

命令 说明
sadd 将一个或多个 member 元素加入到集合 key 当中,已经存在于集合的 member 元素将被忽略
scard 返回集合 key 的基数(集合中元素的数量)
spop 移除并返回集合中的一个随机元素
smembers 返回集合 key 中的所有成员
sismember 判断 member 元素是否集合 key 的成员
sinter 返回多个集合的交集,多个集合由 keys 指定
srandmember 返回集合中的一个随机元素
srandmember 返回集合中的 count 个随机元素
srem 移除集合 key 中的一个或多个 member 元素,不存在的 member 元素会被忽略
sunion 返回多个集合的并集,多个集合由 keys 指定
sdiff 返回一个集合的全部成员,该集合是所有给定集合之间的差集

SET命令实现

SET编码转换

如果一个集合使用 REDIS_ENCODING_INTSET 编码, 那么当以下任何一个条件被满足时, 这个集合会被转换成 REDIS_ENCODING_HT 编码:

  • intset 保存的整数值个数超过
  • server.set_max_intset_entries (默认值为 512 )。 试图往集合里添加一个新元素,并且这个元素不能被表示为 long long 类型(也即是,它不是一个整数)。
/* Convert the set to specified encoding. The resulting dict (when converting
 * to a hash table) is presized to hold the number of elements in the original
 * set. */
void setTypeConvert(robj *setobj, int enc) {
    setTypeIterator *si;
    serverAssertWithInfo(NULL,setobj,setobj->type == OBJ_SET &&
                             setobj->encoding == OBJ_ENCODING_INTSET);

    if (enc == OBJ_ENCODING_HT) {
        int64_t intele;
        dict *d = dictCreate(&setDictType,NULL);
        sds element;

        /*  提前扩张字典结构的大小,避免出现rehash过程 */
        dictExpand(d,intsetLen(setobj->ptr));

        /* To add the elements we extract integers and create redis objects */
        si = setTypeInitIterator(setobj); // 初始化迭代器
        // 循环遍历intset,以及加入到dict中
        while (setTypeNext(si,&element,&intele) != -1) {
            element = sdsfromlonglong(intele);
            serverAssert(dictAdd(d,element,NULL) == DICT_OK);
        }
        setTypeReleaseIterator(si); // 释放迭代器

        // 设定转换后的编码类型
        setobj->encoding = OBJ_ENCODING_HT;
        zfree(setobj->ptr);
        setobj->ptr = d; // 指针指向新创建的dict
    } else {
        // 如果要转换的类型不是OBJ_ENCODING_HT,退出
        serverPanic("Unsupported set conversion");
    }
}

当SET用HT编码的时候,所有元素其实是保存在字典的键里边,而字典的值全都设置成了NULL,这样就能保证集合中所有元素的唯一性。

SADD接口实现

/* Add the specified value into a set.
 *
 * If the value was already member of the set, nothing is done and 0 is
 * returned, otherwise the new element is added and 1 is returned. */
// 插入元素到集合,如果元素已存在,返回0, 插入成功返回1
int setTypeAdd(robj *subject, sds value) {
    long long llval;
    // 如果是HT编码结构,在dict中填加元素
    if (subject->encoding == OBJ_ENCODING_HT) {
        dict *ht = subject->ptr;
        dictEntry *de = dictAddRaw(ht,value,NULL);
        if (de) {
            dictSetKey(ht,de,sdsdup(value));
            dictSetVal(ht,de,NULL);
            return 1;
        }
    } else if (subject->encoding == OBJ_ENCODING_INTSET) {
        // 如果是整数集合编码结构,判断新加的元素是否能够用long long表示
        if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
            // 如果是整数结构,在intset中插入整数
            uint8_t success = 0;
            subject->ptr = intsetAdd(subject->ptr,llval,&success);
            if (success) {
                /* Convert to regular set when the intset contains
                 * too many entries. */
                // 插入成功之后,判断是否超出范围,如果超过了set_max_intset_entries,需要做结构转换
                if (intsetLen(subject->ptr) > server.set_max_intset_entries)
                    setTypeConvert(subject,OBJ_ENCODING_HT);
                return 1;
            }
        } else {
            // 如果要插入的不是整数,先将原先的intset做结构转换,然后执行插入
            /* Failed to get integer from object, convert to regular set. */
            setTypeConvert(subject,OBJ_ENCODING_HT);

            /* The set *was* an intset and this value is not integer
             * encodable, so dictAdd should always work. */
            serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
            return 1;
        }
    } else {
        serverPanic("Unknown set encoding");
    }
    return 0;
}

往SET中插入的时候,需要先判断当前的SET的底层结构,如果是HT格式,直接调用dict的插入命令;如果是intset结构,需要先判断要插入的新元素是否也是整数,如果不是整数,需要将SET的编码格式转换为HT结构体,再执行插入,如果是整数,正常插入之后也要检查一遍是否超过了intset的最大设置。

其他的比如移除元素之类的操作,也是类似,需要考虑在不同的编码格式下的处理方案。

集合命令的实现

SET相比较其他格式的结构,多了集合命令的实现,就是我们所熟知的集合的交差并集。这边讲一下求交集的算法。

求交集算法

/* Iterate all the elements of the first (smallest) set, and test
     * the element against all the other sets, if at least one set does
     * not include the element it is discarded */
    // 获取第一个(基数最小的)集合,循环遍历
    si = setTypeInitIterator(sets[0]);
    while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) {
        for (j = 1; j < setnum; j++) {
            if (sets[j] == sets[0]) continue;
            // 如果底层编码是INTSET
            if (encoding == OBJ_ENCODING_INTSET) {
                /* intset with intset is simple... and fast */
                if (sets[j]->encoding == OBJ_ENCODING_INTSET &&
                    !intsetFind((intset*)sets[j]->ptr,intobj))
                {
                    break;
                /* in order to compare an integer with an object we
                 * have to use the generic function, creating an object
                 * for this */
                } else if (sets[j]->encoding == OBJ_ENCODING_HT) {
                    elesds = sdsfromlonglong(intobj);
                    if (!setTypeIsMember(sets[j],elesds)) {
                        sdsfree(elesds);
                        break;
                    }
                    sdsfree(elesds);
                }
            } else if (encoding == OBJ_ENCODING_HT) {
                // 如果底层编码HT
                if (!setTypeIsMember(sets[j],elesds)) {
                    break;
                }
            }
        }

        /* Only take action when all sets contain the member */
        // 如果j==setNum,说明已经检查到了最后,所有的set都有这个元素
        if (j == setnum) {
            if (!dstkey) {
                if (encoding == OBJ_ENCODING_HT)
                    addReplyBulkCBuffer(c,elesds,sdslen(elesds));
                else
                    addReplyBulkLongLong(c,intobj);
                cardinality++;
            } else {
                if (encoding == OBJ_ENCODING_INTSET) {
                    elesds = sdsfromlonglong(intobj);
                    setTypeAdd(dstset,elesds);
                    sdsfree(elesds);
                } else {
                    setTypeAdd(dstset,elesds);
                }
            }
        }
    }
    setTypeReleaseIterator(si);

    // 如果目标key存在,保存结果
    if (dstkey) {
        /* Store the resulting set into the target, if the intersection
         * is not an empty set. */
        int deleted = dbDelete(c->db,dstkey);
        // 如果最终的结果set非空, 将最终set结果放到dstkey中
        if (setTypeSize(dstset) > 0) {
            dbAdd(c->db,dstkey,dstset);
            addReplyLongLong(c,setTypeSize(dstset));
            notifyKeyspaceEvent(NOTIFY_SET,"sinterstore",
                dstkey,c->db->id);
        } else {
            // 如果结果集合为空,删除
            decrRefCount(dstset);
            addReply(c,shared.czero);
            if (deleted)
                notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
                    dstkey,c->db->id);
        }
        signalModifiedKey(c->db,dstkey);
        server.dirty++;
    } else {
        setDeferredSetLen(c,replylen,cardinality);
    }
    zfree(sets);
}

通俗的来讲,求交集主要分为以下几个步骤:

  1. 将所有的集合按照基数进行排序
  2. 使用基数最小的集合作为结果集
  3. 遍历这个基数最小集合中的每个元素,检查这个元素在剩余的其他集合中是否存在
  4. 如果都存在,就将这个元素加入到最终的结果集中

算法的复杂度是O(N2)。