diff --git a/core/collection/cache.go b/core/collection/cache.go index 83a4115b..f0a8d9bc 100644 --- a/core/collection/cache.go +++ b/core/collection/cache.go @@ -34,7 +34,7 @@ type ( expire time.Duration timingWheel *TimingWheel lruCache lru - barrier syncx.SharedCalls + barrier syncx.SingleFlight unstableExpiry mathx.Unstable stats *cacheStat } @@ -46,7 +46,7 @@ func NewCache(expire time.Duration, opts ...CacheOption) (*Cache, error) { data: make(map[string]interface{}), expire: expire, lruCache: emptyLruCache, - barrier: syncx.NewSharedCalls(), + barrier: syncx.NewSingleFlight(), unstableExpiry: mathx.NewUnstable(expiryDeviation), } diff --git a/core/stores/cache/cache.go b/core/stores/cache/cache.go index 4ac13956..3c5f472d 100644 --- a/core/stores/cache/cache.go +++ b/core/stores/cache/cache.go @@ -29,7 +29,7 @@ type ( ) // New returns a Cache. -func New(c ClusterConf, barrier syncx.SharedCalls, st *Stat, errNotFound error, +func New(c ClusterConf, barrier syncx.SingleFlight, st *Stat, errNotFound error, opts ...Option) Cache { if len(c) == 0 || TotalWeights(c) <= 0 { log.Fatal("no cache nodes") diff --git a/core/stores/cache/cache_test.go b/core/stores/cache/cache_test.go index 65b710ff..55ad297b 100644 --- a/core/stores/cache/cache_test.go +++ b/core/stores/cache/cache_test.go @@ -104,7 +104,7 @@ func TestCache_SetDel(t *testing.T) { Weight: 100, }, } - c := New(conf, syncx.NewSharedCalls(), NewStat("mock"), errPlaceholder) + c := New(conf, syncx.NewSingleFlight(), NewStat("mock"), errPlaceholder) for i := 0; i < total; i++ { if i%2 == 0 { assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i)) @@ -142,7 +142,7 @@ func TestCache_OneNode(t *testing.T) { Weight: 100, }, } - c := New(conf, syncx.NewSharedCalls(), NewStat("mock"), errPlaceholder) + c := New(conf, syncx.NewSingleFlight(), NewStat("mock"), errPlaceholder) for i := 0; i < total; i++ { if i%2 == 0 { assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i)) diff --git a/core/stores/cache/cachenode.go b/core/stores/cache/cachenode.go index 9a39c355..35ea205f 100644 --- a/core/stores/cache/cachenode.go +++ b/core/stores/cache/cachenode.go @@ -29,7 +29,7 @@ type cacheNode struct { rds *redis.Redis expiry time.Duration notFoundExpiry time.Duration - barrier syncx.SharedCalls + barrier syncx.SingleFlight r *rand.Rand lock *sync.Mutex unstableExpiry mathx.Unstable @@ -43,7 +43,7 @@ type cacheNode struct { // st is used to stat the cache. // errNotFound defines the error that returned on cache not found. // opts are the options that customize the cacheNode. -func NewNode(rds *redis.Redis, barrier syncx.SharedCalls, st *Stat, +func NewNode(rds *redis.Redis, barrier syncx.SingleFlight, st *Stat, errNotFound error, opts ...Option) Cache { o := newOptions(opts...) return cacheNode{ diff --git a/core/stores/cache/cachenode_test.go b/core/stores/cache/cachenode_test.go index 13f83d9e..2e3dd6a9 100644 --- a/core/stores/cache/cachenode_test.go +++ b/core/stores/cache/cachenode_test.go @@ -96,7 +96,7 @@ func TestCacheNode_Take(t *testing.T) { cn := cacheNode{ rds: store, r: rand.New(rand.NewSource(time.Now().UnixNano())), - barrier: syncx.NewSharedCalls(), + barrier: syncx.NewSingleFlight(), lock: new(sync.Mutex), unstableExpiry: mathx.NewUnstable(expiryDeviation), stat: NewStat("any"), @@ -123,7 +123,7 @@ func TestCacheNode_TakeNotFound(t *testing.T) { cn := cacheNode{ rds: store, r: rand.New(rand.NewSource(time.Now().UnixNano())), - barrier: syncx.NewSharedCalls(), + barrier: syncx.NewSingleFlight(), lock: new(sync.Mutex), unstableExpiry: mathx.NewUnstable(expiryDeviation), stat: NewStat("any"), @@ -162,7 +162,7 @@ func TestCacheNode_TakeWithExpire(t *testing.T) { cn := cacheNode{ rds: store, r: rand.New(rand.NewSource(time.Now().UnixNano())), - barrier: syncx.NewSharedCalls(), + barrier: syncx.NewSingleFlight(), lock: new(sync.Mutex), unstableExpiry: mathx.NewUnstable(expiryDeviation), stat: NewStat("any"), @@ -189,7 +189,7 @@ func TestCacheNode_String(t *testing.T) { cn := cacheNode{ rds: store, r: rand.New(rand.NewSource(time.Now().UnixNano())), - barrier: syncx.NewSharedCalls(), + barrier: syncx.NewSingleFlight(), lock: new(sync.Mutex), unstableExpiry: mathx.NewUnstable(expiryDeviation), stat: NewStat("any"), @@ -206,7 +206,7 @@ func TestCacheValueWithBigInt(t *testing.T) { cn := cacheNode{ rds: store, r: rand.New(rand.NewSource(time.Now().UnixNano())), - barrier: syncx.NewSharedCalls(), + barrier: syncx.NewSingleFlight(), lock: new(sync.Mutex), unstableExpiry: mathx.NewUnstable(expiryDeviation), stat: NewStat("any"), diff --git a/core/stores/mongoc/cachedcollection.go b/core/stores/mongoc/cachedcollection.go index dc14fe68..b8494987 100644 --- a/core/stores/mongoc/cachedcollection.go +++ b/core/stores/mongoc/cachedcollection.go @@ -11,8 +11,8 @@ var ( // ErrNotFound is an alias of mgo.ErrNotFound. ErrNotFound = mgo.ErrNotFound - // can't use one SharedCalls per conn, because multiple conns may share the same cache key. - sharedCalls = syncx.NewSharedCalls() + // can't use one SingleFlight per conn, because multiple conns may share the same cache key. + sharedCalls = syncx.NewSingleFlight() stats = cache.NewStat("mongoc") ) diff --git a/core/stores/sqlc/cachedsql.go b/core/stores/sqlc/cachedsql.go index 2e3618d4..cd715920 100644 --- a/core/stores/sqlc/cachedsql.go +++ b/core/stores/sqlc/cachedsql.go @@ -17,8 +17,8 @@ var ( // ErrNotFound is an alias of sqlx.ErrNotFound. ErrNotFound = sqlx.ErrNotFound - // can't use one SharedCalls per conn, because multiple conns may share the same cache key. - exclusiveCalls = syncx.NewSharedCalls() + // can't use one SingleFlight per conn, because multiple conns may share the same cache key. + exclusiveCalls = syncx.NewSingleFlight() stats = cache.NewStat("sqlc") ) diff --git a/core/syncx/resourcemanager.go b/core/syncx/resourcemanager.go index 04e03418..39e14561 100644 --- a/core/syncx/resourcemanager.go +++ b/core/syncx/resourcemanager.go @@ -9,16 +9,16 @@ import ( // A ResourceManager is a manager that used to manage resources. type ResourceManager struct { - resources map[string]io.Closer - sharedCalls SharedCalls - lock sync.RWMutex + resources map[string]io.Closer + singleFlight SingleFlight + lock sync.RWMutex } // NewResourceManager returns a ResourceManager. func NewResourceManager() *ResourceManager { return &ResourceManager{ - resources: make(map[string]io.Closer), - sharedCalls: NewSharedCalls(), + resources: make(map[string]io.Closer), + singleFlight: NewSingleFlight(), } } @@ -39,7 +39,7 @@ func (manager *ResourceManager) Close() error { // GetResource returns the resource associated with given key. func (manager *ResourceManager) GetResource(key string, create func() (io.Closer, error)) (io.Closer, error) { - val, err := manager.sharedCalls.Do(key, func() (interface{}, error) { + val, err := manager.singleFlight.Do(key, func() (interface{}, error) { manager.lock.RLock() resource, ok := manager.resources[key] manager.lock.RUnlock() diff --git a/core/syncx/sharedcalls.go b/core/syncx/singleflight.go similarity index 64% rename from core/syncx/sharedcalls.go rename to core/syncx/singleflight.go index 4df1730d..bcdb450e 100644 --- a/core/syncx/sharedcalls.go +++ b/core/syncx/singleflight.go @@ -3,13 +3,17 @@ package syncx import "sync" type ( - // SharedCalls lets the concurrent calls with the same key to share the call result. + // SharedCalls is an alias of SingleFlight. + // Deprecated: use SingleFlight. + SharedCalls = SingleFlight + + // SingleFlight lets the concurrent calls with the same key to share the call result. // For example, A called F, before it's done, B called F. Then B would not execute F, // and shared the result returned by F which called by A. // The calls with the same key are dependent, concurrent calls share the returned values. // A ------->calls F with key<------------------->returns val // B --------------------->calls F with key------>returns val - SharedCalls interface { + SingleFlight interface { Do(key string, fn func() (interface{}, error)) (interface{}, error) DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error) } @@ -20,20 +24,26 @@ type ( err error } - sharedGroup struct { + flightGroup struct { calls map[string]*call lock sync.Mutex } ) -// NewSharedCalls returns a SharedCalls. -func NewSharedCalls() SharedCalls { - return &sharedGroup{ +// NewSingleFlight returns a SingleFlight. +func NewSingleFlight() SingleFlight { + return &flightGroup{ calls: make(map[string]*call), } } -func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) { +// NewSharedCalls returns a SingleFlight. +// Deprecated: use NewSingleFlight. +func NewSharedCalls() SingleFlight { + return NewSingleFlight() +} + +func (g *flightGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) { c, done := g.createCall(key) if done { return c.val, c.err @@ -43,7 +53,7 @@ func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{ return c.val, c.err } -func (g *sharedGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) { +func (g *flightGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) { c, done := g.createCall(key) if done { return c.val, false, c.err @@ -53,7 +63,7 @@ func (g *sharedGroup) DoEx(key string, fn func() (interface{}, error)) (val inte return c.val, true, c.err } -func (g *sharedGroup) createCall(key string) (c *call, done bool) { +func (g *flightGroup) createCall(key string) (c *call, done bool) { g.lock.Lock() if c, ok := g.calls[key]; ok { g.lock.Unlock() @@ -69,7 +79,7 @@ func (g *sharedGroup) createCall(key string) (c *call, done bool) { return c, false } -func (g *sharedGroup) makeCall(c *call, key string, fn func() (interface{}, error)) { +func (g *flightGroup) makeCall(c *call, key string, fn func() (interface{}, error)) { defer func() { g.lock.Lock() delete(g.calls, key) diff --git a/core/syncx/sharedcalls_test.go b/core/syncx/singleflight_test.go similarity index 96% rename from core/syncx/sharedcalls_test.go rename to core/syncx/singleflight_test.go index d6b52fb9..b1ffdf69 100644 --- a/core/syncx/sharedcalls_test.go +++ b/core/syncx/singleflight_test.go @@ -10,7 +10,7 @@ import ( ) func TestExclusiveCallDo(t *testing.T) { - g := NewSharedCalls() + g := NewSingleFlight() v, err := g.Do("key", func() (interface{}, error) { return "bar", nil }) @@ -23,7 +23,7 @@ func TestExclusiveCallDo(t *testing.T) { } func TestExclusiveCallDoErr(t *testing.T) { - g := NewSharedCalls() + g := NewSingleFlight() someErr := errors.New("some error") v, err := g.Do("key", func() (interface{}, error) { return nil, someErr @@ -37,7 +37,7 @@ func TestExclusiveCallDoErr(t *testing.T) { } func TestExclusiveCallDoDupSuppress(t *testing.T) { - g := NewSharedCalls() + g := NewSingleFlight() c := make(chan string) var calls int32 fn := func() (interface{}, error) { @@ -69,7 +69,7 @@ func TestExclusiveCallDoDupSuppress(t *testing.T) { } func TestExclusiveCallDoDiffDupSuppress(t *testing.T) { - g := NewSharedCalls() + g := NewSingleFlight() broadcast := make(chan struct{}) var calls int32 tests := []string{"e", "a", "e", "a", "b", "c", "b", "a", "c", "d", "b", "c", "d"} @@ -102,7 +102,7 @@ func TestExclusiveCallDoDiffDupSuppress(t *testing.T) { } func TestExclusiveCallDoExDupSuppress(t *testing.T) { - g := NewSharedCalls() + g := NewSingleFlight() c := make(chan string) var calls int32 fn := func() (interface{}, error) { diff --git a/zrpc/proxy.go b/zrpc/proxy.go index 6bcb7dab..455b8e8a 100644 --- a/zrpc/proxy.go +++ b/zrpc/proxy.go @@ -15,7 +15,7 @@ type RpcProxy struct { backend string clients map[string]Client options []internal.ClientOption - sharedCalls syncx.SharedCalls + sharedCalls syncx.SingleFlight lock sync.Mutex } @@ -25,7 +25,7 @@ func NewProxy(backend string, opts ...internal.ClientOption) *RpcProxy { backend: backend, clients: make(map[string]Client), options: opts, - sharedCalls: syncx.NewSharedCalls(), + sharedCalls: syncx.NewSingleFlight(), } }