Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/limits/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func New(cfg Config, ringName string, limitsRing ring.ReadRing, logger log.Logge
} else {
assignedPartitionsCache = newTTLCache[string, *proto.GetAssignedPartitionsResponse](cfg.AssignedPartitionsCacheTTL)
}
gatherer := newRingGatherer(limitsRing, clientPool, cfg.NumPartitions, assignedPartitionsCache, logger)
gatherer := newRingGatherer(limitsRing, clientPool, cfg.NumPartitions, assignedPartitionsCache, logger, reg)

f := &Frontend{
cfg: cfg,
Expand Down
23 changes: 21 additions & 2 deletions pkg/limits/frontend/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/sync/errgroup"

"github.com/grafana/loki/v3/pkg/limits/proto"
Expand Down Expand Up @@ -37,6 +39,10 @@ type ringGatherer struct {
numPartitions int
assignedPartitionsCache cache[string, *proto.GetAssignedPartitionsResponse]
zoneCmp func(a, b string) int

// Metrics.
streams prometheus.Counter
streamsUnanswered prometheus.Counter
}

// newRingGatherer returns a new ringGatherer.
Expand All @@ -46,6 +52,7 @@ func newRingGatherer(
numPartitions int,
assignedPartitionsCache cache[string, *proto.GetAssignedPartitionsResponse],
logger log.Logger,
reg prometheus.Registerer,
) *ringGatherer {
return &ringGatherer{
logger: logger,
Expand All @@ -54,6 +61,18 @@ func newRingGatherer(
numPartitions: numPartitions,
assignedPartitionsCache: assignedPartitionsCache,
zoneCmp: defaultZoneCmp,
streams: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "loki_ingest_limits_frontend_streams_total",
Help: "The total number of received streams.",
},
),
streamsFailed: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "loki_ingest_limits_frontend_streams_failed_total",
Help: "The total number of received streams that could not be checked.",
},
),
}
}

Expand All @@ -62,6 +81,7 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi
if len(req.Streams) == 0 {
return nil, nil
}
g.streams.Add(float64(len(req.Streams)))
rs, err := g.ring.GetAllHealthy(LimitsRead)
if err != nil {
return nil, err
Expand Down Expand Up @@ -115,8 +135,7 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi
break
}
}
// TODO(grobinson): In a subsequent change, I will figure out what to do
// about unanswered streams.
g.streamsFailed.Add(float64(len(streams)))
return responses, nil
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/limits/frontend/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/limits"
Expand Down Expand Up @@ -418,7 +419,7 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
}
readRing, clientPool := newMockRingWithClientPool(t, "test", mockClients, test.instances)
cache := newNopCache[string, *proto.GetAssignedPartitionsResponse]()
g := newRingGatherer(readRing, clientPool, test.numPartitions, cache, log.NewNopLogger())
g := newRingGatherer(readRing, clientPool, test.numPartitions, cache, log.NewNopLogger(), prometheus.NewRegistry())

// Set a maximum upper bound on the test execution time.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
Expand Down Expand Up @@ -584,7 +585,7 @@ func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) {
// Set up the mocked ring and client pool for the tests.
readRing, clientPool := newMockRingWithClientPool(t, "test", mockClients, test.instances)
cache := newNopCache[string, *proto.GetAssignedPartitionsResponse]()
g := newRingGatherer(readRing, clientPool, 2, cache, log.NewNopLogger())
g := newRingGatherer(readRing, clientPool, 2, cache, log.NewNopLogger(), prometheus.NewRegistry())

// Set a maximum upper bound on the test execution time.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
Expand Down Expand Up @@ -721,7 +722,7 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) {
// Set up the mocked ring and client pool for the tests.
readRing, clientPool := newMockRingWithClientPool(t, "test", mockClients, test.instances)
cache := newNopCache[string, *proto.GetAssignedPartitionsResponse]()
g := newRingGatherer(readRing, clientPool, 1, cache, log.NewNopLogger())
g := newRingGatherer(readRing, clientPool, 1, cache, log.NewNopLogger(), prometheus.NewRegistry())

// Set a maximum upper bound on the test execution time.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
Expand Down Expand Up @@ -771,7 +772,7 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers_Caching(t *testing.T) {
// Set the cache TTL large enough that entries cannot expire (flake)
// during slow test runs.
cache := newTTLCache[string, *proto.GetAssignedPartitionsResponse](time.Minute)
g := newRingGatherer(readRing, clientPool, 2, cache, log.NewNopLogger())
g := newRingGatherer(readRing, clientPool, 2, cache, log.NewNopLogger(), prometheus.NewRegistry())

// Set a maximum upper bound on the test execution time.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
Expand Down
Loading