Skip to content

Commit 37657f9

Browse files
committed
Add metrics for block querying
This refactors the registrt and logger passing to use a context.
1 parent e6725f9 commit 37657f9

File tree

16 files changed

+325
-128
lines changed

16 files changed

+325
-128
lines changed

cmd/firetool/blocks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func blocksList(ctx context.Context) error {
3232
return err
3333
}
3434

35-
metas, err := firedb.NewBlockQuerier(logger, bucket).BlockMetas(ctx)
35+
metas, err := firedb.NewBlockQuerier(ctx, bucket).BlockMetas(ctx)
3636
if err != nil {
3737
return err
3838
}

cmd/firetool/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"github.com/go-kit/log/level"
1111
"github.com/prometheus/common/version"
1212
"gopkg.in/alecthomas/kingpin.v2"
13+
14+
firecontext "github.com/grafana/fire/pkg/fire/context"
1315
)
1416

1517
var cfg struct {
@@ -26,7 +28,8 @@ var (
2628
)
2729

2830
func main() {
29-
ctx := context.Background()
31+
ctx := firecontext.WithLogger(context.Background(), logger)
32+
3033
app := kingpin.New(filepath.Base(os.Args[0]), "Tooling for Grafana Fire, the continuous profiling aggregation system.").UsageWriter(os.Stdout)
3134
app.Version(version.Print("firetool"))
3235
app.HelpFlag.Short('h')

pkg/fire/context/context.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package context
2+
3+
import (
4+
"context"
5+
6+
"github.com/go-kit/log"
7+
"github.com/prometheus/client_golang/prometheus"
8+
)
9+
10+
type contextKey int
11+
12+
const (
13+
loggerKey contextKey = iota
14+
registryKey
15+
)
16+
17+
func WithLogger(ctx context.Context, logger log.Logger) context.Context {
18+
return context.WithValue(ctx, loggerKey, logger)
19+
}
20+
21+
func Logger(ctx context.Context) log.Logger {
22+
if logger, ok := ctx.Value(loggerKey).(log.Logger); ok {
23+
return logger
24+
}
25+
return log.NewNopLogger()
26+
}
27+
28+
func WithRegistry(ctx context.Context, registry prometheus.Registerer) context.Context {
29+
return context.WithValue(ctx, registryKey, registry)
30+
}
31+
32+
func Registry(ctx context.Context) prometheus.Registerer {
33+
if registry, ok := ctx.Value(registryKey).(prometheus.Registerer); ok {
34+
return registry
35+
}
36+
return prometheus.NewRegistry()
37+
}
38+
39+
func WrapTenant(ctx context.Context, tenantID string) context.Context {
40+
// wrap registry
41+
reg := Registry(ctx)
42+
ctx = WithRegistry(ctx, prometheus.WrapRegistererWith(
43+
prometheus.Labels{"tenant": tenantID},
44+
reg,
45+
))
46+
47+
// add field to logger
48+
logger := Logger(ctx)
49+
ctx = WithLogger(ctx,
50+
log.With(logger, "tenant", tenantID),
51+
)
52+
53+
return ctx
54+
}

pkg/fire/modules.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/grafana/fire/pkg/agent"
2727
"github.com/grafana/fire/pkg/distributor"
28+
firecontext "github.com/grafana/fire/pkg/fire/context"
2829
agentv1 "github.com/grafana/fire/pkg/gen/agent/v1"
2930
"github.com/grafana/fire/pkg/gen/agent/v1/agentv1connect"
3031
"github.com/grafana/fire/pkg/gen/ingester/v1/ingesterv1connect"
@@ -171,7 +172,11 @@ func (f *Fire) initStorage() (_ services.Service, err error) {
171172
func (f *Fire) initIngester() (_ services.Service, err error) {
172173
f.Cfg.Ingester.LifecyclerConfig.ListenPort = f.Cfg.Server.HTTPListenPort
173174

174-
ingester, err := ingester.New(f.Cfg.Ingester, f.Cfg.FireDB, f.logger, f.reg, f.storageBucket)
175+
// TODO: This should be passed to all other services and could also be used to signal shutdown
176+
firectx := firecontext.WithLogger(context.Background(), f.logger)
177+
firectx = firecontext.WithRegistry(firectx, f.reg)
178+
179+
ingester, err := ingester.New(firectx, f.Cfg.Ingester, f.Cfg.FireDB, f.storageBucket)
175180
if err != nil {
176181
return nil, err
177182
}

pkg/firedb/block_querier.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"path/filepath"
88
"sort"
99
"sync"
10+
"time"
1011

1112
"github.com/go-kit/log"
1213
"github.com/go-kit/log/level"
@@ -26,6 +27,7 @@ import (
2627
"golang.org/x/sync/errgroup"
2728
"google.golang.org/grpc/codes"
2829

30+
firecontext "github.com/grafana/fire/pkg/fire/context"
2931
"github.com/grafana/fire/pkg/firedb/block"
3032
query "github.com/grafana/fire/pkg/firedb/query"
3133
schemav1 "github.com/grafana/fire/pkg/firedb/schemas/v1"
@@ -44,20 +46,22 @@ type tableReader interface {
4446
}
4547

4648
type BlockQuerier struct {
47-
logger log.Logger
49+
firectx context.Context
50+
logger log.Logger
4851

4952
bucketReader fireobjstore.BucketReader
5053

5154
queriers []*singleBlockQuerier
5255
queriersLock sync.RWMutex
5356
}
5457

55-
func NewBlockQuerier(logger log.Logger, bucketReader fireobjstore.BucketReader) *BlockQuerier {
56-
if logger == nil {
57-
logger = log.NewNopLogger()
58-
}
58+
func NewBlockQuerier(firectx context.Context, bucketReader fireobjstore.BucketReader) *BlockQuerier {
5959
return &BlockQuerier{
60-
logger: logger,
60+
firectx: contextWithBlockMetrics(firectx,
61+
newBlocksMetrics(
62+
firecontext.Registry(firectx),
63+
),
64+
),
6165
bucketReader: bucketReader,
6266
}
6367
}
@@ -67,7 +71,7 @@ func (b *BlockQuerier) reconstructMetaFromBlock(ctx context.Context, ulid ulid.U
6771
fakeMeta := block.NewMeta()
6872
fakeMeta.ULID = ulid
6973

70-
q := newSingleBlockQuerierFromMeta(b.logger, b.bucketReader, fakeMeta)
74+
q := newSingleBlockQuerierFromMeta(b.firectx, b.bucketReader, fakeMeta)
7175
defer q.Close()
7276

7377
meta, err := q.reconstructMeta(ctx)
@@ -188,7 +192,7 @@ func (b *BlockQuerier) Sync(ctx context.Context) error {
188192
continue
189193
}
190194

191-
b.queriers[pos] = newSingleBlockQuerierFromMeta(b.logger, b.bucketReader, m)
195+
b.queriers[pos] = newSingleBlockQuerierFromMeta(b.firectx, b.bucketReader, m)
192196
}
193197
// ensure queriers are in ascending order.
194198
sort.Slice(b.queriers, func(i, j int) bool {
@@ -265,7 +269,9 @@ func (mm *minMax) InRange(start, end model.Time) bool {
265269
}
266270

267271
type singleBlockQuerier struct {
268-
logger log.Logger
272+
logger log.Logger
273+
metrics *blocksMetrics
274+
269275
bucketReader fireobjstore.BucketReader
270276
meta *block.Meta
271277

@@ -282,9 +288,11 @@ type singleBlockQuerier struct {
282288
profiles parquetReader[*schemav1.Profile, *schemav1.ProfilePersister]
283289
}
284290

285-
func newSingleBlockQuerierFromMeta(logger log.Logger, bucketReader fireobjstore.BucketReader, meta *block.Meta) *singleBlockQuerier {
291+
func newSingleBlockQuerierFromMeta(firectx context.Context, bucketReader fireobjstore.BucketReader, meta *block.Meta) *singleBlockQuerier {
286292
q := &singleBlockQuerier{
287-
logger: logger,
293+
logger: firecontext.Logger(firectx),
294+
metrics: contextBlockMetrics(firectx),
295+
288296
bucketReader: fireobjstore.BucketReaderWithPrefix(bucketReader, meta.ULID.String()),
289297
meta: meta,
290298
}
@@ -727,8 +735,10 @@ func newByteSliceFromBucketReader(bucketReader fireobjstore.BucketReader, path s
727735
}
728736

729737
func (q *singleBlockQuerier) open(ctx context.Context) error {
738+
start := time.Now()
730739
sp, ctx := opentracing.StartSpanFromContext(ctx, "BlockQuerier - open")
731740
defer func() {
741+
q.metrics.blockOpeningLatency.Observe(time.Since(start).Seconds())
732742
sp.LogFields(
733743
otlog.String("block_ulid", q.meta.ULID.String()),
734744
)
@@ -755,7 +765,7 @@ func (q *singleBlockQuerier) open(ctx context.Context) error {
755765

756766
// open parquet files
757767
for _, x := range q.tables {
758-
if err := x.open(ctx, q.bucketReader); err != nil {
768+
if err := x.open(contextWithBlockMetrics(ctx, q.metrics), q.bucketReader); err != nil {
759769
return err
760770
}
761771
}
@@ -767,9 +777,11 @@ type parquetReader[M Models, P schemav1.PersisterName] struct {
767777
persister P
768778
file *parquet.File
769779
reader fireobjstore.ReaderAt
780+
metrics *blocksMetrics
770781
}
771782

772783
func (r *parquetReader[M, P]) open(ctx context.Context, bucketReader fireobjstore.BucketReader) error {
784+
r.metrics = contextBlockMetrics(ctx)
773785
filePath := r.persister.Name() + block.ParquetSuffix
774786

775787
ra, err := bucketReader.ReaderAt(ctx, filePath)
@@ -823,6 +835,7 @@ func (r *parquetReader[M, P]) columnIter(ctx context.Context, columnName string,
823835
if index == -1 {
824836
return query.NewErrIterator(fmt.Errorf("column '%s' not found in parquet file '%s'", columnName, r.relPath()))
825837
}
838+
ctx = query.AddMetricsToContext(ctx, r.metrics.query)
826839
return query.NewColumnIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias)
827840
}
828841

pkg/firedb/firedb.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/samber/lo"
2626
"golang.org/x/sync/errgroup"
2727

28+
firecontext "github.com/grafana/fire/pkg/fire/context"
2829
"github.com/grafana/fire/pkg/firedb/block"
2930
commonv1 "github.com/grafana/fire/pkg/gen/common/v1"
3031
ingestv1 "github.com/grafana/fire/pkg/gen/ingester/v1"
@@ -55,29 +56,32 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
5556
type FireDB struct {
5657
services.Service
5758

59+
logger log.Logger
60+
firectx context.Context
61+
5862
cfg Config
59-
reg prometheus.Registerer
60-
logger log.Logger
6163
stopCh chan struct{}
6264
wg sync.WaitGroup
6365

64-
headLock sync.RWMutex
65-
head *Head
66-
headMetrics *headMetrics
66+
headLock sync.RWMutex
67+
head *Head
6768

6869
headFlushTimer time.Timer
6970

7071
blockQuerier *BlockQuerier
7172
}
7273

73-
func New(cfg Config, logger log.Logger, reg prometheus.Registerer) (*FireDB, error) {
74-
headMetrics := newHeadMetrics(reg)
74+
func New(firectx context.Context, cfg Config) (*FireDB, error) {
75+
reg := firecontext.Registry(firectx)
76+
77+
// ensure head metrics are registered early so they are reused for the new head
78+
firectx = contextWithHeadMetrics(firectx, newHeadMetrics(reg))
79+
7580
f := &FireDB{
76-
cfg: cfg,
77-
reg: reg,
78-
logger: logger,
79-
stopCh: make(chan struct{}, 0),
80-
headMetrics: headMetrics,
81+
cfg: cfg,
82+
logger: firecontext.Logger(firectx),
83+
firectx: firectx,
84+
stopCh: make(chan struct{}, 0),
8185
}
8286
if _, err := f.initHead(); err != nil {
8387
return nil, err
@@ -97,7 +101,7 @@ func New(cfg Config, logger log.Logger, reg prometheus.Registerer) (*FireDB, err
97101
return nil, fmt.Errorf("mkdir %s: %w", f.LocalDataPath(), err)
98102
}
99103

100-
f.blockQuerier = NewBlockQuerier(logger, bucketReader)
104+
f.blockQuerier = NewBlockQuerier(firectx, bucketReader)
101105

102106
// do an initial querier sync
103107
ctx := context.Background()
@@ -460,7 +464,7 @@ func (f *FireDB) initHead() (oldHead *Head, err error) {
460464
f.headLock.Lock()
461465
defer f.headLock.Unlock()
462466
oldHead = f.head
463-
f.head, err = NewHead(f.cfg, headWithMetrics(f.headMetrics), HeadWithLogger(f.logger))
467+
f.head, err = NewHead(f.firectx, f.cfg)
464468
if err != nil {
465469
return oldHead, err
466470
}

pkg/firedb/firedb_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"time"
1111

1212
"github.com/bufbuild/connect-go"
13-
"github.com/go-kit/log"
1413
"github.com/google/uuid"
1514
"github.com/pkg/errors"
1615
"github.com/prometheus/common/model"
@@ -33,16 +32,16 @@ func TestCreateLocalDir(t *testing.T) {
3332
dataPath := t.TempDir()
3433
localFile := dataPath + "/local"
3534
require.NoError(t, ioutil.WriteFile(localFile, []byte("d"), 0o644))
36-
_, err := New(Config{
35+
_, err := New(context.Background(), Config{
3736
DataPath: dataPath,
3837
MaxBlockDuration: 30 * time.Minute,
39-
}, log.NewNopLogger(), nil)
38+
})
4039
require.Error(t, err)
4140
require.NoError(t, os.Remove(localFile))
42-
_, err = New(Config{
41+
_, err = New(context.Background(), Config{
4342
DataPath: dataPath,
4443
MaxBlockDuration: 30 * time.Minute,
45-
}, log.NewNopLogger(), nil)
44+
})
4645
require.NoError(t, err)
4746
}
4847

@@ -133,10 +132,10 @@ func TestMergeProfilesStacktraces(t *testing.T) {
133132
step = 15 * time.Second
134133
)
135134

136-
db, err := New(Config{
135+
db, err := New(context.Background(), Config{
137136
DataPath: testDir,
138137
MaxBlockDuration: time.Duration(100000) * time.Minute, // we will manually flush
139-
}, log.NewNopLogger(), nil)
138+
})
140139
require.NoError(t, err)
141140
defer require.NoError(t, db.Close())
142141

0 commit comments

Comments
 (0)