Skip to content

Commit 1a3a8a1

Browse files
authored
Fix bug in query frontend diff handling (#3315)
This also adds some test coverage to the query-frontend diff handler.
1 parent cf55c09 commit 1a3a8a1

File tree

3 files changed

+199
-2
lines changed

3 files changed

+199
-2
lines changed

pkg/frontend/frontend.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/grafana/pyroscope/pkg/frontend/frontendpb"
3333
"github.com/grafana/pyroscope/pkg/querier/stats"
3434
"github.com/grafana/pyroscope/pkg/scheduler/schedulerdiscovery"
35+
"github.com/grafana/pyroscope/pkg/util/connectgrpc"
3536
"github.com/grafana/pyroscope/pkg/util/httpgrpc"
3637
"github.com/grafana/pyroscope/pkg/util/httpgrpcutil"
3738
"github.com/grafana/pyroscope/pkg/validation"
@@ -78,6 +79,7 @@ func (cfg *Config) Validate() error {
7879
// dispatches them to backends via gRPC, and handles retries for requests which failed.
7980
type Frontend struct {
8081
services.Service
82+
connectgrpc.GRPCRoundTripper
8183

8284
cfg Config
8385
log log.Logger
@@ -149,6 +151,7 @@ func NewFrontend(cfg Config, limits Limits, log log.Logger, reg prometheus.Regis
149151
schedulerWorkersWatcher: services.NewFailureWatcher(),
150152
requests: newRequestsInProgress(),
151153
}
154+
f.GRPCRoundTripper = &realFrontendRoundTripper{frontend: f}
152155
// Randomize to avoid getting responses from queries sent before restart, which could lead to mixing results
153156
// between different queries. Note that frontend verifies the user, so it cannot leak results between tenants.
154157
// This isn't perfect, but better than nothing.
@@ -191,8 +194,15 @@ func (f *Frontend) stopping(_ error) error {
191194
return errors.Wrap(services.StopAndAwaitTerminated(context.Background(), f.schedulerWorkers), "failed to stop frontend scheduler workers")
192195
}
193196

197+
// allow to test the frontend without the need of a real roundertripper
198+
type realFrontendRoundTripper struct {
199+
frontend *Frontend
200+
}
201+
194202
// RoundTripGRPC round trips a proto (instead of an HTTP request).
195-
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
203+
func (rt *realFrontendRoundTripper) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
204+
f := rt.frontend
205+
196206
if s := f.State(); s != services.Running {
197207
return nil, fmt.Errorf("frontend not running: %v", s)
198208
}

pkg/frontend/frontend_diff_test.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package frontend
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"connectrpc.com/connect"
9+
"github.com/grafana/dskit/user"
10+
"github.com/opentracing/opentracing-go"
11+
"github.com/pkg/errors"
12+
"github.com/stretchr/testify/require"
13+
14+
querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
15+
"github.com/grafana/pyroscope/pkg/model"
16+
"github.com/grafana/pyroscope/pkg/util/connectgrpc"
17+
"github.com/grafana/pyroscope/pkg/util/httpgrpc"
18+
)
19+
20+
type mockLimits struct{}
21+
22+
func (m *mockLimits) QuerySplitDuration(_ string) time.Duration {
23+
return time.Hour
24+
}
25+
26+
func (m *mockLimits) MaxQueryParallelism(_ string) int {
27+
return 100
28+
}
29+
30+
func (m *mockLimits) MaxQueryLength(_ string) time.Duration {
31+
return time.Hour
32+
}
33+
34+
func (m *mockLimits) MaxQueryLookback(_ string) time.Duration {
35+
return time.Hour * 24
36+
}
37+
38+
func (m *mockLimits) QueryAnalysisEnabled(_ string) bool {
39+
return true
40+
}
41+
42+
func (m *mockLimits) MaxFlameGraphNodesDefault(_ string) int {
43+
return 10_000
44+
}
45+
46+
func (m *mockLimits) MaxFlameGraphNodesMax(_ string) int {
47+
return 100_000
48+
}
49+
50+
type mockRoundTripper struct {
51+
callback func(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
52+
}
53+
54+
func (m *mockRoundTripper) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
55+
if m.callback != nil {
56+
return m.callback(ctx, req)
57+
}
58+
return &httpgrpc.HTTPResponse{}, errors.New("not implemented")
59+
}
60+
61+
func Test_Frontend_Diff(t *testing.T) {
62+
frontend := Frontend{
63+
limits: &mockLimits{},
64+
}
65+
66+
ctx := user.InjectOrgID(context.Background(), "test")
67+
_, ctx = opentracing.StartSpanFromContext(ctx, "test")
68+
now := time.Now().UnixMilli()
69+
70+
profileType := "memory:inuse_space:bytes:space:byte"
71+
72+
t.Run("Diff outside of the query window", func(t *testing.T) {
73+
resp, err := frontend.Diff(
74+
ctx,
75+
connect.NewRequest(&querierv1.DiffRequest{
76+
Left: &querierv1.SelectMergeStacktracesRequest{
77+
ProfileTypeID: profileType,
78+
LabelSelector: "{}",
79+
Start: 0000,
80+
End: 1000,
81+
},
82+
Right: &querierv1.SelectMergeStacktracesRequest{
83+
ProfileTypeID: profileType,
84+
LabelSelector: "{}",
85+
Start: 2000,
86+
End: 3000,
87+
},
88+
}),
89+
)
90+
require.NoError(t, err)
91+
require.NotNil(t, resp)
92+
})
93+
94+
t.Run("Failing left hand side", func(t *testing.T) {
95+
frontend.GRPCRoundTripper = &mockRoundTripper{callback: func(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
96+
return connectgrpc.HandleUnary[querierv1.SelectMergeStacktracesRequest, querierv1.SelectMergeStacktracesResponse](ctx, req, func(ctx context.Context, req *connect.Request[querierv1.SelectMergeStacktracesRequest]) (*connect.Response[querierv1.SelectMergeStacktracesResponse], error) {
97+
if req.Msg.Start == now {
98+
return nil, errors.New("left fails")
99+
}
100+
101+
return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{
102+
Flamegraph: &querierv1.FlameGraph{},
103+
}), nil
104+
})
105+
}}
106+
107+
_, err := frontend.Diff(
108+
ctx,
109+
connect.NewRequest(&querierv1.DiffRequest{
110+
Left: &querierv1.SelectMergeStacktracesRequest{
111+
ProfileTypeID: profileType,
112+
LabelSelector: "{}",
113+
Start: now + 0000,
114+
End: now + 1000,
115+
},
116+
Right: &querierv1.SelectMergeStacktracesRequest{
117+
ProfileTypeID: profileType,
118+
LabelSelector: "{}",
119+
Start: now + 2000,
120+
End: now + 3000,
121+
},
122+
}),
123+
)
124+
require.ErrorContains(t, err, "left fails")
125+
})
126+
127+
t.Run("simple diff", func(t *testing.T) {
128+
frontend.GRPCRoundTripper = &mockRoundTripper{callback: func(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
129+
return connectgrpc.HandleUnary[querierv1.SelectMergeStacktracesRequest, querierv1.SelectMergeStacktracesResponse](ctx, req, func(ctx context.Context, req *connect.Request[querierv1.SelectMergeStacktracesRequest]) (*connect.Response[querierv1.SelectMergeStacktracesResponse], error) {
130+
131+
s := new(model.Tree)
132+
s.InsertStack(1, "foo", "bar")
133+
134+
if req.Msg.Start == now {
135+
//left
136+
s.InsertStack(1, "foo", "bar", "baz")
137+
} else {
138+
//right
139+
s.InsertStack(2, "foo", "bar", "buz")
140+
}
141+
142+
return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{
143+
Flamegraph: model.NewFlameGraph(s, -1),
144+
}), nil
145+
})
146+
}}
147+
148+
resp, err := frontend.Diff(
149+
ctx,
150+
connect.NewRequest(&querierv1.DiffRequest{
151+
Left: &querierv1.SelectMergeStacktracesRequest{
152+
ProfileTypeID: profileType,
153+
LabelSelector: "{}",
154+
Start: now + 0000,
155+
End: now + 1000,
156+
},
157+
Right: &querierv1.SelectMergeStacktracesRequest{
158+
ProfileTypeID: profileType,
159+
LabelSelector: "{}",
160+
Start: now + 2000,
161+
End: now + 3000,
162+
},
163+
}),
164+
)
165+
require.NoError(t, err)
166+
require.Equal(
167+
t,
168+
&querierv1.FlameGraphDiff{
169+
Names: []string{"total", "foo", "bar", "buz", "baz"},
170+
Total: 5,
171+
Levels: []*querierv1.Level{
172+
{Values: []int64{0, 2, 0, 0, 3, 0, 0}},
173+
{Values: []int64{0, 2, 0, 0, 3, 0, 1}},
174+
{Values: []int64{0, 2, 1, 0, 3, 1, 2}},
175+
{Values: []int64{1, 1, 1, 1, 0, 0, 4, 0, 0, 0, 0, 2, 2, 3}},
176+
},
177+
LeftTicks: 2,
178+
RightTicks: 3,
179+
MaxSelf: 2,
180+
},
181+
resp.Msg.Flamegraph,
182+
)
183+
})
184+
185+
}

pkg/frontend/frontend_select_merge_stacktraces.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ func (f *Frontend) SelectMergeStacktraces(ctx context.Context,
4040
return nil, connect.NewError(connect.CodeInvalidArgument, err)
4141
}
4242
if validated.IsEmpty {
43-
return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{}), nil
43+
return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{
44+
Flamegraph: &querierv1.FlameGraph{},
45+
}), nil
4446
}
4547
maxNodes, err := validation.ValidateMaxNodes(f.limits, tenantIDs, c.Msg.GetMaxNodes())
4648
if err != nil {

0 commit comments

Comments
 (0)