Skip to content

Commit 9bbb6f4

Browse files
committed
Implement ingestion window
1 parent 3a47604 commit 9bbb6f4

File tree

8 files changed

+150
-20
lines changed

8 files changed

+150
-20
lines changed

cmd/pyroscope/help-all.txt.tmpl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -899,5 +899,9 @@ Usage of ./pyroscope:
899899
Maximum number of samples in a profile. 0 to disable. (default 16000)
900900
-validation.max-profile-symbol-value-length int
901901
Maximum length of a profile symbol value (labels, function names and filenames, etc...). Profiles are not rejected instead symbol values are truncated. 0 to disable. (default 65535)
902+
-validation.reject-newer-than duration
903+
This limits how far into the future profiling data can be ingested. This limit is enforced in the distributor. 0 to disable, defaults to 10m. (default 10m)
904+
-validation.reject-older-than duration
905+
This limits how far into the past profiling data can be ingested. This limit is enforced in the distributor. 0 to disable, defaults to 1h. (default 1h)
902906
-version
903907
Show the version of pyroscope and exit

cmd/pyroscope/help.txt.tmpl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,10 @@ Usage of ./pyroscope:
335335
Maximum number of samples in a profile. 0 to disable. (default 16000)
336336
-validation.max-profile-symbol-value-length int
337337
Maximum length of a profile symbol value (labels, function names and filenames, etc...). Profiles are not rejected instead symbol values are truncated. 0 to disable. (default 65535)
338+
-validation.reject-newer-than duration
339+
This limits how far into the future profiling data can be ingested. This limit is enforced in the distributor. 0 to disable, defaults to 10m. (default 10m)
340+
-validation.reject-older-than duration
341+
This limits how far into the past profiling data can be ingested. This limit is enforced in the distributor. 0 to disable, defaults to 1h. (default 1h)
338342
-version
339343
Show the version of pyroscope and exit
340344

pkg/distributor/distributor.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/pkg/errors"
2525
"github.com/prometheus/client_golang/prometheus"
2626
"github.com/prometheus/client_golang/prometheus/promauto"
27+
"github.com/prometheus/common/model"
2728
"go.uber.org/atomic"
2829

2930
googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
@@ -118,6 +119,7 @@ type Limits interface {
118119
MaxProfileStacktraceSampleLabels(userID string) int
119120
MaxProfileStacktraceDepth(userID string) int
120121
MaxProfileSymbolValueLength(userID string) int
122+
validation.ProfileValidationLimits
121123
}
122124

123125
func New(cfg Config, ingestersRing ring.ReadRing, factory ring_client.PoolFactory, limits Limits, reg prometheus.Registerer, logger log.Logger, clientsOptions ...connect.ClientOption) (*Distributor, error) {
@@ -230,6 +232,7 @@ func (d *Distributor) Push(ctx context.Context, grpcReq *connect.Request[pushv1.
230232
}
231233

232234
func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.PushRequest) (*connect.Response[pushv1.PushResponse], error) {
235+
now := model.Now()
233236
tenantID, err := tenant.ExtractTenantIDFromContext(ctx)
234237
if err != nil {
235238
return nil, connect.NewError(connect.CodeUnauthenticated, err)
@@ -283,7 +286,7 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push
283286
d.metrics.receivedSamples.WithLabelValues(profName, tenantID).Observe(float64(len(p.Sample)))
284287
totalPushUncompressedBytes += int64(decompressedSize)
285288

286-
if err := validation.ValidateProfile(d.limits, tenantID, p.Profile, decompressedSize, series.Labels); err != nil {
289+
if err := validation.ValidateProfile(d.limits, tenantID, p.Profile, decompressedSize, series.Labels, now); err != nil {
287290
// todo this actually discards more if multiple Samples in a Series request
288291
validation.DiscardedProfiles.WithLabelValues(string(validation.ReasonOf(err)), tenantID).Add(float64(totalProfiles))
289292
validation.DiscardedBytes.WithLabelValues(string(validation.ReasonOf(err)), tenantID).Add(float64(totalPushUncompressedBytes))

pkg/validation/limits.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ type Limits struct {
5252

5353
// Query frontend.
5454
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"`
55+
56+
// Ensure profiles are dated within the IngestionWindow of the distributor.
57+
RejectOlderThan model.Duration `yaml:"reject_older_than" json:"reject_older_than"`
58+
RejectNewerThan model.Duration `yaml:"reject_newer_than" json:"reject_newer_than"`
5559
}
5660

5761
// LimitError are errors that do not comply with the limits specified.
@@ -93,6 +97,13 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
9397
f.IntVar(&l.MaxProfileStacktraceSampleLabels, "validation.max-profile-stacktrace-sample-labels", 100, "Maximum number of labels in a profile sample. 0 to disable.")
9498
f.IntVar(&l.MaxProfileStacktraceDepth, "validation.max-profile-stacktrace-depth", 1000, "Maximum depth of a profile stacktrace. Profiles are not rejected instead stacktraces are truncated. 0 to disable.")
9599
f.IntVar(&l.MaxProfileSymbolValueLength, "validation.max-profile-symbol-value-length", 65535, "Maximum length of a profile symbol value (labels, function names and filenames, etc...). Profiles are not rejected instead symbol values are truncated. 0 to disable.")
100+
101+
_ = l.RejectNewerThan.Set("10m")
102+
f.Var(&l.RejectNewerThan, "validation.reject-newer-than", "This limits how far into the future profiling data can be ingested. This limit is enforced in the distributor. 0 to disable, defaults to 10m.")
103+
104+
_ = l.RejectOlderThan.Set("1h")
105+
f.Var(&l.RejectOlderThan, "validation.reject-older-than", "This limits how far into the past profiling data can be ingested. This limit is enforced in the distributor. 0 to disable, defaults to 1h.")
106+
96107
}
97108

98109
// UnmarshalYAML implements the yaml.Unmarshaler interface.
@@ -262,6 +273,16 @@ func (o *Overrides) QuerySplitDuration(tenantID string) time.Duration {
262273
// 0 means no limit. Currently disabled.
263274
func (o *Overrides) MaxQueriersPerTenant(tenant string) int { return 0 }
264275

276+
// RejectNewerThan will ensure that profiles are further than the return value into the future are reject.
277+
func (o *Overrides) RejectNewerThan(tenantID string) time.Duration {
278+
return time.Duration(o.getOverridesForTenant(tenantID).RejectNewerThan)
279+
}
280+
281+
// RejectOlderThan will ensure that profiles that are older than the return value are rejected.
282+
func (o *Overrides) RejectOlderThan(tenantID string) time.Duration {
283+
return time.Duration(o.getOverridesForTenant(tenantID).RejectOlderThan)
284+
}
285+
265286
func (o *Overrides) DefaultLimits() *Limits {
266287
return o.defaultLimits
267288
}

pkg/validation/limits_mock.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ func MockOverrides(customize func(defaults *Limits, tenantLimits map[string]*Lim
4545
func MockDefaultLimits() *Limits {
4646
defaults := Limits{}
4747
flagext.DefaultValues(&defaults)
48+
defaults.RejectNewerThan = 0
49+
defaults.RejectOlderThan = 0
4850
return &defaults
4951
}
5052

pkg/validation/testutil.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ type MockLimits struct {
1111
MaxLabelValueLengthValue int
1212
MaxLabelNamesPerSeriesValue int
1313

14+
RejectOlderThanValue time.Duration
15+
RejectNewerThanValue time.Duration
16+
1417
MaxProfileSizeBytesValue int
1518
MaxProfileStacktraceSamplesValue int
1619
MaxProfileStacktraceDepthValue int
@@ -41,3 +44,11 @@ func (m MockLimits) MaxProfileStacktraceSampleLabels(userID string) int {
4144
func (m MockLimits) MaxProfileSymbolValueLength(userID string) int {
4245
return m.MaxProfileSymbolValueLengthValue
4346
}
47+
48+
func (m MockLimits) RejectOlderThan(userID string) time.Duration {
49+
return m.RejectOlderThanValue
50+
}
51+
52+
func (m MockLimits) RejectNewerThan(userID string) time.Duration {
53+
return m.RejectNewerThanValue
54+
}

pkg/validation/validate.go

Lines changed: 65 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ const (
3131
// RateLimited is one of the values for the reason to discard samples.
3232
RateLimited Reason = "rate_limited"
3333

34+
// NotInIngestionWindow is a reason for discarding profiles when Pyroscope doesn't accept profiles
35+
// that are outside of the ingestion window.
36+
NotInIngestionWindow Reason = "not_in_ingestion_window"
37+
3438
// MaxLabelNamesPerSeries is a reason for discarding a request which has too many label names
3539
MaxLabelNamesPerSeries Reason = "max_label_names_per_series"
3640
// LabelNameTooLong is a reason for discarding a request which has a label name too long
@@ -59,6 +63,7 @@ const (
5963
ProfileTooBigErrorMsg = "the profile with labels '%s' exceeds the size limit (max_profile_size_byte, actual: %d, limit: %d)"
6064
ProfileTooManySamplesErrorMsg = "the profile with labels '%s' exceeds the samples count limit (max_profile_stacktrace_samples, actual: %d, limit: %d)"
6165
ProfileTooManySampleLabelsErrorMsg = "the profile with labels '%s' exceeds the sample labels limit (max_profile_stacktrace_sample_labels, actual: %d, limit: %d)"
66+
NotInIngestionWindowErrorMsg = "profile with labels '%s' is outside of ingestion window (profile timestamp: %s, %s)"
6267
)
6368

6469
var (
@@ -84,19 +89,19 @@ var (
8489
)
8590

8691
type LabelValidationLimits interface {
87-
MaxLabelNameLength(userID string) int
88-
MaxLabelValueLength(userID string) int
89-
MaxLabelNamesPerSeries(userID string) int
92+
MaxLabelNameLength(tenantID string) int
93+
MaxLabelValueLength(tenantID string) int
94+
MaxLabelNamesPerSeries(tenantID string) int
9095
}
9196

9297
// ValidateLabels validates the labels of a profile.
93-
func ValidateLabels(limits LabelValidationLimits, userID string, ls []*typesv1.LabelPair) error {
98+
func ValidateLabels(limits LabelValidationLimits, tenantID string, ls []*typesv1.LabelPair) error {
9499
if len(ls) == 0 {
95100
return NewErrorf(MissingLabels, MissingLabelsErrorMsg)
96101
}
97102
sort.Sort(phlaremodel.Labels(ls))
98103
numLabelNames := len(ls)
99-
maxLabels := limits.MaxLabelNamesPerSeries(userID)
104+
maxLabels := limits.MaxLabelNamesPerSeries(tenantID)
100105
if numLabelNames > maxLabels {
101106
return NewErrorf(MaxLabelNamesPerSeries, MaxLabelNamesPerSeriesErrorMsg, phlaremodel.LabelPairsString(ls), numLabelNames, maxLabels)
102107
}
@@ -111,9 +116,9 @@ func ValidateLabels(limits LabelValidationLimits, userID string, ls []*typesv1.L
111116
lastLabelName := ""
112117

113118
for _, l := range ls {
114-
if len(l.Name) > limits.MaxLabelNameLength(userID) {
119+
if len(l.Name) > limits.MaxLabelNameLength(tenantID) {
115120
return NewErrorf(LabelNameTooLong, LabelNameTooLongErrorMsg, phlaremodel.LabelPairsString(ls), l.Name)
116-
} else if len(l.Value) > limits.MaxLabelValueLength(userID) {
121+
} else if len(l.Value) > limits.MaxLabelValueLength(tenantID) {
117122
return NewErrorf(LabelValueTooLong, LabelValueTooLongErrorMsg, phlaremodel.LabelPairsString(ls), l.Value)
118123
} else if !model.LabelName(l.Name).IsValid() {
119124
return NewErrorf(InvalidLabels, InvalidLabelsErrorMsg, phlaremodel.LabelPairsString(ls), "invalid label name '"+l.Name+"'")
@@ -129,27 +134,69 @@ func ValidateLabels(limits LabelValidationLimits, userID string, ls []*typesv1.L
129134
}
130135

131136
type ProfileValidationLimits interface {
132-
MaxProfileSizeBytes(userID string) int
133-
MaxProfileStacktraceSamples(userID string) int
134-
MaxProfileStacktraceSampleLabels(userID string) int
135-
MaxProfileStacktraceDepth(userID string) int
136-
MaxProfileSymbolValueLength(userID string) int
137+
MaxProfileSizeBytes(tenantID string) int
138+
MaxProfileStacktraceSamples(tenantID string) int
139+
MaxProfileStacktraceSampleLabels(tenantID string) int
140+
MaxProfileStacktraceDepth(tenantID string) int
141+
MaxProfileSymbolValueLength(tenantID string) int
142+
RejectNewerThan(tenantID string) time.Duration
143+
RejectOlderThan(tenantID string) time.Duration
144+
}
145+
146+
type ingestionWindow struct {
147+
from, to model.Time
148+
}
149+
150+
func newIngestionWindow(limits ProfileValidationLimits, tenantID string, now model.Time) *ingestionWindow {
151+
var iw ingestionWindow
152+
if d := limits.RejectNewerThan(tenantID); d != 0 {
153+
iw.to = now.Add(d)
154+
}
155+
if d := limits.RejectOlderThan(tenantID); d != 0 {
156+
iw.from = now.Add(-d)
157+
}
158+
return &iw
159+
}
160+
161+
func (iw *ingestionWindow) errorDetail() string {
162+
if iw.to == 0 {
163+
return fmt.Sprintf("the ingestion window starts at %s", util.FormatTimeMillis(int64(iw.from)))
164+
}
165+
if iw.from == 0 {
166+
return fmt.Sprintf("the ingestion window ends at %s", util.FormatTimeMillis(int64(iw.to)))
167+
}
168+
return fmt.Sprintf("the ingestion window starts at %s and ends at %s", util.FormatTimeMillis(int64(iw.from)), util.FormatTimeMillis(int64(iw.to)))
169+
170+
}
171+
172+
func (iw *ingestionWindow) valid(t model.Time, ls phlaremodel.Labels) error {
173+
if (iw.from == 0 || t.After(iw.from)) && (iw.to == 0 || t.Before(iw.to)) {
174+
return nil
175+
}
176+
177+
return NewErrorf(NotInIngestionWindow, NotInIngestionWindowErrorMsg, phlaremodel.LabelPairsString(ls), util.FormatTimeMillis(int64(t)), iw.errorDetail())
137178
}
138179

139-
func ValidateProfile(limits ProfileValidationLimits, userID string, prof *googlev1.Profile, uncompressedSize int, ls phlaremodel.Labels) error {
180+
func ValidateProfile(limits ProfileValidationLimits, tenantID string, prof *googlev1.Profile, uncompressedSize int, ls phlaremodel.Labels, now model.Time) error {
140181
if prof == nil {
141182
return nil
142183
}
143-
if limit := limits.MaxProfileSizeBytes(userID); limit != 0 && uncompressedSize > limit {
184+
185+
// check profile timestamp within ingestion window
186+
if err := newIngestionWindow(limits, tenantID, now).valid(model.TimeFromUnixNano(prof.TimeNanos), ls); err != nil {
187+
return err
188+
}
189+
190+
if limit := limits.MaxProfileSizeBytes(tenantID); limit != 0 && uncompressedSize > limit {
144191
return NewErrorf(ProfileSizeLimit, ProfileTooBigErrorMsg, phlaremodel.LabelPairsString(ls), uncompressedSize, limit)
145192
}
146-
if limit, size := limits.MaxProfileStacktraceSamples(userID), len(prof.Sample); limit != 0 && size > limit {
193+
if limit, size := limits.MaxProfileStacktraceSamples(tenantID), len(prof.Sample); limit != 0 && size > limit {
147194
return NewErrorf(SamplesLimit, ProfileTooManySamplesErrorMsg, phlaremodel.LabelPairsString(ls), size, limit)
148195
}
149196
var (
150-
depthLimit = limits.MaxProfileStacktraceDepth(userID)
151-
labelsLimit = limits.MaxProfileStacktraceSampleLabels(userID)
152-
symbolLengthLimit = limits.MaxProfileSymbolValueLength(userID)
197+
depthLimit = limits.MaxProfileStacktraceDepth(tenantID)
198+
labelsLimit = limits.MaxProfileStacktraceSampleLabels(tenantID)
199+
symbolLengthLimit = limits.MaxProfileSymbolValueLength(tenantID)
153200
)
154201
for _, s := range prof.Sample {
155202
if depthLimit != 0 && len(s.LocationId) > depthLimit {

pkg/validation/validate_test.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ func Test_ValidateRangeRequest(t *testing.T) {
201201
}
202202

203203
func TestValidateProfile(t *testing.T) {
204+
now := model.TimeFromUnixNano(1_676_635_994_000_000_000)
205+
204206
for _, tc := range []struct {
205207
name string
206208
profile *googlev1.Profile
@@ -277,10 +279,46 @@ func TestValidateProfile(t *testing.T) {
277279
require.Equal(t, []uint64{4, 5}, profile.Sample[0].LocationId)
278280
},
279281
},
282+
{
283+
name: "newer than ingestion window",
284+
profile: &googlev1.Profile{
285+
TimeNanos: now.Add(1 * time.Hour).UnixNano(),
286+
},
287+
limits: MockLimits{
288+
RejectNewerThanValue: 10 * time.Minute,
289+
},
290+
expectedErr: &Error{
291+
Reason: NotInIngestionWindow,
292+
msg: "profile with labels '{foo=\"bar\"}' is outside of ingestion window (profile timestamp: 2023-02-17 13:13:14 +0000 UTC, the ingestion window ends at 2023-02-17 12:23:14 +0000 UTC)",
293+
},
294+
},
295+
{
296+
name: "older than ingestion window",
297+
profile: &googlev1.Profile{
298+
TimeNanos: now.Add(-61 * time.Minute).UnixNano(),
299+
},
300+
limits: MockLimits{
301+
RejectOlderThanValue: time.Hour,
302+
},
303+
expectedErr: &Error{
304+
Reason: NotInIngestionWindow,
305+
msg: "profile with labels '{foo=\"bar\"}' is outside of ingestion window (profile timestamp: 2023-02-17 11:12:14 +0000 UTC, the ingestion window starts at 2023-02-17 11:13:14 +0000 UTC)",
306+
},
307+
},
308+
{
309+
name: "just in the ingestion window",
310+
profile: &googlev1.Profile{
311+
TimeNanos: now.Add(-1 * time.Minute).UnixNano(),
312+
},
313+
limits: MockLimits{
314+
RejectOlderThanValue: time.Hour,
315+
RejectNewerThanValue: 10 * time.Minute,
316+
},
317+
},
280318
} {
281319
tc := tc
282320
t.Run(tc.name, func(t *testing.T) {
283-
err := ValidateProfile(tc.limits, "foo", tc.profile, tc.size, phlaremodel.LabelsFromStrings("foo", "bar"))
321+
err := ValidateProfile(tc.limits, "foo", tc.profile, tc.size, phlaremodel.LabelsFromStrings("foo", "bar"), now)
284322
if tc.expectedErr != nil {
285323
require.Error(t, err)
286324
require.Equal(t, tc.expectedErr, err)

0 commit comments

Comments
 (0)