Skip to content

Commit 52e393f

Browse files
authored
chore: Use in-memory connection for scheduler tests (#4233)
1 parent 800f605 commit 52e393f

File tree

2 files changed

+59
-29
lines changed

2 files changed

+59
-29
lines changed

pkg/scheduler/scheduler.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ type Config struct {
9797
QuerierForgetDelay time.Duration `yaml:"querier_forget_delay" category:"experimental"`
9898
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."`
9999
ServiceDiscovery schedulerdiscovery.Config `yaml:",inline"`
100+
101+
// Dial options used to initiate outgoing gRPC connections.
102+
// Intended to be used by tests to use in-memory network connections.
103+
DialOpts []grpc.DialOption `yaml:"-"`
100104
}
101105

102106
func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
@@ -536,6 +540,7 @@ func (s *Scheduler) forwardErrorToFrontend(ctx context.Context, req *schedulerRe
536540
return
537541
}
538542

543+
opts = append(opts, s.cfg.DialOpts...)
539544
conn, err := grpc.DialContext(ctx, req.frontendAddress, opts...)
540545
if err != nil {
541546
level.Warn(s.log).Log("msg", "failed to create gRPC connection to frontend to report error", "frontend", req.frontendAddress, "err", err, "requestErr", requestErr)

pkg/scheduler/scheduler_test.go

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"net"
1313
"net/http"
1414
"net/http/httptest"
15-
"net/url"
1615
"strings"
1716
"sync"
1817
"testing"
@@ -33,6 +32,7 @@ import (
3332
"golang.org/x/net/http2/h2c"
3433
"google.golang.org/grpc"
3534
"google.golang.org/grpc/credentials/insecure"
35+
"google.golang.org/grpc/test/bufconn"
3636

3737
"github.com/grafana/pyroscope/pkg/frontend/frontendpb"
3838
"github.com/grafana/pyroscope/pkg/scheduler/schedulerpb"
@@ -44,30 +44,50 @@ import (
4444

4545
const testMaxOutstandingPerTenant = 5
4646

47-
func setupScheduler(t *testing.T, reg prometheus.Registerer, opts ...connect.HandlerOption) (*Scheduler, schedulerpb.SchedulerForFrontendClient, schedulerpb.SchedulerForQuerierClient) {
48-
cfg := Config{}
47+
type schedulerArgs struct {
48+
reg prometheus.Registerer
49+
handlerOpts []connect.HandlerOption
50+
dialOpts []grpc.DialOption
51+
}
52+
53+
func setupScheduler(t *testing.T, args schedulerArgs) (*Scheduler, schedulerpb.SchedulerForFrontendClient, schedulerpb.SchedulerForQuerierClient) {
54+
cfg := Config{
55+
DialOpts: args.dialOpts,
56+
}
4957
flagext.DefaultValues(&cfg)
5058
cfg.MaxOutstandingPerTenant = testMaxOutstandingPerTenant
5159

52-
s, err := NewScheduler(cfg, &limits{queriers: 2}, log.NewNopLogger(), reg)
60+
s, err := NewScheduler(cfg, &limits{queriers: 2}, log.NewNopLogger(), args.reg)
61+
5362
require.NoError(t, err)
5463

5564
server := httptest.NewUnstartedServer(nil)
5665
mux := mux.NewRouter()
5766
server.Config.Handler = h2c.NewHandler(mux, &http2.Server{})
67+
68+
// Use an in-memory network connection to avoid test flake from network access.
69+
listener := bufconn.Listen(256 << 10)
70+
server.Listener = listener
71+
5872
server.Start()
59-
u, err := url.Parse(server.URL)
6073
require.NoError(t, err)
61-
schedulerpbconnect.RegisterSchedulerForFrontendHandler(mux, s, opts...)
62-
schedulerpbconnect.RegisterSchedulerForQuerierHandler(mux, s, opts...)
74+
schedulerpbconnect.RegisterSchedulerForFrontendHandler(mux, s, args.handlerOpts...)
75+
schedulerpbconnect.RegisterSchedulerForQuerierHandler(mux, s, args.handlerOpts...)
6376

6477
require.NoError(t, services.StartAndAwaitRunning(context.Background(), s))
6578
t.Cleanup(func() {
6679
_ = services.StopAndAwaitTerminated(context.Background(), s)
6780
server.Close()
6881
})
6982

70-
c, err := grpc.Dial(u.Hostname()+":"+u.Port(), grpc.WithTransportCredentials(insecure.NewCredentials()))
83+
c, err := grpc.NewClient(
84+
// Target address is irrelevant as we're using an in-memory connection.
85+
// We simply need the DNS resolution to succeed.
86+
"localhost:3030",
87+
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return listener.Dial() }),
88+
grpc.WithTransportCredentials(insecure.NewCredentials()),
89+
)
90+
7191
require.NoError(t, err)
7292

7393
t.Cleanup(func() {
@@ -77,9 +97,14 @@ func setupScheduler(t *testing.T, reg prometheus.Registerer, opts ...connect.Han
7797
return s, schedulerpb.NewSchedulerForFrontendClient(c), schedulerpb.NewSchedulerForQuerierClient(c)
7898
}
7999

80-
func Test_Timeout(t *testing.T) {
81-
s, _, querierClient := setupScheduler(t, nil, connect.WithInterceptors(util.WithTimeout(1*time.Second)))
100+
func setupSchedulerWithHandlerOpts(t *testing.T, handlerOpts ...connect.HandlerOption) (*Scheduler, schedulerpb.SchedulerForFrontendClient, schedulerpb.SchedulerForQuerierClient) {
101+
return setupScheduler(t, schedulerArgs{
102+
handlerOpts: handlerOpts,
103+
})
104+
}
82105

106+
func Test_Timeout(t *testing.T) {
107+
s, _, querierClient := setupSchedulerWithHandlerOpts(t, connect.WithInterceptors(util.WithTimeout(1*time.Second)))
83108
ql, err := querierClient.QuerierLoop(context.Background())
84109
require.NoError(t, err)
85110
require.NoError(t, ql.Send(&schedulerpb.QuerierToScheduler{QuerierID: "querier-1"}))
@@ -88,7 +113,7 @@ func Test_Timeout(t *testing.T) {
88113
}
89114

90115
func TestSchedulerBasicEnqueue(t *testing.T) {
91-
scheduler, frontendClient, querierClient := setupScheduler(t, nil)
116+
scheduler, frontendClient, querierClient := setupScheduler(t, schedulerArgs{})
92117

93118
frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345")
94119
frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{
@@ -116,7 +141,7 @@ func TestSchedulerBasicEnqueue(t *testing.T) {
116141
}
117142

118143
func TestSchedulerEnqueueWithCancel(t *testing.T) {
119-
scheduler, frontendClient, querierClient := setupScheduler(t, nil)
144+
scheduler, frontendClient, querierClient := setupScheduler(t, schedulerArgs{})
120145

121146
frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345")
122147
frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{
@@ -146,7 +171,7 @@ func initQuerierLoop(t *testing.T, querierClient schedulerpb.SchedulerForQuerier
146171
}
147172

148173
func TestSchedulerEnqueueByMultipleFrontendsWithCancel(t *testing.T) {
149-
scheduler, frontendClient, querierClient := setupScheduler(t, nil)
174+
scheduler, frontendClient, querierClient := setupScheduler(t, schedulerArgs{})
150175

151176
frontendLoop1 := initFrontendLoop(t, frontendClient, "frontend-1")
152177
frontendLoop2 := initFrontendLoop(t, frontendClient, "frontend-2")
@@ -187,7 +212,7 @@ func TestSchedulerEnqueueByMultipleFrontendsWithCancel(t *testing.T) {
187212
}
188213

189214
func TestSchedulerEnqueueWithFrontendDisconnect(t *testing.T) {
190-
scheduler, frontendClient, querierClient := setupScheduler(t, nil)
215+
scheduler, frontendClient, querierClient := setupScheduler(t, schedulerArgs{})
191216

192217
frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345")
193218
frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{
@@ -217,7 +242,7 @@ func TestSchedulerEnqueueWithFrontendDisconnect(t *testing.T) {
217242
}
218243

219244
func TestCancelRequestInProgress(t *testing.T) {
220-
scheduler, frontendClient, querierClient := setupScheduler(t, nil)
245+
scheduler, frontendClient, querierClient := setupScheduler(t, schedulerArgs{})
221246

222247
frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345")
223248
frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{
@@ -249,7 +274,7 @@ func TestCancelRequestInProgress(t *testing.T) {
249274
}
250275

251276
func TestTracingContext(t *testing.T) {
252-
scheduler, frontendClient, _ := setupScheduler(t, nil)
277+
scheduler, frontendClient, _ := setupScheduler(t, schedulerArgs{})
253278

254279
frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345")
255280

@@ -280,7 +305,7 @@ func TestTracingContext(t *testing.T) {
280305
}
281306

282307
func TestSchedulerShutdown_FrontendLoop(t *testing.T) {
283-
scheduler, frontendClient, _ := setupScheduler(t, nil)
308+
scheduler, frontendClient, _ := setupScheduler(t, schedulerArgs{})
284309

285310
frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345")
286311

@@ -301,7 +326,7 @@ func TestSchedulerShutdown_FrontendLoop(t *testing.T) {
301326
}
302327

303328
func TestSchedulerShutdown_QuerierLoop(t *testing.T) {
304-
scheduler, frontendClient, querierClient := setupScheduler(t, nil)
329+
scheduler, frontendClient, querierClient := setupScheduler(t, schedulerArgs{})
305330

306331
frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345")
307332
frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{
@@ -333,7 +358,7 @@ func TestSchedulerShutdown_QuerierLoop(t *testing.T) {
333358
}
334359

335360
func TestSchedulerMaxOutstandingRequests(t *testing.T) {
336-
_, frontendClient, _ := setupScheduler(t, nil)
361+
_, frontendClient, _ := setupScheduler(t, schedulerArgs{})
337362

338363
for i := 0; i < testMaxOutstandingPerTenant; i++ {
339364
// coming from different frontends
@@ -365,32 +390,32 @@ func TestSchedulerMaxOutstandingRequests(t *testing.T) {
365390
}
366391

367392
func TestSchedulerForwardsErrorToFrontend(t *testing.T) {
368-
_, frontendClient, querierClient := setupScheduler(t, nil)
393+
394+
l := bufconn.Listen(256 << 10)
395+
_, frontendClient, querierClient := setupScheduler(t, schedulerArgs{
396+
// Have the scheduler use the in-memory connection to call back into the frontend.
397+
dialOpts: []grpc.DialOption{
398+
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return l.Dial() }),
399+
},
400+
})
369401

370402
fm := &frontendMock{resp: map[uint64]*httpgrpc.HTTPResponse{}}
371-
frontendAddress := ""
372403

373404
// Setup frontend grpc server
374405
{
375406
frontendGrpcServer := grpc.NewServer()
376407
frontendpb.RegisterFrontendForQuerierServer(frontendGrpcServer, fm)
377408

378-
l, err := net.Listen("tcp", "127.0.0.1:")
379-
require.NoError(t, err)
380-
381-
frontendAddress = l.Addr().String()
382-
383409
go func() {
384410
_ = frontendGrpcServer.Serve(l)
385411
}()
386-
387412
t.Cleanup(func() {
388413
_ = l.Close()
389414
})
390415
}
391416

392417
// After preparations, start frontend and querier.
393-
frontendLoop := initFrontendLoop(t, frontendClient, frontendAddress)
418+
frontendLoop := initFrontendLoop(t, frontendClient, "irrelevant://because-we-use-in-memory-connection")
394419
frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{
395420
Type: schedulerpb.FrontendToSchedulerType_ENQUEUE,
396421
QueryID: 100,
@@ -427,7 +452,7 @@ func TestSchedulerForwardsErrorToFrontend(t *testing.T) {
427452
func TestSchedulerMetrics(t *testing.T) {
428453
reg := prometheus.NewPedanticRegistry()
429454

430-
scheduler, frontendClient, _ := setupScheduler(t, reg)
455+
scheduler, frontendClient, _ := setupScheduler(t, schedulerArgs{reg: reg})
431456

432457
frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345")
433458
frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{

0 commit comments

Comments
 (0)