Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2459,10 +2459,10 @@ func TestDistributor_PushIngestLimits(t *testing.T) {
limitsResponse: &limitsproto.ExceedsLimitsResponse{
Results: []*limitsproto.ExceedsLimitsResult{{
StreamHash: 0x90eb45def17f924,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
},
expectedErr: "rpc error: code = Code(429) desc = request exceeded limits: max streams exceeded",
expectedErr: "rpc error: code = Code(429) desc = request exceeded limits: max streams",
}, {
name: "one of two streams exceed max stream limit, request is accepted",
ingestLimitsEnabled: true,
Expand Down Expand Up @@ -2496,7 +2496,7 @@ func TestDistributor_PushIngestLimits(t *testing.T) {
limitsResponse: &limitsproto.ExceedsLimitsResponse{
Results: []*limitsproto.ExceedsLimitsResult{{
StreamHash: 1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
},
}, {
Expand Down Expand Up @@ -2524,7 +2524,7 @@ func TestDistributor_PushIngestLimits(t *testing.T) {
limitsResponse: &limitsproto.ExceedsLimitsResponse{
Results: []*limitsproto.ExceedsLimitsResult{{
StreamHash: 1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
},
}, {
Expand Down
12 changes: 6 additions & 6 deletions pkg/distributor/ingest_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
response: &proto.ExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
},
expectedStreams: []KeyedStream{},
expectedReasons: map[uint64][]string{1: {"max streams exceeded"}},
expectedReasons: map[uint64][]string{1: {"max streams"}},
}, {
name: "one of two streams exceeds limits",
tenant: "test",
Expand All @@ -138,14 +138,14 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
response: &proto.ExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
},
expectedStreams: []KeyedStream{{
HashKey: 2000, // Should not be used.
HashKeyNoShard: 2,
}},
expectedReasons: map[uint64][]string{1: {"max streams exceeded"}},
expectedReasons: map[uint64][]string{1: {"max streams"}},
}, {
name: "does not exceed limits",
tenant: "test",
Expand Down Expand Up @@ -246,11 +246,11 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
response: &proto.ExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
},
expectedExceedsLimits: true,
expectedReasons: map[uint64][]string{1: {"max streams exceeded"}},
expectedReasons: map[uint64][]string{1: {"max streams"}},
}, {
name: "does not exceed limits",
tenant: "test",
Expand Down
20 changes: 10 additions & 10 deletions pkg/limits/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
exceedsLimitsResponses: []*proto.ExceedsLimitsResponse{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}},
expected: &proto.ExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
},
}, {
Expand Down Expand Up @@ -77,19 +77,19 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
exceedsLimitsResponses: []*proto.ExceedsLimitsResponse{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}, {
StreamHash: 0x4,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}},
expected: &proto.ExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}, {
StreamHash: 0x4,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
},
}, {
Expand All @@ -107,21 +107,21 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
exceedsLimitsResponses: []*proto.ExceedsLimitsResponse{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}, {
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x4,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}},
expected: &proto.ExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}, {
StreamHash: 0x4,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
},
}}
Expand Down
4 changes: 2 additions & 2 deletions pkg/limits/frontend/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestFrontend_ServeHTTP(t *testing.T) {
exceedsLimitsResponses: []*proto.ExceedsLimitsResponse{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}},
request: httpExceedsLimitsRequest{
Expand All @@ -64,7 +64,7 @@ func TestFrontend_ServeHTTP(t *testing.T) {
expected: httpExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
},
}}
Expand Down
28 changes: 14 additions & 14 deletions pkg/limits/frontend/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
exceedsLimitsResponses: [][]*proto.ExceedsLimitsResponse{{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}}},
exceedsLimitsResponseErrs: [][]error{{nil}},
expected: []*proto.ExceedsLimitsResponse{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}},
}, {
Expand Down Expand Up @@ -145,15 +145,15 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
nil, {{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}},
},
exceedsLimitsResponseErrs: [][]error{{nil}, {nil}},
expected: []*proto.ExceedsLimitsResponse{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}},
}, {
Expand Down Expand Up @@ -203,15 +203,15 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
nil, {{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}},
},
exceedsLimitsResponseErrs: [][]error{{nil}, {nil}},
expected: []*proto.ExceedsLimitsResponse{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}},
}, {
Expand Down Expand Up @@ -261,24 +261,24 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
exceedsLimitsResponses: [][]*proto.ExceedsLimitsResponse{{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x2,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}}, {{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}}},
exceedsLimitsResponseErrs: [][]error{{nil}, {nil}},
expected: []*proto.ExceedsLimitsResponse{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}, {
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x2,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}},
}, {
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
exceedsLimitsResponses: [][]*proto.ExceedsLimitsResponse{{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x2,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}}, nil},
exceedsLimitsResponseErrs: [][]error{{nil}, {
Expand All @@ -336,7 +336,7 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
expected: []*proto.ExceedsLimitsResponse{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x2,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}},
}, {
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
exceedsLimitsResponses: [][]*proto.ExceedsLimitsResponse{{nil}, {{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}}},
exceedsLimitsResponseErrs: [][]error{{
Expand All @@ -397,7 +397,7 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
expected: []*proto.ExceedsLimitsResponse{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsMaxStreams),
Reason: uint32(limits.ReasonMaxStreams),
}},
}},
}}
Expand Down
2 changes: 1 addition & 1 deletion pkg/limits/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (c *limitsChecker) ExceedsLimits(ctx context.Context, req *proto.ExceedsLim
for _, stream := range rejected {
results = append(results, &proto.ExceedsLimitsResult{
StreamHash: stream.StreamHash,
Reason: uint32(ReasonExceedsMaxStreams),
Reason: uint32(ReasonMaxStreams),
})
}

Expand Down
16 changes: 11 additions & 5 deletions pkg/limits/reason.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ package limits
type Reason int

const (
// ReasonExceedsMaxStreams is returned when a tenant exceeds the maximum
// number of active streams as per their per-tenant limit.
ReasonExceedsMaxStreams Reason = iota
// ReasonFailed is the reason returned when a stream cannot be checked
// against limits due to an error.
ReasonFailed Reason = iota + 1

// ReasonMaxStreams is returned when a stream cannot be accepted because
// the tenant has either reached or exceeded their maximum stream limit.
ReasonMaxStreams
)

func (r Reason) String() string {
switch r {
case ReasonExceedsMaxStreams:
return "max streams exceeded"
case ReasonFailed:
return "failed"
case ReasonMaxStreams:
return "max streams"
default:
return "unknown reason"
}
Expand Down
Loading