cache中的每个元素既在一个双向的lru链表中,同时又在一个hash表中。 lru链表的作用是:当cache超过容量时淘汰掉最旧的元素, hash表的作用是:对cache中的元素进行快速定位。
下面我们来看看cache中的元素是长啥样的,leveldb用LRUHandle结构体代表cache中的元素。
struct LRUHandle { //node void* value; //这个存储的是cache的数据; void (*deleter)(const Slice&, void* value);//这个是数据从Cache中清除时执行的清理函数; //后面的三个成员事关LRUCache的数据的组织结构 /*指向节点在hash table链表中的下一个hash(key)相同的元素,在有碰撞时Leveldb采用的是链表法。*/ LRUHandle* next_hash; // 节点在双向链表中的前驱后继节点指针,所有的cache数据都是存储在一个双向list中 LRUHandle* next; LRUHandle* prev; size_t charge; // 所占的cache容量 size_t key_length; bool in_cache; //是否在cache中 uint32_t refs; // 该元素引用计数 uint32_t hash; //key的hash值 char key_data[1]; // key的起始位置 Slice key() const { // For cheaper lookups, we allow a temporary Handle object // to store a pointer to a key in "value". if (next == this) { return *(reinterpret_cast<Slice*>(value)); } else { return Slice(key_data, key_length); } } };Leveldb自己实现了一个hash table:HandleTable,而不是使用系统提供的hash table。这个类就是基本的hash操作:Lookup、Insert和Delete。Hash table的作用是根据key快速查找元素是否在cache中,并返回LRUHandle节点指针,由此就能快速定位节点在hash表和双向链表中的位置。 HandleTable使用LRUHandle **list_存储所有的hash节点,其实就是一个二维数组,一维是各个哈希槽(不同的hash(key)),另一维则是相同hash(key)的碰撞list(用链表组织起来的)。
每次当hash节点数超过当前一维数组的长度后,都会做Resize操作: LRUHandle** new_list = new LRUHandle*[new_length]; 然后复制list_到new_list中,并删除旧的list_。这个过程很费时间,因为要对每个元素进行重新哈希并加入到对应哈希槽。
//注意,我们这里说的元素或者值指的是LRUHandle* class HandleTable { public: HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); } ~HandleTable() { delete[] list_; } LRUHandle* Lookup(const Slice& key, uint32_t hash) { return *FindPointer(key, hash); } LRUHandle* Insert(LRUHandle* h) { LRUHandle** ptr = FindPointer(h->key(), h->hash);//旧值的地址 LRUHandle* old = *ptr; h->next_hash = (old == NULL ? NULL : old->next_hash); *ptr = h;//赋上新值 if (old == NULL) {//进入该分支说明插入的是新元素,而不是修改老元素 ++elems_; if (elems_ > length_) { Resize();//扩张hash表 } } return old; //返回旧元素 } LRUHandle* Remove(const Slice& key, uint32_t hash) { LRUHandle** ptr = FindPointer(key, hash); LRUHandle* result = *ptr; if (result != NULL) { *ptr = result->next_hash; --elems_; } return result;//also don't delete node, just return } private: uint32_t length_; //有多少个不同的哈希槽,即上面所说的一维数组的大小,list_数组的长度 uint32_t elems_; //表中现在有多少个元素 LRUHandle** list_; //表 //该函数找到符合条件的LRUHandle*元素的地址 LRUHandle** FindPointer(const Slice& key, uint32_t hash) { LRUHandle** ptr = &list_[hash & (length_ - 1)]; //我一直好奇这个ptr到底指在哪里,其实是前一个LRUHandle元素里next_hash的地址(&next_hash) while (*ptr != NULL && ((*ptr)->hash != hash || key != (*ptr)->key())) { ptr = &(*ptr)->next_hash; } return ptr; } void Resize() { uint32_t new_length = 4; while (new_length < elems_) { new_length *= 2; } LRUHandle** new_list = new LRUHandle*[new_length];//以上是重新分配空间扩大哈希槽数 memset(new_list, 0, sizeof(new_list[0]) * new_length); uint32_t count = 0; //下面的for循环将旧表中的元素依次加入新表的对应位置 for (uint32_t i = 0; i < length_; i++) { LRUHandle* h = list_[i]; while (h != NULL) { LRUHandle* next = h->next_hash; uint32_t hash = h->hash; LRUHandle** ptr = &new_list[hash & (new_length - 1)]; h->next_hash = *ptr; *ptr = h; h = next; count++; } } assert(elems_ == count); //删除旧表空间,赋值新表 delete[] list_; list_ = new_list; length_ = new_length; } };上述这个hash表看上去很简单,但自己写很容易出错,比如我在写时把FindPointer写成如下:
LRUHandle** FindPointer(const Slice& key, uint32_t hash) { LRUHandle* p = list_[hash & (length_ - 1)];//p是值不是值的地址,p是个局部变量 while (p != NULL) { if (p->key() == key && p->hash == hash) return &p; else p = p->next_hash; } }后来和原版一比对,表面上我的代码没问题,其实是有很严重bug的。我们FindPointer返回的其实是我们想要元素的地址,其实也就是该元素前一个元素的next_hash的地址,(有点绕2333),但我返回的却是局部变量p的地址,当该哈希槽为空时,&list_[hash & (length_ - 1)]也是有意义的,因为这块地址空间是我们在Resize时new出来的。
LRUCache有两个双向链表,一个是lru_,另一个就是in_use_,从名字上就能理解,in_use_上的元素目前正在被外界使用,元素的引用计数往往大于1。而lru_的元素仅仅是在cache中,目前没有被外界使用,可随时进行删除。提供了如下两个函数对元素的引用计数做增加或减少操作,并且它们自动完成所属链表的转换。还有lru_和in_use_这两个头部只是个傀儡,并不存有key和value,它只是指向链表中的第一个元素,起到头的作用。
void LRUCache::Ref(LRUHandle* e) { if (e->refs == 1 && e->in_cache) { / //当lru_上的元素增加引用计数,说明该元素正在被使用,因此将它挂入in_use_链表中,并从lru_中移除它 LRU_Remove(e); LRU_Append(&in_use_, e); } e->refs++; } void LRUCache::Unref(LRUHandle* e) { assert(e->refs > 0); e->refs--; if (e->refs == 0) { //释放该元素,可能是cache空间不够了,或者cache析构了 assert(!e->in_cache); (*e->deleter)(e->key(), e->value);//调用元素自带的清除函数 free(e);//释放元素所占的内存 } else if (e->in_cache && e->refs == 1) { //引用计数为1,说明外界没有正在使用该元素,因此回到lru_链表中 LRU_Remove(e); LRU_Append(&lru_, e); } } Cache::Handle* LRUCache::Insert( const Slice& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const Slice& key, void* value)) { MutexLock l(&mutex_); //Insert,Release,Lookup,Prune,Erase all need lock //对新元素进行初始化 LRUHandle* e = reinterpret_cast<LRUHandle*>( malloc(sizeof(LRUHandle)-1 + key.size()));//clever key.size,char key_data[1]; e->value = value; e->deleter = deleter; e->charge = charge; e->key_length = key.size(); e->hash = hash; e->in_cache = false; e->refs = 1; // 初始时引用计数都为1 memcpy(e->key_data, key.data(), key.size()); if (capacity_ > 0) { e->refs++; // 因为最后会返回这个元素e给外面,所以增加它的引用计数 e->in_cache = true; //插入lru链表中 LRU_Append(&in_use_, e);//because the e's refs > 1,so insert in in_use_; usage_ += charge; //插入哈希表中并删除旧值 FinishErase(table_.Insert(e)); } // else don't cache. (Tests use capacity_==0 to turn off caching.) while (usage_ > capacity_ && lru_.next != &lru_) { // 如果超过cache容量了,则删除lru_链表上最老的数据,最老的在头部,新的在尾部 LRUHandle* old = lru_.next; bool erased = FinishErase(table_.Remove(old->key(), old->hash)); } return reinterpret_cast<Cache::Handle*>(e);//返回刚插入的元素 }1.这个lrucache的实现跟我们一般的实现不太一样,因为它的元素涉及到引用计数,可能是因为涉及到资源的释放。 如果让我们设计一般的lru淘汰链表,我们一般会设计成list加hashmap的形式,hashmap的作用是快速查某元素是不是已经在lru链表上了,如果不是则把它插入表头,代表是最近被使用,如果是,则把它从链表原来的位置上移除并加入到表头,代表最近被使用。 我们这个lru比较特殊,因为我们基于引用计数,所以设计成两个lru链表,一个叫lru_,使用计数都为1代表没有被外界正在使用,因为淘汰都是淘汰这个链表上最旧的,另一个叫in_use_,这个链表上存的都是引用计数大于1的元素,代表被外界正在使用,当lru_上的某个元素被使用了并不是把它移到表头代表最近被使用,而是增加他的引用计数并挂到in_use_上,空间不够时淘汰的总是lru_上的元素。 2.还要注意的是lrucache的锁竞争情况。Erase,Prune,Insert , Lookup,Release都需要加锁。个人感觉这个加锁的粒度有点粗,还可以优化。
为了多线程访问,尽可能快速,减少锁开销,ShardedLRUCache内部有16个LRUCache,查找Key时首先计算key属于哪一个分片,分片的计算方法是取32位hash值的高4位,然后在相应的LRUCache中进行查找,这样就大大减少了多线程的访问锁的开销。 LRUCache shard_[kNumShards] 它就是一个包装类,实现都在LRUCache类中。比较简单我就不分析了。