From f94e0de43eaea8ff452dde32649327721d76109c Mon Sep 17 00:00:00 2001 From: George Robinson Date: Wed, 4 Jun 2025 09:16:32 +0100 Subject: [PATCH 1/5] feat: add metric that tracks unanswered streams --- pkg/limits/frontend/frontend.go | 2 +- pkg/limits/frontend/ring.go | 15 +++++++++++++-- pkg/limits/frontend/ring_test.go | 9 +++++---- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/pkg/limits/frontend/frontend.go b/pkg/limits/frontend/frontend.go index d7b6aa98b4d26..378fbc9cc5e57 100644 --- a/pkg/limits/frontend/frontend.go +++ b/pkg/limits/frontend/frontend.go @@ -55,7 +55,7 @@ func New(cfg Config, ringName string, limitsRing ring.ReadRing, logger log.Logge } else { assignedPartitionsCache = newTTLCache[string, *proto.GetAssignedPartitionsResponse](cfg.AssignedPartitionsCacheTTL) } - gatherer := newRingGatherer(limitsRing, clientPool, cfg.NumPartitions, assignedPartitionsCache, logger) + gatherer := newRingGatherer(limitsRing, clientPool, cfg.NumPartitions, assignedPartitionsCache, logger, reg) f := &Frontend{ cfg: cfg, diff --git a/pkg/limits/frontend/ring.go b/pkg/limits/frontend/ring.go index e26d8b5ad43f3..6aace797a5abb 100644 --- a/pkg/limits/frontend/ring.go +++ b/pkg/limits/frontend/ring.go @@ -10,6 +10,8 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "golang.org/x/sync/errgroup" "github.com/grafana/loki/v3/pkg/limits/proto" @@ -37,6 +39,9 @@ type ringGatherer struct { numPartitions int assignedPartitionsCache cache[string, *proto.GetAssignedPartitionsResponse] zoneCmp func(a, b string) int + + // Metrics. + unansweredStreams prometheus.Counter } // newRingGatherer returns a new ringGatherer. @@ -46,6 +51,7 @@ func newRingGatherer( numPartitions int, assignedPartitionsCache cache[string, *proto.GetAssignedPartitionsResponse], logger log.Logger, + reg prometheus.Registerer, ) *ringGatherer { return &ringGatherer{ logger: logger, @@ -54,6 +60,12 @@ func newRingGatherer( numPartitions: numPartitions, assignedPartitionsCache: assignedPartitionsCache, zoneCmp: defaultZoneCmp, + unansweredStreams: promauto.With(reg).NewCounter( + prometheus.CounterOpts{ + Name: "loki_ingest_limits_frontend_unanswered_streams_total", + Help: "The total number of unanswered streams.", + }, + ), } } @@ -115,8 +127,7 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi break } } - // TODO(grobinson): In a subsequent change, I will figure out what to do - // about unanswered streams. + g.unansweredStreams.Add(float64(len(streams))) return responses, nil } diff --git a/pkg/limits/frontend/ring_test.go b/pkg/limits/frontend/ring_test.go index 3fedf680f7756..45c30de1795de 100644 --- a/pkg/limits/frontend/ring_test.go +++ b/pkg/limits/frontend/ring_test.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/ring" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/limits" @@ -418,7 +419,7 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) { } readRing, clientPool := newMockRingWithClientPool(t, "test", mockClients, test.instances) cache := newNopCache[string, *proto.GetAssignedPartitionsResponse]() - g := newRingGatherer(readRing, clientPool, test.numPartitions, cache, log.NewNopLogger()) + g := newRingGatherer(readRing, clientPool, test.numPartitions, cache, log.NewNopLogger(), prometheus.NewRegistry()) // Set a maximum upper bound on the test execution time. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) @@ -584,7 +585,7 @@ func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) { // Set up the mocked ring and client pool for the tests. readRing, clientPool := newMockRingWithClientPool(t, "test", mockClients, test.instances) cache := newNopCache[string, *proto.GetAssignedPartitionsResponse]() - g := newRingGatherer(readRing, clientPool, 2, cache, log.NewNopLogger()) + g := newRingGatherer(readRing, clientPool, 2, cache, log.NewNopLogger(), prometheus.NewRegistry()) // Set a maximum upper bound on the test execution time. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) @@ -721,7 +722,7 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) { // Set up the mocked ring and client pool for the tests. readRing, clientPool := newMockRingWithClientPool(t, "test", mockClients, test.instances) cache := newNopCache[string, *proto.GetAssignedPartitionsResponse]() - g := newRingGatherer(readRing, clientPool, 1, cache, log.NewNopLogger()) + g := newRingGatherer(readRing, clientPool, 1, cache, log.NewNopLogger(), prometheus.NewRegistry()) // Set a maximum upper bound on the test execution time. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) @@ -771,7 +772,7 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers_Caching(t *testing.T) { // Set the cache TTL large enough that entries cannot expire (flake) // during slow test runs. cache := newTTLCache[string, *proto.GetAssignedPartitionsResponse](time.Minute) - g := newRingGatherer(readRing, clientPool, 2, cache, log.NewNopLogger()) + g := newRingGatherer(readRing, clientPool, 2, cache, log.NewNopLogger(), prometheus.NewRegistry()) // Set a maximum upper bound on the test execution time. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) From 8375f4d2c576976ff31012b8dfd63e461ae43ff1 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Wed, 4 Jun 2025 09:21:09 +0100 Subject: [PATCH 2/5] feat: add streams_total and unanswered_streams_total --- pkg/limits/frontend/ring.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/limits/frontend/ring.go b/pkg/limits/frontend/ring.go index 6aace797a5abb..8080652fbacdd 100644 --- a/pkg/limits/frontend/ring.go +++ b/pkg/limits/frontend/ring.go @@ -41,7 +41,8 @@ type ringGatherer struct { zoneCmp func(a, b string) int // Metrics. - unansweredStreams prometheus.Counter + streams prometheus.Counter + streamsUnanswered prometheus.Counter } // newRingGatherer returns a new ringGatherer. @@ -60,10 +61,16 @@ func newRingGatherer( numPartitions: numPartitions, assignedPartitionsCache: assignedPartitionsCache, zoneCmp: defaultZoneCmp, - unansweredStreams: promauto.With(reg).NewCounter( + streams: promauto.With(reg).NewCounter( + prometheus.CounterOpts{ + Name: "loki_ingest_limits_frontend_streams_total", + Help: "The total number of received streams.", + }, + ), + streamsUnanswered: promauto.With(reg).NewCounter( prometheus.CounterOpts{ Name: "loki_ingest_limits_frontend_unanswered_streams_total", - Help: "The total number of unanswered streams.", + Help: "The total number of received streams that could not be checked.", }, ), } @@ -74,6 +81,7 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi if len(req.Streams) == 0 { return nil, nil } + g.streams.Add(float64(len(req.Streams))) rs, err := g.ring.GetAllHealthy(LimitsRead) if err != nil { return nil, err @@ -127,7 +135,7 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi break } } - g.unansweredStreams.Add(float64(len(streams))) + g.streamsUnanswered.Add(float64(len(streams))) return responses, nil } From 44b3291e3295695a242cc532a97a145f3eb42b42 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Wed, 4 Jun 2025 09:22:08 +0100 Subject: [PATCH 3/5] Rename unanswered to failed --- pkg/limits/frontend/ring.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/limits/frontend/ring.go b/pkg/limits/frontend/ring.go index 8080652fbacdd..b7963d358d3e0 100644 --- a/pkg/limits/frontend/ring.go +++ b/pkg/limits/frontend/ring.go @@ -67,9 +67,9 @@ func newRingGatherer( Help: "The total number of received streams.", }, ), - streamsUnanswered: promauto.With(reg).NewCounter( + streamsFailed: promauto.With(reg).NewCounter( prometheus.CounterOpts{ - Name: "loki_ingest_limits_frontend_unanswered_streams_total", + Name: "loki_ingest_limits_frontend_streams_failed_total", Help: "The total number of received streams that could not be checked.", }, ), @@ -135,7 +135,7 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi break } } - g.streamsUnanswered.Add(float64(len(streams))) + g.streamsFailed.Add(float64(len(streams))) return responses, nil } From 0a90343031c76955a91fa38a2528983ff10c97f7 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Wed, 4 Jun 2025 09:26:08 +0100 Subject: [PATCH 4/5] Fix lint --- pkg/limits/frontend/ring.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/limits/frontend/ring.go b/pkg/limits/frontend/ring.go index b7963d358d3e0..beb29599ea91e 100644 --- a/pkg/limits/frontend/ring.go +++ b/pkg/limits/frontend/ring.go @@ -41,8 +41,8 @@ type ringGatherer struct { zoneCmp func(a, b string) int // Metrics. - streams prometheus.Counter - streamsUnanswered prometheus.Counter + streams prometheus.Counter + streamsFailed prometheus.Counter } // newRingGatherer returns a new ringGatherer. From 8fc782664e1aa3c4016266c682d99db884155a63 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Wed, 4 Jun 2025 09:33:37 +0100 Subject: [PATCH 5/5] Move metric increment to Line 106 --- pkg/limits/frontend/ring.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/limits/frontend/ring.go b/pkg/limits/frontend/ring.go index beb29599ea91e..f81cffdf813af 100644 --- a/pkg/limits/frontend/ring.go +++ b/pkg/limits/frontend/ring.go @@ -81,7 +81,6 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi if len(req.Streams) == 0 { return nil, nil } - g.streams.Add(float64(len(req.Streams))) rs, err := g.ring.GetAllHealthy(LimitsRead) if err != nil { return nil, err @@ -104,6 +103,7 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi // each time we receive the responses from a zone. streams := make([]*proto.StreamMetadata, 0, len(req.Streams)) streams = append(streams, req.Streams...) + g.streams.Add(float64(len(streams))) // Query each zone as ordered in zonesToQuery. If a zone answers all // streams, the request is satisfied and there is no need to query // subsequent zones. If a zone answers just a subset of streams