Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 49 additions & 5 deletions pkg/limits/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/v3/pkg/limits"
limits_client "github.com/grafana/loki/v3/pkg/limits/client"
"github.com/grafana/loki/v3/pkg/limits/proto"
)
Expand All @@ -32,6 +34,11 @@ type Frontend struct {
subservicesWatcher *services.FailureWatcher
lifecycler *ring.Lifecycler
lifecyclerWatcher *services.FailureWatcher

// Metrics.
streams prometheus.Counter
streamsFailed prometheus.Counter
streamsRejected prometheus.Counter
}

// New returns a new Frontend.
Expand Down Expand Up @@ -62,6 +69,24 @@ func New(cfg Config, ringName string, limitsRing ring.ReadRing, logger log.Logge
logger: logger,
gatherer: gatherer,
assignedPartitionsCache: assignedPartitionsCache,
streams: promauto.With(reg).NewCounter(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to this I was able to move the metrics out of ring.go and into frontend.go.

prometheus.CounterOpts{
Name: "loki_ingest_limits_frontend_streams_total",
Help: "The total number of received streams.",
},
),
streamsFailed: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "loki_ingest_limits_frontend_streams_failed_total",
Help: "The total number of received streams that could not be checked.",
},
),
streamsRejected: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "loki_ingest_limits_frontend_streams_rejected_total",
Help: "The total number of rejected streams.",
},
),
}

lifecycler, err := ring.NewLifecycler(cfg.LifecyclerConfig, f, RingName, RingKey, true, logger, reg)
Expand Down Expand Up @@ -89,13 +114,32 @@ func New(cfg Config, ringName string, limitsRing ring.ReadRing, logger log.Logge

// ExceedsLimits implements proto.IngestLimitsFrontendClient.
func (f *Frontend) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
f.streams.Add(float64(len(req.Streams)))
results := make([]*proto.ExceedsLimitsResult, 0, len(req.Streams))
resps, err := f.gatherer.ExceedsLimits(ctx, req)
if err != nil {
return nil, err
}
results := make([]*proto.ExceedsLimitsResult, 0, len(req.Streams))
for _, resp := range resps {
results = append(results, resp.Results...)
// If the entire call failed, then all streams failed.
for _, stream := range req.Streams {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

results = append(results, &proto.ExceedsLimitsResult{
StreamHash: stream.StreamHash,
Reason: uint32(limits.ReasonFailed),
})
}
f.streamsFailed.Add(float64(len(req.Streams)))
level.Error(f.logger).Log("msg", "failed to check request against limits", "err", err)
} else {
for _, resp := range resps {
for _, res := range resp.Results {
// Even if the call succeeded, some (or all) streams might
// still have failed.
if res.Reason == uint32(limits.ReasonFailed) {
f.streamsFailed.Inc()
} else {
f.streamsRejected.Inc()
}
}
results = append(results, resp.Results...)
}
}
return &proto.ExceedsLimitsResponse{Results: results}, nil
}
Expand Down
49 changes: 44 additions & 5 deletions pkg/limits/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ package frontend

import (
"context"
"errors"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/limits"
Expand All @@ -16,6 +21,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
name string
exceedsLimitsRequest *proto.ExceedsLimitsRequest
exceedsLimitsResponses []*proto.ExceedsLimitsResponse
err error
expected *proto.ExceedsLimitsResponse
}{{
name: "no streams",
Expand Down Expand Up @@ -124,16 +130,49 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
Reason: uint32(limits.ReasonMaxStreams),
}},
},
}, {
name: "unexpected error, response with failed reason",
exceedsLimitsRequest: &proto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*proto.StreamMetadata{{
StreamHash: 0x1,
TotalSize: 0x5,
}, {
StreamHash: 0x2,
TotalSize: 0x9,
}},
},
err: errors.New("an unexpected error occurred"),
expected: &proto.ExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonFailed),
}, {
StreamHash: 0x2,
Reason: uint32(limits.ReasonFailed),
}},
},
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
f := Frontend{
gatherer: &mockExceedsLimitsGatherer{
t: t,
expectedExceedsLimitsRequest: test.exceedsLimitsRequest,
exceedsLimitsResponses: test.exceedsLimitsResponses,
readRing, _ := newMockRingWithClientPool(t, "test", nil, nil)
f, err := New(Config{
LifecyclerConfig: ring.LifecyclerConfig{
RingConfig: ring.Config{
KVStore: kv.Config{
Store: "inmemory",
},
},
},
}, "test", readRing, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
// Replace with our mock.
f.gatherer = &mockExceedsLimitsGatherer{
t: t,
expectedExceedsLimitsRequest: test.exceedsLimitsRequest,
exceedsLimitsResponses: test.exceedsLimitsResponses,
err: test.err,
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
Expand Down
25 changes: 19 additions & 6 deletions pkg/limits/frontend/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"net/http/httptest"
"testing"

"github.com/go-kit/log"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/limits"
Expand Down Expand Up @@ -71,14 +75,23 @@ func TestFrontend_ServeHTTP(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
f := Frontend{
gatherer: &mockExceedsLimitsGatherer{
t: t,
expectedExceedsLimitsRequest: test.expectedExceedsLimitsRequest,
exceedsLimitsResponses: test.exceedsLimitsResponses,
readRing, _ := newMockRingWithClientPool(t, "test", nil, nil)
f, err := New(Config{
LifecyclerConfig: ring.LifecyclerConfig{
RingConfig: ring.Config{
KVStore: kv.Config{
Store: "inmemory",
},
},
},
}, "test", readRing, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
f.gatherer = &mockExceedsLimitsGatherer{
t: t,
expectedExceedsLimitsRequest: test.expectedExceedsLimitsRequest,
exceedsLimitsResponses: test.exceedsLimitsResponses,
}
ts := httptest.NewServer(&f)
ts := httptest.NewServer(f)
defer ts.Close()

b, err := json.Marshal(test.request)
Expand Down
9 changes: 5 additions & 4 deletions pkg/limits/frontend/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ type mockExceedsLimitsGatherer struct {

expectedExceedsLimitsRequest *proto.ExceedsLimitsRequest
exceedsLimitsResponses []*proto.ExceedsLimitsResponse
err error
}

func (g *mockExceedsLimitsGatherer) ExceedsLimits(_ context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) {
if expected := g.expectedExceedsLimitsRequest; expected != nil {
require.Equal(g.t, expected, req)
func (m *mockExceedsLimitsGatherer) ExceedsLimits(_ context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) {
if expected := m.expectedExceedsLimitsRequest; expected != nil {
require.Equal(m.t, expected, req)
}
return g.exceedsLimitsResponses, nil
return m.exceedsLimitsResponses, m.err
}

// mockIngestLimitsClient mocks proto.IngestLimitsClient.
Expand Down
Loading
Loading