diff --git a/cmd/pyroscope/help-all.txt.tmpl b/cmd/pyroscope/help-all.txt.tmpl index 8edd84c0df..31d67fb139 100644 --- a/cmd/pyroscope/help-all.txt.tmpl +++ b/cmd/pyroscope/help-all.txt.tmpl @@ -899,5 +899,9 @@ Usage of ./pyroscope: Maximum number of samples in a profile. 0 to disable. (default 16000) -validation.max-profile-symbol-value-length int Maximum length of a profile symbol value (labels, function names and filenames, etc...). Profiles are not rejected instead symbol values are truncated. 0 to disable. (default 65535) + -validation.reject-newer-than duration + This limits how far into the future profiling data can be ingested. This limit is enforced in the distributor. 0 to disable, defaults to 10m. (default 10m) + -validation.reject-older-than duration + This limits how far into the past profiling data can be ingested. This limit is enforced in the distributor. 0 to disable, defaults to 1h. (default 1h) -version Show the version of pyroscope and exit diff --git a/cmd/pyroscope/help.txt.tmpl b/cmd/pyroscope/help.txt.tmpl index 666b70cff8..b3160d21a3 100644 --- a/cmd/pyroscope/help.txt.tmpl +++ b/cmd/pyroscope/help.txt.tmpl @@ -335,6 +335,10 @@ Usage of ./pyroscope: Maximum number of samples in a profile. 0 to disable. (default 16000) -validation.max-profile-symbol-value-length int Maximum length of a profile symbol value (labels, function names and filenames, etc...). Profiles are not rejected instead symbol values are truncated. 0 to disable. (default 65535) + -validation.reject-newer-than duration + This limits how far into the future profiling data can be ingested. This limit is enforced in the distributor. 0 to disable, defaults to 10m. (default 10m) + -validation.reject-older-than duration + This limits how far into the past profiling data can be ingested. This limit is enforced in the distributor. 0 to disable, defaults to 1h. (default 1h) -version Show the version of pyroscope and exit diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d52afb26c5..c09ac2ba77 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -24,6 +24,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" "go.uber.org/atomic" googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" @@ -118,6 +119,7 @@ type Limits interface { MaxProfileStacktraceSampleLabels(userID string) int MaxProfileStacktraceDepth(userID string) int MaxProfileSymbolValueLength(userID string) int + validation.ProfileValidationLimits } func New(cfg Config, ingestersRing ring.ReadRing, factory ring_client.PoolFactory, limits Limits, reg prometheus.Registerer, logger log.Logger, clientsOptions ...connect.ClientOption) (*Distributor, error) { @@ -230,6 +232,7 @@ func (d *Distributor) Push(ctx context.Context, grpcReq *connect.Request[pushv1. } func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.PushRequest) (*connect.Response[pushv1.PushResponse], error) { + now := model.Now() tenantID, err := tenant.ExtractTenantIDFromContext(ctx) if err != nil { return nil, connect.NewError(connect.CodeUnauthenticated, err) @@ -283,7 +286,7 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push d.metrics.receivedSamples.WithLabelValues(profName, tenantID).Observe(float64(len(p.Sample))) totalPushUncompressedBytes += int64(decompressedSize) - if err := validation.ValidateProfile(d.limits, tenantID, p.Profile, decompressedSize, series.Labels); err != nil { + if err := validation.ValidateProfile(d.limits, tenantID, p.Profile, decompressedSize, series.Labels, now); err != nil { // todo this actually discards more if multiple Samples in a Series request validation.DiscardedProfiles.WithLabelValues(string(validation.ReasonOf(err)), tenantID).Add(float64(totalProfiles)) validation.DiscardedBytes.WithLabelValues(string(validation.ReasonOf(err)), tenantID).Add(float64(totalPushUncompressedBytes)) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 3893f24ec3..8d6ac09eff 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -248,8 +248,6 @@ func (i *Ingester) Push(ctx context.Context, req *connect.Request[pushv1.PushReq validation.DiscardedProfiles.WithLabelValues(string(reason), instance.tenantID).Add(float64(1)) validation.DiscardedBytes.WithLabelValues(string(reason), instance.tenantID).Add(float64(size)) switch validation.ReasonOf(err) { - case validation.OutOfOrder: - return connect.NewError(connect.CodeInvalidArgument, err) case validation.SeriesLimit: return connect.NewError(connect.CodeResourceExhausted, err) } diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index f3688ff96a..3802f0dac9 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -41,8 +41,7 @@ type limiter struct { replicationFactor int tenantID string - activeSeries map[model.Fingerprint]int64 - lastTimestamp map[model.Fingerprint]int64 + activeSeries map[model.Fingerprint]int64 mtx sync.Mutex // todo: may be shard the lock to avoid latency spikes. @@ -60,7 +59,6 @@ func NewLimiter(tenantID string, limits Limits, ring RingCount, replicationFacto ring: ring, replicationFactor: replicationFactor, activeSeries: map[model.Fingerprint]int64{}, - lastTimestamp: map[model.Fingerprint]int64{}, cancel: cancel, ctx: ctx, } @@ -108,26 +106,9 @@ func (l *limiter) cleanup() { func (l *limiter) AllowProfile(fp model.Fingerprint, lbs phlaremodel.Labels, tsNano int64) error { l.mtx.Lock() defer l.mtx.Unlock() - if err := l.allowNewProfile(fp, lbs, tsNano); err != nil { - return err - } return l.allowNewSeries(fp) } -func (l *limiter) allowNewProfile(fp model.Fingerprint, lbs phlaremodel.Labels, tsNano int64) error { - max, ok := l.lastTimestamp[fp] - if ok { - // profile is before the last timestamp - if tsNano < max { - return validation.NewErrorf(validation.OutOfOrder, "profile for series %s out of order (received %s last %s)", phlaremodel.LabelPairsString(lbs), time.Unix(0, tsNano), time.Unix(0, max)) - } - } - - // set the last timestamp - l.lastTimestamp[fp] = tsNano - return nil -} - func (l *limiter) allowNewSeries(fp model.Fingerprint) error { _, ok := l.activeSeries[fp] series := len(l.activeSeries) diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index 36ed6f310b..f27f5070de 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -39,22 +39,6 @@ func (f *fakeRingCount) HealthyInstancesCount() int { return f.healthyInstancesCount } -func TestOutOfOrder(t *testing.T) { - limiter := NewLimiter("foo", &fakeLimits{}, &fakeRingCount{1}, 1) - defer limiter.Stop() - - // First push should be allowed. - err := limiter.AllowProfile(1, phlaremodel.LabelsFromStrings("foo", "bar"), 5) - require.NoError(t, err) - - // different stream should be allowed. - err = limiter.AllowProfile(2, phlaremodel.LabelsFromStrings("foo", "baz"), 1) - require.NoError(t, err) - - err = limiter.AllowProfile(1, phlaremodel.LabelsFromStrings("foo", "baz"), 1) - require.Error(t, err) -} - func TestGlobalMaxSeries(t *testing.T) { // 5 series per user, 2 ingesters, replication factor 3. // We should be able to push 7.5 series. (5 / 2 * 3 = 7.5) diff --git a/pkg/phlaredb/profile_test.go b/pkg/phlaredb/profile_test.go index 222010a67d..19f309f7a3 100644 --- a/pkg/phlaredb/profile_test.go +++ b/pkg/phlaredb/profile_test.go @@ -18,6 +18,7 @@ import ( phlaremodel "github.com/grafana/pyroscope/pkg/model" v1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index" + "github.com/grafana/pyroscope/pkg/pprof/testhelper" ) func TestIndex(t *testing.T) { @@ -245,3 +246,36 @@ func Test_rowRangesIter(t *testing.T) { }) } } + +func TestProfileIndex_Add_OutOfOrder(t *testing.T) { + head := newTestHead(t) + ctx := context.Background() + + for idx, ts := range []int64{100, 80, 20, 50, 110} { + p := testhelper.NewProfileBuilder(ts*1e9). + CPUProfile(). + WithLabels( + "job", "a", + ).ForStacktraceString("foo", "bar", "baz", fmt.Sprintf("iteration%d", idx)).AddSamples(1) + + require.NoError(t, head.Ingest(ctx, p.Profile, uuid.New())) + } + + index := head.profiles.index + + // check that the profiles are in the correct order + var tsOrder []int64 + + require.Len(t, index.profilesPerFP, 1) + for _, profiles := range index.profilesPerFP { + for _, p := range profiles.profiles { + tsOrder = append(tsOrder, p.TimeNanos/1e9) + } + + // check if min/max time is correct + require.Equal(t, int64(20e9), profiles.minTime) + require.Equal(t, int64(110e9), profiles.maxTime) + } + require.Equal(t, []int64{20, 50, 80, 100, 110}, tsOrder) + +} diff --git a/pkg/phlaredb/profiles.go b/pkg/phlaredb/profiles.go index a242636826..4aba669d2b 100644 --- a/pkg/phlaredb/profiles.go +++ b/pkg/phlaredb/profiles.go @@ -181,12 +181,27 @@ func (pi *profilesIndex) Add(ps *schemav1.InMemoryProfile, lbs phlaremodel.Label pi.metrics.seriesCreated.WithLabelValues(profileName).Inc() } - profiles.profiles = append(profiles.profiles, ps) - if ps.TimeNanos < profiles.minTime { - profiles.minTime = ps.TimeNanos - } + // profile is latest in this series, use a shortcut if ps.TimeNanos > profiles.maxTime { + // update max timeNanos profiles.maxTime = ps.TimeNanos + + // add profile to in memory slice + profiles.profiles = append(profiles.profiles, ps) + } else { + // use binary search to find position + i := sort.Search(len(profiles.profiles), func(i int) bool { + return profiles.profiles[i].TimeNanos > ps.TimeNanos + }) + + // insert into slice at correct position + profiles.profiles = append(profiles.profiles, &schemav1.InMemoryProfile{}) + copy(profiles.profiles[i+1:], profiles.profiles[i:]) + profiles.profiles[i] = ps + } + + if ps.TimeNanos < profiles.minTime { + profiles.minTime = ps.TimeNanos } pi.metrics.profiles.Set(float64(pi.totalProfiles.Inc())) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 73adbf51c3..92170cec2c 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -52,6 +52,10 @@ type Limits struct { // Query frontend. QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` + + // Ensure profiles are dated within the IngestionWindow of the distributor. + RejectOlderThan model.Duration `yaml:"reject_older_than" json:"reject_older_than"` + RejectNewerThan model.Duration `yaml:"reject_newer_than" json:"reject_newer_than"` } // LimitError are errors that do not comply with the limits specified. @@ -93,6 +97,13 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxProfileStacktraceSampleLabels, "validation.max-profile-stacktrace-sample-labels", 100, "Maximum number of labels in a profile sample. 0 to disable.") f.IntVar(&l.MaxProfileStacktraceDepth, "validation.max-profile-stacktrace-depth", 1000, "Maximum depth of a profile stacktrace. Profiles are not rejected instead stacktraces are truncated. 0 to disable.") f.IntVar(&l.MaxProfileSymbolValueLength, "validation.max-profile-symbol-value-length", 65535, "Maximum length of a profile symbol value (labels, function names and filenames, etc...). Profiles are not rejected instead symbol values are truncated. 0 to disable.") + + _ = l.RejectNewerThan.Set("10m") + f.Var(&l.RejectNewerThan, "validation.reject-newer-than", "This limits how far into the future profiling data can be ingested. This limit is enforced in the distributor. 0 to disable, defaults to 10m.") + + _ = l.RejectOlderThan.Set("1h") + f.Var(&l.RejectOlderThan, "validation.reject-older-than", "This limits how far into the past profiling data can be ingested. This limit is enforced in the distributor. 0 to disable, defaults to 1h.") + } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -262,6 +273,16 @@ func (o *Overrides) QuerySplitDuration(tenantID string) time.Duration { // 0 means no limit. Currently disabled. func (o *Overrides) MaxQueriersPerTenant(tenant string) int { return 0 } +// RejectNewerThan will ensure that profiles are further than the return value into the future are reject. +func (o *Overrides) RejectNewerThan(tenantID string) time.Duration { + return time.Duration(o.getOverridesForTenant(tenantID).RejectNewerThan) +} + +// RejectOlderThan will ensure that profiles that are older than the return value are rejected. +func (o *Overrides) RejectOlderThan(tenantID string) time.Duration { + return time.Duration(o.getOverridesForTenant(tenantID).RejectOlderThan) +} + func (o *Overrides) DefaultLimits() *Limits { return o.defaultLimits } diff --git a/pkg/validation/limits_mock.go b/pkg/validation/limits_mock.go index 3c62233390..5e1247a7e5 100644 --- a/pkg/validation/limits_mock.go +++ b/pkg/validation/limits_mock.go @@ -45,6 +45,8 @@ func MockOverrides(customize func(defaults *Limits, tenantLimits map[string]*Lim func MockDefaultLimits() *Limits { defaults := Limits{} flagext.DefaultValues(&defaults) + defaults.RejectNewerThan = 0 + defaults.RejectOlderThan = 0 return &defaults } diff --git a/pkg/validation/testutil.go b/pkg/validation/testutil.go index 3c7186045b..495fdc0b8b 100644 --- a/pkg/validation/testutil.go +++ b/pkg/validation/testutil.go @@ -11,6 +11,9 @@ type MockLimits struct { MaxLabelValueLengthValue int MaxLabelNamesPerSeriesValue int + RejectOlderThanValue time.Duration + RejectNewerThanValue time.Duration + MaxProfileSizeBytesValue int MaxProfileStacktraceSamplesValue int MaxProfileStacktraceDepthValue int @@ -41,3 +44,11 @@ func (m MockLimits) MaxProfileStacktraceSampleLabels(userID string) int { func (m MockLimits) MaxProfileSymbolValueLength(userID string) int { return m.MaxProfileSymbolValueLengthValue } + +func (m MockLimits) RejectOlderThan(userID string) time.Duration { + return m.RejectOlderThanValue +} + +func (m MockLimits) RejectNewerThan(userID string) time.Duration { + return m.RejectNewerThanValue +} diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index 69e91dfd23..38e7ff130a 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -30,9 +30,11 @@ const ( MissingLabels Reason = "missing_labels" // RateLimited is one of the values for the reason to discard samples. RateLimited Reason = "rate_limited" - // OutOfOrder is a reason for discarding profiles when Pyroscope doesn't accept out - // of order profiles. - OutOfOrder Reason = "out_of_order" + + // NotInIngestionWindow is a reason for discarding profiles when Pyroscope doesn't accept profiles + // that are outside of the ingestion window. + NotInIngestionWindow Reason = "not_in_ingestion_window" + // MaxLabelNamesPerSeries is a reason for discarding a request which has too many label names MaxLabelNamesPerSeries Reason = "max_label_names_per_series" // LabelNameTooLong is a reason for discarding a request which has a label name too long @@ -61,6 +63,7 @@ const ( ProfileTooBigErrorMsg = "the profile with labels '%s' exceeds the size limit (max_profile_size_byte, actual: %d, limit: %d)" ProfileTooManySamplesErrorMsg = "the profile with labels '%s' exceeds the samples count limit (max_profile_stacktrace_samples, actual: %d, limit: %d)" ProfileTooManySampleLabelsErrorMsg = "the profile with labels '%s' exceeds the sample labels limit (max_profile_stacktrace_sample_labels, actual: %d, limit: %d)" + NotInIngestionWindowErrorMsg = "profile with labels '%s' is outside of ingestion window (profile timestamp: %s, %s)" ) var ( @@ -86,19 +89,19 @@ var ( ) type LabelValidationLimits interface { - MaxLabelNameLength(userID string) int - MaxLabelValueLength(userID string) int - MaxLabelNamesPerSeries(userID string) int + MaxLabelNameLength(tenantID string) int + MaxLabelValueLength(tenantID string) int + MaxLabelNamesPerSeries(tenantID string) int } // ValidateLabels validates the labels of a profile. -func ValidateLabels(limits LabelValidationLimits, userID string, ls []*typesv1.LabelPair) error { +func ValidateLabels(limits LabelValidationLimits, tenantID string, ls []*typesv1.LabelPair) error { if len(ls) == 0 { return NewErrorf(MissingLabels, MissingLabelsErrorMsg) } sort.Sort(phlaremodel.Labels(ls)) numLabelNames := len(ls) - maxLabels := limits.MaxLabelNamesPerSeries(userID) + maxLabels := limits.MaxLabelNamesPerSeries(tenantID) if numLabelNames > maxLabels { return NewErrorf(MaxLabelNamesPerSeries, MaxLabelNamesPerSeriesErrorMsg, phlaremodel.LabelPairsString(ls), numLabelNames, maxLabels) } @@ -113,9 +116,9 @@ func ValidateLabels(limits LabelValidationLimits, userID string, ls []*typesv1.L lastLabelName := "" for _, l := range ls { - if len(l.Name) > limits.MaxLabelNameLength(userID) { + if len(l.Name) > limits.MaxLabelNameLength(tenantID) { return NewErrorf(LabelNameTooLong, LabelNameTooLongErrorMsg, phlaremodel.LabelPairsString(ls), l.Name) - } else if len(l.Value) > limits.MaxLabelValueLength(userID) { + } else if len(l.Value) > limits.MaxLabelValueLength(tenantID) { return NewErrorf(LabelValueTooLong, LabelValueTooLongErrorMsg, phlaremodel.LabelPairsString(ls), l.Value) } else if !model.LabelName(l.Name).IsValid() { return NewErrorf(InvalidLabels, InvalidLabelsErrorMsg, phlaremodel.LabelPairsString(ls), "invalid label name '"+l.Name+"'") @@ -131,27 +134,69 @@ func ValidateLabels(limits LabelValidationLimits, userID string, ls []*typesv1.L } type ProfileValidationLimits interface { - MaxProfileSizeBytes(userID string) int - MaxProfileStacktraceSamples(userID string) int - MaxProfileStacktraceSampleLabels(userID string) int - MaxProfileStacktraceDepth(userID string) int - MaxProfileSymbolValueLength(userID string) int + MaxProfileSizeBytes(tenantID string) int + MaxProfileStacktraceSamples(tenantID string) int + MaxProfileStacktraceSampleLabels(tenantID string) int + MaxProfileStacktraceDepth(tenantID string) int + MaxProfileSymbolValueLength(tenantID string) int + RejectNewerThan(tenantID string) time.Duration + RejectOlderThan(tenantID string) time.Duration +} + +type ingestionWindow struct { + from, to model.Time } -func ValidateProfile(limits ProfileValidationLimits, userID string, prof *googlev1.Profile, uncompressedSize int, ls phlaremodel.Labels) error { +func newIngestionWindow(limits ProfileValidationLimits, tenantID string, now model.Time) *ingestionWindow { + var iw ingestionWindow + if d := limits.RejectNewerThan(tenantID); d != 0 { + iw.to = now.Add(d) + } + if d := limits.RejectOlderThan(tenantID); d != 0 { + iw.from = now.Add(-d) + } + return &iw +} + +func (iw *ingestionWindow) errorDetail() string { + if iw.to == 0 { + return fmt.Sprintf("the ingestion window starts at %s", util.FormatTimeMillis(int64(iw.from))) + } + if iw.from == 0 { + return fmt.Sprintf("the ingestion window ends at %s", util.FormatTimeMillis(int64(iw.to))) + } + return fmt.Sprintf("the ingestion window starts at %s and ends at %s", util.FormatTimeMillis(int64(iw.from)), util.FormatTimeMillis(int64(iw.to))) + +} + +func (iw *ingestionWindow) valid(t model.Time, ls phlaremodel.Labels) error { + if (iw.from == 0 || t.After(iw.from)) && (iw.to == 0 || t.Before(iw.to)) { + return nil + } + + return NewErrorf(NotInIngestionWindow, NotInIngestionWindowErrorMsg, phlaremodel.LabelPairsString(ls), util.FormatTimeMillis(int64(t)), iw.errorDetail()) +} + +func ValidateProfile(limits ProfileValidationLimits, tenantID string, prof *googlev1.Profile, uncompressedSize int, ls phlaremodel.Labels, now model.Time) error { if prof == nil { return nil } - if limit := limits.MaxProfileSizeBytes(userID); limit != 0 && uncompressedSize > limit { + + // check profile timestamp within ingestion window + if err := newIngestionWindow(limits, tenantID, now).valid(model.TimeFromUnixNano(prof.TimeNanos), ls); err != nil { + return err + } + + if limit := limits.MaxProfileSizeBytes(tenantID); limit != 0 && uncompressedSize > limit { return NewErrorf(ProfileSizeLimit, ProfileTooBigErrorMsg, phlaremodel.LabelPairsString(ls), uncompressedSize, limit) } - if limit, size := limits.MaxProfileStacktraceSamples(userID), len(prof.Sample); limit != 0 && size > limit { + if limit, size := limits.MaxProfileStacktraceSamples(tenantID), len(prof.Sample); limit != 0 && size > limit { return NewErrorf(SamplesLimit, ProfileTooManySamplesErrorMsg, phlaremodel.LabelPairsString(ls), size, limit) } var ( - depthLimit = limits.MaxProfileStacktraceDepth(userID) - labelsLimit = limits.MaxProfileStacktraceSampleLabels(userID) - symbolLengthLimit = limits.MaxProfileSymbolValueLength(userID) + depthLimit = limits.MaxProfileStacktraceDepth(tenantID) + labelsLimit = limits.MaxProfileStacktraceSampleLabels(tenantID) + symbolLengthLimit = limits.MaxProfileSymbolValueLength(tenantID) ) for _, s := range prof.Sample { if depthLimit != 0 && len(s.LocationId) > depthLimit { diff --git a/pkg/validation/validate_test.go b/pkg/validation/validate_test.go index a9e2ee9cba..10da80f3d0 100644 --- a/pkg/validation/validate_test.go +++ b/pkg/validation/validate_test.go @@ -201,6 +201,8 @@ func Test_ValidateRangeRequest(t *testing.T) { } func TestValidateProfile(t *testing.T) { + now := model.TimeFromUnixNano(1_676_635_994_000_000_000) + for _, tc := range []struct { name string profile *googlev1.Profile @@ -277,10 +279,46 @@ func TestValidateProfile(t *testing.T) { require.Equal(t, []uint64{4, 5}, profile.Sample[0].LocationId) }, }, + { + name: "newer than ingestion window", + profile: &googlev1.Profile{ + TimeNanos: now.Add(1 * time.Hour).UnixNano(), + }, + limits: MockLimits{ + RejectNewerThanValue: 10 * time.Minute, + }, + expectedErr: &Error{ + Reason: NotInIngestionWindow, + msg: "profile with labels '{foo=\"bar\"}' is outside of ingestion window (profile timestamp: 2023-02-17 13:13:14 +0000 UTC, the ingestion window ends at 2023-02-17 12:23:14 +0000 UTC)", + }, + }, + { + name: "older than ingestion window", + profile: &googlev1.Profile{ + TimeNanos: now.Add(-61 * time.Minute).UnixNano(), + }, + limits: MockLimits{ + RejectOlderThanValue: time.Hour, + }, + expectedErr: &Error{ + Reason: NotInIngestionWindow, + msg: "profile with labels '{foo=\"bar\"}' is outside of ingestion window (profile timestamp: 2023-02-17 11:12:14 +0000 UTC, the ingestion window starts at 2023-02-17 11:13:14 +0000 UTC)", + }, + }, + { + name: "just in the ingestion window", + profile: &googlev1.Profile{ + TimeNanos: now.Add(-1 * time.Minute).UnixNano(), + }, + limits: MockLimits{ + RejectOlderThanValue: time.Hour, + RejectNewerThanValue: 10 * time.Minute, + }, + }, } { tc := tc t.Run(tc.name, func(t *testing.T) { - err := ValidateProfile(tc.limits, "foo", tc.profile, tc.size, phlaremodel.LabelsFromStrings("foo", "bar")) + err := ValidateProfile(tc.limits, "foo", tc.profile, tc.size, phlaremodel.LabelsFromStrings("foo", "bar"), now) if tc.expectedErr != nil { require.Error(t, err) require.Equal(t, tc.expectedErr, err)