Skip to content

Commit 104c457

Browse files
feat: check for failed reason in distributors (#18128)
1 parent 16ccf9b commit 104c457

File tree

4 files changed

+92
-94
lines changed

4 files changed

+92
-94
lines changed

pkg/distributor/distributor.go

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -718,25 +718,6 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
718718
return &logproto.PushResponse{}, validationErr
719719
}
720720

721-
if d.cfg.IngestLimitsEnabled {
722-
streamsAfterLimits, reasonsForHashes, err := d.ingestLimits.enforceLimits(ctx, tenantID, streams)
723-
if err != nil {
724-
level.Error(d.logger).Log("msg", "failed to check if request exceeds limits, request has been accepted", "err", err)
725-
} else if len(streamsAfterLimits) == 0 {
726-
// All streams have been dropped.
727-
level.Debug(d.logger).Log("msg", "request exceeded limits, all streams will be dropped", "tenant", tenantID)
728-
if !d.cfg.IngestLimitsDryRunEnabled {
729-
return nil, httpgrpc.Error(http.StatusTooManyRequests, "request exceeded limits: "+firstReasonForHashes(reasonsForHashes))
730-
}
731-
} else if len(streamsAfterLimits) < len(streams) {
732-
// Some streams have been dropped.
733-
level.Debug(d.logger).Log("msg", "request exceeded limits, some streams will be dropped", "tenant", tenantID)
734-
if !d.cfg.IngestLimitsDryRunEnabled {
735-
streams = streamsAfterLimits
736-
}
737-
}
738-
}
739-
740721
if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.aggregatedPushStats.lineSize) {
741722
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.RateLimited, streamResolver)
742723

@@ -746,6 +727,19 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
746727
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", err.Error())
747728
}
748729

730+
// These limits are checked after the ingestion rate limit as this
731+
// is how it works in ingesters.
732+
if d.cfg.IngestLimitsEnabled {
733+
accepted, err := d.ingestLimits.EnforceLimits(ctx, tenantID, streams)
734+
if err == nil && !d.cfg.IngestLimitsDryRunEnabled {
735+
if len(accepted) == 0 {
736+
// All streams were rejected, the request should be failed.
737+
return nil, httpgrpc.Error(http.StatusTooManyRequests, "request exceeded limits")
738+
}
739+
streams = accepted
740+
}
741+
}
742+
749743
// Nil check for performance reasons, to avoid dynamic lookup and/or no-op
750744
// function calls that cannot be inlined.
751745
if d.tee != nil {

pkg/distributor/distributor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2462,7 +2462,7 @@ func TestDistributor_PushIngestLimits(t *testing.T) {
24622462
Reason: uint32(limits.ReasonMaxStreams),
24632463
}},
24642464
},
2465-
expectedErr: "rpc error: code = Code(429) desc = request exceeded limits: max streams",
2465+
expectedErr: "rpc error: code = Code(429) desc = request exceeded limits",
24662466
}, {
24672467
name: "one of two streams exceed max stream limit, request is accepted",
24682468
ingestLimitsEnabled: true,

pkg/distributor/ingest_limits.go

Lines changed: 58 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818

1919
// ingestLimitsFrontendClient is used for tests.
2020
type ingestLimitsFrontendClient interface {
21-
exceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error)
21+
ExceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error)
2222
}
2323

2424
// ingestLimitsFrontendRingClient uses the ring to query ingest-limits frontends.
@@ -35,7 +35,7 @@ func newIngestLimitsFrontendRingClient(ring ring.ReadRing, pool *ring_client.Poo
3535
}
3636

3737
// Implements the ingestLimitsFrontendClient interface.
38-
func (c *ingestLimitsFrontendRingClient) exceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
38+
func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
3939
// We use an FNV-1 of all stream hashes in the request to load balance requests
4040
// to limits-frontends instances.
4141
h := fnv.New32()
@@ -78,64 +78,85 @@ func (c *ingestLimitsFrontendRingClient) exceedsLimits(ctx context.Context, req
7878

7979
type ingestLimits struct {
8080
client ingestLimitsFrontendClient
81-
limitsFailures prometheus.Counter
81+
requests prometheus.Counter
82+
requestsFailed prometheus.Counter
8283
}
8384

8485
func newIngestLimits(client ingestLimitsFrontendClient, r prometheus.Registerer) *ingestLimits {
8586
return &ingestLimits{
8687
client: client,
87-
limitsFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{
88-
Name: "loki_distributor_ingest_limits_failures_total",
89-
Help: "The total number of failures checking ingest limits.",
88+
requests: promauto.With(r).NewCounter(prometheus.CounterOpts{
89+
Name: "loki_distributor_ingest_limits_requests_total",
90+
Help: "The total number of requests.",
91+
}),
92+
requestsFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{
93+
Name: "loki_distributor_ingest_limits_requests_failed_total",
94+
Help: "The total number of requests that failed.",
9095
}),
9196
}
9297
}
9398

94-
// enforceLimits returns a slice of streams that are within the per-tenant
95-
// limits, and in the case where one or more streams exceed per-tenant
96-
// limits, the reasons those streams were not included in the result.
97-
// An error is returned if per-tenant limits could not be enforced.
98-
func (l *ingestLimits) enforceLimits(ctx context.Context, tenant string, streams []KeyedStream) ([]KeyedStream, map[uint64][]string, error) {
99-
exceedsLimits, reasons, err := l.exceedsLimits(ctx, tenant, streams)
100-
if !exceedsLimits || err != nil {
101-
return streams, nil, err
99+
// EnforceLimits checks all streams against the per-tenant limits and returns
100+
// a slice containing the streams that are accepted (within the per-tenant
101+
// limits). Any streams that could not have their limits checked are also
102+
// accepted.
103+
func (l *ingestLimits) EnforceLimits(ctx context.Context, tenant string, streams []KeyedStream) ([]KeyedStream, error) {
104+
results, err := l.ExceedsLimits(ctx, tenant, streams)
105+
if err != nil {
106+
return streams, err
107+
}
108+
// Fast path. No results means all streams were accepted and there were
109+
// no failures, so we can return the input streams.
110+
if len(results) == 0 {
111+
return streams, nil
102112
}
103113
// We can do this without allocation if needed, but doing so will modify
104114
// the original backing array. See "Filtering without allocation" from
105115
// https://go.dev/wiki/SliceTricks.
106-
withinLimits := make([]KeyedStream, 0, len(streams))
116+
accepted := make([]KeyedStream, 0, len(streams))
107117
for _, s := range streams {
108-
if _, ok := reasons[s.HashKeyNoShard]; !ok {
109-
withinLimits = append(withinLimits, s)
118+
// Check each stream to see if it failed.
119+
// TODO(grobinson): We have an O(N*M) loop here. Need to benchmark if
120+
// its faster to do this or if we should create a map instead.
121+
var (
122+
found bool
123+
reason uint32
124+
)
125+
for _, res := range results {
126+
if res.StreamHash == s.HashKeyNoShard {
127+
found = true
128+
reason = res.Reason
129+
break
130+
}
131+
}
132+
if !found || reason == uint32(limits.ReasonFailed) {
133+
accepted = append(accepted, s)
110134
}
111135
}
112-
return withinLimits, reasons, nil
136+
return accepted, nil
113137
}
114138

115-
// ExceedsLimits returns true if one or more streams exceeds per-tenant limits,
116-
// and false if no streams exceed per-tenant limits. In the case where one or
117-
// more streams exceeds per-tenant limits, it returns the reasons for each stream.
118-
// An error is returned if per-tenant limits could not be checked.
119-
func (l *ingestLimits) exceedsLimits(ctx context.Context, tenant string, streams []KeyedStream) (bool, map[uint64][]string, error) {
139+
// ExceedsLimits checks all streams against the per-tenant limits. It returns
140+
// an error if the client failed to send the request or receive a response
141+
// from the server. Any streams that could not have their limits checked
142+
// and returned in the results with the reason "ReasonFailed".
143+
func (l *ingestLimits) ExceedsLimits(
144+
ctx context.Context,
145+
tenant string,
146+
streams []KeyedStream,
147+
) ([]*proto.ExceedsLimitsResult, error) {
148+
l.requests.Inc()
120149
req, err := newExceedsLimitsRequest(tenant, streams)
121150
if err != nil {
122-
return false, nil, err
151+
l.requestsFailed.Inc()
152+
return nil, err
123153
}
124-
resp, err := l.client.exceedsLimits(ctx, req)
154+
resp, err := l.client.ExceedsLimits(ctx, req)
125155
if err != nil {
126-
return false, nil, err
127-
}
128-
if len(resp.Results) == 0 {
129-
return false, nil, nil
156+
l.requestsFailed.Inc()
157+
return nil, err
130158
}
131-
reasonsForHashes := make(map[uint64][]string)
132-
for _, result := range resp.Results {
133-
reasons := reasonsForHashes[result.StreamHash]
134-
humanized := limits.Reason(result.Reason).String()
135-
reasons = append(reasons, humanized)
136-
reasonsForHashes[result.StreamHash] = reasons
137-
}
138-
return true, reasonsForHashes, nil
159+
return resp.Results, nil
139160
}
140161

141162
func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.ExceedsLimitsRequest, error) {
@@ -156,10 +177,3 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee
156177
Streams: streamMetadata,
157178
}, nil
158179
}
159-
160-
func firstReasonForHashes(reasonsForHashes map[uint64][]string) string {
161-
for _, reasons := range reasonsForHashes {
162-
return reasons[0]
163-
}
164-
return "unknown reason"
165-
}

pkg/distributor/ingest_limits_test.go

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type mockIngestLimitsFrontendClient struct {
2626
}
2727

2828
// Implements the ingestLimitsFrontendClient interface.
29-
func (c *mockIngestLimitsFrontendClient) exceedsLimits(_ context.Context, r *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
29+
func (c *mockIngestLimitsFrontendClient) ExceedsLimits(_ context.Context, r *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
3030
c.calls.Add(1)
3131
if c.expectedRequest != nil {
3232
require.Equal(c.t, c.expectedRequest, r)
@@ -49,7 +49,6 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
4949
response *proto.ExceedsLimitsResponse
5050
responseErr error
5151
expectedStreams []KeyedStream
52-
expectedReasons map[uint64][]string
5352
expectedErr string
5453
}{{
5554
// This test also asserts that streams are returned unmodified.
@@ -116,7 +115,6 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
116115
}},
117116
},
118117
expectedStreams: []KeyedStream{},
119-
expectedReasons: map[uint64][]string{1: {"max streams"}},
120118
}, {
121119
name: "one of two streams exceeds limits",
122120
tenant: "test",
@@ -145,7 +143,6 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
145143
HashKey: 2000, // Should not be used.
146144
HashKeyNoShard: 2,
147145
}},
148-
expectedReasons: map[uint64][]string{1: {"max streams"}},
149146
}, {
150147
name: "does not exceed limits",
151148
tenant: "test",
@@ -174,7 +171,6 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
174171
HashKey: 2000, // Should not be used.
175172
HashKeyNoShard: 2,
176173
}},
177-
expectedReasons: nil,
178174
}}
179175

180176
for _, test := range tests {
@@ -188,35 +184,29 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
188184
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
189185
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
190186
defer cancel()
191-
streams, reasons, err := l.enforceLimits(ctx, test.tenant, test.streams)
187+
accepted, err := l.EnforceLimits(ctx, test.tenant, test.streams)
192188
if test.expectedErr != "" {
193189
require.EqualError(t, err, test.expectedErr)
194190
// The streams should be returned unmodified.
195-
require.Equal(t, test.streams, streams)
196-
require.Nil(t, reasons)
191+
require.Equal(t, test.streams, accepted)
197192
} else {
198193
require.Nil(t, err)
199-
require.Equal(t, test.expectedStreams, streams)
200-
require.Equal(t, test.expectedReasons, reasons)
194+
require.Equal(t, test.expectedStreams, accepted)
201195
}
202196
})
203197
}
204198
}
205199

206-
// This test asserts that when checking ingest limits the expected proto
207-
// message is sent, and that for a given response, the result contains the
208-
// expected streams each with their expected reasons.
209200
func TestIngestLimits_ExceedsLimits(t *testing.T) {
210201
tests := []struct {
211-
name string
212-
tenant string
213-
streams []KeyedStream
214-
expectedRequest *proto.ExceedsLimitsRequest
215-
response *proto.ExceedsLimitsResponse
216-
responseErr error
217-
expectedExceedsLimits bool
218-
expectedReasons map[uint64][]string
219-
expectedErr string
202+
name string
203+
tenant string
204+
streams []KeyedStream
205+
expectedRequest *proto.ExceedsLimitsRequest
206+
response *proto.ExceedsLimitsResponse
207+
responseErr error
208+
expectedResult []*proto.ExceedsLimitsResult
209+
expectedErr string
220210
}{{
221211
name: "error should be returned if limits cannot be checked",
222212
tenant: "test",
@@ -249,8 +239,10 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
249239
Reason: uint32(limits.ReasonMaxStreams),
250240
}},
251241
},
252-
expectedExceedsLimits: true,
253-
expectedReasons: map[uint64][]string{1: {"max streams"}},
242+
expectedResult: []*proto.ExceedsLimitsResult{{
243+
StreamHash: 1,
244+
Reason: uint32(limits.ReasonMaxStreams),
245+
}},
254246
}, {
255247
name: "does not exceed limits",
256248
tenant: "test",
@@ -266,7 +258,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
266258
response: &proto.ExceedsLimitsResponse{
267259
Results: []*proto.ExceedsLimitsResult{},
268260
},
269-
expectedReasons: nil,
261+
expectedResult: []*proto.ExceedsLimitsResult{},
270262
}}
271263

272264
for _, test := range tests {
@@ -280,15 +272,13 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
280272
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
281273
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
282274
defer cancel()
283-
exceedsLimits, reasons, err := l.exceedsLimits(ctx, test.tenant, test.streams)
275+
res, err := l.ExceedsLimits(ctx, test.tenant, test.streams)
284276
if test.expectedErr != "" {
285277
require.EqualError(t, err, test.expectedErr)
286-
require.False(t, exceedsLimits)
287-
require.Nil(t, reasons)
278+
require.Nil(t, res)
288279
} else {
289280
require.Nil(t, err)
290-
require.Equal(t, test.expectedExceedsLimits, exceedsLimits)
291-
require.Equal(t, test.expectedReasons, reasons)
281+
require.Equal(t, test.expectedResult, res)
292282
}
293283
})
294284
}

0 commit comments

Comments
 (0)