Skip to content

Commit 0afb56d

Browse files
authored
fix(ingest-limits): Query all assigned partitions (#17473)
1 parent 970a886 commit 0afb56d

File tree

2 files changed

+49
-20
lines changed

2 files changed

+49
-20
lines changed

pkg/limits/frontend/ring.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (g *RingStreamUsageGatherer) forGivenReplicaSet(ctx context.Context, rs rin
7171
return nil, err
7272
}
7373

74-
instancesToQuery := make(map[string][]uint64)
74+
ownedStreamHeashes := make(map[string][]uint64)
7575
for _, hash := range r.StreamHashes {
7676
partitionID := int32(hash % uint64(g.numPartitions))
7777
addr, ok := partitionConsumers[partitionID]
@@ -80,34 +80,36 @@ func (g *RingStreamUsageGatherer) forGivenReplicaSet(ctx context.Context, rs rin
8080
level.Warn(g.logger).Log("msg", "no instance found for partition", "partition", partitionID)
8181
continue
8282
}
83-
instancesToQuery[addr] = append(instancesToQuery[addr], hash)
83+
ownedStreamHeashes[addr] = append(ownedStreamHeashes[addr], hash)
8484
}
8585

8686
errg, ctx := errgroup.WithContext(ctx)
87-
responses := make([]GetStreamUsageResponse, len(instancesToQuery))
87+
responses := make([]GetStreamUsageResponse, len(rs.Instances))
8888

89-
// Query each instance for stream usage
90-
i := 0
91-
for addr, hashes := range instancesToQuery {
92-
j := i
93-
i++
89+
// Query all healthy instances in parallel,
90+
// send requested stream hahes only to owning instances.
91+
for i, instance := range rs.Instances {
9492
errg.Go(func() error {
95-
client, err := g.pool.GetClientFor(addr)
93+
client, err := g.pool.GetClientFor(instance.Addr)
9694
if err != nil {
9795
return err
9896
}
9997

100-
protoReq := &logproto.GetStreamUsageRequest{
101-
Tenant: r.Tenant,
102-
StreamHashes: hashes,
98+
protoReq := &logproto.GetStreamUsageRequest{Tenant: r.Tenant}
99+
100+
// Only send stream hashes to the instance that owns them.
101+
// This eliminates the need in downstream callers to filter
102+
// duplicate unknown streams.
103+
if hashes, ok := ownedStreamHeashes[instance.Addr]; ok {
104+
protoReq.StreamHashes = hashes
103105
}
104106

105107
resp, err := client.(logproto.IngestLimitsClient).GetStreamUsage(ctx, protoReq)
106108
if err != nil {
107109
return err
108110
}
109111

110-
responses[j] = GetStreamUsageResponse{Addr: addr, Response: resp}
112+
responses[i] = GetStreamUsageResponse{Addr: instance.Addr, Response: resp}
111113
return nil
112114
})
113115
}

pkg/limits/frontend/ring_test.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,9 @@ func TestRingStreamUsageGatherer_GetStreamUsage(t *testing.T) {
8181
}},
8282
}, {
8383
// When there is one stream, and two instances each owning separate
84-
// partitions, only the instance owning the partition for the stream hash
85-
// should be queried.
84+
// partitions, all instances should be queried. However, only the
85+
// instance owning the partition for the stream hash should be queried
86+
// for the requested stream hashes.
8687
name: "one stream two instances",
8788
getStreamUsageRequest: GetStreamUsageRequest{
8889
Tenant: "test",
@@ -104,16 +105,29 @@ func TestRingStreamUsageGatherer_GetStreamUsage(t *testing.T) {
104105
1: time.Now().UnixNano(),
105106
},
106107
}},
107-
expectedStreamUsageRequests: []*logproto.GetStreamUsageRequest{nil, {
108+
expectedStreamUsageRequests: []*logproto.GetStreamUsageRequest{{
109+
Tenant: "test",
110+
}, {
108111
Tenant: "test",
109112
StreamHashes: []uint64{0x1},
110113
}},
111-
getStreamUsageResponses: []*logproto.GetStreamUsageResponse{nil, {
114+
getStreamUsageResponses: []*logproto.GetStreamUsageResponse{{
115+
Tenant: "test",
116+
ActiveStreams: 1,
117+
Rate: 10,
118+
}, {
112119
Tenant: "test",
113120
ActiveStreams: 1,
114121
Rate: 10,
115122
}},
116123
expectedResponses: []GetStreamUsageResponse{{
124+
Addr: "instance-0",
125+
Response: &logproto.GetStreamUsageResponse{
126+
Tenant: "test",
127+
ActiveStreams: 1,
128+
Rate: 10,
129+
},
130+
}, {
117131
Addr: "instance-1",
118132
Response: &logproto.GetStreamUsageResponse{
119133
Tenant: "test",
@@ -124,7 +138,7 @@ func TestRingStreamUsageGatherer_GetStreamUsage(t *testing.T) {
124138
}, {
125139
// When there is one stream, and two instances owning overlapping
126140
// partitions, only the instance with the latest timestamp for the relevant
127-
// partition should be queried.
141+
// partition should be queried with the requested stream hash.
128142
name: "one stream two instances, overlapping partition ownership",
129143
getStreamUsageRequest: GetStreamUsageRequest{
130144
Tenant: "test",
@@ -146,16 +160,29 @@ func TestRingStreamUsageGatherer_GetStreamUsage(t *testing.T) {
146160
1: time.Now().UnixNano(),
147161
},
148162
}},
149-
expectedStreamUsageRequests: []*logproto.GetStreamUsageRequest{nil, {
163+
expectedStreamUsageRequests: []*logproto.GetStreamUsageRequest{{
164+
Tenant: "test",
165+
}, {
150166
Tenant: "test",
151167
StreamHashes: []uint64{0x1},
152168
}},
153-
getStreamUsageResponses: []*logproto.GetStreamUsageResponse{nil, {
169+
getStreamUsageResponses: []*logproto.GetStreamUsageResponse{{
170+
Tenant: "test",
171+
ActiveStreams: 1,
172+
Rate: 10,
173+
}, {
154174
Tenant: "test",
155175
ActiveStreams: 1,
156176
Rate: 10,
157177
}},
158178
expectedResponses: []GetStreamUsageResponse{{
179+
Addr: "instance-0",
180+
Response: &logproto.GetStreamUsageResponse{
181+
Tenant: "test",
182+
ActiveStreams: 1,
183+
Rate: 10,
184+
},
185+
}, {
159186
Addr: "instance-1",
160187
Response: &logproto.GetStreamUsageResponse{
161188
Tenant: "test",

0 commit comments

Comments
 (0)