diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index f87b2f2cd5..994c8fc831 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -2,6 +2,8 @@ package phlaredb import ( "context" + "crypto/rand" + "fmt" "io/fs" "math" "os" @@ -9,18 +11,21 @@ import ( "sort" "github.com/oklog/ulid" - "github.com/opentracing/opentracing-go" "github.com/parquet-go/parquet-go" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/grafana/dskit/multierror" + "github.com/grafana/dskit/runutil" + "github.com/grafana/pyroscope/pkg/iter" phlaremodel "github.com/grafana/pyroscope/pkg/model" phlareparquet "github.com/grafana/pyroscope/pkg/parquet" "github.com/grafana/pyroscope/pkg/phlaredb/block" schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" + "github.com/grafana/pyroscope/pkg/phlaredb/sharding" "github.com/grafana/pyroscope/pkg/phlaredb/symdb" "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index" "github.com/grafana/pyroscope/pkg/util" @@ -35,100 +40,287 @@ type BlockReader interface { } func Compact(ctx context.Context, src []BlockReader, dst string) (meta block.Meta, err error) { - srcMetas := make([]block.Meta, len(src)) - ulids := make([]string, len(src)) + metas, err := CompactWithSplitting(ctx, src, 1, dst) + if err != nil { + return block.Meta{}, err + } + return metas[0], nil +} +func CompactWithSplitting(ctx context.Context, src []BlockReader, shardsCount uint64, dst string) ( + []block.Meta, error, +) { + if shardsCount == 0 { + shardsCount = 1 + } + if len(src) <= 1 && shardsCount == 1 { + return nil, errors.New("not enough blocks to compact") + } + var ( + writers = make([]*blockWriter, shardsCount) + shardBy = shardByFingerprint + srcMetas = make([]block.Meta, len(src)) + err error + ) for i, b := range src { srcMetas[i] = b.Meta() - ulids[i] = b.Meta().ULID.String() } - meta = compactMetas(srcMetas...) - blockPath := filepath.Join(dst, meta.ULID.String()) - indexPath := filepath.Join(blockPath, block.IndexFilename) - profilePath := filepath.Join(blockPath, (&schemav1.ProfilePersister{}).Name()+block.ParquetSuffix) - - sp, ctx := opentracing.StartSpanFromContext(ctx, "Compact") - defer func() { - // todo: context propagation is not working through objstore - // This is because the BlockReader has no context. - sp.SetTag("src", ulids) - sp.SetTag("block_id", meta.ULID.String()) + + outBlocksTime := ulid.Now() + outMeta := compactMetas(srcMetas...) + + // create the shards writers + for i := range writers { + meta := outMeta.Clone() + meta.ULID = ulid.MustNew(outBlocksTime, rand.Reader) + writers[i], err = newBlockWriter(dst, meta) if err != nil { - sp.SetTag("error", err) + return nil, fmt.Errorf("create block writer: %w", err) + } + } + + rowsIt, err := newMergeRowProfileIterator(src) + if err != nil { + return nil, err + } + defer runutil.CloseWithLogOnErr(util.Logger, rowsIt, "close rows iterator") + + // iterate and splits the rows into series. + for rowsIt.Next() { + r := rowsIt.At() + shard := int(shardBy(r, shardsCount)) + if err := writers[shard].WriteRow(r); err != nil { + return nil, err + } + } + if err := rowsIt.Err(); err != nil { + return nil, err + } + + // Close all blocks + errs := multierror.New() + for _, w := range writers { + if err := w.Close(ctx); err != nil { + errs.Add(err) } - sp.Finish() - }() + } - if len(src) <= 1 { - return block.Meta{}, errors.New("not enough blocks to compact") + out := make([]block.Meta, 0, len(writers)) + for shard, w := range writers { + if w.meta.Stats.NumSamples > 0 { + w.meta.Labels[sharding.CompactorShardIDLabel] = sharding.FormatShardIDLabelValue(uint64(shard), shardsCount) + out = append(out, *w.meta) + } } + + // Returns all Metas + return out, errs.Err() +} + +var shardByFingerprint = func(r profileRow, shardsCount uint64) uint64 { + return uint64(r.fp) % shardsCount +} + +type blockWriter struct { + indexRewriter *indexRewriter + symbolsRewriter *symbolsRewriter + profilesWriter *profilesWriter + path string + meta *block.Meta + totalProfiles uint64 + min, max int64 +} + +func newBlockWriter(dst string, meta *block.Meta) (*blockWriter, error) { + blockPath := filepath.Join(dst, meta.ULID.String()) + if err := os.MkdirAll(blockPath, 0o777); err != nil { - return block.Meta{}, err + return nil, err } - indexw, err := prepareIndexWriter(ctx, indexPath, src) + profileWriter, err := newProfileWriter(blockPath) if err != nil { - return block.Meta{}, err + return nil, err } - profileFile, err := os.OpenFile(profilePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644) + return &blockWriter{ + indexRewriter: newIndexRewriter(blockPath), + symbolsRewriter: newSymbolsRewriter(blockPath), + profilesWriter: profileWriter, + path: blockPath, + meta: meta, + min: math.MaxInt64, + max: math.MinInt64, + }, nil +} + +func (bw *blockWriter) WriteRow(r profileRow) error { + err := bw.indexRewriter.ReWriteRow(r) if err != nil { - return block.Meta{}, err + return err + } + err = bw.symbolsRewriter.ReWriteRow(r) + if err != nil { + return err } - profileWriter := newProfileWriter(profileFile) - symw := symdb.NewSymDB(symdb.DefaultConfig(). - WithDirectory(filepath.Join(blockPath, symdb.DefaultDirName)). - WithParquetConfig(symdb.ParquetConfig{ - MaxBufferRowCount: defaultParquetConfig.MaxBufferRowCount, - })) + if err := bw.profilesWriter.WriteRow(r); err != nil { + return err + } + bw.totalProfiles++ + if r.timeNanos < bw.min { + bw.min = r.timeNanos + } + if r.timeNanos > bw.max { + bw.max = r.timeNanos + } + return nil +} - if err != nil { - return block.Meta{}, err +func (bw *blockWriter) Close(ctx context.Context) error { + if err := bw.indexRewriter.Close(ctx); err != nil { + return err + } + if err := bw.symbolsRewriter.Close(); err != nil { + return err } + if err := bw.profilesWriter.Close(); err != nil { + return err + } + metaFiles, err := metaFilesFromDir(bw.path) + if err != nil { + return err + } + bw.meta.Files = metaFiles + bw.meta.Stats.NumProfiles = bw.totalProfiles + bw.meta.Stats.NumSeries = bw.indexRewriter.NumSeries() + bw.meta.Stats.NumSamples = bw.symbolsRewriter.NumSamples() + bw.meta.Compaction.Deletable = bw.totalProfiles == 0 + bw.meta.MinTime = model.TimeFromUnixNano(bw.min) + bw.meta.MaxTime = model.TimeFromUnixNano(bw.max) + if _, err := bw.meta.WriteToFile(util.Logger, bw.path); err != nil { + return err + } + return nil +} - rowsIt, err := newMergeRowProfileIterator(src) +type profilesWriter struct { + *parquet.GenericWriter[*schemav1.Profile] + file *os.File + + buf []parquet.Row +} + +func newProfileWriter(path string) (*profilesWriter, error) { + profilePath := filepath.Join(path, (&schemav1.ProfilePersister{}).Name()+block.ParquetSuffix) + profileFile, err := os.OpenFile(profilePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644) if err != nil { - return block.Meta{}, err + return nil, err } - seriesRewriter := newSeriesRewriter(rowsIt, indexw) - symRewriter := newSymbolsRewriter(seriesRewriter, src, symw) - reader := phlareparquet.NewIteratorRowReader(newRowsIterator(symRewriter)) + return &profilesWriter{ + GenericWriter: newParquetProfileWriter(profileFile, parquet.MaxRowsPerRowGroup(int64(defaultParquetConfig.MaxBufferRowCount))), + file: profileFile, + buf: make([]parquet.Row, 1), + }, nil +} - total, _, err := phlareparquet.CopyAsRowGroups(profileWriter, reader, defaultParquetConfig.MaxBufferRowCount) +func (p *profilesWriter) WriteRow(r profileRow) error { + p.buf[0] = parquet.Row(r.row) + _, err := p.GenericWriter.WriteRows(p.buf) if err != nil { - return block.Meta{}, err + return err } - if err = symRewriter.Close(); err != nil { - return block.Meta{}, err + return nil +} + +func (p *profilesWriter) Close() error { + err := p.GenericWriter.Close() + if err != nil { + return err } + return p.file.Close() +} - // flush the index file. - if err = indexw.Close(); err != nil { - return block.Meta{}, err +func newIndexRewriter(path string) *indexRewriter { + return &indexRewriter{ + symbols: make(map[string]struct{}), + path: path, } +} - if err = profileWriter.Close(); err != nil { - return block.Meta{}, err +type indexRewriter struct { + series []struct { + labels phlaremodel.Labels + fp model.Fingerprint } - if err = symw.Flush(); err != nil { - return block.Meta{}, err + symbols map[string]struct{} + chunks []index.ChunkMeta // one chunk per series + + previousFp model.Fingerprint + + path string +} + +func (idxRw *indexRewriter) ReWriteRow(r profileRow) error { + if idxRw.previousFp != r.fp || len(idxRw.series) == 0 { + series := r.labels.Clone() + for _, l := range series { + idxRw.symbols[l.Name] = struct{}{} + idxRw.symbols[l.Value] = struct{}{} + } + idxRw.series = append(idxRw.series, struct { + labels phlaremodel.Labels + fp model.Fingerprint + }{ + labels: series, + fp: r.fp, + }) + idxRw.chunks = append(idxRw.chunks, index.ChunkMeta{ + MinTime: r.timeNanos, + MaxTime: r.timeNanos, + SeriesIndex: uint32(len(idxRw.series) - 1), + }) + idxRw.previousFp = r.fp } + idxRw.chunks[len(idxRw.chunks)-1].MaxTime = r.timeNanos + r.row.SetSeriesIndex(idxRw.chunks[len(idxRw.chunks)-1].SeriesIndex) + return nil +} - metaFiles, err := metaFilesFromDir(blockPath) +func (idxRw *indexRewriter) NumSeries() uint64 { + return uint64(len(idxRw.series)) +} + +// Close writes the index to given folder. +func (idxRw *indexRewriter) Close(ctx context.Context) error { + indexw, err := index.NewWriter(ctx, filepath.Join(idxRw.path, block.IndexFilename)) if err != nil { - return block.Meta{}, err + return err } - meta.Files = metaFiles - meta.Stats.NumProfiles = total - meta.Stats.NumSeries = seriesRewriter.NumSeries() - meta.Stats.NumSamples = symRewriter.NumSamples() - meta.Compaction.Deletable = meta.Stats.NumSamples == 0 - if _, err := meta.WriteToFile(util.Logger, blockPath); err != nil { - return block.Meta{}, err + + // Sort symbols + symbols := make([]string, 0, len(idxRw.symbols)) + for s := range idxRw.symbols { + symbols = append(symbols, s) } - return meta, nil + sort.Strings(symbols) + + // Add symbols + for _, symbol := range symbols { + if err := indexw.AddSymbol(symbol); err != nil { + return err + } + } + + // Add Series + for i, series := range idxRw.series { + if err := indexw.AddSeries(storage.SeriesRef(i), series.labels, series.fp, idxRw.chunks[i]); err != nil { + return err + } + } + + return indexw.Close() } // metaFilesFromDir returns a list of block files description from a directory. @@ -370,82 +562,6 @@ func newMergeRowProfileIterator(src []BlockReader) (iter.Iterator[profileRow], e }, nil } -type seriesRewriter struct { - iter.Iterator[profileRow] - - indexw *index.Writer - - seriesRef storage.SeriesRef - labels phlaremodel.Labels - previousFp model.Fingerprint - currentChunkMeta index.ChunkMeta - err error - - numSeries uint64 - done bool -} - -func newSeriesRewriter(it iter.Iterator[profileRow], indexw *index.Writer) *seriesRewriter { - return &seriesRewriter{ - Iterator: it, - indexw: indexw, - } -} - -func (s *seriesRewriter) NumSeries() uint64 { - return s.numSeries -} - -func (s *seriesRewriter) Next() bool { - if !s.Iterator.Next() { - if s.done { - return false - } - s.done = true - if s.previousFp != 0 { - s.currentChunkMeta.SeriesIndex = uint32(s.seriesRef) - 1 - if err := s.indexw.AddSeries(s.seriesRef-1, s.labels, s.previousFp, s.currentChunkMeta); err != nil { - s.err = err - return false - } - s.numSeries++ - } - return false - } - currentProfile := s.Iterator.At() - if s.previousFp != currentProfile.fp { - if s.previousFp != 0 { - s.currentChunkMeta.SeriesIndex = uint32(s.seriesRef) - 1 - if err := s.indexw.AddSeries(s.seriesRef-1, s.labels, s.previousFp, s.currentChunkMeta); err != nil { - s.err = err - return false - } - s.numSeries++ - } - s.seriesRef++ - s.labels = currentProfile.labels.Clone() - s.previousFp = currentProfile.fp - s.currentChunkMeta.MinTime = currentProfile.timeNanos - } - s.currentChunkMeta.MaxTime = currentProfile.timeNanos - currentProfile.row.SetSeriesIndex(uint32(s.seriesRef - 1)) - return true -} - -type rowsIterator struct { - iter.Iterator[profileRow] -} - -func newRowsIterator(it iter.Iterator[profileRow]) *rowsIterator { - return &rowsIterator{ - Iterator: it, - } -} - -func (r *rowsIterator) At() parquet.Row { - return parquet.Row(r.Iterator.At().row) -} - type dedupeProfileRowIterator struct { iter.Iterator[profileRow] @@ -469,73 +585,36 @@ func (it *dedupeProfileRowIterator) Next() bool { } } -func prepareIndexWriter(ctx context.Context, path string, readers []BlockReader) (*index.Writer, error) { - var symbols index.StringIter - indexw, err := index.NewWriter(ctx, path) - if err != nil { - return nil, err - } - for i, r := range readers { - if i == 0 { - symbols = r.Index().Symbols() - } - symbols = tsdb.NewMergedStringIter(symbols, r.Index().Symbols()) - } - - for symbols.Next() { - if err := indexw.AddSymbol(symbols.At()); err != nil { - return nil, errors.Wrap(err, "add symbol") - } - } - if symbols.Err() != nil { - return nil, errors.Wrap(symbols.Err(), "next symbol") - } - - return indexw, nil -} - type symbolsRewriter struct { - profiles iter.Iterator[profileRow] rewriters map[BlockReader]*symdb.Rewriter + w *symdb.SymDB stacktraces []uint32 - err error numSamples uint64 } -func newSymbolsRewriter(it iter.Iterator[profileRow], blocks []BlockReader, w *symdb.SymDB) *symbolsRewriter { - sr := symbolsRewriter{ - profiles: it, - rewriters: make(map[BlockReader]*symdb.Rewriter, len(blocks)), - } - for _, r := range blocks { - sr.rewriters[r] = symdb.NewRewriter(w, r.Symbols()) +func newSymbolsRewriter(path string) *symbolsRewriter { + return &symbolsRewriter{ + w: symdb.NewSymDB(symdb.DefaultConfig(). + WithDirectory(filepath.Join(path, symdb.DefaultDirName)). + WithParquetConfig(symdb.ParquetConfig{ + MaxBufferRowCount: defaultParquetConfig.MaxBufferRowCount, + })), + rewriters: make(map[BlockReader]*symdb.Rewriter), } - return &sr } func (s *symbolsRewriter) NumSamples() uint64 { return s.numSamples } -func (s *symbolsRewriter) At() profileRow { return s.profiles.At() } - -func (s *symbolsRewriter) Close() error { return s.profiles.Close() } - -func (s *symbolsRewriter) Err() error { - if s.err != nil { - return s.err - } - return s.profiles.Err() -} - -func (s *symbolsRewriter) Next() bool { - if !s.profiles.Next() { - return false - } +func (s *symbolsRewriter) ReWriteRow(profile profileRow) error { var err error - profile := s.profiles.At() profile.row.ForStacktraceIDsValues(func(values []parquet.Value) { s.loadStacktracesID(values) - r := s.rewriters[profile.blockReader] + r, ok := s.rewriters[profile.blockReader] + if !ok { + r = symdb.NewRewriter(s.w, profile.blockReader.Symbols()) + s.rewriters[profile.blockReader] = r + } if err = r.Rewrite(profile.row.StacktracePartitionID(), s.stacktraces); err != nil { return } @@ -546,10 +625,13 @@ func (s *symbolsRewriter) Next() bool { } }) if err != nil { - s.err = err - return false + return err } - return true + return nil +} + +func (s *symbolsRewriter) Close() error { + return s.w.Flush() } func (s *symbolsRewriter) loadStacktracesID(values []parquet.Value) { diff --git a/pkg/phlaredb/compact_test.go b/pkg/phlaredb/compact_test.go index f7fe3a5685..1966f49a3c 100644 --- a/pkg/phlaredb/compact_test.go +++ b/pkg/phlaredb/compact_test.go @@ -6,6 +6,7 @@ import ( _ "net/http/pprof" "os" "path/filepath" + "sort" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -23,6 +25,7 @@ import ( "github.com/grafana/pyroscope/pkg/objstore/client" "github.com/grafana/pyroscope/pkg/objstore/providers/filesystem" "github.com/grafana/pyroscope/pkg/phlaredb/block" + "github.com/grafana/pyroscope/pkg/phlaredb/sharding" "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index" "github.com/grafana/pyroscope/pkg/pprof/testhelper" ) @@ -85,6 +88,153 @@ func TestCompact(t *testing.T) { require.Equal(t, expected.String(), res.String()) } +func TestCompactWithSplitting(t *testing.T) { + ctx := context.Background() + + b1 := newBlock(t, func() []*testhelper.ProfileBuilder { + return append( + profileSeriesGenerator(t, time.Unix(1, 0), time.Unix(10, 0), time.Second, "job", "a"), + profileSeriesGenerator(t, time.Unix(11, 0), time.Unix(20, 0), time.Second, "job", "b")..., + ) + }) + b2 := newBlock(t, func() []*testhelper.ProfileBuilder { + return append( + append( + append( + profileSeriesGenerator(t, time.Unix(1, 0), time.Unix(10, 0), time.Second, "job", "c"), + profileSeriesGenerator(t, time.Unix(11, 0), time.Unix(20, 0), time.Second, "job", "d")..., + ), profileSeriesGenerator(t, time.Unix(1, 0), time.Unix(10, 0), time.Second, "job", "a")..., + ), + profileSeriesGenerator(t, time.Unix(11, 0), time.Unix(20, 0), time.Second, "job", "b")..., + ) + }) + dst := t.TempDir() + compacted, err := CompactWithSplitting(ctx, []BlockReader{b1, b2, b2, b1}, 16, dst) + require.NoError(t, err) + + // 4 shards one per series. + require.Equal(t, 4, len(compacted)) + require.Equal(t, "1_of_16", compacted[0].Labels[sharding.CompactorShardIDLabel]) + require.Equal(t, "6_of_16", compacted[1].Labels[sharding.CompactorShardIDLabel]) + require.Equal(t, "7_of_16", compacted[2].Labels[sharding.CompactorShardIDLabel]) + require.Equal(t, "14_of_16", compacted[3].Labels[sharding.CompactorShardIDLabel]) + + // The series b should span from 11 to 20 and not 1 to 20. + require.Equal(t, model.TimeFromUnix(11), compacted[1].MinTime) + require.Equal(t, model.TimeFromUnix(20), compacted[1].MaxTime) + + // We first verify we have all series and timestamps across querying all blocks. + queriers := make(Queriers, len(compacted)) + for i, blk := range compacted { + queriers[i] = blockQuerierFromMeta(t, dst, blk) + } + + err = queriers.Open(context.Background()) + require.NoError(t, err) + matchAll := &ingesterv1.SelectProfilesRequest{ + LabelSelector: "{}", + Type: mustParseProfileSelector(t, "process_cpu:cpu:nanoseconds:cpu:nanoseconds"), + Start: 0, + End: 40000, + } + it, err := queriers.SelectMatchingProfiles(context.Background(), matchAll) + require.NoError(t, err) + + seriesMap := make(map[model.Fingerprint]lo.Tuple2[phlaremodel.Labels, []model.Time]) + for it.Next() { + r := it.At() + seriesMap[r.Fingerprint()] = lo.T2(r.Labels().WithoutPrivateLabels(), append(seriesMap[r.Fingerprint()].B, r.Timestamp())) + } + require.NoError(t, it.Err()) + require.NoError(t, it.Close()) + series := lo.Values(seriesMap) + sort.Slice(series, func(i, j int) bool { + return phlaremodel.CompareLabelPairs(series[i].A, series[j].A) < 0 + }) + require.Equal(t, []lo.Tuple2[phlaremodel.Labels, []model.Time]{ + lo.T2(phlaremodel.LabelsFromStrings("job", "a"), + generateTimes(t, model.TimeFromUnix(1), model.TimeFromUnix(10)), + ), + lo.T2(phlaremodel.LabelsFromStrings("job", "b"), + generateTimes(t, model.TimeFromUnix(11), model.TimeFromUnix(20)), + ), + lo.T2(phlaremodel.LabelsFromStrings("job", "c"), + generateTimes(t, model.TimeFromUnix(1), model.TimeFromUnix(10)), + ), + lo.T2(phlaremodel.LabelsFromStrings("job", "d"), + generateTimes(t, model.TimeFromUnix(11), model.TimeFromUnix(20)), + ), + }, series) + + // Then we query 2 different shards and verify we have a subset of series. + it, err = queriers[0].SelectMatchingProfiles(ctx, matchAll) + require.NoError(t, err) + seriesResult, err := queriers[0].MergeByLabels(context.Background(), it, "job") + require.NoError(t, err) + require.Equal(t, + []*typesv1.Series{ + { + Labels: phlaremodel.LabelsFromStrings("job", "a"), + Points: generatePoints(t, model.TimeFromUnix(1), model.TimeFromUnix(10)), + }, + }, seriesResult) + + it, err = queriers[1].SelectMatchingProfiles(ctx, matchAll) + require.NoError(t, err) + seriesResult, err = queriers[1].MergeByLabels(context.Background(), it, "job") + require.NoError(t, err) + require.Equal(t, + []*typesv1.Series{ + { + Labels: phlaremodel.LabelsFromStrings("job", "b"), + Points: generatePoints(t, model.TimeFromUnix(11), model.TimeFromUnix(20)), + }, + }, seriesResult) + + // Finally test some stacktraces resolution. + it, err = queriers[1].SelectMatchingProfiles(ctx, matchAll) + require.NoError(t, err) + res, err := queriers[1].MergeByStacktraces(ctx, it) + require.NoError(t, err) + + expected := new(phlaremodel.Tree) + expected.InsertStack(10, "baz", "bar", "foo") + require.Equal(t, expected.String(), res.String()) +} + +// nolint:unparam +func profileSeriesGenerator(t *testing.T, from, through time.Time, interval time.Duration, lbls ...string) []*testhelper.ProfileBuilder { + t.Helper() + var builders []*testhelper.ProfileBuilder + for ts := from; ts.Before(through) || ts.Equal(through); ts = ts.Add(interval) { + builders = append(builders, + testhelper.NewProfileBuilder(ts.UnixNano()). + CPUProfile(). + WithLabels( + lbls..., + ).ForStacktraceString("foo", "bar", "baz").AddSamples(1)) + } + return builders +} + +func generatePoints(t *testing.T, from, through model.Time) []*typesv1.Point { + t.Helper() + var points []*typesv1.Point + for ts := from; ts.Before(through) || ts.Equal(through); ts = ts.Add(time.Second) { + points = append(points, &typesv1.Point{Timestamp: int64(ts), Value: 1}) + } + return points +} + +func generateTimes(t *testing.T, from, through model.Time) []model.Time { + t.Helper() + var times []model.Time + for ts := from; ts.Before(through) || ts.Equal(through); ts = ts.Add(time.Second) { + times = append(times, ts) + } + return times +} + func TestProfileRowIterator(t *testing.T) { b := newBlock(t, func() []*testhelper.ProfileBuilder { return []*testhelper.ProfileBuilder{ @@ -268,28 +418,22 @@ func TestSeriesRewriter(t *testing.T) { }) rows, err := newProfileRowIterator(blk) require.NoError(t, err) - filePath := filepath.Join(t.TempDir(), block.IndexFilename) - idxw, err := prepareIndexWriter(context.Background(), filePath, []BlockReader{blk}) - require.NoError(t, err) - it := newSeriesRewriter(rows, idxw) - // tests that all rows are written to the correct series index - require.True(t, it.Next()) - require.Equal(t, uint32(0), it.At().row.SeriesIndex()) - require.True(t, it.Next()) - require.Equal(t, uint32(0), it.At().row.SeriesIndex()) - require.True(t, it.Next()) - require.Equal(t, uint32(0), it.At().row.SeriesIndex()) - require.True(t, it.Next()) - require.Equal(t, uint32(1), it.At().row.SeriesIndex()) - require.True(t, it.Next()) - require.Equal(t, uint32(2), it.At().row.SeriesIndex()) - require.True(t, it.Next()) - require.Equal(t, uint32(2), it.At().row.SeriesIndex()) - require.False(t, it.Next()) + path := t.TempDir() + filePath := filepath.Join(path, block.IndexFilename) + idxw := newIndexRewriter(path) + seriesIdx := []uint32{} + for rows.Next() { + r := rows.At() + require.NoError(t, idxw.ReWriteRow(r)) + seriesIdx = append(seriesIdx, r.row.SeriesIndex()) + } + require.NoError(t, rows.Err()) + require.NoError(t, rows.Close()) - require.NoError(t, it.Err()) - require.NoError(t, it.Close()) - require.NoError(t, idxw.Close()) + require.Equal(t, []uint32{0, 0, 0, 1, 2, 2}, seriesIdx) + + err = idxw.Close(context.Background()) + require.NoError(t, err) idxr, err := index.NewFileReader(filePath) require.NoError(t, err) diff --git a/pkg/phlaredb/head.go b/pkg/phlaredb/head.go index d4ed86dec8..dd8f1fb9de 100644 --- a/pkg/phlaredb/head.go +++ b/pkg/phlaredb/head.go @@ -488,7 +488,7 @@ func (h *Head) flush(ctx context.Context) error { // It must be guaranteed that no new inserts will happen // after the call start. h.inFlightProfiles.Wait() - if len(h.profiles.slice) == 0 { + if h.profiles.index.totalProfiles.Load() == 0 { level.Info(h.logger).Log("msg", "head empty - no block written") return os.RemoveAll(h.headPath) } diff --git a/pkg/phlaredb/profile_store.go b/pkg/phlaredb/profile_store.go index 3afe503684..9b445ceec4 100644 --- a/pkg/phlaredb/profile_store.go +++ b/pkg/phlaredb/profile_store.go @@ -62,11 +62,13 @@ type profileStore struct { flushBufferLbs []phlaremodel.Labels } -func newProfileWriter(writer io.Writer) *parquet.GenericWriter[*schemav1.Profile] { - return parquet.NewGenericWriter[*schemav1.Profile](writer, schemav1.ProfilesSchema, - parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "pyroscopedb-parquet-buffers*")), - parquet.CreatedBy("github.com/grafana/pyroscope/", build.Version, build.Revision), - parquet.PageBufferSize(3*1024*1024), +func newParquetProfileWriter(writer io.Writer, options ...parquet.WriterOption) *parquet.GenericWriter[*schemav1.Profile] { + options = append(options, parquet.PageBufferSize(3*1024*1024)) + options = append(options, parquet.CreatedBy("github.com/grafana/pyroscope/", build.Version, build.Revision)) + options = append(options, parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "pyroscopedb-parquet-buffers*"))) + options = append(options, schemav1.ProfilesSchema) + return parquet.NewGenericWriter[*schemav1.Profile]( + writer, options..., ) } @@ -82,7 +84,7 @@ func newProfileStore(phlarectx context.Context) *profileStore { go s.cutRowGroupLoop() // Initialize writer on /dev/null // TODO: Reuse parquet.Writer beyond life time of the head. - s.writer = newProfileWriter(io.Discard) + s.writer = newParquetProfileWriter(io.Discard) return s } diff --git a/pkg/phlaredb/schemas/v1/functions.go b/pkg/phlaredb/schemas/v1/functions.go index 31ba8b5037..62d723fc7d 100644 --- a/pkg/phlaredb/schemas/v1/functions.go +++ b/pkg/phlaredb/schemas/v1/functions.go @@ -53,3 +53,8 @@ type InMemoryFunction struct { // Line number in source file. StartLine uint32 } + +func (f *InMemoryFunction) Clone() *InMemoryFunction { + n := *f + return &n +} diff --git a/pkg/phlaredb/schemas/v1/locations.go b/pkg/phlaredb/schemas/v1/locations.go index 27dc019a9e..b9cbf91ba6 100644 --- a/pkg/phlaredb/schemas/v1/locations.go +++ b/pkg/phlaredb/schemas/v1/locations.go @@ -110,6 +110,13 @@ type InMemoryLocation struct { Line []InMemoryLine } +func (l *InMemoryLocation) Clone() *InMemoryLocation { + x := *l + x.Line = make([]InMemoryLine, len(l.Line)) + copy(x.Line, l.Line) + return &x +} + type InMemoryLine struct { // The id of the corresponding profile.Function for this line. FunctionId uint32 diff --git a/pkg/phlaredb/schemas/v1/mappings.go b/pkg/phlaredb/schemas/v1/mappings.go index 6b342c24fe..0d5503f6cb 100644 --- a/pkg/phlaredb/schemas/v1/mappings.go +++ b/pkg/phlaredb/schemas/v1/mappings.go @@ -73,3 +73,8 @@ type InMemoryMapping struct { HasLineNumbers bool HasInlineFrames bool } + +func (m *InMemoryMapping) Clone() *InMemoryMapping { + n := *m + return &n +} diff --git a/pkg/phlaredb/sharding/label.go b/pkg/phlaredb/sharding/label.go new file mode 100644 index 0000000000..d9db0df453 --- /dev/null +++ b/pkg/phlaredb/sharding/label.go @@ -0,0 +1,106 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package sharding + +import ( + "fmt" + "strconv" + "strings" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" +) + +const ( + // ShardLabel is a reserved label referencing a shard on read path. + ShardLabel = "__query_shard__" + // CompactorShardIDLabel is the external label used to store + // the ID of a sharded block generated by the split-and-merge compactor. If a block hasn't + // this label, it means the block hasn't been split. + CompactorShardIDLabel = "__compactor_shard_id__" +) + +// ShardSelector holds information about the configured query shard. +type ShardSelector struct { + ShardIndex uint64 + ShardCount uint64 +} + +// LabelValue returns the label value to use to select this shard. +func (shard ShardSelector) LabelValue() string { + return FormatShardIDLabelValue(shard.ShardIndex, shard.ShardCount) +} + +// Label generates the ShardSelector as a label. +func (shard ShardSelector) Label() labels.Label { + return labels.Label{ + Name: ShardLabel, + Value: shard.LabelValue(), + } +} + +// Matcher converts ShardSelector to Matcher. +func (shard ShardSelector) Matcher() *labels.Matcher { + return labels.MustNewMatcher(labels.MatchEqual, ShardLabel, shard.LabelValue()) +} + +// ShardFromMatchers extracts a ShardSelector and the index it was pulled from the matcher list. +func ShardFromMatchers(matchers []*labels.Matcher) (shard *ShardSelector, idx int, err error) { + for i, matcher := range matchers { + if matcher.Name == ShardLabel && matcher.Type == labels.MatchEqual { + index, count, err := ParseShardIDLabelValue(matcher.Value) + if err != nil { + return nil, i, err + } + return &ShardSelector{ + ShardIndex: index, + ShardCount: count, + }, i, nil + } + } + return nil, 0, nil +} + +// RemoveShardFromMatchers returns the input matchers without the label matcher on the query shard (if any). +func RemoveShardFromMatchers(matchers []*labels.Matcher) (shard *ShardSelector, filtered []*labels.Matcher, err error) { + shard, idx, err := ShardFromMatchers(matchers) + if err != nil || shard == nil { + return nil, matchers, err + } + + // Create a new slice with the shard matcher removed. + filtered = make([]*labels.Matcher, 0, len(matchers)-1) + filtered = append(filtered, matchers[:idx]...) + filtered = append(filtered, matchers[idx+1:]...) + + return shard, filtered, nil +} + +// FormatShardIDLabelValue expects 0-based shardID, but uses 1-based shard in the output string. +func FormatShardIDLabelValue(shardID, shardCount uint64) string { + return fmt.Sprintf("%d_of_%d", shardID+1, shardCount) +} + +// ParseShardIDLabelValue returns original (0-based) shard index and shard count parsed from formatted value. +func ParseShardIDLabelValue(val string) (index, shardCount uint64, _ error) { + // If we fail to parse shardID, we better not consider this block fully included in successors. + matches := strings.Split(val, "_") + if len(matches) != 3 || matches[1] != "of" { + return 0, 0, errors.Errorf("invalid shard ID: %q", val) + } + + index, err := strconv.ParseUint(matches[0], 10, 64) + if err != nil { + return 0, 0, errors.Errorf("invalid shard ID: %q: %v", val, err) + } + count, err := strconv.ParseUint(matches[2], 10, 64) + if err != nil { + return 0, 0, errors.Errorf("invalid shard ID: %q: %v", val, err) + } + + if index == 0 || count == 0 || index > count { + return 0, 0, errors.Errorf("invalid shard ID: %q", val) + } + + return index - 1, count, nil +} diff --git a/pkg/phlaredb/sharding/label_test.go b/pkg/phlaredb/sharding/label_test.go new file mode 100644 index 0000000000..2ce1e6b239 --- /dev/null +++ b/pkg/phlaredb/sharding/label_test.go @@ -0,0 +1,208 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package sharding + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseShard(t *testing.T) { + tests := map[string]struct { + input string + index, count uint64 + err bool + }{ + "should return error on invalid format": { + input: "lsdjf", + err: true, + }, + "should return error on invalid index (not an integer)": { + input: "a_of_3", + err: true, + }, + "should return error on invalid index (not positive)": { + input: "-1_of_3", + err: true, + }, + "should return error on invalid count (not positive)": { + input: "-1_of_-3", + err: true, + }, + "should return error on invalid index (too large)": { + input: "4_of_3", + err: true, + }, + "should return error on invalid index (too small)": { + input: "0_of_3", + err: true, + }, + "should return error on invalid separator": { + input: "1_out_3", + err: true, + }, + "should succeed on valid first shard ID": { + input: "1_of_2", + index: 0, // 0-based + count: 2, + }, + "should succeed on valid last shard selector": { + input: "2_of_2", + index: 1, // 0-based + count: 2, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + index, count, err := ParseShardIDLabelValue(testData.input) + if testData.err { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, testData.index, index) + require.Equal(t, testData.count, count) + } + }) + } +} + +func TestRemoveShardFromMatchers(t *testing.T) { + tests := map[string]struct { + input []*labels.Matcher + expectedShard *ShardSelector + expectedMatchers []*labels.Matcher + expectedError error + }{ + "should return no shard on empty label matchers": {}, + "should return no shard on no shard label matcher": { + input: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test"), + labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"), + }, + expectedShard: nil, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test"), + labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"), + }, + }, + "should return matching shard and filter out its matcher": { + input: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test"), + labels.MustNewMatcher(labels.MatchEqual, ShardLabel, ShardSelector{ShardIndex: 1, ShardCount: 8}.LabelValue()), + labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"), + }, + expectedShard: &ShardSelector{ + ShardIndex: 1, + ShardCount: 8, + }, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test"), + labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"), + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + actualShard, actualMatchers, actualError := RemoveShardFromMatchers(testData.input) + assert.Equal(t, testData.expectedShard, actualShard) + assert.Equal(t, testData.expectedError, actualError) + + // Assert same matchers. We do some optimizations in mimir-prometheus which make + // the label matchers not comparable with reflect.DeepEqual() so we're going to + // compare their string representation. + require.Len(t, actualMatchers, len(testData.expectedMatchers)) + for i := 0; i < len(testData.expectedMatchers); i++ { + assert.Equal(t, testData.expectedMatchers[i].String(), actualMatchers[i].String()) + } + }) + } +} + +func TestShardFromMatchers(t *testing.T) { + testExpr := []struct { + input []*labels.Matcher + shard *ShardSelector + idx int + err bool + }{ + { + input: []*labels.Matcher{ + {}, + { + Name: ShardLabel, + Type: labels.MatchEqual, + Value: ShardSelector{ + ShardIndex: 10, + ShardCount: 16, + }.LabelValue(), + }, + {}, + }, + shard: &ShardSelector{ + ShardIndex: 10, + ShardCount: 16, + }, + idx: 1, + err: false, + }, + { + input: []*labels.Matcher{ + { + Name: ShardLabel, + Type: labels.MatchEqual, + Value: "invalid-fmt", + }, + }, + shard: nil, + idx: 0, + err: true, + }, + { + input: []*labels.Matcher{}, + shard: nil, + idx: 0, + err: false, + }, + } + + for i, c := range testExpr { + t.Run(fmt.Sprint(i), func(t *testing.T) { + shard, idx, err := ShardFromMatchers(c.input) + if c.err { + require.NotNil(t, err) + } else { + require.Nil(t, err) + require.Equal(t, c.shard, shard) + require.Equal(t, c.idx, idx) + } + }) + } +} + +func TestFormatAndParseShardId(t *testing.T) { + r := rand.New(rand.NewSource(0)) + + const maxTests = 1000 + const maxShardCount = 10000 + + for i := 0; i < maxTests; i++ { + count := 1 + r.Intn(maxShardCount) + id := r.Intn(count) + + require.True(t, id < count) + + out := FormatShardIDLabelValue(uint64(id), uint64(count)) + nid, ncount, err := ParseShardIDLabelValue(out) + + require.NoError(t, err) + require.Equal(t, uint64(id), nid) + require.Equal(t, uint64(count), ncount) + } +} diff --git a/pkg/phlaredb/symdb/rewriter.go b/pkg/phlaredb/symdb/rewriter.go index cf1ba5d408..8d34b52f36 100644 --- a/pkg/phlaredb/symdb/rewriter.go +++ b/pkg/phlaredb/symdb/rewriter.go @@ -54,14 +54,15 @@ func (r *Rewriter) getOrCreatePartition(partition uint64) (_ *partitionRewriter, n := &partitionRewriter{name: partition} n.dst = r.symdb.PartitionWriter(partition) + // Note that the partition is not released: we want to keep + // it during the whole lifetime of the rewriter. pr, err := r.source.Partition(context.TODO(), partition) if err != nil { return nil, err } - - // Note that the partition is not released: we want to keep - // it during the whole lifetime of the rewriter. - n.src = pr.Symbols() + // We clone locations, functions, and mappings, + // because these object will be modified. + n.src = cloneSymbolsPartially(pr.Symbols()) var stats PartitionStats pr.WriteStats(&stats) @@ -242,6 +243,26 @@ func (p *partitionRewriter) InsertStacktrace(stacktrace uint32, locations []int3 p.stacktraces.values[idx] = n } +func cloneSymbolsPartially(x *Symbols) *Symbols { + n := Symbols{ + Stacktraces: x.Stacktraces, + Locations: make([]*schemav1.InMemoryLocation, len(x.Locations)), + Mappings: make([]*schemav1.InMemoryMapping, len(x.Mappings)), + Functions: make([]*schemav1.InMemoryFunction, len(x.Functions)), + Strings: x.Strings, + } + for i, l := range x.Locations { + n.Locations[i] = l.Clone() + } + for i, m := range x.Mappings { + n.Mappings[i] = m.Clone() + } + for i, f := range x.Functions { + n.Functions[i] = f.Clone() + } + return &n +} + const ( marker = 1 << 31 markerMask = math.MaxUint32 >> 1