Groupcache源码解析

golang中cache组件有很多, 比如groupcache、bigcache等。 本文介绍groupcache的使用。

groupcache简介

groupcache是memcache的作者开源的一个项目,在许多情况下,它是memcached的替代品。

对比memcache,相似处:

  • 按键选择哪个对等体负责该键

不同处:

  • 不需要运行单独的一组服务器,从而大大减少了部署/配置的痛苦。Groupcache是一个客户端库,也是一个服务器。它连接到自己的对等点,形成分布式缓存。 可以与存储服务集成,不需要单独部署。
  • 带有缓存填充机制。memcached只是说“Sorry, cache miss”,通常会导致无限数量的客户端加载大量数据,而groupcache坐标缓存填充使得一个key只会有一个加载,加载完成的数据共享给其他调用者。
  • 不支持版本控制的值。如果键"foo"是值"bar",键"foo"必须总是"bar"。既没有缓存过期时间,也没有显式的缓存收回。因此也没有CAS,也没有递增/递减.
  • 支持将热数据自动镜像到当前进程。这可以就近获取并且减少加载时间。 比如从peer1中获取 key2,key2存在与peer2中, key2可以作为热热数据镜像到peer1中。
  • 当前仅适用于Go。

groupcache不支持更新,也不支持删除,数据的载入是通过 GetterFunc函数来操作的。 groupcache适合有高性能要求,数据无更新的场景。

使用

使用NewGroup初始化一个缓存组,需要指定一个GetterFunc。 通过dest.SetString设置字符串类型的数据。 通过dest.SetProto设置proto(如grpc请求的返回值)类型的数据。 通过dest.SetBytes设置butes类型的数据。


	stringGroup = NewGroup(stringGroupName, cacheSize, GetterFunc(func(_ context.Context, key string, dest Sink) error {
		// 数据获取逻辑, 通过dest.SetString方法设值
		return dest.SetString("ECHO:")
	}))

使用Group.Get方法获取一个缓存值

var s string
err := stringGroup.Get(dummyCtx, fromChan, StringSink(&s))

如果未命中, groupcache会调用初始化时声明的GetterFunc来做数据载入

初始化

创建group时, 会添加一个冲突锁。作者预留了初始化函数callInitPeerServer,但是并为做实现。

mu.Lock()
defer mu.Unlock()
initPeerServerOnce.Do(callInitPeerServer)

使用 singleflight.Group保证同一key只获取一次,共享结果。这个函数后面解析。 name 缓存组的名字 getter 数据未命中时运行的数据加载函数 peers 其他的groupcache地址 cacheBytes 限制热缓存和主缓存的总大小

	g := &Group{
		name:       name,
		getter:     getter,
		peers:      peers,
		cacheBytes: cacheBytes,
		loadGroup:  &singleflight.Group{},
	}

Group结构的定义源码 mainCache 主缓存 hotCache 热缓存 _ 强制Stats在32位平台上以8字节对齐 Stats 计数统计

type Group struct {
	name       string
	getter     Getter
	peersOnce  sync.Once
	peers      PeerPicker
	cacheBytes int64 
	mainCache cache
	hotCache cache
	loadGroup flightGroup

	_ int32 
	Stats Stats
}

源码

func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
	return newGroup(name, cacheBytes, getter, nil)
}

// If peers is nil, the peerPicker is called via a sync.Once to initialize it.
func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
	if getter == nil {
		panic("nil Getter")
	}
	mu.Lock()
	defer mu.Unlock()
	initPeerServerOnce.Do(callInitPeerServer)
	if _, dup := groups[name]; dup {
		panic("duplicate registration of group " + name)
	}
	g := &Group{
		name:       name,
		getter:     getter,
		peers:      peers,
		cacheBytes: cacheBytes,
		loadGroup:  &singleflight.Group{},
	}
	if fn := newGroupHook; fn != nil {
		fn(g)
	}
	groups[name] = g
	return g
}

get value

获取时会首先初始化一次所有的peers(仅初始化一次),然后更新计数

g.peersOnce.Do(g.initPeers)
g.Stats.Gets.Add(1)

先从主缓存中获取数据,然后是热缓存。如果命中直接返回

value, cacheHit := g.lookupCache(key)

if cacheHit {
	g.Stats.CacheHits.Add(1)
	return setSinkView(dest, value)
}

未命中会使用loadGroup加载数据, 首先从其他peer中加载,未命中则调用getter加载,然后添加到本地cache

func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
	g.Stats.Loads.Add(1)
	viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
		// Check the cache again because singleflight can only dedup calls
		// that overlap concurrently.  It's possible for 2 concurrent
		// requests to miss the cache, resulting in 2 load() calls.  An
		// unfortunate goroutine scheduling would result in this callback
		// being run twice, serially.  If we don't check the cache again,
		// cache.nbytes would be incremented below even though there will
		// be only one entry for this key.
		//
		// Consider the following serialized event ordering for two
		// goroutines in which this callback gets called twice for the
		// same key:
		// 1: Get("key")
		// 2: Get("key")
		// 1: lookupCache("key")
		// 2: lookupCache("key")
		// 1: load("key")
		// 2: load("key")
		// 1: loadGroup.Do("key", fn)
		// 1: fn()
		// 2: loadGroup.Do("key", fn)
		// 2: fn()
		if value, cacheHit := g.lookupCache(key); cacheHit {
			g.Stats.CacheHits.Add(1)
			return value, nil
		}
		g.Stats.LoadsDeduped.Add(1)
		var value ByteView
		var err error
		if peer, ok := g.peers.PickPeer(key); ok {
			value, err = g.getFromPeer(ctx, peer, key)
			if err == nil {
				g.Stats.PeerLoads.Add(1)
				return value, nil
			}
			g.Stats.PeerErrors.Add(1)
			// TODO(bradfitz): log the peer's error? keep
			// log of the past few for /groupcachez?  It's
			// probably boring (normal task movement), so not
			// worth logging I imagine.
		}
		value, err = g.getLocally(ctx, key, dest)
		if err != nil {
			g.Stats.LocalLoadErrs.Add(1)
			return nil, err
		}
		g.Stats.LocalLoads.Add(1)
		destPopulated = true // only one caller of load gets this return value
		g.populateCache(key, value, &g.mainCache)
		return value, nil
	})
	if err == nil {
		value = viewi.(ByteView)
	}
	return
}

源码

func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
	g.peersOnce.Do(g.initPeers)
	g.Stats.Gets.Add(1)
	if dest == nil {
		return errors.New("groupcache: nil dest Sink")
	}
	value, cacheHit := g.lookupCache(key)

	if cacheHit {
		g.Stats.CacheHits.Add(1)
		return setSinkView(dest, value)
	}

	// Optimization to avoid double unmarshalling or copying: keep
	// track of whether the dest was already populated. One caller
	// (if local) will set this; the losers will not. The common
	// case will likely be one caller.
	destPopulated := false
	value, destPopulated, err := g.load(ctx, key, dest)
	if err != nil {
		return err
	}
	if destPopulated {
		return nil
	}
	return setSinkView(dest, value)
}

缓存填充以及加载抑制的实现

上篇有提到load函数的实现, 缓存填充的逻辑也体现在这里。 groupcache尽量避免从源中获取数据,当本地数据缺失时会先从peer中获取,peer中命中则直接填充到本地,未命中才会从源中加载,这正是缓存填充的实现逻辑。 而加载抑制,避免重复加载的功能是依靠 singleflight包实现的。 这个包中主要有两个结构体:

  • call用来存放获取结果(val)和错误(err), 每个key对应一个call实例。wg用来控制请求的等待。
type call struct {
	wg  sync.WaitGroup
	val interface{}
	err error
}
  • Group用来存放所有的call,记录所有的请求。
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

Group.Do是功能的实现。 当接到一个请求时, 会首先加锁, 并初始化用来记录请求的mapmap的键为请求的key, 值为call

g.mu.Lock()
if g.m == nil {
	g.m = make(map[string]*call)
}

如果当前的key已经在请求加载的过程中,那么解除上一步定义的冲突锁,并等待已经存在的加载请求结束后返回。

if c, ok := g.m[key]; ok {
	g.mu.Unlock()
	c.wg.Wait()
	return c.val, c.err
}

如果当前的key没有已经存在的加载过程,那么创建一个call实例, 加入到map记录中,并向call.wg中加入一个记录,以阻塞其他请求,解除上一步定义的冲突锁。

c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

调用传入的函数(作者并没有将这个功能局限于数据获取,通过传入的func可以实现不同功能的控制),将结果赋值给call,获取完成后wg.done结束阻塞。

c.val, c.err = fn()
c.wg.Done()

然后删除map记录

g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()

这个功能的实现主要是依靠sync.WaitGroup的阻塞实现, 这里也是对初学者最难理解的地方。 可以想象一个场景: 大学寝室中,你和你的室友都要到食堂买午饭,你对室友说:“你自己去就行,给我带一份”。然后你就在宿舍中等待舍友回来。 在这个场景中,你和室友就是请求,你在等待就是阻塞

cache(lru)

上篇提到的主缓存和热缓存均是依靠cache实现。 cache的实现依靠双向链表。 MaxEntries 最大的存储量 OnEvicted当发生驱逐时(即到达MaxEntries)执行的操作 ll双向链表本体 cache key对应链表中的元素

type Cache struct {
	// MaxEntries is the maximum number of cache entries before
	// an item is evicted. Zero means no limit.
	MaxEntries int

	// OnEvicted optionally specifies a callback function to be
	// executed when an entry is purged from the cache.
	OnEvicted func(key Key, value interface{})

	ll    *list.List
	cache map[interface{}]*list.Element
}

添加时会先进行初始化map,如果key已存在,那么会将keyindex提到首位(这里的链表不存在index,仅为方便理解),并更新其value。 如果不存在则直接插入到首位。 如果插入后的长度超过限制, 会执行清理操作

func (c *Cache) Add(key Key, value interface{}) {
	if c.cache == nil {
		c.cache = make(map[interface{}]*list.Element)
		c.ll = list.New()
	}
	if ee, ok := c.cache[key]; ok {
		c.ll.MoveToFront(ee)
		ee.Value.(*entry).value = value
		return
	}
	ele := c.ll.PushFront(&entry{key, value})
	c.cache[key] = ele
	if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
		c.RemoveOldest()
	}
}

清理时会删除尾部元素, 这里就解释了为什么每次操作时会把元素提到首位。

func (c *Cache) RemoveOldest() {
	if c.cache == nil {
		return
	}
	ele := c.ll.Back()
	if ele != nil {
		c.removeElement(ele)
	}
}