Skip to content

Commit 4223fe6

Browse files
authored
feat(v2): configurable write-path compression (#4006)
* feat(v2): configurable write-path compression * feat(v2): configurable write-path compression
1 parent 1420455 commit 4223fe6

File tree

2 files changed

+45
-12
lines changed

2 files changed

+45
-12
lines changed

pkg/distributor/write_path/router.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,9 @@ func (m *Router) Send(ctx context.Context, req *distributormodel.PushRequest) er
101101
config := m.overrides.WritePathOverrides(req.TenantID)
102102
switch config.WritePath {
103103
case SegmentWriterPath:
104-
return m.send(m.segwriterRoute(true))(ctx, req)
104+
return m.send(m.segwriterRoute(true, &config))(ctx, req)
105105
case CombinedPath:
106-
return m.sendToBoth(ctx, req, config)
106+
return m.sendToBoth(ctx, req, &config)
107107
default:
108108
return m.send(m.ingesterRoute())(ctx, req)
109109
}
@@ -120,17 +120,17 @@ func (m *Router) ingesterRoute() *route {
120120
}
121121
}
122122

123-
func (m *Router) segwriterRoute(primary bool) *route {
123+
func (m *Router) segwriterRoute(primary bool, config *Config) *route {
124124
return &route{
125125
path: SegmentWriterPath,
126126
primary: primary,
127127
send: func(ctx context.Context, req *distributormodel.PushRequest) error {
128-
return m.sendRequestsToSegmentWriter(ctx, convertRequest(req))
128+
return m.sendRequestsToSegmentWriter(ctx, convertRequest(req, config.Compression))
129129
},
130130
}
131131
}
132132

133-
func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.PushRequest, config Config) error {
133+
func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.PushRequest, config *Config) error {
134134
r := rand.Float64() // [0.0, 1.0)
135135
shouldIngester := config.IngesterWeight > 0.0 && config.IngesterWeight >= r
136136
shouldSegwriter := config.SegmentWriterWeight > 0.0 && config.SegmentWriterWeight >= r
@@ -149,7 +149,7 @@ func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.PushReque
149149
}
150150
}
151151
if shouldSegwriter {
152-
segwriter = m.segwriterRoute(!shouldIngester)
152+
segwriter = m.segwriterRoute(!shouldIngester, config)
153153
if segwriter.primary && !config.AsyncIngest {
154154
// The request is sent to segment-writer exclusively, and the client
155155
// must block until the response returns.
@@ -272,11 +272,11 @@ func (m *Router) sendRequestsToSegmentWriter(ctx context.Context, requests []*se
272272
return g.Wait()
273273
}
274274

275-
func convertRequest(req *distributormodel.PushRequest) []*segmentwriterv1.PushRequest {
275+
func convertRequest(req *distributormodel.PushRequest, compression Compression) []*segmentwriterv1.PushRequest {
276276
r := make([]*segmentwriterv1.PushRequest, 0, len(req.Series)*2)
277277
for _, s := range req.Series {
278278
for _, p := range s.Samples {
279-
r = append(r, convertProfile(p, s.Labels, req.TenantID))
279+
r = append(r, convertProfile(p, s.Labels, req.TenantID, compression))
280280
}
281281
}
282282
return r
@@ -286,8 +286,9 @@ func convertProfile(
286286
sample *distributormodel.ProfileSample,
287287
labels []*typesv1.LabelPair,
288288
tenantID string,
289+
compression Compression,
289290
) *segmentwriterv1.PushRequest {
290-
buf, err := pprof.Marshal(sample.Profile.Profile, true)
291+
buf, err := pprof.Marshal(sample.Profile.Profile, compression == CompressionGzip)
291292
if err != nil {
292293
panic(fmt.Sprintf("failed to marshal profile: %v", err))
293294
}

pkg/distributor/write_path/write_path.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ var paths = []WritePath{
3838
CombinedPath,
3939
}
4040

41-
const validOptionsString = "valid options: ingester, segment-writer, combined"
41+
const validWritePathOptionsString = "valid options: ingester, segment-writer, combined"
4242

4343
func (m *WritePath) Set(text string) error {
4444
x := WritePath(text)
@@ -48,26 +48,58 @@ func (m *WritePath) Set(text string) error {
4848
return nil
4949
}
5050
}
51-
return fmt.Errorf("%w: %s; %s", ErrInvalidWritePath, x, validOptionsString)
51+
return fmt.Errorf("%w: %s; %s", ErrInvalidWritePath, x, validWritePathOptionsString)
5252
}
5353

5454
func (m *WritePath) String() string { return string(*m) }
5555

56+
type Compression string
57+
58+
const (
59+
CompressionNone Compression = "none"
60+
CompressionGzip Compression = "gzip"
61+
)
62+
63+
var ErrInvalidCompression = errors.New("invalid write path compression")
64+
65+
var compressions = []Compression{
66+
CompressionNone,
67+
CompressionGzip,
68+
}
69+
70+
const validCompressionOptionsString = "valid compression options: none, gzip"
71+
72+
func (m *Compression) Set(text string) error {
73+
x := Compression(text)
74+
for _, name := range compressions {
75+
if x == name {
76+
*m = x
77+
return nil
78+
}
79+
}
80+
return fmt.Errorf("%w: %s; %s", ErrInvalidCompression, x, validCompressionOptionsString)
81+
}
82+
83+
func (m *Compression) String() string { return string(*m) }
84+
5685
type Config struct {
5786
WritePath WritePath `yaml:"write_path" json:"write_path" doc:"hidden"`
5887
IngesterWeight float64 `yaml:"write_path_ingester_weight" json:"write_path_ingester_weight" doc:"hidden"`
5988
SegmentWriterWeight float64 `yaml:"write_path_segment_writer_weight" json:"write_path_segment_writer_weight" doc:"hidden"`
6089
SegmentWriterTimeout time.Duration `yaml:"write_path_segment_writer_timeout" json:"write_path_segment_writer_timeout" doc:"hidden"`
90+
Compression Compression `yaml:"write_path_compression" json:"write_path_compression" doc:"hidden"`
6191
AsyncIngest bool `yaml:"async_ingest" json:"async_ingest" doc:"hidden"`
6292
}
6393

6494
func (o *Config) RegisterFlags(f *flag.FlagSet) {
6595
o.WritePath = IngesterPath
66-
f.Var(&o.WritePath, "write-path", "Controls the write path route; "+validOptionsString+".")
96+
o.Compression = CompressionNone
97+
f.Var(&o.WritePath, "write-path", "Controls the write path route; "+validWritePathOptionsString+".")
6798
f.Float64Var(&o.IngesterWeight, "write-path.ingester-weight", 1,
6899
"Specifies the fraction [0:1] that should be send to ingester in combined mode. 0 means no traffics is sent to ingester. 1 means 100% of requests are sent to ingester.")
69100
f.Float64Var(&o.SegmentWriterWeight, "write-path.segment-writer-weight", 0,
70101
"Specifies the fraction [0:1] that should be send to segment-writer in combined mode. 0 means no traffics is sent to segment-writer. 1 means 100% of requests are sent to segment-writer.")
71102
f.DurationVar(&o.SegmentWriterTimeout, "write-path.segment-writer-timeout", 5*time.Second, "Timeout for segment writer requests.")
103+
f.Var(&o.Compression, "write-path.compression", "Compression algorithm to use for segment writer requests; "+validCompressionOptionsString+".")
72104
f.BoolVar(&o.AsyncIngest, "async-ingest", false, "If true, the write path will not wait for the segment-writer to finish processing the request. Writes to ingester always synchronous.")
73105
}

0 commit comments

Comments
 (0)