@@ -6,12 +6,16 @@ import (
6
6
"flag"
7
7
"fmt"
8
8
"hash/fnv"
9
+ "net/http"
9
10
"strconv"
10
11
"time"
11
12
12
13
"github.com/bufbuild/connect-go"
14
+ "github.com/dustin/go-humanize"
13
15
"github.com/go-kit/log"
14
16
"github.com/google/uuid"
17
+ "github.com/grafana/dskit/kv"
18
+ "github.com/grafana/dskit/limiter"
15
19
"github.com/grafana/dskit/ring"
16
20
ring_client "github.com/grafana/dskit/ring/client"
17
21
"github.com/grafana/dskit/services"
@@ -29,12 +33,23 @@ import (
29
33
"github.com/grafana/phlare/pkg/pprof"
30
34
"github.com/grafana/phlare/pkg/tenant"
31
35
"github.com/grafana/phlare/pkg/usagestats"
36
+ "github.com/grafana/phlare/pkg/util"
37
+ "github.com/grafana/phlare/pkg/validation"
32
38
)
33
39
34
40
type PushClient interface {
35
41
Push (context.Context , * connect.Request [pushv1.PushRequest ]) (* connect.Response [pushv1.PushResponse ], error )
36
42
}
37
43
44
+ const (
45
+ // distributorRingKey is the key under which we store the distributors ring in the KVStore.
46
+ distributorRingKey = "distributor"
47
+
48
+ // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
49
+ // in the ring will be automatically removed after.
50
+ ringAutoForgetUnhealthyPeriods = 10
51
+ )
52
+
38
53
// todo: move to non global metrics.
39
54
var (
40
55
clients = promauto .NewGauge (prometheus.GaugeOpts {
@@ -52,12 +67,16 @@ var (
52
67
type Config struct {
53
68
PushTimeout time.Duration
54
69
PoolConfig clientpool.PoolConfig `yaml:"pool_config,omitempty"`
70
+
71
+ // Distributors ring
72
+ DistributorRing RingConfig `yaml:"ring" doc:"hidden"`
55
73
}
56
74
57
75
// RegisterFlags registers distributor-related flags.
58
76
func (cfg * Config ) RegisterFlags (fs * flag.FlagSet ) {
59
77
cfg .PoolConfig .RegisterFlagsWithPrefix ("distributor" , fs )
60
78
fs .DurationVar (& cfg .PushTimeout , "distributor.push.timeout" , 5 * time .Second , "Timeout when pushing data to ingester." )
79
+ cfg .DistributorRing .RegisterFlags (fs )
61
80
}
62
81
63
82
// Distributor coordinates replicates and distribution of log streams.
@@ -66,30 +85,64 @@ type Distributor struct {
66
85
logger log.Logger
67
86
68
87
cfg Config
88
+ limits Limits
69
89
ingestersRing ring.ReadRing
70
90
pool * ring_client.Pool
71
91
92
+ // The global rate limiter requires a distributors ring to count
93
+ // the number of healthy instances
94
+ distributorsLifecycler * ring.BasicLifecycler
95
+ distributorsRing * ring.Ring
96
+ healthyInstancesCount * atomic.Uint32
97
+ ingestionRateLimiter * limiter.RateLimiter
98
+
72
99
subservices * services.Manager
73
100
subservicesWatcher * services.FailureWatcher
74
101
75
102
metrics * metrics
76
103
}
77
104
78
- func New (cfg Config , ingestersRing ring.ReadRing , factory ring_client.PoolFactory , reg prometheus.Registerer , logger log.Logger , clientsOptions ... connect.ClientOption ) (* Distributor , error ) {
105
+ type Limits interface {
106
+ IngestionRateBytes (tenantID string ) float64
107
+ IngestionBurstSizeBytes (tenantID string ) int
108
+ MaxLabelNameLength (userID string ) int
109
+ MaxLabelValueLength (userID string ) int
110
+ MaxLabelNamesPerSeries (userID string ) int
111
+ }
112
+
113
+ func New (cfg Config , ingestersRing ring.ReadRing , factory ring_client.PoolFactory , limits Limits , reg prometheus.Registerer , logger log.Logger , clientsOptions ... connect.ClientOption ) (* Distributor , error ) {
79
114
d := & Distributor {
80
- cfg : cfg ,
81
- logger : logger ,
82
- ingestersRing : ingestersRing ,
83
- pool : clientpool .NewPool (cfg .PoolConfig , ingestersRing , factory , clients , logger , clientsOptions ... ),
84
- metrics : newMetrics (reg ),
115
+ cfg : cfg ,
116
+ logger : logger ,
117
+ ingestersRing : ingestersRing ,
118
+ pool : clientpool .NewPool (cfg .PoolConfig , ingestersRing , factory , clients , logger , clientsOptions ... ),
119
+ metrics : newMetrics (reg ),
120
+ healthyInstancesCount : atomic .NewUint32 (0 ),
121
+ limits : limits ,
85
122
}
86
123
var err error
87
- d .subservices , err = services .NewManager (d .pool )
124
+
125
+ subservices := []services.Service (nil )
126
+ subservices = append (subservices , d .pool )
127
+
128
+ distributorsRing , distributorsLifecycler , err := newRingAndLifecycler (cfg .DistributorRing , d .healthyInstancesCount , logger , reg )
129
+ if err != nil {
130
+ return nil , err
131
+ }
132
+
133
+ subservices = append (subservices , distributorsLifecycler , distributorsRing )
134
+
135
+ d .ingestionRateLimiter = limiter .NewRateLimiter (newGlobalRateStrategy (newIngestionRateStrategy (limits ), d ), 10 * time .Second )
136
+ d .distributorsLifecycler = distributorsLifecycler
137
+ d .distributorsRing = distributorsRing
138
+
139
+ d .subservices , err = services .NewManager (subservices ... )
88
140
if err != nil {
89
141
return nil , errors .Wrap (err , "services manager" )
90
142
}
91
143
d .subservicesWatcher = services .NewFailureWatcher ()
92
144
d .subservicesWatcher .WatchManager (d .subservices )
145
+
93
146
d .Service = services .NewBasicService (d .starting , d .running , d .stopping )
94
147
rfStats .Set (int64 (ingestersRing .ReplicationFactor ()))
95
148
return d , nil
@@ -115,29 +168,37 @@ func (d *Distributor) stopping(_ error) error {
115
168
func (d * Distributor ) Push (ctx context.Context , req * connect.Request [pushv1.PushRequest ]) (* connect.Response [pushv1.PushResponse ], error ) {
116
169
tenantID , err := tenant .ExtractTenantIDFromContext (ctx )
117
170
if err != nil {
118
- return nil , connect .NewError (connect .CodeInvalidArgument , err )
171
+ return nil , connect .NewError (connect .CodeUnauthenticated , err )
119
172
}
120
173
var (
121
- keys = make ([]uint32 , 0 , len (req .Msg .Series ))
122
- profiles = make ([]* profileTracker , 0 , len (req .Msg .Series ))
174
+ keys = make ([]uint32 , 0 , len (req .Msg .Series ))
175
+ profiles = make ([]* profileTracker , 0 , len (req .Msg .Series ))
176
+ totalPushUncompressedBytes int64
177
+ totalProfiles int64
123
178
)
124
179
125
180
for _ , series := range req .Msg .Series {
181
+ // include the labels in the size calculation
182
+ for _ , lbs := range series .Labels {
183
+ totalPushUncompressedBytes += int64 (len (lbs .Name ))
184
+ totalPushUncompressedBytes += int64 (len (lbs .Value ))
185
+ }
126
186
keys = append (keys , TokenFor (tenantID , labelsString (series .Labels )))
127
187
profName := phlaremodel .Labels (series .Labels ).Get (scrape .ProfileName )
128
188
for _ , raw := range series .Samples {
129
189
usagestats .NewCounter (fmt .Sprintf ("distributor_profile_type_%s_received" , profName )).Inc (1 )
130
190
profileReceivedStats .Inc (1 )
131
191
bytesReceivedTotalStats .Inc (int64 (len (raw .RawProfile )))
132
192
bytesReceivedStats .Record (float64 (len (raw .RawProfile )))
133
- d .metrics .receivedCompressedBytes .WithLabelValues (profName ).Observe (float64 (len (raw .RawProfile )))
193
+ totalProfiles ++
194
+ d .metrics .receivedCompressedBytes .WithLabelValues (profName , tenantID ).Observe (float64 (len (raw .RawProfile )))
134
195
p , err := pprof .RawFromBytes (raw .RawProfile )
135
196
if err != nil {
136
- return nil , err
197
+ return nil , connect . NewError ( connect . CodeInvalidArgument , err )
137
198
}
138
- d .metrics .receivedDecompressedBytes .WithLabelValues (profName ).Observe (float64 (p .SizeBytes ()))
139
- d .metrics .receivedSamples .WithLabelValues (profName ).Observe (float64 (len (p .Sample )))
140
-
199
+ d .metrics .receivedDecompressedBytes .WithLabelValues (profName , tenantID ).Observe (float64 (p .SizeBytes ()))
200
+ d .metrics .receivedSamples .WithLabelValues (profName , tenantID ).Observe (float64 (len (p .Sample )))
201
+ totalPushUncompressedBytes += int64 ( p . SizeBytes ())
141
202
p .Normalize ()
142
203
143
204
// zip the data back into the buffer
@@ -153,6 +214,24 @@ func (d *Distributor) Push(ctx context.Context, req *connect.Request[pushv1.Push
153
214
profiles = append (profiles , & profileTracker {profile : series })
154
215
}
155
216
217
+ // validate the request
218
+ for _ , series := range req .Msg .Series {
219
+ if err := validation .ValidateLabels (d .limits , tenantID , series .Labels ); err != nil {
220
+ validation .DiscardedProfiles .WithLabelValues (string (validation .ReasonOf (err )), tenantID ).Add (float64 (totalProfiles ))
221
+ validation .DiscardedBytes .WithLabelValues (string (validation .ReasonOf (err )), tenantID ).Add (float64 (totalPushUncompressedBytes ))
222
+ return nil , connect .NewError (connect .CodeInvalidArgument , err )
223
+ }
224
+ }
225
+
226
+ // rate limit the request
227
+ if ! d .ingestionRateLimiter .AllowN (time .Now (), tenantID , int (totalPushUncompressedBytes )) {
228
+ validation .DiscardedProfiles .WithLabelValues (string (validation .RateLimited ), tenantID ).Add (float64 (totalProfiles ))
229
+ validation .DiscardedBytes .WithLabelValues (string (validation .RateLimited ), tenantID ).Add (float64 (totalPushUncompressedBytes ))
230
+ return nil , connect .NewError (connect .CodeResourceExhausted ,
231
+ fmt .Errorf ("push rate limit (%s) exceeded while adding %s" , humanize .Bytes (uint64 (d .limits .IngestionRateBytes (tenantID ))), humanize .Bytes (uint64 (totalPushUncompressedBytes ))),
232
+ )
233
+ }
234
+
156
235
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
157
236
var descs [maxExpectedReplicationSet ]ring.InstanceDesc
158
237
@@ -245,6 +324,35 @@ func (d *Distributor) sendProfilesErr(ctx context.Context, ingester ring.Instanc
245
324
return err
246
325
}
247
326
327
+ func (d * Distributor ) ServeHTTP (w http.ResponseWriter , req * http.Request ) {
328
+ if d .distributorsRing != nil {
329
+ d .distributorsRing .ServeHTTP (w , req )
330
+ } else {
331
+ ringNotEnabledPage := `
332
+ <!DOCTYPE html>
333
+ <html>
334
+ <head>
335
+ <meta charset="UTF-8">
336
+ <title>Distributor Status</title>
337
+ </head>
338
+ <body>
339
+ <h1>Distributor Status</h1>
340
+ <p>Distributor is not running with global limits enabled</p>
341
+ </body>
342
+ </html>`
343
+ util .WriteHTMLResponse (w , ringNotEnabledPage )
344
+ }
345
+ }
346
+
347
+ // HealthyInstancesCount implements the ReadLifecycler interface
348
+ //
349
+ // We use a ring lifecycler delegate to count the number of members of the
350
+ // ring. The count is then used to enforce rate limiting correctly for each
351
+ // distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES
352
+ func (d * Distributor ) HealthyInstancesCount () int {
353
+ return int (d .healthyInstancesCount .Load ())
354
+ }
355
+
248
356
type profileTracker struct {
249
357
profile * pushv1.RawProfileSeries
250
358
minSuccess int
@@ -283,3 +391,35 @@ func TokenFor(tenantID, labels string) uint32 {
283
391
_ , _ = h .Write ([]byte (labels ))
284
392
return h .Sum32 ()
285
393
}
394
+
395
+ // newRingAndLifecycler creates a new distributor ring and lifecycler with all required lifecycler delegates
396
+ func newRingAndLifecycler (cfg RingConfig , instanceCount * atomic.Uint32 , logger log.Logger , reg prometheus.Registerer ) (* ring.Ring , * ring.BasicLifecycler , error ) {
397
+ reg = prometheus .WrapRegistererWithPrefix ("phlare_" , reg )
398
+ kvStore , err := kv .NewClient (cfg .KVStore , ring .GetCodec (), kv .RegistererWithKVName (reg , "distributor-lifecycler" ), logger )
399
+ if err != nil {
400
+ return nil , nil , errors .Wrap (err , "failed to initialize distributors' KV store" )
401
+ }
402
+
403
+ lifecyclerCfg , err := cfg .ToBasicLifecyclerConfig (logger )
404
+ if err != nil {
405
+ return nil , nil , errors .Wrap (err , "failed to build distributors' lifecycler config" )
406
+ }
407
+
408
+ var delegate ring.BasicLifecyclerDelegate
409
+ delegate = ring .NewInstanceRegisterDelegate (ring .ACTIVE , lifecyclerCfg .NumTokens )
410
+ delegate = newHealthyInstanceDelegate (instanceCount , cfg .HeartbeatTimeout , delegate )
411
+ delegate = ring .NewLeaveOnStoppingDelegate (delegate , logger )
412
+ delegate = ring .NewAutoForgetDelegate (ringAutoForgetUnhealthyPeriods * cfg .HeartbeatTimeout , delegate , logger )
413
+
414
+ distributorsLifecycler , err := ring .NewBasicLifecycler (lifecyclerCfg , "distributor" , distributorRingKey , kvStore , delegate , logger , reg )
415
+ if err != nil {
416
+ return nil , nil , errors .Wrap (err , "failed to initialize distributors' lifecycler" )
417
+ }
418
+
419
+ distributorsRing , err := ring .New (cfg .ToRingConfig (), "distributor" , distributorRingKey , logger , reg )
420
+ if err != nil {
421
+ return nil , nil , errors .Wrap (err , "failed to initialize distributors' ring client" )
422
+ }
423
+
424
+ return distributorsRing , distributorsLifecycler , nil
425
+ }
0 commit comments