golang中cache组件有很多, 比如groupcache、bigcache等。 本文介绍groupcache的使用。
groupcache简介
groupcache是memcache的作者开源的一个项目,在许多情况下,它是memcached的替代品。
对比memcache,相似处:
- 按键选择哪个对等体负责该键
不同处:
- 不需要运行单独的一组服务器,从而大大减少了部署/配置的痛苦。Groupcache是一个客户端库,也是一个服务器。它连接到自己的对等点,形成分布式缓存。 可以与存储服务集成,不需要单独部署。
- 带有缓存填充机制。memcached只是说“Sorry, cache miss”,通常会导致无限数量的客户端加载大量数据,而groupcache坐标缓存填充使得一个key只会有一个加载,加载完成的数据共享给其他调用者。
- 不支持版本控制的值。如果键"foo"是值"bar",键"foo"必须总是"bar"。既没有缓存过期时间,也没有显式的缓存收回。因此也没有CAS,也没有递增/递减.
- 支持将热数据自动镜像到当前进程。这可以就近获取并且减少加载时间。 比如从peer1中获取 key2,key2存在与peer2中, key2可以作为热热数据镜像到peer1中。
- 当前仅适用于Go。
groupcache不支持更新,也不支持删除,数据的载入是通过 GetterFunc
函数来操作的。
groupcache适合有高性能要求,数据无更新的场景。
使用
使用NewGroup初始化一个缓存组,需要指定一个GetterFunc
。
通过dest.SetString
设置字符串类型的数据。
通过dest.SetProto
设置proto(如grpc请求的返回值)类型的数据。
通过dest.SetBytes
设置butes类型的数据。
stringGroup = NewGroup(stringGroupName, cacheSize, GetterFunc(func(_ context.Context, key string, dest Sink) error {
// 数据获取逻辑, 通过dest.SetString方法设值
return dest.SetString("ECHO:")
}))
使用Group.Get
方法获取一个缓存值
var s string
err := stringGroup.Get(dummyCtx, fromChan, StringSink(&s))
如果未命中, groupcache会调用初始化时声明的GetterFunc
来做数据载入
初始化
创建group时, 会添加一个冲突锁。作者预留了初始化函数callInitPeerServer
,但是并为做实现。
mu.Lock()
defer mu.Unlock()
initPeerServerOnce.Do(callInitPeerServer)
使用 singleflight.Group
保证同一key只获取一次,共享结果。这个函数后面解析。
name
缓存组的名字
getter
数据未命中时运行的数据加载函数
peers
其他的groupcache地址
cacheBytes
限制热缓存和主缓存的总大小
g := &Group{
name: name,
getter: getter,
peers: peers,
cacheBytes: cacheBytes,
loadGroup: &singleflight.Group{},
}
Group结构的定义源码
mainCache
主缓存
hotCache
热缓存
_
强制Stats在32位平台上以8字节对齐
Stats
计数统计
type Group struct {
name string
getter Getter
peersOnce sync.Once
peers PeerPicker
cacheBytes int64
mainCache cache
hotCache cache
loadGroup flightGroup
_ int32
Stats Stats
}
源码
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
return newGroup(name, cacheBytes, getter, nil)
}
// If peers is nil, the peerPicker is called via a sync.Once to initialize it.
func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
if getter == nil {
panic("nil Getter")
}
mu.Lock()
defer mu.Unlock()
initPeerServerOnce.Do(callInitPeerServer)
if _, dup := groups[name]; dup {
panic("duplicate registration of group " + name)
}
g := &Group{
name: name,
getter: getter,
peers: peers,
cacheBytes: cacheBytes,
loadGroup: &singleflight.Group{},
}
if fn := newGroupHook; fn != nil {
fn(g)
}
groups[name] = g
return g
}
get value
获取时会首先初始化一次所有的peers(仅初始化一次),然后更新计数
g.peersOnce.Do(g.initPeers)
g.Stats.Gets.Add(1)
先从主缓存中获取数据,然后是热缓存。如果命中直接返回
value, cacheHit := g.lookupCache(key)
if cacheHit {
g.Stats.CacheHits.Add(1)
return setSinkView(dest, value)
}
未命中会使用loadGroup
加载数据, 首先从其他peer中加载,未命中则调用getter
加载,然后添加到本地cache
func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// Check the cache again because singleflight can only dedup calls
// that overlap concurrently. It's possible for 2 concurrent
// requests to miss the cache, resulting in 2 load() calls. An
// unfortunate goroutine scheduling would result in this callback
// being run twice, serially. If we don't check the cache again,
// cache.nbytes would be incremented below even though there will
// be only one entry for this key.
//
// Consider the following serialized event ordering for two
// goroutines in which this callback gets called twice for the
// same key:
// 1: Get("key")
// 2: Get("key")
// 1: lookupCache("key")
// 2: lookupCache("key")
// 1: load("key")
// 2: load("key")
// 1: loadGroup.Do("key", fn)
// 1: fn()
// 2: loadGroup.Do("key", fn)
// 2: fn()
if value, cacheHit := g.lookupCache(key); cacheHit {
g.Stats.CacheHits.Add(1)
return value, nil
}
g.Stats.LoadsDeduped.Add(1)
var value ByteView
var err error
if peer, ok := g.peers.PickPeer(key); ok {
value, err = g.getFromPeer(ctx, peer, key)
if err == nil {
g.Stats.PeerLoads.Add(1)
return value, nil
}
g.Stats.PeerErrors.Add(1)
// TODO(bradfitz): log the peer's error? keep
// log of the past few for /groupcachez? It's
// probably boring (normal task movement), so not
// worth logging I imagine.
}
value, err = g.getLocally(ctx, key, dest)
if err != nil {
g.Stats.LocalLoadErrs.Add(1)
return nil, err
}
g.Stats.LocalLoads.Add(1)
destPopulated = true // only one caller of load gets this return value
g.populateCache(key, value, &g.mainCache)
return value, nil
})
if err == nil {
value = viewi.(ByteView)
}
return
}
源码
func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
g.peersOnce.Do(g.initPeers)
g.Stats.Gets.Add(1)
if dest == nil {
return errors.New("groupcache: nil dest Sink")
}
value, cacheHit := g.lookupCache(key)
if cacheHit {
g.Stats.CacheHits.Add(1)
return setSinkView(dest, value)
}
// Optimization to avoid double unmarshalling or copying: keep
// track of whether the dest was already populated. One caller
// (if local) will set this; the losers will not. The common
// case will likely be one caller.
destPopulated := false
value, destPopulated, err := g.load(ctx, key, dest)
if err != nil {
return err
}
if destPopulated {
return nil
}
return setSinkView(dest, value)
}
缓存填充以及加载抑制的实现
上篇有提到load
函数的实现, 缓存填充的逻辑也体现在这里。
groupcache尽量避免从源中获取数据,当本地数据缺失时会先从peer中获取,peer中命中则直接填充到本地,未命中才会从源中加载,这正是缓存填充的实现逻辑。
而加载抑制,避免重复加载的功能是依靠 singleflight
包实现的。
这个包中主要有两个结构体:
call
用来存放获取结果(val)和错误(err), 每个key对应一个call
实例。wg
用来控制请求的等待。
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
Group
用来存放所有的call
,记录所有的请求。
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
Group.Do
是功能的实现。
当接到一个请求时, 会首先加锁, 并初始化用来记录请求的map
。map
的键为请求的key
, 值为call
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
如果当前的key已经在请求加载的过程中,那么解除上一步定义的冲突锁,并等待已经存在的加载请求结束后返回。
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
如果当前的key没有已经存在的加载过程,那么创建一个call
实例, 加入到map
记录中,并向call.wg
中加入一个记录,以阻塞其他请求,解除上一步定义的冲突锁。
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
调用传入的函数(作者并没有将这个功能局限于数据获取,通过传入的func
可以实现不同功能的控制),将结果赋值给call
,获取完成后wg.done
结束阻塞。
c.val, c.err = fn()
c.wg.Done()
然后删除map
记录
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
这个功能的实现主要是依靠sync.WaitGroup
的阻塞实现, 这里也是对初学者最难理解的地方。
可以想象一个场景:
大学寝室中,你和你的室友都要到食堂买午饭,你对室友说:“你自己去就行,给我带一份”。然后你就在宿舍中等待舍友回来。
在这个场景中,你和室友就是请求,你在等待就是阻塞。
cache(lru)
上篇提到的主缓存和热缓存均是依靠cache实现。
cache的实现依靠双向链表。
MaxEntries
最大的存储量
OnEvicted
当发生驱逐时(即到达MaxEntries)执行的操作
ll
双向链表本体
cache
key对应链表中的元素
type Cache struct {
// MaxEntries is the maximum number of cache entries before
// an item is evicted. Zero means no limit.
MaxEntries int
// OnEvicted optionally specifies a callback function to be
// executed when an entry is purged from the cache.
OnEvicted func(key Key, value interface{})
ll *list.List
cache map[interface{}]*list.Element
}
添加时会先进行初始化map
,如果key
已存在,那么会将key
的index
提到首位(这里的链表不存在index,仅为方便理解),并更新其value。
如果不存在则直接插入到首位。
如果插入后的长度超过限制, 会执行清理操作
func (c *Cache) Add(key Key, value interface{}) {
if c.cache == nil {
c.cache = make(map[interface{}]*list.Element)
c.ll = list.New()
}
if ee, ok := c.cache[key]; ok {
c.ll.MoveToFront(ee)
ee.Value.(*entry).value = value
return
}
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.RemoveOldest()
}
}
清理时会删除尾部元素, 这里就解释了为什么每次操作时会把元素提到首位。
func (c *Cache) RemoveOldest() {
if c.cache == nil {
return
}
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
}
}