Skip to content

Commit b2e42a8

Browse files
authored
feat: tenant settings ruler (#3945)
* feat: tenant settings ruler * doc * use rlock for cache * tenant settings client usage * config opt * flags refactor * intermediate config object rules_source
1 parent a941d1c commit b2e42a8

File tree

8 files changed

+232
-35
lines changed

8 files changed

+232
-35
lines changed

pkg/experiment/compactor/compaction_worker.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ type Worker struct {
5757
}
5858

5959
type Config struct {
60-
JobConcurrency int `yaml:"job_capacity"`
61-
JobPollInterval time.Duration `yaml:"job_poll_interval"`
62-
SmallObjectSize int `yaml:"small_object_size_bytes"`
63-
TempDir string `yaml:"temp_dir"`
64-
RequestTimeout time.Duration `yaml:"request_timeout"`
65-
MetricsExporterEnabled bool `yaml:"metrics_exporter_enabled"`
60+
JobConcurrency int `yaml:"job_capacity"`
61+
JobPollInterval time.Duration `yaml:"job_poll_interval"`
62+
SmallObjectSize int `yaml:"small_object_size_bytes"`
63+
TempDir string `yaml:"temp_dir"`
64+
RequestTimeout time.Duration `yaml:"request_timeout"`
65+
MetricsExporterConfig metrics.Config `yaml:"metrics_exporter_config"`
6666
}
6767

6868
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
@@ -72,7 +72,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
7272
f.DurationVar(&cfg.RequestTimeout, prefix+"request-timeout", 5*time.Second, "Job request timeout.")
7373
f.IntVar(&cfg.SmallObjectSize, prefix+"small-object-size-bytes", 8<<20, "Size of the object that can be loaded in memory.")
7474
f.StringVar(&cfg.TempDir, prefix+"temp-dir", os.TempDir(), "Temporary directory for compaction jobs.")
75-
f.BoolVar(&cfg.MetricsExporterEnabled, prefix+"metrics-exporter.enabled", false, "This parameter specifies whether the metrics exporter is enabled.")
75+
cfg.MetricsExporterConfig.RegisterFlags(f)
7676
}
7777

7878
type compactionJob struct {
@@ -486,7 +486,7 @@ func (w *Worker) runCompaction(job *compactionJob) {
486486
}
487487

488488
func (w *Worker) buildSampleObserver(md *metastorev1.BlockMeta) *metrics.SampleObserver {
489-
if !w.config.MetricsExporterEnabled || md.CompactionLevel > 0 {
489+
if !w.config.MetricsExporterConfig.Enabled || md.CompactionLevel > 0 {
490490
return nil
491491
}
492492
recordingTime := int64(ulid.MustParse(md.Id).Time())

pkg/experiment/metrics/config.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package metrics
2+
3+
import (
4+
"flag"
5+
)
6+
7+
type Config struct {
8+
Enabled bool `yaml:"enabled"`
9+
RulesSource struct {
10+
ClientAddress string `yaml:"client_address"`
11+
} `yaml:"rules_source"`
12+
}
13+
14+
func (c *Config) Validate() error {
15+
return nil
16+
}
17+
18+
func (c *Config) RegisterFlags(f *flag.FlagSet) {
19+
const prefix = "compaction-worker.metrics-exporter."
20+
21+
f.BoolVar(&c.Enabled, prefix+"enabled", false, "This parameter specifies whether the metrics exporter is enabled.")
22+
f.StringVar(&c.RulesSource.ClientAddress, prefix+"rules-source.client-address", "", "The address to use for the recording rules client connection.")
23+
}

pkg/experiment/metrics/ruler.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"encoding/json"
55
"fmt"
66
"os"
7+
"sync"
8+
"time"
79

810
"github.com/go-kit/log"
911
"github.com/go-kit/log/level"
@@ -14,6 +16,7 @@ import (
1416

1517
const (
1618
envVarRecordingRules = "PYROSCOPE_RECORDING_RULES"
19+
rulesExpiryTime = time.Minute
1720
)
1821

1922
type StaticRuler struct {
@@ -51,3 +54,92 @@ func NewStaticRulerFromEnvVars(logger log.Logger) (Ruler, error) {
5154
func (r StaticRuler) RecordingRules(tenant string) []*model.RecordingRule {
5255
return r.rules[tenant]
5356
}
57+
58+
// CachedRemoteRuler is a thread-safe ruler that retrieves rules from an external service.
59+
// It has a per-tenant cache: rulesPerTenant
60+
type CachedRemoteRuler struct {
61+
rulesPerTenant map[string]*tenantCache
62+
mu sync.RWMutex
63+
64+
client RecordingRulesClient
65+
66+
logger log.Logger
67+
}
68+
69+
type RecordingRulesClient interface {
70+
RecordingRules(tenant string) ([]*model.RecordingRule, error)
71+
}
72+
73+
func NewCachedRemoteRuler(client RecordingRulesClient, logger log.Logger) (Ruler, error) {
74+
return &CachedRemoteRuler{
75+
rulesPerTenant: make(map[string]*tenantCache),
76+
client: client,
77+
logger: logger,
78+
}, nil
79+
}
80+
81+
func (r *CachedRemoteRuler) RecordingRules(tenant string) []*model.RecordingRule {
82+
// get the per-tenant cache
83+
r.mu.RLock()
84+
cache, ok := r.rulesPerTenant[tenant]
85+
r.mu.RUnlock()
86+
87+
// There's no cache for given tenant: init it
88+
if !ok {
89+
r.mu.Lock()
90+
defer r.mu.Unlock()
91+
92+
// only race-winner will initialize the per-tenant cache
93+
cache, ok = r.rulesPerTenant[tenant]
94+
if !ok {
95+
cache = &tenantCache{
96+
initFunc: func() ([]*model.RecordingRule, error) {
97+
return r.client.RecordingRules(tenant)
98+
},
99+
logger: r.logger,
100+
}
101+
r.rulesPerTenant[tenant] = cache
102+
}
103+
}
104+
105+
// get data from cache:
106+
return cache.get()
107+
}
108+
109+
// tenantCache is a thread-safe cache that holds an expirable array of rules.
110+
type tenantCache struct {
111+
value []*model.RecordingRule
112+
ttl time.Time
113+
initFunc func() ([]*model.RecordingRule, error)
114+
mu sync.RWMutex
115+
logger log.Logger
116+
}
117+
118+
// get returns the stored value if present and not expired.
119+
// Otherwise, a single call to initFunc will be performed to retrieve the value and hold it for future calls within
120+
// the ttl.
121+
func (c *tenantCache) get() []*model.RecordingRule {
122+
c.mu.RLock()
123+
if c.value != nil && time.Now().Before(c.ttl) {
124+
defer c.mu.RUnlock()
125+
// value exists and didn't expired
126+
return c.value
127+
}
128+
c.mu.RUnlock()
129+
130+
c.mu.Lock()
131+
defer c.mu.Unlock()
132+
133+
// only race-winner will fetch the data
134+
if c.value == nil || time.Now().After(c.ttl) {
135+
value, err := c.initFunc()
136+
if err != nil {
137+
// keep old value and ttl, just log an error
138+
level.Error(c.logger).Log("msg", "failed to fetch recording rules", "err", err)
139+
} else {
140+
c.value = value
141+
c.ttl = time.Now().Add(rulesExpiryTime)
142+
}
143+
}
144+
return c.value
145+
}

pkg/phlare/modules.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,18 +92,19 @@ const (
9292

9393
// Experimental modules
9494

95-
Metastore string = "metastore"
96-
MetastoreClient string = "metastore-client"
97-
MetastoreAdmin string = "metastore-admin"
98-
SegmentWriter string = "segment-writer"
99-
SegmentWriterRing string = "segment-writer-ring"
100-
SegmentWriterClient string = "segment-writer-client"
101-
QueryBackend string = "query-backend"
102-
QueryBackendClient string = "query-backend-client"
103-
CompactionWorker string = "compaction-worker"
104-
PlacementAgent string = "placement-agent"
105-
PlacementManager string = "placement-manager"
106-
HealthServer string = "health-server"
95+
Metastore string = "metastore"
96+
MetastoreClient string = "metastore-client"
97+
MetastoreAdmin string = "metastore-admin"
98+
SegmentWriter string = "segment-writer"
99+
SegmentWriterRing string = "segment-writer-ring"
100+
SegmentWriterClient string = "segment-writer-client"
101+
QueryBackend string = "query-backend"
102+
QueryBackendClient string = "query-backend-client"
103+
CompactionWorker string = "compaction-worker"
104+
PlacementAgent string = "placement-agent"
105+
PlacementManager string = "placement-manager"
106+
HealthServer string = "health-server"
107+
RecordingRulesClient string = "recording-rules-client"
107108
)
108109

109110
var objectStoreTypeStats = usagestats.NewString("store_object_type")

pkg/phlare/modules_experimental.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/grafana/pyroscope/pkg/experiment/metrics"
2828
querybackend "github.com/grafana/pyroscope/pkg/experiment/query_backend"
2929
querybackendclient "github.com/grafana/pyroscope/pkg/experiment/query_backend/client"
30+
recordingrulesclient "github.com/grafana/pyroscope/pkg/settings/recording/client"
3031
"github.com/grafana/pyroscope/pkg/util"
3132
"github.com/grafana/pyroscope/pkg/util/health"
3233
)
@@ -101,11 +102,16 @@ func (f *Phlare) initCompactionWorker() (svc services.Service, err error) {
101102

102103
var ruler metrics.Ruler
103104
var exporter metrics.Exporter
104-
if f.Cfg.CompactionWorker.MetricsExporterEnabled {
105-
ruler, err = metrics.NewStaticRulerFromEnvVars(f.logger)
105+
if f.Cfg.CompactionWorker.MetricsExporterConfig.Enabled {
106+
if f.recordingRulesClient != nil {
107+
ruler, err = metrics.NewCachedRemoteRuler(f.recordingRulesClient, f.logger)
108+
} else {
109+
ruler, err = metrics.NewStaticRulerFromEnvVars(f.logger)
110+
}
106111
if err != nil {
107112
return nil, err
108113
}
114+
109115
exporter, err = metrics.NewStaticExporterFromEnvVars(f.logger, f.reg)
110116
if err != nil {
111117
return nil, err
@@ -224,6 +230,22 @@ func (f *Phlare) initQueryBackendClient() (services.Service, error) {
224230
return c.Service(), nil
225231
}
226232

233+
func (f *Phlare) initRecordingRulesClient() (services.Service, error) {
234+
if err := f.Cfg.CompactionWorker.MetricsExporterConfig.Validate(); err != nil {
235+
return nil, err
236+
}
237+
if !f.Cfg.CompactionWorker.MetricsExporterConfig.Enabled ||
238+
f.Cfg.CompactionWorker.MetricsExporterConfig.RulesSource.ClientAddress == "" {
239+
return nil, nil
240+
}
241+
c, err := recordingrulesclient.NewClient(f.Cfg.CompactionWorker.MetricsExporterConfig.RulesSource.ClientAddress, f.logger, f.auth)
242+
if err != nil {
243+
return nil, err
244+
}
245+
f.recordingRulesClient = c
246+
return c.Service(), nil
247+
}
248+
227249
func (f *Phlare) initPlacementAgent() (services.Service, error) {
228250
f.placementAgent = adaptiveplacement.NewAgent(
229251
f.logger,

pkg/phlare/phlare.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ import (
6868
"github.com/grafana/pyroscope/pkg/scheduler"
6969
"github.com/grafana/pyroscope/pkg/scheduler/schedulerdiscovery"
7070
"github.com/grafana/pyroscope/pkg/settings"
71+
recordingrulesclient "github.com/grafana/pyroscope/pkg/settings/recording/client"
7172
"github.com/grafana/pyroscope/pkg/storegateway"
7273
"github.com/grafana/pyroscope/pkg/tenant"
7374
"github.com/grafana/pyroscope/pkg/tracing"
@@ -359,17 +360,18 @@ type Phlare struct {
359360
frontend *frontend.Frontend
360361

361362
// Experimental modules.
362-
segmentWriter *segmentwriter.SegmentWriterService
363-
segmentWriterClient *segmentwriterclient.Client
364-
segmentWriterRing *ring.Ring
365-
placementAgent *adaptiveplacement.Agent
366-
placementManager *adaptiveplacement.Manager
367-
metastore *metastore.Metastore
368-
metastoreClient *metastoreclient.Client
369-
metastoreAdmin *metastoreadmin.Admin
370-
queryBackendClient *querybackendclient.Client
371-
compactionWorker *compactionworker.Worker
372-
healthServer *health.Server
363+
segmentWriter *segmentwriter.SegmentWriterService
364+
segmentWriterClient *segmentwriterclient.Client
365+
segmentWriterRing *ring.Ring
366+
placementAgent *adaptiveplacement.Agent
367+
placementManager *adaptiveplacement.Manager
368+
metastore *metastore.Metastore
369+
metastoreClient *metastoreclient.Client
370+
metastoreAdmin *metastoreadmin.Admin
371+
queryBackendClient *querybackendclient.Client
372+
compactionWorker *compactionworker.Worker
373+
healthServer *health.Server
374+
recordingRulesClient *recordingrulesclient.Client
373375
}
374376

375377
func New(cfg Config) (*Phlare, error) {
@@ -485,7 +487,7 @@ func (f *Phlare) setupModuleManager() error {
485487
SegmentWriter: {Overrides, API, MemberlistKV, Storage, UsageReport, MetastoreClient},
486488
Metastore: {Overrides, API, MetastoreClient, Storage, PlacementManager},
487489
MetastoreAdmin: {API, MetastoreClient},
488-
CompactionWorker: {Overrides, API, Storage, MetastoreClient},
490+
CompactionWorker: {Overrides, API, Storage, MetastoreClient, RecordingRulesClient},
489491
QueryBackend: {Overrides, API, Storage, QueryBackendClient},
490492
SegmentWriterRing: {Overrides, API, MemberlistKV},
491493
SegmentWriterClient: {Overrides, API, SegmentWriterRing, PlacementAgent},
@@ -515,6 +517,7 @@ func (f *Phlare) setupModuleManager() error {
515517
mm.RegisterModule(PlacementAgent, f.initPlacementAgent, modules.UserInvisibleModule)
516518
mm.RegisterModule(PlacementManager, f.initPlacementManager, modules.UserInvisibleModule)
517519
mm.RegisterModule(HealthServer, f.initHealthServer, modules.UserInvisibleModule)
520+
mm.RegisterModule(RecordingRulesClient, f.initRecordingRulesClient, modules.UserInvisibleModule)
518521
}
519522

520523
for mod, targets := range deps {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package recording
2+
3+
import (
4+
"context"
5+
6+
"connectrpc.com/connect"
7+
"github.com/go-kit/log"
8+
"github.com/go-kit/log/level"
9+
"github.com/grafana/dskit/services"
10+
11+
settingsv1 "github.com/grafana/pyroscope/api/gen/proto/go/settings/v1"
12+
"github.com/grafana/pyroscope/api/gen/proto/go/settings/v1/settingsv1connect"
13+
connectapi "github.com/grafana/pyroscope/pkg/api/connect"
14+
phlaremodel "github.com/grafana/pyroscope/pkg/model"
15+
"github.com/grafana/pyroscope/pkg/tenant"
16+
"github.com/grafana/pyroscope/pkg/util"
17+
)
18+
19+
type Client struct {
20+
service services.Service
21+
client settingsv1connect.RecordingRulesServiceClient
22+
logger log.Logger
23+
}
24+
25+
func NewClient(address string, logger log.Logger, auth connect.Option) (*Client, error) {
26+
httpClient := util.InstrumentedDefaultHTTPClient()
27+
opts := connectapi.DefaultClientOptions()
28+
opts = append(opts, auth)
29+
c := Client{
30+
client: settingsv1connect.NewRecordingRulesServiceClient(httpClient, "http://"+address, opts...),
31+
logger: logger,
32+
}
33+
c.service = services.NewIdleService(c.starting, c.stopping)
34+
return &c, nil
35+
}
36+
37+
func (b *Client) RecordingRules(tenantId string) ([]*phlaremodel.RecordingRule, error) {
38+
ctx := tenant.InjectTenantID(context.Background(), tenantId)
39+
resp, err := b.client.ListRecordingRules(ctx, connect.NewRequest(&settingsv1.ListRecordingRulesRequest{}))
40+
if err != nil {
41+
return nil, err
42+
}
43+
rules := make([]*phlaremodel.RecordingRule, 0, len(resp.Msg.Rules))
44+
for _, rule := range resp.Msg.Rules {
45+
r, err := phlaremodel.NewRecordingRule(rule)
46+
if err == nil {
47+
rules = append(rules, r)
48+
} else {
49+
level.Error(b.logger).Log("msg", "failed to parse recording rule", "rule", rule, "err", err)
50+
}
51+
}
52+
return rules, nil
53+
}
54+
55+
func (b *Client) Service() services.Service { return b.service }
56+
func (b *Client) starting(context.Context) error { return nil }
57+
func (b *Client) stopping(error) error { return nil }

pkg/settings/recording/config.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,5 @@ func (cfg *Config) Validate() error {
2626
if !cfg.Enabled {
2727
return nil
2828
}
29-
3029
return nil
3130
}

0 commit comments

Comments
 (0)