Skip to content

Commit c3ea532

Browse files
bduffanytoddbaert
andauthored
fix: fix goroutine leaks around shutdown (#716)
Signed-off-by: Brandon Duffany <[email protected]> Co-authored-by: Todd Baert <[email protected]>
1 parent 629a535 commit c3ea532

File tree

5 files changed

+383
-16
lines changed

5 files changed

+383
-16
lines changed

providers/flagd/pkg/service/in_process/service.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type InProcess struct {
3232
serviceMetadata model.Metadata
3333
sync sync.ISync
3434
syncEnd context.CancelFunc
35+
wg parallel.WaitGroup
3536
}
3637

3738
type Configuration struct {
@@ -93,15 +94,19 @@ func (i *InProcess) Init() error {
9394
syncChan := make(chan sync.DataSync, 1)
9495

9596
// start data sync
97+
i.wg.Add(1)
9698
go func() {
99+
defer i.wg.Done()
97100
err := i.sync.Sync(ctx, syncChan)
98101
if err != nil {
99102
syncInitErr <- err
100103
}
101104
}()
102105

103106
// start data sync listener and listen to listener shutdown hook
107+
i.wg.Add(1)
104108
go func() {
109+
defer i.wg.Done()
105110
for {
106111
select {
107112
case data := <-syncChan:
@@ -144,6 +149,7 @@ func (i *InProcess) Init() error {
144149
func (i *InProcess) Shutdown() {
145150
i.syncEnd()
146151
close(i.listenerShutdown)
152+
i.wg.Wait()
147153
}
148154

149155
func (i *InProcess) ResolveBoolean(ctx context.Context, key string, defaultValue bool,
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package process
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path/filepath"
7+
"runtime"
8+
"testing"
9+
"time"
10+
11+
of "github.com/open-feature/go-sdk/openfeature"
12+
)
13+
14+
func TestInProcessServiceShutdownCleansUpGoroutines(t *testing.T) {
15+
checkGoroutineLeaks(t)
16+
17+
flagFile := "config.json"
18+
offlinePath := filepath.Join(t.TempDir(), flagFile)
19+
if err := os.WriteFile(offlinePath, []byte(flagRsp), 0644); err != nil {
20+
t.Fatal(err)
21+
}
22+
service := NewInProcessService(Configuration{OfflineFlagSource: offlinePath})
23+
if err := service.Init(); err != nil {
24+
t.Fatal(err)
25+
}
26+
27+
// Wait for provider to be ready.
28+
channel := service.EventChannel()
29+
select {
30+
case event := <-channel:
31+
if event.EventType != of.ProviderReady {
32+
t.Fatalf("Provider initialization failed. Got event type %s with message %s", event.EventType, event.Message)
33+
}
34+
case <-time.After(2 * time.Second):
35+
t.Fatal("Provider initialization did not complete within acceptable timeframe")
36+
}
37+
38+
// Service is ready - now shut down.
39+
service.Shutdown()
40+
}
41+
42+
func TestInProcessServiceImmediateShutdownCleansUpGoroutines(t *testing.T) {
43+
checkGoroutineLeaks(t)
44+
45+
flagFile := "config.json"
46+
offlinePath := filepath.Join(t.TempDir(), flagFile)
47+
if err := os.WriteFile(offlinePath, []byte(flagRsp), 0644); err != nil {
48+
t.Fatal(err)
49+
}
50+
service := NewInProcessService(Configuration{OfflineFlagSource: offlinePath})
51+
if err := service.Init(); err != nil {
52+
t.Fatal(err)
53+
}
54+
55+
// Immediately shut down - don't wait for the provider to be ready.
56+
service.Shutdown()
57+
}
58+
59+
// At the end of the test, if no other failures have occurred, check for
60+
// goroutine leaks, and fail the test if any were found.
61+
func checkGoroutineLeaks(t *testing.T) {
62+
startingGoroutineCount := runtime.NumGoroutine()
63+
t.Cleanup(func() {
64+
if t.Failed() {
65+
return
66+
}
67+
buf := make([]byte, 1<<20)
68+
stacklen := runtime.Stack(buf, true)
69+
if numGoroutinesAfter := runtime.NumGoroutine(); numGoroutinesAfter > startingGoroutineCount {
70+
t.Errorf("Goroutines leaked: %d goroutines before, %d goroutines after", startingGoroutineCount, numGoroutinesAfter)
71+
fmt.Fprintf(os.Stderr, "%s\n", buf[:stacklen])
72+
}
73+
})
74+
}

providers/flagd/pkg/service/rpc/service.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net"
99
"net/http"
1010
"os"
11+
"sync"
1112
"time"
1213

1314
schemaConnectV1 "buf.build/gen/go/open-feature/flagd/connectrpc/go/flagd/evaluation/v1/evaluationv1connect"
@@ -51,6 +52,7 @@ type Service struct {
5152

5253
client schemaConnectV1.ServiceClient
5354
cancelHook context.CancelFunc
55+
wg sync.WaitGroup
5456
}
5557

5658
func NewService(cfg Configuration, cache *cache.Service, logger logr.Logger, retries int) *Service {
@@ -86,7 +88,9 @@ func (s *Service) Init() error {
8688
ctx, cancelFunc := context.WithCancel(context.Background())
8789
s.cancelHook = cancelFunc
8890

91+
s.wg.Add(1)
8992
go func() {
93+
defer s.wg.Done()
9094
s.startEventStream(ctx)
9195
}()
9296

@@ -97,6 +101,7 @@ func (s *Service) Shutdown() {
97101
if s.cancelHook != nil {
98102
s.cancelHook()
99103
}
104+
s.wg.Wait()
100105
}
101106

102107
// ResolveBoolean handles the flag evaluation response from the flagd ResolveBoolean rpc
@@ -450,24 +455,28 @@ func (s *Service) startEventStream(ctx context.Context) {
450455
}
451456

452457
// error in stream handler, purge cache if available and retry
453-
s.logger.V(logger.Warn).Info("connection to event stream failed, attempting again")
458+
s.logger.V(logger.Warn).Info(fmt.Sprintf("connection to event stream failed (%q), attempting again", err))
454459
if s.cache.IsEnabled() {
455460
s.cache.GetCache().Purge()
456461
}
457462
}
458463

459-
time.Sleep(s.retryCounter.sleep())
464+
select {
465+
case <-ctx.Done():
466+
return
467+
case <-time.After(s.retryCounter.sleep()):
468+
}
460469
}
461470

462471
// retry attempts exhausted. Disable cache and emit error event
463472
s.cache.Disable()
464-
s.events <- of.Event{
473+
s.sendEvent(ctx, of.Event{
465474
ProviderName: "flagd",
466475
EventType: of.ProviderError,
467476
ProviderEventDetails: of.ProviderEventDetails{
468477
Message: "grpc connection establishment failed",
469478
},
470-
}
479+
})
471480
}
472481

473482
// streamClient opens the event stream and distribute streams to appropriate handlers.
@@ -485,9 +494,9 @@ func (s *Service) streamClient(ctx context.Context) error {
485494

486495
switch stream.Msg().Type {
487496
case string(flagdService.ConfigurationChange):
488-
s.handleConfigurationChangeEvent(stream.Msg())
497+
s.handleConfigurationChangeEvent(ctx, stream.Msg())
489498
case string(flagdService.ProviderReady):
490-
s.handleReadyEvent()
499+
s.handleReadyEvent(ctx)
491500
case string(flagdService.Shutdown):
492501
// this is considered as a non-error
493502
return nil
@@ -504,7 +513,7 @@ func (s *Service) streamClient(ctx context.Context) error {
504513
return nil
505514
}
506515

507-
func (s *Service) handleConfigurationChangeEvent(event *schemaV1.EventStreamResponse) {
516+
func (s *Service) handleConfigurationChangeEvent(ctx context.Context, event *schemaV1.EventStreamResponse) {
508517
if !s.cache.IsEnabled() {
509518
return
510519
}
@@ -536,20 +545,28 @@ func (s *Service) handleConfigurationChangeEvent(event *schemaV1.EventStreamResp
536545
keys = append(keys, flagKey)
537546
}
538547

539-
s.events <- of.Event{
548+
s.sendEvent(ctx, of.Event{
540549
ProviderName: "flagd",
541550
EventType: of.ProviderConfigChange,
542551
ProviderEventDetails: of.ProviderEventDetails{
543552
Message: "flags changed",
544553
FlagChanges: keys,
545554
},
546-
}
555+
})
547556
}
548557

549-
func (s *Service) handleReadyEvent() {
550-
s.events <- of.Event{
558+
func (s *Service) handleReadyEvent(ctx context.Context) {
559+
s.sendEvent(ctx, of.Event{
551560
ProviderName: "flagd",
552561
EventType: of.ProviderReady,
562+
})
563+
}
564+
565+
func (s *Service) sendEvent(ctx context.Context, event of.Event) {
566+
select {
567+
case <-ctx.Done():
568+
return
569+
case s.events <- event:
553570
}
554571
}
555572

providers/flagd/pkg/service/rpc/service_eventing_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package rpc
22

33
import (
4-
schemaV1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/evaluation/v1"
54
"context"
65
"errors"
6+
"testing"
7+
"time"
8+
9+
schemaV1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/evaluation/v1"
710
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
811
of "github.com/open-feature/go-sdk/openfeature"
912
"google.golang.org/protobuf/types/known/structpb"
10-
"testing"
11-
"time"
1213
)
1314

1415
func TestRetries(t *testing.T) {
@@ -69,7 +70,8 @@ func TestConfigChange(t *testing.T) {
6970

7071
// when
7172
go func() {
72-
service.handleConfigurationChangeEvent(&schemaV1.EventStreamResponse{
73+
ctx := context.Background()
74+
service.handleConfigurationChangeEvent(ctx, &schemaV1.EventStreamResponse{
7375
Data: stData,
7476
})
7577
}()
@@ -93,7 +95,8 @@ func TestConfigChange(t *testing.T) {
9395

9496
// when
9597
go func() {
96-
service.handleConfigurationChangeEvent(&schemaV1.EventStreamResponse{
98+
ctx := context.Background()
99+
service.handleConfigurationChangeEvent(ctx, &schemaV1.EventStreamResponse{
97100
Data: stData,
98101
})
99102
}()

0 commit comments

Comments
 (0)