Skip to content

Commit b686397

Browse files
authored
chore(v2): add segment-writer metastore client options (#4081)
1 parent 6941e42 commit b686397

File tree

3 files changed

+60
-68
lines changed

3 files changed

+60
-68
lines changed

pkg/experiment/ingester/segment.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ import (
3636
"github.com/grafana/pyroscope/pkg/validation"
3737
)
3838

39-
var ErrMetastoreDLQFailed = fmt.Errorf("failed to store block metadata in DLQ")
40-
4139
type shardKey uint32
4240

4341
type segmentsWriter struct {
@@ -132,7 +130,7 @@ func newSegmentWriter(l log.Logger, metrics *segmentMetrics, hm *memdb.HeadMetri
132130
shards: make(map[shardKey]*shard),
133131
metastore: metastoreClient,
134132
}
135-
sw.retryLimiter = retry.NewRateLimiter(sw.config.Upload.HedgeRateMax, int(sw.config.Upload.HedgeRateBurst))
133+
sw.retryLimiter = retry.NewRateLimiter(sw.config.UploadHedgeRateMax, int(sw.config.UploadHedgeRateBurst))
136134
sw.ctx, sw.cancel = context.WithCancel(context.Background())
137135
flushWorkers := runtime.GOMAXPROCS(-1)
138136
if config.FlushConcurrency > 0 {
@@ -587,9 +585,9 @@ func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, met
587585
// are included into the call duration.
588586
uploadWithRetry := func(ctx context.Context, hedge bool) (any, error) {
589587
retryConfig := backoff.Config{
590-
MinBackoff: sw.config.Upload.MinBackoff,
591-
MaxBackoff: sw.config.Upload.MaxBackoff,
592-
MaxRetries: sw.config.Upload.MaxRetries,
588+
MinBackoff: sw.config.UploadMinBackoff,
589+
MaxBackoff: sw.config.UploadMaxBackoff,
590+
MaxRetries: sw.config.UploadMaxRetries,
593591
}
594592
var attemptErr error
595593
if hedge {
@@ -615,7 +613,7 @@ func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, met
615613

616614
hedgedUpload := retry.Hedged[any]{
617615
Call: uploadWithRetry,
618-
Trigger: time.After(sw.config.Upload.HedgeUploadAfter),
616+
Trigger: time.After(sw.config.UploadHedgeAfter),
619617
Throttler: sw.retryLimiter,
620618
FailFast: false,
621619
}
@@ -629,12 +627,21 @@ func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, met
629627
}
630628

631629
func (sw *segmentsWriter) uploadWithTimeout(ctx context.Context, path string, r io.Reader) error {
632-
ctx, cancel := context.WithTimeout(ctx, sw.config.Upload.Timeout)
633-
defer cancel()
630+
if sw.config.UploadTimeout > 0 {
631+
var cancel context.CancelFunc
632+
ctx, cancel = context.WithTimeout(ctx, sw.config.UploadTimeout)
633+
defer cancel()
634+
}
634635
return sw.bucket.Upload(ctx, path, r)
635636
}
636637

637638
func (sw *segmentsWriter) storeMetadata(ctx context.Context, meta *metastorev1.BlockMeta, s *segment) error {
639+
if sw.config.MetadataUpdateTimeout > 0 {
640+
var cancel context.CancelFunc
641+
ctx, cancel = context.WithTimeout(ctx, sw.config.MetadataUpdateTimeout)
642+
defer cancel()
643+
}
644+
638645
start := time.Now()
639646
var err error
640647
defer func() {
@@ -643,11 +650,16 @@ func (sw *segmentsWriter) storeMetadata(ctx context.Context, meta *metastorev1.B
643650
Observe(time.Since(start).Seconds())
644651
s.debuginfo.storeMetaDuration = time.Since(start)
645652
}()
653+
646654
if _, err = sw.metastore.AddBlock(ctx, &metastorev1.AddBlockRequest{Block: meta}); err == nil {
647655
return nil
648656
}
649657

650658
level.Error(s.logger).Log("msg", "failed to store meta in metastore", "err", err)
659+
if !sw.config.MetadataDLQEnabled {
660+
return err
661+
}
662+
651663
defer func() {
652664
sw.metrics.storeMetadataDLQ.WithLabelValues(statusLabelValue(err)).Inc()
653665
}()

pkg/experiment/ingester/segment_test.go

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,7 @@ func ingestWithDLQ(t *testing.T, chunks []inputChunk) {
121121
}
122122

123123
func TestIngestWait(t *testing.T) {
124-
sw := newTestSegmentWriter(t, Config{
125-
SegmentDuration: 100 * time.Millisecond,
126-
})
124+
sw := newTestSegmentWriter(t, defaultTestConfig())
127125

128126
defer sw.stop()
129127
sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
@@ -143,9 +141,7 @@ func TestIngestWait(t *testing.T) {
143141

144142
func TestBusyIngestLoop(t *testing.T) {
145143

146-
sw := newTestSegmentWriter(t, Config{
147-
SegmentDuration: 100 * time.Millisecond,
148-
})
144+
sw := newTestSegmentWriter(t, defaultTestConfig())
149145
defer sw.stop()
150146

151147
writeCtx, writeCancel := context.WithCancel(context.Background())
@@ -244,9 +240,7 @@ func TestDLQFail(t *testing.T) {
244240
l,
245241
newSegmentMetrics(nil),
246242
memdb.NewHeadMetricsWithPrefix(nil, ""),
247-
Config{
248-
SegmentDuration: 100 * time.Millisecond,
249-
},
243+
defaultTestConfig(),
250244
validation.MockDefaultOverrides(),
251245
bucket,
252246
client,
@@ -289,9 +283,7 @@ func TestDatasetMinMaxTime(t *testing.T) {
289283
l,
290284
newSegmentMetrics(nil),
291285
memdb.NewHeadMetricsWithPrefix(nil, ""),
292-
Config{
293-
SegmentDuration: 100 * time.Millisecond,
294-
},
286+
defaultTestConfig(),
295287
validation.MockDefaultOverrides(),
296288
bucket,
297289
client,
@@ -330,9 +322,7 @@ func TestDatasetMinMaxTime(t *testing.T) {
330322
func TestQueryMultipleSeriesSingleTenant(t *testing.T) {
331323
metas := make(chan *metastorev1.BlockMeta, 1)
332324

333-
sw := newTestSegmentWriter(t, Config{
334-
SegmentDuration: 100 * time.Millisecond,
335-
})
325+
sw := newTestSegmentWriter(t, defaultTestConfig())
336326
defer sw.stop()
337327
sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything).
338328
Run(func(args mock.Arguments) {
@@ -379,9 +369,7 @@ func TestDLQRecoveryMock(t *testing.T) {
379369
{shard: 1, tenant: "tb", profile: cpuProfile(42, 239, "svc1", "kek", "foo", "bar")},
380370
})
381371

382-
sw := newTestSegmentWriter(t, Config{
383-
SegmentDuration: 100 * time.Millisecond,
384-
})
372+
sw := newTestSegmentWriter(t, defaultTestConfig())
385373
sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything).
386374
Return(nil, fmt.Errorf("mock metastore unavailable"))
387375

@@ -419,9 +407,7 @@ func TestDLQRecovery(t *testing.T) {
419407
{shard: 1, tenant: tenant, profile: cpuProfile(42, int(ts), "svc1", "kek", "foo", "bar")},
420408
})
421409

422-
sw := newTestSegmentWriter(t, Config{
423-
SegmentDuration: 100 * time.Millisecond,
424-
})
410+
sw := newTestSegmentWriter(t, defaultTestConfig())
425411
sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything).
426412
Return(nil, fmt.Errorf("mock metastore unavailable"))
427413

@@ -491,7 +477,10 @@ func newTestSegmentWriter(t *testing.T, cfg Config) sw {
491477

492478
func defaultTestConfig() Config {
493479
return Config{
494-
SegmentDuration: 1 * time.Second,
480+
SegmentDuration: 100 * time.Millisecond,
481+
UploadTimeout: time.Second,
482+
MetadataUpdateTimeout: time.Second,
483+
MetadataDLQEnabled: true,
495484
}
496485
}
497486

pkg/experiment/ingester/service.go

Lines changed: 28 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -42,48 +42,44 @@ const (
4242
)
4343

4444
type Config struct {
45-
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the segment writer."`
46-
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
47-
SegmentDuration time.Duration `yaml:"segment_duration,omitempty" category:"advanced"`
48-
FlushConcurrency uint `yaml:"flush_concurrency,omitempty" category:"advanced"`
49-
Upload UploadConfig `yaml:"upload,omitempty" category:"advanced"`
45+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the segment writer."`
46+
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
47+
SegmentDuration time.Duration `yaml:"segment_duration,omitempty" category:"advanced"`
48+
FlushConcurrency uint `yaml:"flush_concurrency,omitempty" category:"advanced"`
49+
UploadTimeout time.Duration `yaml:"upload-timeout,omitempty" category:"advanced"`
50+
UploadMaxRetries int `yaml:"upload-retry_max_retries,omitempty" category:"advanced"`
51+
UploadMinBackoff time.Duration `yaml:"upload-retry_min_period,omitempty" category:"advanced"`
52+
UploadMaxBackoff time.Duration `yaml:"upload-retry_max_period,omitempty" category:"advanced"`
53+
UploadHedgeAfter time.Duration `yaml:"upload-hedge_upload_after,omitempty" category:"advanced"`
54+
UploadHedgeRateMax float64 `yaml:"upload-hedge_rate_max,omitempty" category:"advanced"`
55+
UploadHedgeRateBurst uint `yaml:"upload-hedge_rate_burst,omitempty" category:"advanced"`
56+
MetadataDLQEnabled bool `yaml:"metadata_dlq_enabled,omitempty" category:"advanced"`
57+
MetadataUpdateTimeout time.Duration `yaml:"metadata_update_timeout,omitempty" category:"advanced"`
5058
}
5159

52-
type UploadConfig struct {
53-
Timeout time.Duration `yaml:"timeout,omitempty" category:"advanced"`
54-
MaxRetries int `yaml:"retry_max_retries,omitempty" category:"advanced"`
55-
MinBackoff time.Duration `yaml:"retry_min_period,omitempty" category:"advanced"`
56-
MaxBackoff time.Duration `yaml:"retry_max_period,omitempty" category:"advanced"`
57-
HedgeUploadAfter time.Duration `yaml:"hedge_upload_after,omitempty" category:"advanced"`
58-
HedgeRateMax float64 `yaml:"hedge_rate_max,omitempty" category:"advanced"`
59-
HedgeRateBurst uint `yaml:"hedge_rate_burst,omitempty" category:"advanced"`
60+
func (cfg *Config) Validate() error {
61+
// TODO(kolesnikovae): implement.
62+
if err := cfg.LifecyclerConfig.Validate(); err != nil {
63+
return err
64+
}
65+
return cfg.GRPCClientConfig.Validate()
6066
}
6167

6268
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
6369
const prefix = "segment-writer"
6470
cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix, f)
6571
cfg.LifecyclerConfig.RegisterFlagsWithPrefix(prefix+".", f, util.Logger)
66-
cfg.Upload.RegisterFlagsWithPrefix(prefix+".upload.", f)
6772
f.DurationVar(&cfg.SegmentDuration, prefix+".segment-duration", defaultSegmentDuration, "Timeout when flushing segments to bucket.")
6873
f.UintVar(&cfg.FlushConcurrency, prefix+".flush-concurrency", 0, "Number of concurrent flushes. Defaults to the number of CPUs, but not less than 8.")
69-
}
70-
71-
func (cfg *UploadConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
72-
f.DurationVar(&cfg.Timeout, prefix+".timeout", time.Second, "Timeout for upload requests.")
73-
f.IntVar(&cfg.MaxRetries, prefix+".max-retries", 3, "Number of times to backoff and retry before failing.")
74-
f.DurationVar(&cfg.MinBackoff, prefix+".retry-min-period", 50*time.Millisecond, "Minimum delay when backing off.")
75-
f.DurationVar(&cfg.MaxBackoff, prefix+".retry-max-period", defaultSegmentDuration, "Maximum delay when backing off.")
76-
f.DurationVar(&cfg.HedgeUploadAfter, prefix+".hedge-upload-after", defaultSegmentDuration, "Time after which to hedge the upload request.")
77-
f.Float64Var(&cfg.HedgeRateMax, prefix+".hedge-rate-max", defaultHedgedRequestMaxRate, "Maximum number of hedged requests per second.")
78-
f.UintVar(&cfg.HedgeRateBurst, prefix+".hedge-rate-burst", defaultHedgedRequestBurst, "Maximum number of hedged requests in a burst.")
79-
}
80-
81-
func (cfg *Config) Validate() error {
82-
// TODO(kolesnikovae): implement.
83-
if err := cfg.LifecyclerConfig.Validate(); err != nil {
84-
return err
85-
}
86-
return cfg.GRPCClientConfig.Validate()
74+
f.DurationVar(&cfg.UploadTimeout, prefix+".upload-timeout", time.Second, "Timeout for upload requests.")
75+
f.IntVar(&cfg.UploadMaxRetries, prefix+".upload-max-retries", 3, "Number of times to backoff and retry before failing.")
76+
f.DurationVar(&cfg.UploadMinBackoff, prefix+".upload-retry-min-period", 50*time.Millisecond, "Minimum delay when backing off.")
77+
f.DurationVar(&cfg.UploadMaxBackoff, prefix+".upload-retry-max-period", defaultSegmentDuration, "Maximum delay when backing off.")
78+
f.DurationVar(&cfg.UploadHedgeAfter, prefix+".upload-hedge-after", defaultSegmentDuration, "Time after which to hedge the upload request.")
79+
f.Float64Var(&cfg.UploadHedgeRateMax, prefix+".upload-hedge-rate-max", defaultHedgedRequestMaxRate, "Maximum number of hedged requests per second.")
80+
f.UintVar(&cfg.UploadHedgeRateBurst, prefix+".upload-hedge-rate-burst", defaultHedgedRequestBurst, "Maximum number of hedged requests in a burst.")
81+
f.BoolVar(&cfg.MetadataDLQEnabled, prefix+".metadata-dlq-enabled", true, "Enables dead letter queue (DLQ) for metadata. If the metadata update fails, it will be stored and updated asynchronously.")
82+
f.DurationVar(&cfg.MetadataUpdateTimeout, prefix+".metadata-update-timeout", time.Second, "Timeout for metadata update requests.")
8783
}
8884

8985
type Limits interface {
@@ -238,11 +234,6 @@ func (i *SegmentWriterService) Push(ctx context.Context, req *segmentwriterv1.Pu
238234
level.Error(i.logger).Log("msg", "flush timeout", "err", err)
239235
return nil, status.FromContextError(err).Err()
240236

241-
case errors.Is(err, ErrMetastoreDLQFailed):
242-
// This error will cause retry.
243-
level.Error(i.logger).Log("msg", "failed to store metadata", "err", err)
244-
return nil, status.Error(codes.Unavailable, err.Error())
245-
246237
default:
247238
level.Error(i.logger).Log("msg", "flush err", "err", err)
248239
return nil, status.Error(codes.Unknown, err.Error())

0 commit comments

Comments
 (0)