Skip to content

Commit 6aa159e

Browse files
authored
chore: Recording rules overrides (#3973)
* Recording rules overrides * use settingsv1.RecordingRule for limits config * tests * moving flags to v2 scope
1 parent cc28636 commit 6aa159e

File tree

8 files changed

+249
-123
lines changed

8 files changed

+249
-123
lines changed

pkg/experiment/metrics/ruler.go

Lines changed: 14 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
package metrics
22

33
import (
4-
"encoding/json"
5-
"fmt"
6-
"os"
74
"sync"
85
"time"
96

107
"github.com/go-kit/log"
118
"github.com/go-kit/log/level"
129

13-
settingsv1 "github.com/grafana/pyroscope/api/gen/proto/go/settings/v1"
1410
"github.com/grafana/pyroscope/pkg/model"
11+
"github.com/grafana/pyroscope/pkg/validation"
1512
)
1613

1714
const (
@@ -20,39 +17,24 @@ const (
2017
)
2118

2219
type StaticRuler struct {
23-
rules map[string][]*model.RecordingRule
24-
logger log.Logger
20+
overrides *validation.Overrides
2521
}
2622

27-
func NewStaticRulerFromEnvVars(logger log.Logger) (Ruler, error) {
28-
jsonRules := os.Getenv(envVarRecordingRules)
29-
30-
var rulesByTenant map[string][]*settingsv1.RecordingRule
31-
if err := json.Unmarshal([]byte(jsonRules), &rulesByTenant); err != nil {
32-
return nil, fmt.Errorf("failed to unmarshal recording rules: %w", err)
33-
}
34-
35-
ruler := &StaticRuler{
36-
rules: make(map[string][]*model.RecordingRule, len(rulesByTenant)),
37-
logger: logger,
23+
func NewStaticRulerFromOverrides(overrides *validation.Overrides) Ruler {
24+
return &StaticRuler{
25+
overrides: overrides,
3826
}
39-
for tenant, rules := range rulesByTenant {
40-
rs := make([]*model.RecordingRule, 0, len(rules))
41-
for _, rule := range rules {
42-
r, err := model.NewRecordingRule(rule)
43-
if err == nil {
44-
rs = append(rs, r)
45-
} else {
46-
level.Error(logger).Log("msg", "failed to parse recording rule", "rule", rule, "err", err)
47-
}
48-
}
49-
ruler.rules[tenant] = rs
50-
}
51-
return ruler, nil
5227
}
5328

54-
func (r StaticRuler) RecordingRules(tenant string) []*model.RecordingRule {
55-
return r.rules[tenant]
29+
func (ruler StaticRuler) RecordingRules(tenant string) []*model.RecordingRule {
30+
rules := ruler.overrides.RecordingRules(tenant)
31+
rs := make([]*model.RecordingRule, 0, len(rules))
32+
for _, rule := range rules {
33+
// should never fail, overrides already validated
34+
r, _ := model.NewRecordingRule(rule)
35+
rs = append(rs, r)
36+
}
37+
return rs
5638
}
5739

5840
// CachedRemoteRuler is a thread-safe ruler that retrieves rules from an external service.

pkg/experiment/metrics/ruler_test.go

Lines changed: 52 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,95 +1,73 @@
11
package metrics
22

33
import (
4-
"bytes"
5-
"os"
6-
"strings"
74
"testing"
85

9-
"github.com/go-kit/log"
106
"github.com/prometheus/prometheus/model/labels"
117
"github.com/stretchr/testify/assert"
128

9+
settingsv1 "github.com/grafana/pyroscope/api/gen/proto/go/settings/v1"
10+
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
1311
"github.com/grafana/pyroscope/pkg/model"
12+
"github.com/grafana/pyroscope/pkg/validation"
1413
)
1514

16-
func Test_Ruler_missconfigured(t *testing.T) {
17-
_, err := NewStaticRulerFromEnvVars(log.NewNopLogger())
18-
assert.EqualError(t, err, "failed to unmarshal recording rules: unexpected end of JSON input", "Empty env var should result in error at creating ruler")
19-
}
20-
21-
func Test_Ruler_happyPath(t *testing.T) {
22-
jsonRecordingRules :=
23-
`{
24-
"tenant1": [{
25-
"metric_name": "metric1",
26-
"matchers": ["{__profile_type__=\"profile_type\", pod=\"foo\"}"],
27-
"group_by": ["bar"],
28-
"external_labels": [{"name":"__name__", "value":"metric1"}]
29-
}],
30-
"tenant2": [{
31-
"metric_name": "metric2",
32-
"matchers": ["{__profile_type__=\"profile_type\"}"],
33-
"group_by": [],
34-
"external_labels": [{"name":"__name__", "value":"should be ignored"}]
35-
},
36-
{
37-
"metric_name": "no_profile_type",
38-
"matchers": ["{}"],
39-
"group_by": [],
40-
"external_labels": []
41-
},
42-
{
43-
"metric_name": "Wrong metric name",
44-
"matchers": ["{__profile_type__=\"profile_type\"}"],
45-
"group_by": [],
46-
"external_labels": []
47-
},
48-
{
49-
"metric_name": "wrong_matcher",
50-
"matchers": ["{foo==\"bar\"}"],
51-
"group_by": [],
52-
"external_labels": []
53-
}]
54-
}`
55-
_ = os.Setenv(envVarRecordingRules, jsonRecordingRules)
56-
loggerBuffer := &bytes.Buffer{}
57-
logger := log.NewJSONLogger(loggerBuffer)
58-
ruler, err := NewStaticRulerFromEnvVars(logger)
59-
assert.NoError(t, err)
15+
var (
16+
defaultRecordingRulesProto = []*settingsv1.RecordingRule{{
17+
MetricName: "default_recording_rule",
18+
Matchers: []string{"{__profile_type__=\"any-profile-type\"}"},
19+
}}
6020

61-
rules := ruler.RecordingRules("tenant1")
62-
assert.Equal(t, []*model.RecordingRule{{
63-
Matchers: []*labels.Matcher{{
64-
Type: labels.MatchEqual,
65-
Name: "__profile_type__",
66-
Value: "profile_type",
67-
}, {
68-
Type: labels.MatchEqual,
69-
Name: "pod",
70-
Value: "foo",
21+
defaultRecordingRules = []*model.RecordingRule{{
22+
ExternalLabels: labels.Labels{{
23+
Name: "__name__",
24+
Value: "default_recording_rule",
7125
}},
72-
GroupBy: []string{"bar"},
73-
ExternalLabels: labels.Labels{
74-
{Name: "__name__", Value: "metric1"},
75-
},
76-
}}, rules)
77-
78-
rules = ruler.RecordingRules("tenant2")
79-
assert.Equal(t, []*model.RecordingRule{{
8026
Matchers: []*labels.Matcher{{
8127
Type: labels.MatchEqual,
8228
Name: "__profile_type__",
83-
Value: "profile_type",
29+
Value: "any-profile-type",
8430
}},
85-
GroupBy: []string{},
31+
}}
32+
33+
overriddenRecordingRulesProto = []*settingsv1.RecordingRule{{
34+
MetricName: "rule",
35+
Matchers: []string{"{__profile_type__=\"any-profile-type\", matcher1!=\"value\"}"},
36+
GroupBy: []string{"group_by_label"},
37+
ExternalLabels: []*typesv1.LabelPair{{Name: "foo", Value: "bar"}},
38+
}}
39+
40+
overriddenRecordingRules = []*model.RecordingRule{{
41+
Matchers: []*labels.Matcher{
42+
{Type: labels.MatchEqual, Name: "__profile_type__", Value: "any-profile-type"},
43+
{Type: labels.MatchNotEqual, Name: "matcher1", Value: "value"},
44+
},
45+
GroupBy: []string{"group_by_label"},
8646
ExternalLabels: labels.Labels{
87-
{Name: "__name__", Value: "metric2"},
47+
{Name: "foo", Value: "bar"},
48+
{Name: "__name__", Value: "rule"},
8849
},
89-
}}, rules)
50+
}}
51+
)
52+
53+
func Test_Ruler_happyPath(t *testing.T) {
54+
overrides := newOverrides(t)
55+
56+
ruler := NewStaticRulerFromOverrides(overrides)
57+
58+
rules := ruler.RecordingRules("non-configured-tenant")
59+
assert.Equal(t, defaultRecordingRules, rules)
60+
61+
rules = ruler.RecordingRules("tenant-override")
62+
assert.Equal(t, overriddenRecordingRules, rules)
63+
}
9064

91-
loggedLines := strings.Split(loggerBuffer.String(), "\n")
92-
assert.True(t, strings.Contains(loggedLines[0], "no __profile_type__ matcher present"))
93-
assert.True(t, strings.Contains(loggedLines[1], "invalid metric name: Wrong metric name"))
94-
assert.True(t, strings.Contains(loggedLines[2], "failed to parse matchers: 1:6: parse error: unexpected \\\"=\\\" in label matching, expected string"))
65+
func newOverrides(t *testing.T) *validation.Overrides {
66+
t.Helper()
67+
return validation.MockOverrides(func(defaults *validation.Limits, tenantLimits map[string]*validation.Limits) {
68+
defaults.RecordingRules = defaultRecordingRulesProto
69+
l := validation.MockDefaultLimits()
70+
l.RecordingRules = overriddenRecordingRulesProto
71+
tenantLimits["tenant-override"] = l
72+
})
9573
}

pkg/phlare/modules_experimental.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,11 @@ func (f *Phlare) initCompactionWorker() (svc services.Service, err error) {
105105
if f.Cfg.CompactionWorker.MetricsExporter.Enabled {
106106
if f.recordingRulesClient != nil {
107107
ruler, err = metrics.NewCachedRemoteRuler(f.recordingRulesClient, f.logger)
108+
if err != nil {
109+
return nil, err
110+
}
108111
} else {
109-
ruler, err = metrics.NewStaticRulerFromEnvVars(f.logger)
110-
}
111-
if err != nil {
112-
return nil, err
112+
ruler = metrics.NewStaticRulerFromOverrides(f.Overrides)
113113
}
114114

115115
exporter, err = metrics.NewStaticExporterFromEnvVars(f.logger, f.reg)

pkg/phlare/phlare.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -225,14 +225,15 @@ func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) {
225225
c.v2Experiment = os.Getenv("PYROSCOPE_V2_EXPERIMENT") != ""
226226
if c.v2Experiment {
227227
for k, v := range map[string]string{
228-
"server.grpc-max-recv-msg-size-bytes": "104857600",
229-
"server.grpc-max-send-msg-size-bytes": "104857600",
230-
"server.grpc.keepalive.min-time-between-pings": "1s",
231-
"segment-writer.grpc-client-config.connect-timeout": "1s",
232-
"segment-writer.num-tokens": "4",
233-
"segment-writer.heartbeat-timeout": "1m",
234-
"segment-writer.unregister-on-shutdown": "false",
235-
"segment-writer.min-ready-duration": "30s",
228+
"server.grpc-max-recv-msg-size-bytes": "104857600",
229+
"server.grpc-max-send-msg-size-bytes": "104857600",
230+
"server.grpc.keepalive.min-time-between-pings": "1s",
231+
"segment-writer.grpc-client-config.connect-timeout": "1s",
232+
"segment-writer.num-tokens": "4",
233+
"segment-writer.heartbeat-timeout": "1m",
234+
"segment-writer.unregister-on-shutdown": "false",
235+
"segment-writer.min-ready-duration": "30s",
236+
"compaction-worker.metrics-exporter.rules-source.static": "[]",
236237
} {
237238
overrides[k] = v
238239
}
@@ -245,6 +246,7 @@ func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) {
245246
c.LimitsConfig.WritePathOverrides.RegisterFlags(throwaway)
246247
c.LimitsConfig.ReadPathOverrides.RegisterFlags(throwaway)
247248
c.LimitsConfig.AdaptivePlacementLimits.RegisterFlags(throwaway)
249+
c.LimitsConfig.RecordingRules.RegisterFlags(throwaway)
248250
}
249251

250252
throwaway.VisitAll(func(f *flag.Flag) {

pkg/validation/limits.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
writepath "github.com/grafana/pyroscope/pkg/distributor/write_path"
1515
"github.com/grafana/pyroscope/pkg/experiment/distributor/placement/adaptive_placement"
1616
readpath "github.com/grafana/pyroscope/pkg/frontend/read_path"
17+
phlaremodel "github.com/grafana/pyroscope/pkg/model"
1718
"github.com/grafana/pyroscope/pkg/phlaredb/block"
1819
)
1920

@@ -113,6 +114,10 @@ type Limits struct {
113114
// Distributors use these limits to determine how many shards to allocate
114115
// to a tenant dataset by default, if no placement rules defined.
115116
AdaptivePlacementLimits adaptive_placement.PlacementLimits `yaml:",inline" json:",inline"`
117+
118+
// RecordingRules allow to specify static recording rules. This is not compatible with recording rules
119+
// coming from a RecordingRulesClient, that will replace any static rules defined.
120+
RecordingRules RecordingRules `yaml:"recording_rules" json:"recording_rules" category:"experimental" doc:"hidden"`
116121
}
117122

118123
// LimitError are errors that do not comply with the limits specified.
@@ -209,13 +214,19 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error {
209214

210215
// Validate validates that this limits config is valid.
211216
func (l *Limits) Validate() error {
212-
213217
if l.IngestionRelabelingDefaultRulesPosition != "" {
214218
if err := l.IngestionRelabelingDefaultRulesPosition.Set(string(l.IngestionRelabelingDefaultRulesPosition)); err != nil {
215219
return err
216220
}
217221
}
218222

223+
for idx, rule := range l.RecordingRules {
224+
_, err := phlaremodel.NewRecordingRule(rule)
225+
if err != nil {
226+
return fmt.Errorf("rule at pos %d is not valid: %v", idx, err)
227+
}
228+
}
229+
219230
return nil
220231
}
221232

pkg/validation/limits_test.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ shard_streams:
7575
blocked_queries:
7676
- pattern: ".*foo.*"
7777
regex: true
78+
recording_rules:
79+
- metric_name: 'any-name'
80+
matchers: ['any-matcher']
81+
group_by: ['any-group-by']
82+
external_labels:
83+
- name: 'any-label-name'
84+
value: 'any-label-value'
7885
`
7986
inputJSON := `
8087
{
@@ -120,10 +127,23 @@ blocked_queries:
120127
"logging_enabled": true
121128
},
122129
"blocked_queries": [
123-
{
124-
"pattern": ".*foo.*",
125-
"regex": true
126-
}
130+
{
131+
"pattern": ".*foo.*",
132+
"regex": true
133+
}
134+
],
135+
"recording_rules": [
136+
{
137+
"metric_name": "any-name",
138+
"matchers": ["any-matcher"],
139+
"group_by": ["any-group-by"],
140+
"external_labels": [
141+
{
142+
"name" : "any-label-name",
143+
"value" : "any-label-value"
144+
}
145+
]
146+
}
127147
]
128148
}
129149
`

0 commit comments

Comments
 (0)