Skip to content

Commit 701f887

Browse files
authored
chore: Use in-memory network connection for metastore client tests (#4235)
1 parent 67af8d5 commit 701f887

File tree

3 files changed

+45
-38
lines changed

3 files changed

+45
-38
lines changed

pkg/experiment/metastore/client/client.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type Client struct {
3131
stopped bool
3232
logger log.Logger
3333
grpcClientConfig grpcclient.Config
34+
dialOpts []grpc.DialOption
3435
}
3536

3637
type client struct {
@@ -53,14 +54,16 @@ type instance interface {
5354
raftnodepb.RaftNodeServiceClient
5455
}
5556

56-
func New(logger log.Logger, grpcClientConfig grpcclient.Config, d discovery.Discovery) *Client {
57+
func New(logger log.Logger, grpcClientConfig grpcclient.Config, d discovery.Discovery, dialOpts ...grpc.DialOption) *Client {
5758
var c Client
5859
logger = log.With(logger, "component", "metastore-client")
5960
c.service = services.NewIdleService(c.starting, c.stopping)
6061
c.logger = logger
6162
c.grpcClientConfig = grpcClientConfig
6263
c.servers = make(map[raft.ServerID]*client)
6364
c.discovery = d
65+
c.dialOpts = dialOpts
66+
6467
c.discovery.Subscribe(discovery.UpdateFunc(func(servers []discovery.Server) {
6568
c.updateServers(servers)
6669
}))
@@ -117,7 +120,7 @@ func (c *Client) updateServers(servers []discovery.Server) {
117120
continue
118121
}
119122
}
120-
cl, err := newClient(s[0], c.grpcClientConfig, c.logger)
123+
cl, err := newClient(s[0], c.grpcClientConfig, c.dialOpts...)
121124
if err != nil {
122125
level.Error(c.logger).Log("msg", "failed to create client", "err", err)
123126
continue
@@ -139,12 +142,12 @@ func (c *Client) updateServers(servers []discovery.Server) {
139142
c.servers = newServers
140143
}
141144

142-
func newClient(s discovery.Server, config grpcclient.Config, logger log.Logger) (*client, error) {
145+
func newClient(s discovery.Server, config grpcclient.Config, dialOpts ...grpc.DialOption) (*client, error) {
143146
address := s.Raft.Address
144147
if s.ResolvedAddress != "" {
145148
address = raft.ServerAddress(s.ResolvedAddress)
146149
}
147-
conn, err := dial(string(address), config, logger)
150+
conn, err := dial(string(address), config, dialOpts...)
148151
if err != nil {
149152
return nil, err
150153
}
@@ -159,13 +162,14 @@ func newClient(s discovery.Server, config grpcclient.Config, logger log.Logger)
159162
}, nil
160163
}
161164

162-
func dial(address string, grpcClientConfig grpcclient.Config, _ log.Logger) (*grpc.ClientConn, error) {
165+
func dial(address string, grpcClientConfig grpcclient.Config, dialOpts ...grpc.DialOption) (*grpc.ClientConn, error) {
163166
options, err := grpcClientConfig.DialOption(nil, nil)
164167
if err != nil {
165168
return nil, err
166169
}
167170
// TODO: https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto
168171
options = append(options, grpc.WithDefaultServiceConfig(grpcServiceConfig))
172+
options = append(options, dialOpts...)
169173
return grpc.Dial(address, options...)
170174
}
171175

pkg/experiment/metastore/client/client_test.go

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"github.com/grafana/dskit/flagext"
99
"github.com/grafana/dskit/grpcclient"
10-
"github.com/stretchr/testify/assert"
1110
"github.com/stretchr/testify/mock"
1211
"github.com/stretchr/testify/require"
1312

@@ -16,20 +15,16 @@ import (
1615
"github.com/grafana/pyroscope/pkg/test/mocks/mockdiscovery"
1716
)
1817

19-
const nServers = 3
20-
2118
func TestUnavailable(t *testing.T) {
2219
d := mockdiscovery.NewMockDiscovery(t)
2320
d.On("Subscribe", mock.Anything).Return()
2421
l := test.NewTestingLogger(t)
2522
c := New(l, grpcclient.Config{}, d)
26-
ports, err := test.GetFreePorts(nServers)
27-
assert.NoError(t, err)
2823

2924
d.On("Rediscover").Run(func(args mock.Arguments) {
3025
}).Return()
3126

32-
c.updateServers(createServers(ports))
27+
c.updateServers(createServers([]int{30030, 30031, 30032}))
3328
res, err := c.AddBlock(context.Background(), &metastorev1.AddBlockRequest{})
3429
require.Error(t, err)
3530
require.Nil(t, res)
@@ -73,32 +68,30 @@ func testRediscoverWrongLeader(t *testing.T, f func(c *Client)) {
7368
l := test.NewTestingLogger(t)
7469
config := &grpcclient.Config{}
7570
flagext.DefaultValues(config)
76-
c := New(l, *config, d)
77-
ports, err := test.GetFreePorts(nServers * 2)
78-
assert.NoError(t, err)
7971

80-
p1 := ports[:nServers]
81-
p2 := ports[nServers:]
82-
m := sync.Mutex{}
83-
var servers *mockServers
84-
defer servers.Close()
72+
dServers1 := createServers([]int{30031, 30032, 30033})
73+
74+
dServers2 := createServers([]int{40031, 40032, 40033})
75+
mockServers2, dialOpts := createMockServers(t, l, dServers2)
76+
defer mockServers2.Close()
8577

78+
c := New(l, *config, d, dialOpts...)
79+
m := sync.Mutex{}
8680
verify := func() {}
81+
initWrongLeaderCalled := false
8782
d.On("Rediscover", mock.Anything).Run(func(args mock.Arguments) {
8883
m.Lock()
8984
defer m.Unlock()
90-
if servers == nil {
91-
srvInfo := createServers(p2)
92-
servers = createMockServers(t, l, p2)
93-
verify = servers.InitWrongLeader()
94-
85+
if !initWrongLeaderCalled {
86+
initWrongLeaderCalled = true
87+
verify = mockServers2.InitWrongLeader()
9588
// call updateServers twice
96-
c.updateServers(srvInfo)
97-
c.updateServers(srvInfo)
89+
c.updateServers(dServers2)
90+
c.updateServers(dServers2)
9891
}
9992
}).Return()
10093

101-
c.updateServers(createServers(p1))
94+
c.updateServers(dServers1)
10295
f(c)
10396
verify()
10497
}

pkg/experiment/metastore/client/server_mock_test.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"google.golang.org/grpc"
1515
"google.golang.org/grpc/codes"
1616
"google.golang.org/grpc/status"
17+
"google.golang.org/grpc/test/bufconn"
1718

1819
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
1920
"github.com/grafana/pyroscope/pkg/experiment/metastore/discovery"
@@ -95,13 +96,13 @@ func (m *mockServer) PromoteToLeader(ctx context.Context, request *raftnodepb.Pr
9596

9697
func createServers(ports []int) []discovery.Server {
9798
var servers []discovery.Server
98-
for i := 0; i < nServers; i++ {
99+
for i, p := range ports {
99100
servers = append(servers, discovery.Server{
100101
Raft: raft.Server{
101102
ID: testServerId(i),
102103
Address: raft.ServerAddress(fmt.Sprintf("server-%d", i)),
103104
},
104-
ResolvedAddress: fmt.Sprintf("127.0.0.1:%d", ports[i]),
105+
ResolvedAddress: fmt.Sprintf("127.0.0.1:%d", p),
105106
})
106107
}
107108
return servers
@@ -139,6 +140,7 @@ func (m *mockServers) InitWrongLeader() func() {
139140
s := new(wrongLeaderState)
140141
s.leaderIndex = -1
141142

143+
nServers := len(m.servers)
142144
for _, srv := range m.servers {
143145
srv := srv
144146
errf := func() error {
@@ -196,30 +198,38 @@ func errOrT[T any](t *T, f func() error) (*T, error) {
196198
return t, nil
197199
}
198200

199-
func createMockServers(t *testing.T, l log.Logger, ports []int) *mockServers {
201+
// Returns the grpc.DialOptions needed for a client connection to the created mock servers.
202+
func createMockServers(t *testing.T, l log.Logger, dServers []discovery.Server) (*mockServers, []grpc.DialOption) {
200203
var servers []*mockServer
201-
for idx, port := range ports {
204+
listeners := make(map[string]*bufconn.Listener)
205+
for idx, dserv := range dServers {
202206
s := newMockServer(t)
203207
s.index = idx
204208
s.id = testServerId(idx)
205-
s.address = fmt.Sprintf(":%d", port)
206-
lis, err := net.Listen("tcp", s.address)
207-
if err != nil {
208-
assert.NoError(t, err)
209-
}
209+
s.address = dserv.ResolvedAddress
210+
lis := bufconn.Listen(256 << 10)
211+
listeners[s.address] = lis
210212
go func() {
211213
if err := s.srv.Serve(lis); err != nil {
212214
assert.NoError(t, err)
213215
}
214216
}()
215217
servers = append(servers, s)
216218
}
217-
return &mockServers{
219+
220+
ms := &mockServers{
218221
servers: servers,
219222
t: t,
220223
l: l,
221224
}
222-
225+
dialer := func(_ context.Context, address string) (net.Conn, error) {
226+
el := listeners[address]
227+
if el != nil {
228+
return el.Dial()
229+
}
230+
return net.Dial("tcp", address)
231+
}
232+
return ms, []grpc.DialOption{grpc.WithContextDialer(dialer)}
223233
}
224234

225235
func newMockServer(t *testing.T) *mockServer {

0 commit comments

Comments
 (0)