diff --git a/pkg/limits/frontend/frontend.go b/pkg/limits/frontend/frontend.go index 378fbc9cc5e57..b84df429b879c 100644 --- a/pkg/limits/frontend/frontend.go +++ b/pkg/limits/frontend/frontend.go @@ -14,7 +14,9 @@ import ( "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/grafana/loki/v3/pkg/limits" limits_client "github.com/grafana/loki/v3/pkg/limits/client" "github.com/grafana/loki/v3/pkg/limits/proto" ) @@ -32,6 +34,11 @@ type Frontend struct { subservicesWatcher *services.FailureWatcher lifecycler *ring.Lifecycler lifecyclerWatcher *services.FailureWatcher + + // Metrics. + streams prometheus.Counter + streamsFailed prometheus.Counter + streamsRejected prometheus.Counter } // New returns a new Frontend. @@ -62,6 +69,24 @@ func New(cfg Config, ringName string, limitsRing ring.ReadRing, logger log.Logge logger: logger, gatherer: gatherer, assignedPartitionsCache: assignedPartitionsCache, + streams: promauto.With(reg).NewCounter( + prometheus.CounterOpts{ + Name: "loki_ingest_limits_frontend_streams_total", + Help: "The total number of received streams.", + }, + ), + streamsFailed: promauto.With(reg).NewCounter( + prometheus.CounterOpts{ + Name: "loki_ingest_limits_frontend_streams_failed_total", + Help: "The total number of received streams that could not be checked.", + }, + ), + streamsRejected: promauto.With(reg).NewCounter( + prometheus.CounterOpts{ + Name: "loki_ingest_limits_frontend_streams_rejected_total", + Help: "The total number of rejected streams.", + }, + ), } lifecycler, err := ring.NewLifecycler(cfg.LifecyclerConfig, f, RingName, RingKey, true, logger, reg) @@ -89,13 +114,32 @@ func New(cfg Config, ringName string, limitsRing ring.ReadRing, logger log.Logge // ExceedsLimits implements proto.IngestLimitsFrontendClient. func (f *Frontend) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) { + f.streams.Add(float64(len(req.Streams))) + results := make([]*proto.ExceedsLimitsResult, 0, len(req.Streams)) resps, err := f.gatherer.ExceedsLimits(ctx, req) if err != nil { - return nil, err - } - results := make([]*proto.ExceedsLimitsResult, 0, len(req.Streams)) - for _, resp := range resps { - results = append(results, resp.Results...) + // If the entire call failed, then all streams failed. + for _, stream := range req.Streams { + results = append(results, &proto.ExceedsLimitsResult{ + StreamHash: stream.StreamHash, + Reason: uint32(limits.ReasonFailed), + }) + } + f.streamsFailed.Add(float64(len(req.Streams))) + level.Error(f.logger).Log("msg", "failed to check request against limits", "err", err) + } else { + for _, resp := range resps { + for _, res := range resp.Results { + // Even if the call succeeded, some (or all) streams might + // still have failed. + if res.Reason == uint32(limits.ReasonFailed) { + f.streamsFailed.Inc() + } else { + f.streamsRejected.Inc() + } + } + results = append(results, resp.Results...) + } } return &proto.ExceedsLimitsResponse{Results: results}, nil } diff --git a/pkg/limits/frontend/frontend_test.go b/pkg/limits/frontend/frontend_test.go index 5f7b12f414a58..c2e31c1ae40de 100644 --- a/pkg/limits/frontend/frontend_test.go +++ b/pkg/limits/frontend/frontend_test.go @@ -2,9 +2,14 @@ package frontend import ( "context" + "errors" "testing" "time" + "github.com/go-kit/log" + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/ring" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/limits" @@ -16,6 +21,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) { name string exceedsLimitsRequest *proto.ExceedsLimitsRequest exceedsLimitsResponses []*proto.ExceedsLimitsResponse + err error expected *proto.ExceedsLimitsResponse }{{ name: "no streams", @@ -124,16 +130,49 @@ func TestFrontend_ExceedsLimits(t *testing.T) { Reason: uint32(limits.ReasonMaxStreams), }}, }, + }, { + name: "unexpected error, response with failed reason", + exceedsLimitsRequest: &proto.ExceedsLimitsRequest{ + Tenant: "test", + Streams: []*proto.StreamMetadata{{ + StreamHash: 0x1, + TotalSize: 0x5, + }, { + StreamHash: 0x2, + TotalSize: 0x9, + }}, + }, + err: errors.New("an unexpected error occurred"), + expected: &proto.ExceedsLimitsResponse{ + Results: []*proto.ExceedsLimitsResult{{ + StreamHash: 0x1, + Reason: uint32(limits.ReasonFailed), + }, { + StreamHash: 0x2, + Reason: uint32(limits.ReasonFailed), + }}, + }, }} for _, test := range tests { t.Run(test.name, func(t *testing.T) { - f := Frontend{ - gatherer: &mockExceedsLimitsGatherer{ - t: t, - expectedExceedsLimitsRequest: test.exceedsLimitsRequest, - exceedsLimitsResponses: test.exceedsLimitsResponses, + readRing, _ := newMockRingWithClientPool(t, "test", nil, nil) + f, err := New(Config{ + LifecyclerConfig: ring.LifecyclerConfig{ + RingConfig: ring.Config{ + KVStore: kv.Config{ + Store: "inmemory", + }, + }, }, + }, "test", readRing, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + // Replace with our mock. + f.gatherer = &mockExceedsLimitsGatherer{ + t: t, + expectedExceedsLimitsRequest: test.exceedsLimitsRequest, + exceedsLimitsResponses: test.exceedsLimitsResponses, + err: test.err, } ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() diff --git a/pkg/limits/frontend/http_test.go b/pkg/limits/frontend/http_test.go index 43373ae314f0e..34516ec149728 100644 --- a/pkg/limits/frontend/http_test.go +++ b/pkg/limits/frontend/http_test.go @@ -8,6 +8,10 @@ import ( "net/http/httptest" "testing" + "github.com/go-kit/log" + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/ring" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/limits" @@ -71,14 +75,23 @@ func TestFrontend_ServeHTTP(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - f := Frontend{ - gatherer: &mockExceedsLimitsGatherer{ - t: t, - expectedExceedsLimitsRequest: test.expectedExceedsLimitsRequest, - exceedsLimitsResponses: test.exceedsLimitsResponses, + readRing, _ := newMockRingWithClientPool(t, "test", nil, nil) + f, err := New(Config{ + LifecyclerConfig: ring.LifecyclerConfig{ + RingConfig: ring.Config{ + KVStore: kv.Config{ + Store: "inmemory", + }, + }, }, + }, "test", readRing, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + f.gatherer = &mockExceedsLimitsGatherer{ + t: t, + expectedExceedsLimitsRequest: test.expectedExceedsLimitsRequest, + exceedsLimitsResponses: test.exceedsLimitsResponses, } - ts := httptest.NewServer(&f) + ts := httptest.NewServer(f) defer ts.Close() b, err := json.Marshal(test.request) diff --git a/pkg/limits/frontend/mock_test.go b/pkg/limits/frontend/mock_test.go index e4fdde2dd0ece..040db2a2623b9 100644 --- a/pkg/limits/frontend/mock_test.go +++ b/pkg/limits/frontend/mock_test.go @@ -24,13 +24,14 @@ type mockExceedsLimitsGatherer struct { expectedExceedsLimitsRequest *proto.ExceedsLimitsRequest exceedsLimitsResponses []*proto.ExceedsLimitsResponse + err error } -func (g *mockExceedsLimitsGatherer) ExceedsLimits(_ context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) { - if expected := g.expectedExceedsLimitsRequest; expected != nil { - require.Equal(g.t, expected, req) +func (m *mockExceedsLimitsGatherer) ExceedsLimits(_ context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) { + if expected := m.expectedExceedsLimitsRequest; expected != nil { + require.Equal(m.t, expected, req) } - return g.exceedsLimitsResponses, nil + return m.exceedsLimitsResponses, m.err } // mockIngestLimitsClient mocks proto.IngestLimitsClient. diff --git a/pkg/limits/frontend/ring.go b/pkg/limits/frontend/ring.go index 835a0716255b4..c66a8cf3a7007 100644 --- a/pkg/limits/frontend/ring.go +++ b/pkg/limits/frontend/ring.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "golang.org/x/sync/errgroup" + "github.com/grafana/loki/v3/pkg/limits" "github.com/grafana/loki/v3/pkg/limits/proto" ) @@ -41,9 +42,6 @@ type ringGatherer struct { zoneCmp func(a, b string) int // Metrics. - streams prometheus.Counter - streamsFailed prometheus.Counter - streamsRejected prometheus.Counter partitionsMissing *prometheus.CounterVec } @@ -63,24 +61,6 @@ func newRingGatherer( numPartitions: numPartitions, assignedPartitionsCache: assignedPartitionsCache, zoneCmp: defaultZoneCmp, - streams: promauto.With(reg).NewCounter( - prometheus.CounterOpts{ - Name: "loki_ingest_limits_frontend_streams_total", - Help: "The total number of received streams.", - }, - ), - streamsFailed: promauto.With(reg).NewCounter( - prometheus.CounterOpts{ - Name: "loki_ingest_limits_frontend_streams_failed_total", - Help: "The total number of received streams that could not be checked.", - }, - ), - streamsRejected: promauto.With(reg).NewCounter( - prometheus.CounterOpts{ - Name: "loki_ingest_limits_frontend_streams_rejected_total", - Help: "The total number of rejected streams.", - }, - ), partitionsMissing: promauto.With(reg).NewCounterVec( prometheus.CounterOpts{ Name: "loki_ingest_limits_frontend_partitions_missing_total", @@ -92,16 +72,16 @@ func newRingGatherer( } // ExceedsLimits implements the [exceedsLimitsGatherer] interface. -func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) { +func (r *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) { if len(req.Streams) == 0 { return nil, nil } - rs, err := g.ring.GetAllHealthy(LimitsRead) + rs, err := r.ring.GetAllHealthy(LimitsRead) if err != nil { return nil, err } // Get the partition consumers for each zone. - zonesPartitions, err := g.getZoneAwarePartitionConsumers(ctx, rs.Instances) + zonesPartitions, err := r.getZoneAwarePartitionConsumers(ctx, rs.Instances) if err != nil { return nil, err } @@ -113,12 +93,11 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi for zone := range zonesPartitions { zonesToQuery = append(zonesToQuery, zone) } - slices.SortFunc(zonesToQuery, g.zoneCmp) + slices.SortFunc(zonesToQuery, r.zoneCmp) // Make a copy of the streams from the request. We will prune this slice // each time we receive the responses from a zone. streams := make([]*proto.StreamMetadata, 0, len(req.Streams)) streams = append(streams, req.Streams...) - g.streams.Add(float64(len(streams))) // Query each zone as ordered in zonesToQuery. If a zone answers all // streams, the request is satisfied and there is no need to query // subsequent zones. If a zone answers just a subset of streams @@ -129,7 +108,11 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi // zones. responses := make([]*proto.ExceedsLimitsResponse, 0) for _, zone := range zonesToQuery { - resps, answered, err := g.doExceedsLimitsRPCs(ctx, req.Tenant, streams, zonesPartitions[zone], zone) + // All streams been checked against per-tenant limits. + if len(streams) == 0 { + break + } + resps, answered, err := r.doExceedsLimitsRPCs(ctx, req.Tenant, streams, zonesPartitions[zone], zone) if err != nil { continue } @@ -145,26 +128,29 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi }) return i < len(answered) && answered[i] == stream.StreamHash }) - // All streams been checked against per-tenant limits. - if len(streams) == 0 { - break - } } - for _, resp := range responses { - g.streamsRejected.Add(float64(len(resp.Results))) + // Any unanswered streams after exhausting all zones must be failed. + if len(streams) > 0 { + failed := make([]*proto.ExceedsLimitsResult, 0, len(streams)) + for _, stream := range streams { + failed = append(failed, &proto.ExceedsLimitsResult{ + StreamHash: stream.StreamHash, + Reason: uint32(limits.ReasonFailed), + }) + } + responses = append(responses, &proto.ExceedsLimitsResponse{Results: failed}) } - g.streamsFailed.Add(float64(len(streams))) return responses, nil } -func (g *ringGatherer) doExceedsLimitsRPCs(ctx context.Context, tenant string, streams []*proto.StreamMetadata, partitions map[int32]string, zone string) ([]*proto.ExceedsLimitsResponse, []uint64, error) { +func (r *ringGatherer) doExceedsLimitsRPCs(ctx context.Context, tenant string, streams []*proto.StreamMetadata, partitions map[int32]string, zone string) ([]*proto.ExceedsLimitsResponse, []uint64, error) { // For each stream, figure out which instance consume its partition. instancesForStreams := make(map[string][]*proto.StreamMetadata) for _, stream := range streams { - partition := int32(stream.StreamHash % uint64(g.numPartitions)) + partition := int32(stream.StreamHash % uint64(r.numPartitions)) addr, ok := partitions[partition] if !ok { - g.partitionsMissing.WithLabelValues(zone).Inc() + r.partitionsMissing.WithLabelValues(zone).Inc() continue } instancesForStreams[addr] = append(instancesForStreams[addr], stream) @@ -174,9 +160,9 @@ func (g *ringGatherer) doExceedsLimitsRPCs(ctx context.Context, tenant string, s answeredCh := make(chan uint64, len(streams)) for addr, streams := range instancesForStreams { errg.Go(func() error { - client, err := g.pool.GetClientFor(addr) + client, err := r.pool.GetClientFor(addr) if err != nil { - level.Error(g.logger).Log("msg", "failed to get client for instance", "instance", addr, "err", err.Error()) + level.Error(r.logger).Log("msg", "failed to get client for instance", "instance", addr, "err", err.Error()) return nil } resp, err := client.(proto.IngestLimitsClient).ExceedsLimits(ctx, &proto.ExceedsLimitsRequest{ @@ -184,7 +170,7 @@ func (g *ringGatherer) doExceedsLimitsRPCs(ctx context.Context, tenant string, s Streams: streams, }) if err != nil { - level.Error(g.logger).Log("failed check execeed limits for instance", "instance", addr, "err", err.Error()) + level.Error(r.logger).Log("failed check execeed limits for instance", "instance", addr, "err", err.Error()) return nil } responseCh <- resp @@ -218,7 +204,7 @@ type zonePartitionConsumersResult struct { // zone will still be returned but its partition consumers will be nil. // If ZoneAwarenessEnabled is false, it returns all partition consumers under // a pseudo-zone (""). -func (g *ringGatherer) getZoneAwarePartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[string]map[int32]string, error) { +func (r *ringGatherer) getZoneAwarePartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[string]map[int32]string, error) { zoneDescs := make(map[string][]ring.InstanceDesc) for _, instance := range instances { zoneDescs[instance.Zone] = append(zoneDescs[instance.Zone], instance) @@ -228,9 +214,9 @@ func (g *ringGatherer) getZoneAwarePartitionConsumers(ctx context.Context, insta errg, ctx := errgroup.WithContext(ctx) for zone, instances := range zoneDescs { errg.Go(func() error { - res, err := g.getPartitionConsumers(ctx, instances) + res, err := r.getPartitionConsumers(ctx, instances) if err != nil { - level.Error(g.logger).Log("msg", "failed to get partition consumers for zone", "zone", zone, "err", err.Error()) + level.Error(r.logger).Log("msg", "failed to get partition consumers for zone", "zone", zone, "err", err.Error()) } // Even if the consumers could not be fetched for a zone, we // should still return the zone. @@ -271,7 +257,7 @@ type getAssignedPartitionsResponse struct { // should be called once for each zone, and instances should be filtered to // the respective zone. Alternatively, you can pass all instances for all zones // to find the most up to date consumer for each partition across all zones. -func (g *ringGatherer) getPartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[int32]string, error) { +func (r *ringGatherer) getPartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[int32]string, error) { errg, ctx := errgroup.WithContext(ctx) responseCh := make(chan getAssignedPartitionsResponse, len(instances)) for _, instance := range instances { @@ -279,24 +265,24 @@ func (g *ringGatherer) getPartitionConsumers(ctx context.Context, instances []ri // We use a cache to eliminate redundant gRPC requests for // GetAssignedPartitions as the set of assigned partitions is // expected to be stable outside consumer rebalances. - if resp, ok := g.assignedPartitionsCache.Get(instance.Addr); ok { + if resp, ok := r.assignedPartitionsCache.Get(instance.Addr); ok { responseCh <- getAssignedPartitionsResponse{ addr: instance.Addr, response: resp, } return nil } - client, err := g.pool.GetClientFor(instance.Addr) + client, err := r.pool.GetClientFor(instance.Addr) if err != nil { - level.Error(g.logger).Log("failed to get client for instance", "instance", instance.Addr, "err", err.Error()) + level.Error(r.logger).Log("failed to get client for instance", "instance", instance.Addr, "err", err.Error()) return nil } resp, err := client.(proto.IngestLimitsClient).GetAssignedPartitions(ctx, &proto.GetAssignedPartitionsRequest{}) if err != nil { - level.Error(g.logger).Log("failed to get assigned partitions for instance", "instance", instance.Addr, "err", err.Error()) + level.Error(r.logger).Log("failed to get assigned partitions for instance", "instance", instance.Addr, "err", err.Error()) return nil } - g.assignedPartitionsCache.Set(instance.Addr, resp) + r.assignedPartitionsCache.Set(instance.Addr, resp) responseCh <- getAssignedPartitionsResponse{ addr: instance.Addr, response: resp, diff --git a/pkg/limits/frontend/ring_test.go b/pkg/limits/frontend/ring_test.go index 58686e2fd91a0..9367def3d29eb 100644 --- a/pkg/limits/frontend/ring_test.go +++ b/pkg/limits/frontend/ring_test.go @@ -283,7 +283,7 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) { }}, }, { // When one instance returns an error, the streams for that instance - // are permitted. + // are failed. name: "two streams, two instances, one instance returns error", request: &proto.ExceedsLimitsRequest{ Tenant: "test", @@ -338,6 +338,11 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) { StreamHash: 0x2, Reason: uint32(limits.ReasonMaxStreams), }}, + }, { + Results: []*proto.ExceedsLimitsResult{{ + StreamHash: 0x1, + Reason: uint32(limits.ReasonFailed), + }}, }}, }, { // When one zone returns an error, the streams for that instance