Skip to content

Commit de04640

Browse files
committed
Fixed hot cache update on calls to Set()
1 parent cf916be commit de04640

File tree

7 files changed

+123
-41
lines changed

7 files changed

+123
-41
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
*~
22
.idea/
33
.DS_Store
4-
vendor
4+
vendor
5+
.aider*
6+
.env

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ $(GOLANGCI_LINT): ## Download Go linter
66

77
.PHONY: ci
88
ci: tidy lint test
9-
go mod tidy && git diff --exit-code
9+
@echo
10+
@echo "\033[32mEVERYTHING PASSED!\033[0m"
1011

1112
.PHONY: lint
1213
lint: $(GOLANGCI_LINT) ## Run Go linter

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ func ExampleUsage() {
6161
defer cancel()
6262

6363
// ListenAndServe is a convenience function which Starts an instance of groupcache
64-
// with the provided transport and listens for groupcache HTTP requests on the address provided.
64+
// with the provided transport and listens for groupcache HTTP requests on
65+
// the address provided.
6566
d, err := groupcache.ListenAndServe(ctx, "192.168.1.1:8080", groupcache.Options{})
6667
if err != nil {
6768
log.Fatal("while starting server on 192.168.1.1:8080")

group.go

Lines changed: 63 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import (
3232

3333
// Group is the user facing interface for a group
3434
type Group interface {
35+
// TODO: deprecate the hotCache boolean in Set(). It is not needed
36+
3537
Set(context.Context, string, []byte, time.Time, bool) error
3638
Get(context.Context, string, transport.Sink) error
3739
Remove(context.Context, string) error
@@ -60,10 +62,10 @@ func (f GetterFunc) Get(ctx context.Context, key string, dest transport.Sink) er
6062
// A Group is a cache namespace and associated data loaded spread over
6163
// a group of 1 or more machines.
6264
type group struct {
63-
name string
64-
getter Getter
65-
instance *Instance
66-
cacheBytes int64 // limit for sum of mainCache and hotCache size
65+
name string
66+
getter Getter
67+
instance *Instance
68+
maxCacheBytes int64 // max size of both mainCache and hotCache
6769

6870
// mainCache is a cache of the keys for which this process
6971
// (amongst its peers) is authoritative. That is, this cache
@@ -135,27 +137,56 @@ func (g *group) Get(ctx context.Context, key string, dest transport.Sink) error
135137
return transport.SetSinkView(dest, value)
136138
}
137139

138-
func (g *group) Set(ctx context.Context, key string, value []byte, expire time.Time, hotCache bool) error {
140+
func (g *group) Set(ctx context.Context, key string, value []byte, expire time.Time, _ bool) error {
139141
if key == "" {
140142
return errors.New("empty Set() key not allowed")
141143
}
142144

145+
if g.maxCacheBytes <= 0 {
146+
return nil
147+
}
148+
143149
_, err := g.setGroup.Do(key, func() (interface{}, error) {
144150
// If remote peer owns this key
145151
owner, isRemote := g.instance.PickPeer(key)
146152
if isRemote {
147-
if err := g.setFromPeer(ctx, owner, key, value, expire); err != nil {
153+
// Set the key/value on the remote peer
154+
if err := g.setPeer(ctx, owner, key, value, expire); err != nil {
148155
return nil, err
149156
}
150-
// TODO(thrawn01): Not sure if this is useful outside of tests...
151-
// maybe we should ALWAYS update the local cache?
152-
if hotCache {
153-
g.localSet(key, value, expire, g.hotCache)
157+
}
158+
// Update the local caches
159+
bv := transport.ByteViewWithExpire(value, expire)
160+
g.loadGroup.Lock(func() {
161+
g.mainCache.Add(key, bv)
162+
g.hotCache.Remove(key)
163+
})
164+
165+
// Update all peers in the cluster
166+
var wg sync.WaitGroup
167+
for _, p := range g.instance.getAllPeers() {
168+
if p.PeerInfo().IsSelf {
169+
continue // Skip self
170+
}
171+
172+
// Do not update the owner again, we already updated them
173+
if p.HashKey() == owner.HashKey() {
174+
continue
154175
}
155-
return nil, nil
176+
177+
wg.Add(1)
178+
go func(p peer.Client) {
179+
if err := g.setPeer(ctx, p, key, value, expire); err != nil {
180+
g.instance.opts.Logger.Error("Failed to update peer",
181+
"peer", p.PeerInfo().Address,
182+
"key", key,
183+
"err", err)
184+
}
185+
wg.Done()
186+
}(p)
156187
}
157-
// We own this key
158-
g.localSet(key, value, expire, g.mainCache)
188+
wg.Wait()
189+
159190
return nil, nil
160191
})
161192
return err
@@ -340,7 +371,7 @@ func (g *group) getFromPeer(ctx context.Context, peer peer.Client, key string) (
340371
return value, nil
341372
}
342373

343-
func (g *group) setFromPeer(ctx context.Context, peer peer.Client, k string, v []byte, e time.Time) error {
374+
func (g *group) setPeer(ctx context.Context, peer peer.Client, k string, v []byte, e time.Time) error {
344375
var expire int64
345376
if !e.IsZero() {
346377
expire = e.UnixNano()
@@ -363,7 +394,7 @@ func (g *group) removeFromPeer(ctx context.Context, peer peer.Client, key string
363394
}
364395

365396
func (g *group) lookupCache(key string) (value transport.ByteView, ok bool) {
366-
if g.cacheBytes <= 0 {
397+
if g.maxCacheBytes <= 0 {
367398
return
368399
}
369400
value, ok = g.mainCache.Get(key)
@@ -374,26 +405,30 @@ func (g *group) lookupCache(key string) (value transport.ByteView, ok bool) {
374405
return
375406
}
376407

377-
func (g *group) LocalSet(key string, value []byte, expire time.Time) {
378-
g.localSet(key, value, expire, g.mainCache)
379-
}
380-
381-
func (g *group) localSet(key string, value []byte, expire time.Time, cache Cache) {
382-
if g.cacheBytes <= 0 {
408+
// RemoteSet is called by the transport to set values in the local and hot caches when
409+
// a remote peer sends us a pb.SetRequest
410+
func (g *group) RemoteSet(key string, value []byte, expire time.Time) {
411+
if g.maxCacheBytes <= 0 {
383412
return
384413
}
385414

386-
bv := transport.ByteViewWithExpire(value, expire)
387-
388-
// Ensure no requests are in flight
415+
// Lock all load operations until this function returns
389416
g.loadGroup.Lock(func() {
390-
g.populateCache(key, bv, cache)
417+
// This instance could take over ownership of this key at any moment after
418+
// the set is made. In order to avoid accidental propagation of the previous
419+
// value should this instance become owner of the key, we always set key in
420+
// the main cache.
421+
bv := transport.ByteViewWithExpire(value, expire)
422+
g.mainCache.Add(key, bv)
423+
424+
// It's possible the value could be in the hot cache.
425+
g.hotCache.Remove(key)
391426
})
392427
}
393428

394429
func (g *group) LocalRemove(key string) {
395430
// Clear key from our local cache
396-
if g.cacheBytes <= 0 {
431+
if g.maxCacheBytes <= 0 {
397432
return
398433
}
399434

@@ -405,7 +440,7 @@ func (g *group) LocalRemove(key string) {
405440
}
406441

407442
func (g *group) populateCache(key string, value transport.ByteView, cache Cache) {
408-
if g.cacheBytes <= 0 {
443+
if g.maxCacheBytes <= 0 {
409444
return
410445
}
411446
cache.Add(key, value)
@@ -440,7 +475,7 @@ func (g *group) CacheStats(which CacheType) CacheStats {
440475
// ResetCacheSize changes the maxBytes allowed and resets both the main and hot caches.
441476
// It is mostly intended for testing and is not thread safe.
442477
func (g *group) ResetCacheSize(maxBytes int64) error {
443-
g.cacheBytes = maxBytes
478+
g.maxCacheBytes = maxBytes
444479
var (
445480
hotCache int64
446481
mainCache int64

instance.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,13 @@ func (i *Instance) NewGroup(name string, cacheBytes int64, getter Getter) (Group
169169
return nil, fmt.Errorf("duplicate registration of group '%s'", name)
170170
}
171171
g := &group{
172-
instance: i,
173-
name: name,
174-
getter: getter,
175-
cacheBytes: cacheBytes,
176-
loadGroup: &singleflight.Group{},
177-
setGroup: &singleflight.Group{},
178-
removeGroup: &singleflight.Group{},
172+
instance: i,
173+
name: name,
174+
getter: getter,
175+
maxCacheBytes: cacheBytes,
176+
loadGroup: &singleflight.Group{},
177+
setGroup: &singleflight.Group{},
178+
removeGroup: &singleflight.Group{},
179179
}
180180
if err := g.ResetCacheSize(cacheBytes); err != nil {
181181
return nil, err

instance_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/groupcache/groupcache-go/v3/cluster"
3434
"github.com/groupcache/groupcache-go/v3/transport"
3535
"github.com/groupcache/groupcache-go/v3/transport/pb/testpb"
36+
"github.com/stretchr/testify/assert"
3637
"github.com/stretchr/testify/require"
3738
"google.golang.org/protobuf/proto"
3839
)
@@ -451,3 +452,46 @@ func TestNoDeDup(t *testing.T) {
451452
t.Errorf("cache has %d bytes, want %d", used, wantBytes)
452453
}
453454
}
455+
456+
func TestSetValueOnAllPeers(t *testing.T) {
457+
ctx := context.Background()
458+
err := cluster.Start(ctx, 3, groupcache.Options{
459+
Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
460+
})
461+
require.NoError(t, err)
462+
defer func() { _ = cluster.Shutdown(context.Background()) }()
463+
464+
// Create a group for each instance in the cluster
465+
var groups []groupcache.Group
466+
for _, d := range cluster.ListDaemons() {
467+
g, err := d.GetInstance().NewGroup("group", 1<<20, groupcache.GetterFunc(func(ctx context.Context, key string, dest transport.Sink) error {
468+
return dest.SetString("original-value", time.Time{})
469+
}))
470+
require.NoError(t, err)
471+
groups = append(groups, g)
472+
}
473+
474+
// Set the value on the first group
475+
err = groups[0].Set(ctx, "key", []byte("value"), time.Time{}, false)
476+
require.NoError(t, err)
477+
478+
// Verify the value exists on all peers
479+
for i, g := range groups {
480+
var result string
481+
err := g.Get(ctx, "key", transport.StringSink(&result))
482+
require.NoError(t, err, "Failed to get value from peer %d", i)
483+
assert.Equal(t, "value", result, "Unexpected value from peer %d", i)
484+
}
485+
486+
// Update the value on the second group
487+
err = groups[1].Set(ctx, "key", []byte("foo"), time.Time{}, false)
488+
require.NoError(t, err)
489+
490+
// Verify the value was updated
491+
for i, g := range groups {
492+
var result string
493+
err := g.Get(ctx, "key", transport.StringSink(&result))
494+
require.NoError(t, err, "Failed to get value from peer %d", i)
495+
assert.Equal(t, "foo", result, "Unexpected value from peer %d", i)
496+
}
497+
}

transport/http_transport.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func (t *HttpTransport) ServeHTTP(w http.ResponseWriter, r *http.Request) {
263263

264264
type transportMethods interface {
265265
Get(ctx context.Context, key string, dest Sink) error
266-
LocalSet(string, []byte, time.Time)
266+
RemoteSet(string, []byte, time.Time)
267267
LocalRemove(string)
268268
}
269269

@@ -309,8 +309,7 @@ func (t *HttpTransport) ServeHTTP(w http.ResponseWriter, r *http.Request) {
309309
if out.Expire != nil && *out.Expire != 0 {
310310
expire = time.Unix(*out.Expire/int64(time.Second), *out.Expire%int64(time.Second))
311311
}
312-
313-
group.LocalSet(*out.Key, out.Value, expire)
312+
group.RemoteSet(*out.Key, out.Value, expire)
314313
return
315314
}
316315

0 commit comments

Comments
 (0)