diff --git a/pkg/iter/iter.go b/pkg/iter/iter.go index 47fbe6f4e6..34a16f7db6 100644 --- a/pkg/iter/iter.go +++ b/pkg/iter/iter.go @@ -101,6 +101,20 @@ func NewSliceSeekIterator[A constraints.Ordered](s []A) SeekIterator[A, A] { } } +type slicePositionIterator[T constraints.Integer, M any] struct { + i Iterator[T] + s []M +} + +func NewSliceIndexIterator[T constraints.Integer, M any](s []M, i Iterator[T]) Iterator[M] { + return slicePositionIterator[T, M]{s: s, i: i} +} + +func (i slicePositionIterator[T, M]) Next() bool { return i.i.Next() } +func (i slicePositionIterator[T, M]) At() M { return i.s[i.i.At()] } +func (i slicePositionIterator[T, M]) Err() error { return i.i.Err() } +func (i slicePositionIterator[T, M]) Close() error { return i.i.Close() } + type sliceSeekIterator[A constraints.Ordered] struct { *sliceIterator[A] } diff --git a/pkg/iter/tree.go b/pkg/iter/tree.go index fb810d64eb..44c73ba430 100644 --- a/pkg/iter/tree.go +++ b/pkg/iter/tree.go @@ -1,11 +1,7 @@ package iter import ( -<<<<<<< HEAD "github.com/grafana/pyroscope/pkg/util/loser" -======= - "github.com/grafana/phlare/pkg/util/loser" ->>>>>>> ee8a92e04 (Add first draft of block compaction) ) var _ Iterator[interface{}] = &TreeIterator[interface{}]{} diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 7f675cc7fe..e6171e5c79 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -305,7 +305,13 @@ type singleBlockQuerier struct { type StacktraceDB interface { Open(ctx context.Context) error Close() error - Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error + + // Load the database into memory entirely. + // This method is used at compaction. + Load(context.Context) error + WriteStats(partition uint64, s *symdb.Stats) + + Resolve(ctx context.Context, partition uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error } type stacktraceResolverV1 struct { @@ -321,18 +327,33 @@ func (r *stacktraceResolverV1) Close() error { return r.stacktraces.Close() } -func (r *stacktraceResolverV1) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error { +func (r *stacktraceResolverV1) Resolve(ctx context.Context, _ uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error { stacktraces := repeatedColumnIter(ctx, r.stacktraces.file, "LocationIDs.list.element", iter.NewSliceIterator(stacktraceIDs)) defer stacktraces.Close() - + t := make([]int32, 0, 64) for stacktraces.Next() { s := stacktraces.At() - locs.addFromParquet(int64(s.Row), s.Values) - + t = grow(t, len(s.Values)) + for i, v := range s.Values { + t[i] = v.Int32() + } + locs.InsertStacktrace(s.Row, t) } return stacktraces.Err() } +func (r *stacktraceResolverV1) WriteStats(_ uint64, s *symdb.Stats) { + s.StacktracesTotal = int(r.stacktraces.file.NumRows()) + s.MaxStacktraceID = s.StacktracesTotal +} + +func (r *stacktraceResolverV1) Load(context.Context) error { + // FIXME(kolesnikovae): Loading all stacktraces from parquet file + // into memory is likely a bad choice. Instead we could convert + // it to symdb first. + return nil +} + type stacktraceResolverV2 struct { reader *symdb.Reader bucketReader phlareobj.Bucket @@ -351,19 +372,25 @@ func (r *stacktraceResolverV2) Close() error { return nil } -func (r *stacktraceResolverV2) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error { - mr, ok := r.reader.MappingReader(mapping) +func (r *stacktraceResolverV2) Resolve(ctx context.Context, partition uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error { + mr, ok := r.reader.SymbolsResolver(partition) if !ok { return nil } resolver := mr.StacktraceResolver() defer resolver.Release() + return resolver.ResolveStacktraces(ctx, locs, stacktraceIDs) +} - return resolver.ResolveStacktraces(ctx, symdb.StacktraceInserterFn( - func(stacktraceID uint32, locations []int32) { - locs.add(int64(stacktraceID), locations) - }, - ), stacktraceIDs) +func (r *stacktraceResolverV2) Load(ctx context.Context) error { + return r.reader.Load(ctx) +} + +func (r *stacktraceResolverV2) WriteStats(partition uint64, s *symdb.Stats) { + mr, ok := r.reader.SymbolsResolver(partition) + if ok { + mr.WriteStats(s) + } } func NewSingleBlockQuerierFromMeta(phlarectx context.Context, bucketReader phlareobj.Bucket, meta *block.Meta) *singleBlockQuerier { @@ -435,6 +462,18 @@ func (b *singleBlockQuerier) Index() IndexReader { return b.index } +func (b *singleBlockQuerier) Symbols() SymbolsReader { + return &inMemorySymbolsReader{ + partitions: make(map[uint64]*inMemorySymbolsResolver), + + strings: b.strings, + functions: b.functions, + locations: b.locations, + mappings: b.mappings, + stacktraces: b.stacktraces, + } +} + func (b *singleBlockQuerier) Meta() block.Meta { if b.meta == nil { return block.Meta{} diff --git a/pkg/phlaredb/block_symbols_reader.go b/pkg/phlaredb/block_symbols_reader.go new file mode 100644 index 0000000000..e8399e1342 --- /dev/null +++ b/pkg/phlaredb/block_symbols_reader.go @@ -0,0 +1,82 @@ +package phlaredb + +import ( + "context" + + "github.com/grafana/pyroscope/pkg/iter" + schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" + "github.com/grafana/pyroscope/pkg/phlaredb/symdb" +) + +// TODO(kolesnikovae): Refactor to symdb. + +type SymbolsReader interface { + SymbolsResolver(partition uint64) (SymbolsResolver, error) +} + +type SymbolsResolver interface { + ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error + + Locations(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation] + Mappings(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] + Functions(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] + Strings(iter.Iterator[uint32]) iter.Iterator[string] + + WriteStats(*symdb.Stats) +} + +type inMemorySymbolsReader struct { + partitions map[uint64]*inMemorySymbolsResolver + + // TODO(kolesnikovae): Split into partitions. + strings inMemoryparquetReader[string, *schemav1.StringPersister] + functions inMemoryparquetReader[*schemav1.InMemoryFunction, *schemav1.FunctionPersister] + locations inMemoryparquetReader[*schemav1.InMemoryLocation, *schemav1.LocationPersister] + mappings inMemoryparquetReader[*schemav1.InMemoryMapping, *schemav1.MappingPersister] + stacktraces StacktraceDB +} + +func (r *inMemorySymbolsReader) SymbolsResolver(partition uint64) (SymbolsResolver, error) { + p, ok := r.partitions[partition] + if !ok { + p = &inMemorySymbolsResolver{ + partition: partition, + reader: r, + } + r.partitions[partition] = p + } + return p, nil +} + +type inMemorySymbolsResolver struct { + partition uint64 + reader *inMemorySymbolsReader +} + +func (s inMemorySymbolsResolver) ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error { + return s.reader.stacktraces.Resolve(ctx, s.partition, dst, stacktraces) +} + +func (s inMemorySymbolsResolver) Locations(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation] { + return iter.NewSliceIndexIterator(s.reader.locations.cache, i) +} + +func (s inMemorySymbolsResolver) Mappings(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] { + return iter.NewSliceIndexIterator(s.reader.mappings.cache, i) +} + +func (s inMemorySymbolsResolver) Functions(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] { + return iter.NewSliceIndexIterator(s.reader.functions.cache, i) +} + +func (s inMemorySymbolsResolver) Strings(i iter.Iterator[uint32]) iter.Iterator[string] { + return iter.NewSliceIndexIterator(s.reader.strings.cache, i) +} + +func (s inMemorySymbolsResolver) WriteStats(stats *symdb.Stats) { + s.reader.stacktraces.WriteStats(s.partition, stats) + stats.LocationsTotal = len(s.reader.locations.cache) + stats.MappingsTotal = len(s.reader.mappings.cache) + stats.FunctionsTotal = len(s.reader.functions.cache) + stats.StringsTotal = len(s.reader.strings.cache) +} diff --git a/pkg/phlaredb/block_symbols_writer.go b/pkg/phlaredb/block_symbols_writer.go new file mode 100644 index 0000000000..43fd1a09a7 --- /dev/null +++ b/pkg/phlaredb/block_symbols_writer.go @@ -0,0 +1,111 @@ +package phlaredb + +import ( + "context" + "fmt" + "path/filepath" + + schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" + "github.com/grafana/pyroscope/pkg/phlaredb/symdb" +) + +// TODO(kolesnikovae): Refactor to symdb. + +type SymbolsWriter interface { + SymbolsAppender(partition uint64) (SymbolsAppender, error) +} + +type SymbolsAppender interface { + AppendStacktraces([]uint32, []*schemav1.Stacktrace) + AppendLocations([]uint32, []*schemav1.InMemoryLocation) + AppendMappings([]uint32, []*schemav1.InMemoryMapping) + AppendFunctions([]uint32, []*schemav1.InMemoryFunction) + AppendStrings([]uint32, []string) +} + +type symbolsWriter struct { + partitions map[uint64]*symbolsAppender + + locations deduplicatingSlice[*schemav1.InMemoryLocation, locationsKey, *locationsHelper, *schemav1.LocationPersister] + mappings deduplicatingSlice[*schemav1.InMemoryMapping, mappingsKey, *mappingsHelper, *schemav1.MappingPersister] + functions deduplicatingSlice[*schemav1.InMemoryFunction, functionsKey, *functionsHelper, *schemav1.FunctionPersister] + strings deduplicatingSlice[string, string, *stringsHelper, *schemav1.StringPersister] + tables []Table + + symdb *symdb.SymDB +} + +func newSymbolsWriter(dst string, cfg *ParquetConfig) (*symbolsWriter, error) { + w := symbolsWriter{ + partitions: make(map[uint64]*symbolsAppender), + } + dir := filepath.Join(dst, symdb.DefaultDirName) + w.symdb = symdb.NewSymDB(symdb.DefaultConfig().WithDirectory(dir)) + w.tables = []Table{ + &w.locations, + &w.mappings, + &w.functions, + &w.strings, + } + for _, t := range w.tables { + if err := t.Init(dst, cfg, contextHeadMetrics(context.Background())); err != nil { + return nil, err + } + } + return &w, nil +} + +func (w *symbolsWriter) SymbolsAppender(partition uint64) (SymbolsAppender, error) { + p, ok := w.partitions[partition] + if !ok { + appender := w.symdb.SymbolsAppender(partition) + x := &symbolsAppender{ + stacktraces: appender.StacktraceAppender(), + writer: w, + } + w.partitions[partition] = x + p = x + } + return p, nil +} + +func (w *symbolsWriter) Close() error { + for _, t := range w.tables { + _, _, err := t.Flush(context.Background()) + if err != nil { + return fmt.Errorf("flushing table %s: %w", t.Name(), err) + } + if err = t.Close(); err != nil { + return fmt.Errorf("closing table %s: %w", t.Name(), err) + } + } + if err := w.symdb.Flush(); err != nil { + return fmt.Errorf("flushing symbol database: %w", err) + } + return nil +} + +type symbolsAppender struct { + stacktraces symdb.StacktraceAppender + writer *symbolsWriter +} + +func (s symbolsAppender) AppendStacktraces(dst []uint32, stacktraces []*schemav1.Stacktrace) { + s.stacktraces.AppendStacktrace(dst, stacktraces) +} + +func (s symbolsAppender) AppendLocations(dst []uint32, locations []*schemav1.InMemoryLocation) { + s.writer.locations.append(dst, locations) +} + +func (s symbolsAppender) AppendMappings(dst []uint32, mappings []*schemav1.InMemoryMapping) { + s.writer.mappings.append(dst, mappings) +} + +func (s symbolsAppender) AppendFunctions(dst []uint32, functions []*schemav1.InMemoryFunction) { + s.writer.functions.append(dst, functions) +} + +func (s symbolsAppender) AppendStrings(dst []uint32, strings []string) { + s.writer.strings.append(dst, strings) +} diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 84c31e6bf9..62dc48a1af 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -6,6 +6,7 @@ import ( "math" "os" "path/filepath" + "sort" "github.com/oklog/ulid" "github.com/opentracing/opentracing-go" @@ -20,6 +21,7 @@ import ( phlareparquet "github.com/grafana/pyroscope/pkg/parquet" "github.com/grafana/pyroscope/pkg/phlaredb/block" schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" + "github.com/grafana/pyroscope/pkg/phlaredb/symdb" "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index" "github.com/grafana/pyroscope/pkg/util" "github.com/grafana/pyroscope/pkg/util/loser" @@ -29,7 +31,7 @@ type BlockReader interface { Meta() block.Meta Profiles() []parquet.RowGroup Index() IndexReader - // todo symbdb + Symbols() SymbolsReader } func Compact(ctx context.Context, src []BlockReader, dst string) (meta block.Meta, err error) { @@ -74,28 +76,37 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (meta block.Met return block.Meta{}, err } profileWriter := newProfileWriter(profileFile) - - // todo new symbdb + symw, err := newSymbolsWriter(blockPath, defaultParquetConfig) + if err != nil { + return block.Meta{}, err + } rowsIt, err := newMergeRowProfileIterator(src) if err != nil { return block.Meta{}, err } seriesRewriter := newSeriesRewriter(rowsIt, indexw) - symbolsRewriter := newSymbolsRewriter(seriesRewriter) - reader := phlareparquet.NewIteratorRowReader(newRowsIterator(symbolsRewriter)) + symRewriter := newSymbolsRewriter(seriesRewriter, src, symw) + reader := phlareparquet.NewIteratorRowReader(newRowsIterator(symRewriter)) total, _, err := phlareparquet.CopyAsRowGroups(profileWriter, reader, defaultParquetConfig.MaxBufferRowCount) if err != nil { return block.Meta{}, err } + if err = symRewriter.Close(); err != nil { + return block.Meta{}, err + } + if err = symw.Close(); err != nil { + return block.Meta{}, err + } + // flush the index file. - if err := indexw.Close(); err != nil { + if err = indexw.Close(); err != nil { return block.Meta{}, err } - if err := profileWriter.Close(); err != nil { + if err = profileWriter.Close(); err != nil { return block.Meta{}, err } @@ -106,7 +117,7 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (meta block.Met meta.Files = metaFiles meta.Stats.NumProfiles = total meta.Stats.NumSeries = seriesRewriter.NumSeries() - meta.Stats.NumSamples = symbolsRewriter.NumSamples() + meta.Stats.NumSamples = symRewriter.NumSamples() if _, err := meta.WriteToFile(util.Logger, blockPath); err != nil { return block.Meta{}, err } @@ -179,8 +190,8 @@ func parquetMetaFile(filePath string, size int64) (block.File, error) { func compactMetas(src []block.Meta) block.Meta { meta := block.NewMeta() highestCompactionLevel := 0 - ulids := make([]ulid.ULID, len(src)) - parents := make([]tsdb.BlockDesc, len(src)) + ulids := make([]ulid.ULID, 0, len(src)) + parents := make([]tsdb.BlockDesc, 0, len(src)) minTime, maxTime := model.Latest, model.Earliest labels := make(map[string]string) for _, b := range src { @@ -229,10 +240,13 @@ type profileRow struct { labels phlaremodel.Labels fp model.Fingerprint row schemav1.ProfileRow + + blockReader BlockReader } type profileRowIterator struct { profiles iter.Iterator[parquet.Row] + blockReader BlockReader index IndexReader allPostings index.Postings err error @@ -241,15 +255,16 @@ type profileRowIterator struct { chunks []index.ChunkMeta } -func newProfileRowIterator(reader parquet.RowReader, idx IndexReader) (*profileRowIterator, error) { +func newProfileRowIterator(reader parquet.RowReader, s BlockReader) (*profileRowIterator, error) { k, v := index.AllPostingsKey() - allPostings, err := idx.Postings(k, nil, v) + allPostings, err := s.Index().Postings(k, nil, v) if err != nil { return nil, err } return &profileRowIterator{ profiles: phlareparquet.NewBufferedRowReaderIterator(reader, 1024), - index: idx, + blockReader: s, + index: s.Index(), allPostings: allPostings, currentRow: profileRow{ seriesRef: math.MaxUint32, @@ -266,6 +281,7 @@ func (p *profileRowIterator) Next() bool { if !p.profiles.Next() { return false } + p.currentRow.blockReader = p.blockReader p.currentRow.row = schemav1.ProfileRow(p.profiles.At()) seriesIndex := p.currentRow.row.SeriesIndex() p.currentRow.timeNanos = p.currentRow.row.TimeNanos() @@ -308,10 +324,7 @@ func newMergeRowProfileIterator(src []BlockReader) (iter.Iterator[profileRow], e for i, s := range src { // todo: may be we could merge rowgroups in parallel but that requires locking. reader := parquet.MultiRowGroup(s.Profiles()...).Rows() - it, err := newProfileRowIterator( - reader, - s.Index(), - ) + it, err := newProfileRowIterator(reader, s) if err != nil { return nil, err } @@ -343,80 +356,6 @@ func newMergeRowProfileIterator(src []BlockReader) (iter.Iterator[profileRow], e }, nil } -type noopStacktraceRewriter struct{} - -func (noopStacktraceRewriter) RewriteStacktraces(src, dst []uint32) error { - copy(dst, src) - return nil -} - -type StacktraceRewriter interface { - RewriteStacktraces(src, dst []uint32) error -} - -type symbolsRewriter struct { - iter.Iterator[profileRow] - err error - - rewriter StacktraceRewriter - src, dst []uint32 - numSamples uint64 -} - -// todo remap symbols & ingest symbols -func newSymbolsRewriter(it iter.Iterator[profileRow]) *symbolsRewriter { - return &symbolsRewriter{ - Iterator: it, - rewriter: noopStacktraceRewriter{}, - } -} - -func (s *symbolsRewriter) NumSamples() uint64 { - return s.numSamples -} - -func (s *symbolsRewriter) Next() bool { - if !s.Iterator.Next() { - return false - } - var err error - s.Iterator.At().row.ForStacktraceIDsValues(func(values []parquet.Value) { - s.numSamples += uint64(len(values)) - s.loadStacktracesID(values) - err = s.rewriter.RewriteStacktraces(s.src, s.dst) - if err != nil { - return - } - for i, v := range values { - values[i] = parquet.Int64Value(int64(s.dst[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) - } - }) - if err != nil { - s.err = err - return false - } - return true -} - -func (s *symbolsRewriter) Err() error { - if s.err != nil { - return s.err - } - return s.Iterator.Err() -} - -func (s *symbolsRewriter) loadStacktracesID(values []parquet.Value) { - if cap(s.src) < len(values) { - s.src = make([]uint32, len(values)*2) - s.dst = make([]uint32, len(values)*2) - } - s.src = s.src[:len(values)] - s.dst = s.dst[:len(values)] - for i := range values { - s.src[i] = values[i].Uint32() - } -} - type seriesRewriter struct { iter.Iterator[profileRow] @@ -534,3 +473,463 @@ func prepareIndexWriter(ctx context.Context, path string, readers []BlockReader) return indexw, nil } + +type symbolsRewriter struct { + profiles iter.Iterator[profileRow] + rewriters map[BlockReader]*stacktraceRewriter + stacktraces []uint32 + err error + + numSamples uint64 +} + +func newSymbolsRewriter(it iter.Iterator[profileRow], blocks []BlockReader, w SymbolsWriter) *symbolsRewriter { + sr := symbolsRewriter{ + profiles: it, + rewriters: make(map[BlockReader]*stacktraceRewriter, len(blocks)), + } + for _, r := range blocks { + sr.rewriters[r] = newStacktraceRewriter(r.Symbols(), w) + } + return &sr +} + +func (s *symbolsRewriter) NumSamples() uint64 { return s.numSamples } + +func (s *symbolsRewriter) At() profileRow { return s.profiles.At() } + +func (s *symbolsRewriter) Close() error { return s.profiles.Close() } + +func (s *symbolsRewriter) Err() error { + if s.err != nil { + return s.err + } + return s.profiles.Err() +} + +func (s *symbolsRewriter) Next() bool { + if !s.profiles.Next() { + return false + } + var err error + profile := s.profiles.At() + profile.row.ForStacktraceIDsValues(func(values []parquet.Value) { + s.loadStacktracesID(values) + r := s.rewriters[profile.blockReader] + if err = r.rewriteStacktraces(profile.row.StacktracePartitionID(), s.stacktraces); err != nil { + return + } + s.numSamples += uint64(len(values)) + for i, v := range values { + // FIXME: the original order is not preserved, which will affect encoding. + values[i] = parquet.Int64Value(int64(s.stacktraces[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) + } + }) + if err != nil { + s.err = err + return false + } + return true +} + +func (s *symbolsRewriter) loadStacktracesID(values []parquet.Value) { + s.stacktraces = grow(s.stacktraces, len(values)) + for i := range values { + s.stacktraces[i] = values[i].Uint32() + } +} + +type stacktraceRewriter struct { + reader SymbolsReader + writer SymbolsWriter + + partitions map[uint64]*symPartitionRewriter + inserter *stacktraceInserter + + // Objects below have global addressing. + // TODO(kolesnikovae): Move to partition. + locations *lookupTable[*schemav1.InMemoryLocation] + mappings *lookupTable[*schemav1.InMemoryMapping] + functions *lookupTable[*schemav1.InMemoryFunction] + strings *lookupTable[string] +} + +type symPartitionRewriter struct { + name uint64 + stats symdb.Stats + // Stacktrace identifiers are only valid within the partition. + stacktraces *lookupTable[[]int32] + resolver SymbolsResolver + appender SymbolsAppender + + r *stacktraceRewriter + + // FIXME(kolesnikovae): schemav1.Stacktrace should be just a uint32 slice: + // type Stacktrace []uint32 + current []*schemav1.Stacktrace +} + +func newStacktraceRewriter(r SymbolsReader, w SymbolsWriter) *stacktraceRewriter { + return &stacktraceRewriter{ + reader: r, + writer: w, + } +} + +func (r *stacktraceRewriter) init(partition uint64) (p *symPartitionRewriter, err error) { + if r.partitions == nil { + r.partitions = make(map[uint64]*symPartitionRewriter) + } + if p, err = r.getOrCreatePartition(partition); err != nil { + return nil, err + } + + if r.locations == nil { + r.locations = newLookupTable[*schemav1.InMemoryLocation](p.stats.LocationsTotal) + r.mappings = newLookupTable[*schemav1.InMemoryMapping](p.stats.MappingsTotal) + r.functions = newLookupTable[*schemav1.InMemoryFunction](p.stats.FunctionsTotal) + r.strings = newLookupTable[string](p.stats.StringsTotal) + } else { + r.locations.reset() + r.mappings.reset() + r.functions.reset() + r.strings.reset() + } + + r.inserter = &stacktraceInserter{ + stacktraces: p.stacktraces, + locations: r.locations, + } + + return p, nil +} + +func (r *stacktraceRewriter) getOrCreatePartition(partition uint64) (_ *symPartitionRewriter, err error) { + p, ok := r.partitions[partition] + if ok { + p.reset() + return p, nil + } + n := &symPartitionRewriter{r: r, name: partition} + if n.resolver, err = r.reader.SymbolsResolver(partition); err != nil { + return nil, err + } + if n.appender, err = r.writer.SymbolsAppender(partition); err != nil { + return nil, err + } + n.resolver.WriteStats(&n.stats) + n.stacktraces = newLookupTable[[]int32](n.stats.MaxStacktraceID) + r.partitions[partition] = n + return n, nil +} + +func (r *stacktraceRewriter) rewriteStacktraces(partition uint64, stacktraces []uint32) error { + p, err := r.init(partition) + if err != nil { + return err + } + if err = p.populateUnresolved(stacktraces); err != nil { + return err + } + if p.hasUnresolved() { + return p.appendRewrite(stacktraces) + } + return nil +} + +func (p *symPartitionRewriter) reset() { + p.stacktraces.reset() + p.current = p.current[:0] +} + +func (p *symPartitionRewriter) hasUnresolved() bool { + return len(p.stacktraces.unresolved)+ + len(p.r.locations.unresolved)+ + len(p.r.mappings.unresolved)+ + len(p.r.functions.unresolved)+ + len(p.r.strings.unresolved) > 0 +} + +func (p *symPartitionRewriter) populateUnresolved(stacktraceIDs []uint32) error { + // Filter out all stack traces that have been already + // resolved and populate locations lookup table. + if err := p.resolveStacktraces(stacktraceIDs); err != nil { + return err + } + if len(p.r.locations.unresolved) == 0 { + return nil + } + + // Resolve functions and mappings for new locations. + unresolvedLocs := p.r.locations.iter() + locations := p.resolver.Locations(unresolvedLocs) + for locations.Next() { + location := locations.At() + location.MappingId = p.r.mappings.tryLookup(location.MappingId) + for j, line := range location.Line { + location.Line[j].FunctionId = p.r.functions.tryLookup(line.FunctionId) + } + unresolvedLocs.setValue(location) + } + if err := locations.Err(); err != nil { + return err + } + + // Resolve strings. + unresolvedMappings := p.r.mappings.iter() + mappings := p.resolver.Mappings(unresolvedMappings) + for mappings.Next() { + mapping := mappings.At() + mapping.BuildId = p.r.strings.tryLookup(mapping.BuildId) + mapping.Filename = p.r.strings.tryLookup(mapping.Filename) + unresolvedMappings.setValue(mapping) + } + if err := mappings.Err(); err != nil { + return err + } + + unresolvedFunctions := p.r.functions.iter() + functions := p.resolver.Functions(unresolvedFunctions) + for functions.Next() { + function := functions.At() + function.Name = p.r.strings.tryLookup(function.Name) + function.Filename = p.r.strings.tryLookup(function.Filename) + function.SystemName = p.r.strings.tryLookup(function.SystemName) + unresolvedFunctions.setValue(function) + } + if err := functions.Err(); err != nil { + return err + } + + unresolvedStrings := p.r.strings.iter() + strings := p.resolver.Strings(unresolvedStrings) + for strings.Next() { + unresolvedStrings.setValue(strings.At()) + } + return strings.Err() +} + +func (p *symPartitionRewriter) appendRewrite(stacktraces []uint32) error { + p.appender.AppendStrings(p.r.strings.buf, p.r.strings.values) + p.r.strings.updateResolved() + + for _, v := range p.r.functions.values { + v.Name = p.r.strings.lookupResolved(v.Name) + v.Filename = p.r.strings.lookupResolved(v.Filename) + v.SystemName = p.r.strings.lookupResolved(v.SystemName) + } + p.appender.AppendFunctions(p.r.functions.buf, p.r.functions.values) + p.r.functions.updateResolved() + + for _, v := range p.r.mappings.values { + v.BuildId = p.r.strings.lookupResolved(v.BuildId) + v.Filename = p.r.strings.lookupResolved(v.Filename) + } + p.appender.AppendMappings(p.r.mappings.buf, p.r.mappings.values) + p.r.mappings.updateResolved() + + for _, v := range p.r.locations.values { + v.MappingId = p.r.mappings.lookupResolved(v.MappingId) + for j, line := range v.Line { + v.Line[j].FunctionId = p.r.functions.lookupResolved(line.FunctionId) + } + } + p.appender.AppendLocations(p.r.locations.buf, p.r.locations.values) + p.r.locations.updateResolved() + + for _, v := range p.stacktraces.values { + for j, location := range v { + v[j] = int32(p.r.locations.lookupResolved(uint32(location))) + } + } + p.appender.AppendStacktraces(p.stacktraces.buf, p.stacktracesFromResolvedValues()) + p.stacktraces.updateResolved() + + for i, v := range stacktraces { + stacktraces[i] = p.stacktraces.lookupResolved(v) + } + + return nil +} + +func (p *symPartitionRewriter) resolveStacktraces(stacktraceIDs []uint32) error { + for i, v := range stacktraceIDs { + stacktraceIDs[i] = p.stacktraces.tryLookup(v) + } + if len(p.stacktraces.unresolved) == 0 { + return nil + } + p.stacktraces.initSorted() + return p.resolver.ResolveStacktraces(context.TODO(), p.r.inserter, p.stacktraces.buf) +} + +func (p *symPartitionRewriter) stacktracesFromResolvedValues() []*schemav1.Stacktrace { + p.current = grow(p.current, len(p.stacktraces.values)) + for i, v := range p.stacktraces.values { + s := p.current[i] + if s == nil { + s = &schemav1.Stacktrace{LocationIDs: make([]uint64, len(v))} + p.current[i] = s + } + s.LocationIDs = grow(s.LocationIDs, len(v)) + for j, m := range v { + s.LocationIDs[j] = uint64(m) + } + } + return p.current +} + +type stacktraceInserter struct { + stacktraces *lookupTable[[]int32] + locations *lookupTable[*schemav1.InMemoryLocation] +} + +func (i *stacktraceInserter) InsertStacktrace(stacktrace uint32, locations []int32) { + // Resolve locations for new stack traces. + for j, loc := range locations { + locations[j] = int32(i.locations.tryLookup(uint32(loc))) + } + // stacktrace points to resolved which should + // be a marked pointer to unresolved value. + idx := i.stacktraces.resolved[stacktrace] & markerMask + v := &i.stacktraces.values[idx] + n := grow(*v, len(locations)) + copy(n, locations) + // Preserve allocated capacity. + i.stacktraces.values[idx] = n +} + +const ( + marker = 1 << 31 + markerMask = math.MaxUint32 >> 1 +) + +type lookupTable[T any] struct { + // Index is source ID, and the value is the destination ID. + // If destination ID is not known, the element is index to 'unresolved' (marked). + resolved []uint32 + unresolved []uint32 // Points to resolved. Index matches values. + values []T // Values are populated for unresolved items. + buf []uint32 // Sorted unresolved values. +} + +func newLookupTable[T any](size int) *lookupTable[T] { + var t lookupTable[T] + t.init(size) + return &t +} + +func (t *lookupTable[T]) init(size int) { + if cap(t.resolved) < size { + t.resolved = make([]uint32, size) + return + } + t.resolved = t.resolved[:size] + for i := range t.resolved { + t.resolved[i] = 0 + } +} + +func (t *lookupTable[T]) reset() { + t.unresolved = t.unresolved[:0] + t.values = t.values[:0] + t.buf = t.buf[:0] +} + +// tryLookup looks up the value at x in resolved. +// If x is has not been resolved yet, the x is memorized +// for future resolve, and returned values is the marked +// index to unresolved. +func (t *lookupTable[T]) tryLookup(x uint32) uint32 { + if v := t.resolved[x]; v != 0 { + if v&marker > 0 { + return v // Already marked for resolve. + } + return v - 1 // Already resolved. + } + u := t.newUnresolved(x) | marker + t.resolved[x] = u + return u +} + +func (t *lookupTable[T]) newUnresolved(rid uint32) uint32 { + t.unresolved = append(t.unresolved, rid) + x := len(t.values) + if x < cap(t.values) { + // Try to reuse previously allocated value. + t.values = t.values[:x+1] + } else { + var v T + t.values = append(t.values, v) + } + return uint32(x) +} + +func (t *lookupTable[T]) storeResolved(i int, rid uint32) { + // The index is incremented to avoid 0 because it is + // used as sentinel and indicates absence (resolved is + // a sparse slice initialized with the maximal expected + // size). Correspondingly, lookupResolved should + // decrement the index on read. + t.resolved[t.unresolved[i]] = rid + 1 +} + +func (t *lookupTable[T]) lookupResolved(x uint32) uint32 { + if x&marker > 0 { + return t.resolved[t.unresolved[x&markerMask]] - 1 + } + return x // Already resolved. +} + +// updateResolved loads indices from buf to resolved. +// It is expected that the order matches values. +func (t *lookupTable[T]) updateResolved() { + for i, rid := range t.unresolved { + t.resolved[rid] = t.buf[i] + 1 + } +} + +func (t *lookupTable[T]) initSorted() { + // Gather and sort references to unresolved values. + t.buf = grow(t.buf, len(t.unresolved)) + copy(t.buf, t.unresolved) + sort.Slice(t.buf, func(i, j int) bool { + return t.buf[i] < t.buf[j] + }) +} + +func (t *lookupTable[T]) iter() *lookupTableIterator[T] { + t.initSorted() + return &lookupTableIterator[T]{table: t} +} + +type lookupTableIterator[T any] struct { + table *lookupTable[T] + cur uint32 +} + +func (t *lookupTableIterator[T]) Next() bool { + return t.cur < uint32(len(t.table.buf)) +} + +func (t *lookupTableIterator[T]) At() uint32 { + x := t.table.buf[t.cur] + t.cur++ + return x +} + +func (t *lookupTableIterator[T]) setValue(v T) { + u := t.table.resolved[t.table.buf[t.cur-1]] + t.table.values[u&markerMask] = v +} + +func (t *lookupTableIterator[T]) Close() error { return nil } + +func (t *lookupTableIterator[T]) Err() error { return nil } + +func grow[T any](s []T, n int) []T { + if cap(s) < n { + return make([]T, n, 2*n) + } + return s[:n] +} diff --git a/pkg/phlaredb/compact_test.go b/pkg/phlaredb/compact_test.go index acab55278e..0c9bdd4788 100644 --- a/pkg/phlaredb/compact_test.go +++ b/pkg/phlaredb/compact_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + _ "net/http/pprof" "os" "path/filepath" "sort" @@ -11,8 +12,6 @@ import ( "testing" "time" - _ "net/http/pprof" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" "github.com/segmentio/parquet-go" @@ -119,9 +118,11 @@ func testCompact(t *testing.T, metas []*block.Meta, bkt phlareobj.Bucket, dst st "numSamples", m.Stats.NumSamples) b := NewSingleBlockQuerierFromMeta(ctx, bkt, m) g.Go(func() error { - return b.Open(ctx) + if err := b.Open(ctx); err != nil { + return err + } + return b.stacktraces.Load(ctx) }) - src = append(src, b) } @@ -136,6 +137,15 @@ func testCompact(t *testing.T, metas []*block.Meta, bkt phlareobj.Bucket, dst st "numSamples", new.Stats.NumSamples) } +type blockReaderMock struct { + BlockReader + idxr IndexReader +} + +func (m *blockReaderMock) Index() IndexReader { + return m.idxr +} + func TestProfileRowIterator(t *testing.T) { filePath := t.TempDir() + "/index.tsdb" idxw, err := index.NewWriter(context.Background(), filePath) @@ -162,7 +172,7 @@ func TestProfileRowIterator(t *testing.T) { {SeriesIndex: 1, TimeNanos: 2}, {SeriesIndex: 2, TimeNanos: 3}, }, - ), idxr) + ), &blockReaderMock{idxr: idxr}) require.NoError(t, err) assert.True(t, it.Next()) @@ -294,3 +304,96 @@ func generateParquetFile(t *testing.T, path string) { require.NoError(t, err) } } + +func Test_lookupTable(t *testing.T) { + // Given the source data set. + // Copy arbitrary subsets of items from src to dst. + var dst []string + src := []string{ + "zero", + "one", + "two", + "three", + "four", + "five", + "six", + "seven", + } + + type testCase struct { + description string + input []uint32 + expected []string + } + + testCases := []testCase{ + { + description: "empty table", + input: []uint32{5, 0, 3, 1, 2, 2, 4}, + expected: []string{"five", "zero", "three", "one", "two", "two", "four"}, + }, + { + description: "no new values", + input: []uint32{2, 1, 2, 3}, + expected: []string{"two", "one", "two", "three"}, + }, + { + description: "new value mixed", + input: []uint32{2, 1, 6, 2, 3}, + expected: []string{"two", "one", "six", "two", "three"}, + }, + } + + // Try to lookup values in src lazily. + // Table size must be greater or equal + // to the source data set. + l := newLookupTable[string](10) + + populate := func(t *testing.T, x []uint32) { + for i, v := range x { + x[i] = l.tryLookup(v) + } + // Resolve unknown yet values. + // Mind the order and deduplication. + p := -1 + for it := l.iter(); it.Err() == nil && it.Next(); { + m := int(it.At()) + if m <= p { + t.Fatal("iterator order invalid") + } + p = m + it.setValue(src[m]) + } + } + + resolveAppend := func() { + // Populate dst with the newly resolved values. + // Note that order in dst does not have to match src. + for i, v := range l.values { + l.storeResolved(i, uint32(len(dst))) + dst = append(dst, v) + } + } + + resolve := func(x []uint32) []string { + // Lookup resolved values. + var resolved []string + for _, v := range x { + resolved = append(resolved, dst[l.lookupResolved(v)]) + } + return resolved + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.description, func(t *testing.T) { + l.reset() + populate(t, tc.input) + resolveAppend() + assert.Equal(t, tc.expected, resolve(tc.input)) + }) + } + + assert.Len(t, dst, 7) + assert.NotContains(t, dst, "seven") +} diff --git a/pkg/phlaredb/deduplicating_slice.go b/pkg/phlaredb/deduplicating_slice.go index 3442fb9e6f..cdeaf15ec0 100644 --- a/pkg/phlaredb/deduplicating_slice.go +++ b/pkg/phlaredb/deduplicating_slice.go @@ -227,3 +227,37 @@ func (s *deduplicatingSlice[M, K, H, P]) ingest(_ context.Context, elems []M, re return nil } + +func (s *deduplicatingSlice[M, K, H, P]) append(dst []uint32, elems []M) { + missing := int64SlicePool.Get()[:0] + s.lock.RLock() + for i, v := range elems { + k := s.helper.key(v) + if x, ok := s.lookup[k]; ok { + dst[i] = uint32(x) + } else { + missing = append(missing, int64(i)) + } + } + s.lock.RUnlock() + if len(missing) > 0 { + s.lock.RLock() + p := uint32(len(s.slice)) + for _, i := range missing { + e := elems[i] + k := s.helper.key(e) + x, ok := s.lookup[k] + if ok { + dst[i] = uint32(x) + continue + } + s.size.Add(s.helper.size(e)) + s.slice = append(s.slice, s.helper.clone(e)) + s.lookup[k] = int64(p) + dst[i] = p + p++ + } + s.lock.RUnlock() + } + int64SlicePool.Put(missing) +} diff --git a/pkg/phlaredb/head.go b/pkg/phlaredb/head.go index 703a691fb7..d514edb46b 100644 --- a/pkg/phlaredb/head.go +++ b/pkg/phlaredb/head.go @@ -317,7 +317,7 @@ func (h *Head) convertSamples(_ context.Context, r *rewriter, stacktracePartitio r.locations.rewriteUint64(&stacktraces[idxSample].LocationIDs[i]) } } - appender := h.symbolDB.MappingWriter(stacktracePartition).StacktraceAppender() + appender := h.symbolDB.SymbolsAppender(stacktracePartition).StacktraceAppender() defer appender.Release() if cap(stacktracesIds) < len(stacktraces) { @@ -609,7 +609,7 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac sp.LogFields(otlog.String("msg", "building MergeProfilesStacktracesResult")) _ = stacktracesByMapping.ForEach( func(mapping uint64, stacktraceSamples stacktraceSampleMap) error { - mp, ok := h.symbolDB.MappingReader(mapping) + mp, ok := h.symbolDB.SymbolsResolver(mapping) if !ok { return nil } @@ -672,7 +672,7 @@ func (h *Head) resolvePprof(ctx context.Context, stacktracesByMapping profileSam // now add locationIDs and stacktraces _ = stacktracesByMapping.ForEach( func(mapping uint64, stacktraceSamples profileSampleMap) error { - mp, ok := h.symbolDB.MappingReader(mapping) + mp, ok := h.symbolDB.SymbolsResolver(mapping) if !ok { return nil } @@ -971,9 +971,9 @@ func (h *Head) flush(ctx context.Context) error { } // add total size symdb - symbDBFiles, error := h.SymDBFiles() - if error != nil { - return error + symbDBFiles, err := symdbMetaFiles(h.headPath) + if err != nil { + return err } for _, file := range symbDBFiles { @@ -1020,6 +1020,26 @@ func (h *Head) SymDBFiles() ([]block.File, error) { return result, nil } +func symdbMetaFiles(dir string) ([]block.File, error) { + files, err := os.ReadDir(filepath.Join(dir, symdb.DefaultDirName)) + if err != nil { + return nil, err + } + result := make([]block.File, len(files)) + for idx, f := range files { + if f.IsDir() { + continue + } + result[idx].RelPath = filepath.Join(symdb.DefaultDirName, f.Name()) + info, err := f.Info() + if err != nil { + return nil, err + } + result[idx].SizeBytes = uint64(info.Size()) + } + return result, nil +} + // Move moves the head directory to local blocks. The call is not thread-safe: // no concurrent reads and writes are allowed. // diff --git a/pkg/phlaredb/sample_merge.go b/pkg/phlaredb/sample_merge.go index e9fcd2671b..1c44f24a76 100644 --- a/pkg/phlaredb/sample_merge.go +++ b/pkg/phlaredb/sample_merge.go @@ -56,20 +56,12 @@ func newLocationsIdsByStacktraceID(size int) locationsIdsByStacktraceID { } } -func (l locationsIdsByStacktraceID) addFromParquet(stacktraceID int64, locs []parquet.Value) { - l.byStacktraceID[stacktraceID] = make([]int32, len(locs)) - for i, locationID := range locs { - locID := locationID.Uint64() - l.ids[int64(locID)] = struct{}{} - l.byStacktraceID[stacktraceID][i] = int32(locID) - } -} - -func (l locationsIdsByStacktraceID) add(stacktraceID int64, locs []int32) { - l.byStacktraceID[stacktraceID] = make([]int32, len(locs)) +func (l locationsIdsByStacktraceID) InsertStacktrace(stacktraceID uint32, locs []int32) { + s := make([]int32, len(locs)) + l.byStacktraceID[int64(stacktraceID)] = s for i, locationID := range locs { l.ids[int64(locationID)] = struct{}{} - l.byStacktraceID[stacktraceID][i] = locationID + s[i] = locationID } } diff --git a/pkg/phlaredb/schemas/v1/profiles.go b/pkg/phlaredb/schemas/v1/profiles.go index d661ab6fb4..bfcc0ec298 100644 --- a/pkg/phlaredb/schemas/v1/profiles.go +++ b/pkg/phlaredb/schemas/v1/profiles.go @@ -42,10 +42,11 @@ var ( phlareparquet.NewGroupField("DefaultSampleType", parquet.Optional(parquet.Int(64))), }) - maxProfileRow parquet.Row - seriesIndexColIndex int - stacktraceIDColIndex int - timeNanoColIndex int + maxProfileRow parquet.Row + seriesIndexColIndex int + stacktraceIDColIndex int + timeNanoColIndex int + stacktracePartitionColIndex int ) func init() { @@ -68,6 +69,11 @@ func init() { panic(fmt.Errorf("StacktraceID column not found")) } stacktraceIDColIndex = stacktraceIDCol.ColumnIndex + stacktracePartitionCol, ok := profilesSchema.Lookup("StacktracePartition") + if !ok { + panic(fmt.Errorf("StacktracePartition column not found")) + } + stacktracePartitionColIndex = stacktracePartitionCol.ColumnIndex } type Sample struct { @@ -471,6 +477,10 @@ func (p ProfileRow) SeriesIndex() uint32 { return p[seriesIndexColIndex].Uint32() } +func (p ProfileRow) StacktracePartitionID() uint64 { + return p[stacktracePartitionColIndex].Uint64() +} + func (p ProfileRow) TimeNanos() int64 { var ts int64 for i := len(p) - 1; i >= 0; i-- { diff --git a/pkg/phlaredb/symdb/format.go b/pkg/phlaredb/symdb/format.go index 83ba737cb2..04924bbf4b 100644 --- a/pkg/phlaredb/symdb/format.go +++ b/pkg/phlaredb/symdb/format.go @@ -80,8 +80,8 @@ type IndexFile struct { // Version-specific parts. - // StacktraceChunkHeaders are sorted by mapping - // name and chunk index in ascending order. + // StacktraceChunkHeaders are sorted by + // partition and chunk index in ascending order. StacktraceChunkHeaders StacktraceChunkHeaders CRC uint32 @@ -201,29 +201,29 @@ func (h *StacktraceChunkHeaders) UnmarshalBinary(b []byte) error { return nil } -type stacktraceChunkHeadersByMappingAndIndex StacktraceChunkHeaders +type stacktraceChunkHeadersByPartitionAndIndex StacktraceChunkHeaders -func (h stacktraceChunkHeadersByMappingAndIndex) Len() int { +func (h stacktraceChunkHeadersByPartitionAndIndex) Len() int { return len(h.Entries) } -func (h stacktraceChunkHeadersByMappingAndIndex) Less(i, j int) bool { +func (h stacktraceChunkHeadersByPartitionAndIndex) Less(i, j int) bool { a, b := h.Entries[i], h.Entries[j] - if a.MappingName == b.MappingName { + if a.Partition == b.Partition { return a.ChunkIndex < b.ChunkIndex } - return a.MappingName < b.MappingName + return a.Partition < b.Partition } -func (h stacktraceChunkHeadersByMappingAndIndex) Swap(i, j int) { +func (h stacktraceChunkHeadersByPartitionAndIndex) Swap(i, j int) { h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i] } type StacktraceChunkHeader struct { - Offset int64 // Relative to the mapping offset. + Offset int64 Size int64 - MappingName uint64 // MappingName the chunk refers to. + Partition uint64 ChunkIndex uint16 ChunkEncoding ChunkEncoding _ [5]byte // Reserved. @@ -247,7 +247,7 @@ const ( func (h *StacktraceChunkHeader) marshal(b []byte) { binary.BigEndian.PutUint64(b[0:8], uint64(h.Offset)) binary.BigEndian.PutUint64(b[8:16], uint64(h.Size)) - binary.BigEndian.PutUint64(b[16:24], h.MappingName) + binary.BigEndian.PutUint64(b[16:24], h.Partition) binary.BigEndian.PutUint16(b[24:26], h.ChunkIndex) b[27] = byte(h.ChunkEncoding) // 5 bytes reserved. @@ -262,7 +262,7 @@ func (h *StacktraceChunkHeader) marshal(b []byte) { func (h *StacktraceChunkHeader) unmarshal(b []byte) { h.Offset = int64(binary.BigEndian.Uint64(b[0:8])) h.Size = int64(binary.BigEndian.Uint64(b[8:16])) - h.MappingName = binary.BigEndian.Uint64(b[16:24]) + h.Partition = binary.BigEndian.Uint64(b[16:24]) h.ChunkIndex = binary.BigEndian.Uint16(b[24:26]) h.ChunkEncoding = ChunkEncoding(b[27]) // 5 bytes reserved. @@ -333,7 +333,7 @@ func (f *IndexFile) WriteTo(dst io.Writer) (n int64, err error) { return w.offset, fmt.Errorf("toc write: %w", err) } - sort.Sort(stacktraceChunkHeadersByMappingAndIndex(f.StacktraceChunkHeaders)) + sort.Sort(stacktraceChunkHeadersByPartitionAndIndex(f.StacktraceChunkHeaders)) sch, _ := f.StacktraceChunkHeaders.MarshalBinary() if _, err = w.Write(sch); err != nil { return w.offset, fmt.Errorf("stacktrace chunk headers: %w", err) diff --git a/pkg/phlaredb/symdb/interfaces.go b/pkg/phlaredb/symdb/interfaces.go index 2b9baaac84..1010bea37e 100644 --- a/pkg/phlaredb/symdb/interfaces.go +++ b/pkg/phlaredb/symdb/interfaces.go @@ -11,24 +11,22 @@ import ( // // In the package, Mapping represents all the version of a binary. -type MappingWriter interface { - // StacktraceAppender provides exclusive write access - // to the stack traces of the mapping. - // - // StacktraceAppender.Release must be called in order - // to dispose the object and release the lock. - // Released resolver must not be used. +type SymbolsAppender interface { StacktraceAppender() StacktraceAppender } -type MappingReader interface { - // StacktraceResolver provides non-exclusive read - // access to the stack traces of the mapping. - // - // StacktraceResolver.Release must be called in order - // to dispose the object and release the lock. - // Released resolver must not be used. +type SymbolsResolver interface { StacktraceResolver() StacktraceResolver + WriteStats(*Stats) +} + +type Stats struct { + StacktracesTotal int + LocationsTotal int + MappingsTotal int + FunctionsTotal int + StringsTotal int + MaxStacktraceID int } type StacktraceAppender interface { diff --git a/pkg/phlaredb/symdb/mapping_memory.go b/pkg/phlaredb/symdb/partition_memory.go similarity index 76% rename from pkg/phlaredb/symdb/mapping_memory.go rename to pkg/phlaredb/symdb/partition_memory.go index 2182f7b37e..58c78deed7 100644 --- a/pkg/phlaredb/symdb/mapping_memory.go +++ b/pkg/phlaredb/symdb/partition_memory.go @@ -12,21 +12,19 @@ import ( ) var ( - _ MappingReader = (*inMemoryMapping)(nil) - _ MappingWriter = (*inMemoryMapping)(nil) + _ SymbolsResolver = (*inMemoryPartition)(nil) + _ SymbolsAppender = (*inMemoryPartition)(nil) _ StacktraceAppender = (*stacktraceAppender)(nil) _ StacktraceResolver = (*stacktraceResolverMemory)(nil) ) -type inMemoryMapping struct { +type inMemoryPartition struct { name uint64 maxNodesPerChunk uint32 // maxStackDepth uint32 - // Stack traces originating from the mapping (binary): - // their bottom frames (roots) refer to this mapping. stacktraceMutex sync.RWMutex stacktraceHashToID map[uint64]uint32 stacktraceChunks []*stacktraceChunk @@ -34,33 +32,44 @@ type inMemoryMapping struct { stacktraceChunkHeaders []StacktraceChunkHeader } -func (b *inMemoryMapping) StacktraceAppender() StacktraceAppender { +func (b *inMemoryPartition) StacktraceAppender() StacktraceAppender { b.stacktraceMutex.RLock() - // Assuming there is at least one chunk. - c := b.stacktraceChunks[len(b.stacktraceChunks)-1] + c := b.currentStacktraceChunk() b.stacktraceMutex.RUnlock() return &stacktraceAppender{ - mapping: b, - chunk: c, + partition: b, + chunk: c, } } -func (b *inMemoryMapping) StacktraceResolver() StacktraceResolver { +func (b *inMemoryPartition) StacktraceResolver() StacktraceResolver { return &stacktraceResolverMemory{ - mapping: b, + partition: b, } } +func (b *inMemoryPartition) WriteStats(s *Stats) { + b.stacktraceMutex.RLock() + c := b.currentStacktraceChunk() + s.MaxStacktraceID = int(c.stid + c.tree.len()) + s.StacktracesTotal = len(b.stacktraceHashToID) + b.stacktraceMutex.RUnlock() +} + // stacktraceChunkForInsert returns a chunk for insertion: // if the existing one has capacity, or a new one, if the former is full. // Must be called with the stracktraces mutex write lock held. -func (b *inMemoryMapping) stacktraceChunkForInsert(x int) *stacktraceChunk { - c := b.stacktraceChunks[len(b.stacktraceChunks)-1] +func (b *inMemoryPartition) stacktraceChunkForInsert(x int) *stacktraceChunk { + c := b.currentStacktraceChunk() if n := c.tree.len() + uint32(x); b.maxNodesPerChunk > 0 && n >= b.maxNodesPerChunk { + // Calculate number of stacks in the chunk. + s := uint32(len(b.stacktraceHashToID)) + c.stacks = s - c.stacks c = &stacktraceChunk{ - mapping: b, - tree: newStacktraceTree(defaultStacktraceTreeSize), - stid: c.stid + b.maxNodesPerChunk, + parition: b, + tree: newStacktraceTree(defaultStacktraceTreeSize), + stid: c.stid + b.maxNodesPerChunk, + stacks: s, } b.stacktraceChunks = append(b.stacktraceChunks, c) } @@ -69,17 +78,23 @@ func (b *inMemoryMapping) stacktraceChunkForInsert(x int) *stacktraceChunk { // stacktraceChunkForRead returns a chunk for reads. // Must be called with the stracktraces mutex read lock held. -func (b *inMemoryMapping) stacktraceChunkForRead(i int) (*stacktraceChunk, bool) { +func (b *inMemoryPartition) stacktraceChunkForRead(i int) (*stacktraceChunk, bool) { if i < len(b.stacktraceChunks) { return b.stacktraceChunks[i], true } return nil, false } +func (b *inMemoryPartition) currentStacktraceChunk() *stacktraceChunk { + // Assuming there is at least one chunk. + return b.stacktraceChunks[len(b.stacktraceChunks)-1] +} + type stacktraceChunk struct { - mapping *inMemoryMapping - stid uint32 // Initial stack trace ID. - tree *stacktraceTree + parition *inMemoryPartition + tree *stacktraceTree + stid uint32 // Initial stack trace ID. + stacks uint32 // } func (s *stacktraceChunk) WriteTo(dst io.Writer) (int64, error) { @@ -87,7 +102,7 @@ func (s *stacktraceChunk) WriteTo(dst io.Writer) (int64, error) { } type stacktraceAppender struct { - mapping *inMemoryMapping + partition *inMemoryPartition chunk *stacktraceChunk releaseOnce sync.Once } @@ -103,13 +118,13 @@ func (a *stacktraceAppender) AppendStacktrace(dst []uint32, s []*v1.Stacktrace) misses int ) - a.mapping.stacktraceMutex.RLock() + a.partition.stacktraceMutex.RLock() for i, x := range s { - if dst[i], found = a.mapping.stacktraceHashToID[hashLocations(x.LocationIDs)]; !found { + if dst[i], found = a.partition.stacktraceHashToID[hashLocations(x.LocationIDs)]; !found { misses++ } } - a.mapping.stacktraceMutex.RUnlock() + a.partition.stacktraceMutex.RUnlock() if misses == 0 { return } @@ -125,10 +140,10 @@ func (a *stacktraceAppender) AppendStacktrace(dst []uint32, s []*v1.Stacktrace) // Instead of inserting stacks one by one, it is better to // build a tree, and merge it to the existing one. - a.mapping.stacktraceMutex.Lock() - defer a.mapping.stacktraceMutex.Unlock() + a.partition.stacktraceMutex.Lock() + defer a.partition.stacktraceMutex.Unlock() - m := int(a.mapping.maxNodesPerChunk) + m := int(a.partition.maxNodesPerChunk) t, j := a.chunk.tree, a.chunk.stid for i, v := range dst[:len(s)] { if v != 0 { @@ -142,7 +157,7 @@ func (a *stacktraceAppender) AppendStacktrace(dst []uint32, s []*v1.Stacktrace) // If we're close to the max nodes limit and can // potentially exceed it, we take the next chunk, // even if there are some space. - a.chunk = a.mapping.stacktraceChunkForInsert(len(x)) + a.chunk = a.partition.stacktraceChunkForInsert(len(x)) t, j = a.chunk.tree, a.chunk.stid } @@ -150,7 +165,7 @@ func (a *stacktraceAppender) AppendStacktrace(dst []uint32, s []*v1.Stacktrace) // we don't need to check the map. id = t.insert(x) + j h := hashLocations(x) - a.mapping.stacktraceHashToID[h] = id + a.partition.stacktraceHashToID[h] = id dst[i] = id } } @@ -174,7 +189,7 @@ func hashLocations(s []uint64) uint64 { } type stacktraceResolverMemory struct { - mapping *inMemoryMapping + partition *inMemoryPartition } const defaultStacktraceDepth = 64 @@ -196,7 +211,7 @@ func (p *stacktraceLocationsPool) put(x []int32) { func (r *stacktraceResolverMemory) ResolveStacktraces(_ context.Context, dst StacktraceInserter, stacktraces []uint32) (err error) { // TODO(kolesnikovae): Add option to do resolve concurrently. // Depends on StacktraceInserter implementation. - for _, sr := range SplitStacktraces(stacktraces, r.mapping.maxNodesPerChunk) { + for _, sr := range SplitStacktraces(stacktraces, r.partition.maxNodesPerChunk) { if err = r.ResolveStacktracesChunk(dst, sr); err != nil { return err } @@ -212,10 +227,10 @@ func (r *stacktraceResolverMemory) ResolveStacktraces(_ context.Context, dst Sta // the options, the package provides. func (r *stacktraceResolverMemory) ResolveStacktracesChunk(dst StacktraceInserter, sr StacktracesRange) error { - r.mapping.stacktraceMutex.RLock() - c, found := r.mapping.stacktraceChunkForRead(int(sr.chunk)) + r.partition.stacktraceMutex.RLock() + c, found := r.partition.stacktraceChunkForRead(int(sr.chunk)) if !found { - r.mapping.stacktraceMutex.RUnlock() + r.partition.stacktraceMutex.RUnlock() return ErrInvalidStacktraceRange } t := stacktraceTree{nodes: c.tree.nodes} @@ -227,7 +242,7 @@ func (r *stacktraceResolverMemory) ResolveStacktracesChunk(dst StacktraceInserte // races when the slice grows: in the worst case, the underlying // capacity will be retained and thus not be eligible for GC during // the call. - r.mapping.stacktraceMutex.RUnlock() + r.partition.stacktraceMutex.RUnlock() s := stacktraceLocations.get() // Restore the original stacktrace ID. off := sr.offset() diff --git a/pkg/phlaredb/symdb/mapping_memory_test.go b/pkg/phlaredb/symdb/partition_memory_test.go similarity index 95% rename from pkg/phlaredb/symdb/mapping_memory_test.go rename to pkg/phlaredb/symdb/partition_memory_test.go index 316703f6d9..82fc9b0bb9 100644 --- a/pkg/phlaredb/symdb/mapping_memory_test.go +++ b/pkg/phlaredb/symdb/partition_memory_test.go @@ -22,7 +22,7 @@ func Test_StacktraceAppender_shards(t *testing.T) { }, }) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() defer a.Release() @@ -48,8 +48,8 @@ func Test_StacktraceAppender_shards(t *testing.T) { }) assert.Equal(t, []uint32{18}, sids[:1]) - require.Len(t, db.mappings, 1) - m := db.mappings[0] + require.Len(t, db.partitions, 1) + m := db.partitions[0] require.Len(t, m.stacktraceChunks, 3) c1 := m.stacktraceChunks[0] @@ -67,7 +67,7 @@ func Test_StacktraceAppender_shards(t *testing.T) { t.Run("WithoutMaxStacktraceTreeNodesPerChunk", func(t *testing.T) { db := NewSymDB(new(Config)) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() defer a.Release() @@ -81,8 +81,8 @@ func Test_StacktraceAppender_shards(t *testing.T) { }) assert.Equal(t, []uint32{3, 2, 4, 5, 6}, sids) - require.Len(t, db.mappings, 1) - m := db.mappings[0] + require.Len(t, db.partitions, 1) + m := db.partitions[0] require.Len(t, m.stacktraceChunks, 1) c1 := m.stacktraceChunks[0] @@ -166,7 +166,7 @@ func Test_StacktraceResolver_stacktraces_split(t *testing.T) { func Test_Stacktrace_append_existing(t *testing.T) { db := NewSymDB(new(Config)) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() defer a.Release() sids := make([]uint32, 2) @@ -185,7 +185,7 @@ func Test_Stacktrace_append_existing(t *testing.T) { func Test_Stacktrace_append_empty(t *testing.T) { db := NewSymDB(new(Config)) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() defer a.Release() @@ -205,7 +205,7 @@ func Test_Stacktraces_append_resolve(t *testing.T) { t.Run("single chunk", func(t *testing.T) { db := NewSymDB(new(Config)) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() defer a.Release() @@ -218,7 +218,7 @@ func Test_Stacktraces_append_resolve(t *testing.T) { {LocationIDs: []uint64{5, 2, 1}}, }) - mr, _ := db.MappingReader(0) + mr, _ := db.SymbolsResolver(0) r := mr.StacktraceResolver() defer r.Release() dst := new(mockStacktraceInserter) @@ -237,7 +237,7 @@ func Test_Stacktraces_append_resolve(t *testing.T) { }, }) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() defer a.Release() @@ -274,10 +274,10 @@ func Test_Stacktraces_append_resolve(t *testing.T) { */ sids := make([]uint32, len(stacktraces)) a.AppendStacktrace(sids, stacktraces) - require.Len(t, db.mappings[0].stacktraceChunks, 6) + require.Len(t, db.partitions[0].stacktraceChunks, 6) t.Run("adjacent shards at beginning", func(t *testing.T) { - mr, _ := db.MappingReader(0) + mr, _ := db.SymbolsResolver(0) r := mr.StacktraceResolver() defer r.Release() dst := new(mockStacktraceInserter) @@ -290,7 +290,7 @@ func Test_Stacktraces_append_resolve(t *testing.T) { }) t.Run("adjacent shards at end", func(t *testing.T) { - mr, _ := db.MappingReader(0) + mr, _ := db.SymbolsResolver(0) r := mr.StacktraceResolver() defer r.Release() dst := new(mockStacktraceInserter) @@ -303,7 +303,7 @@ func Test_Stacktraces_append_resolve(t *testing.T) { }) t.Run("non-adjacent shards", func(t *testing.T) { - mr, _ := db.MappingReader(0) + mr, _ := db.SymbolsResolver(0) r := mr.StacktraceResolver() defer r.Release() dst := new(mockStacktraceInserter) @@ -348,13 +348,13 @@ func Test_Stacktraces_memory_resolve_pprof(t *testing.T) { sids := make([]uint32, len(stacktraces)) db := NewSymDB(new(Config)) - mw := db.MappingWriter(0) + mw := db.SymbolsAppender(0) a := mw.StacktraceAppender() defer a.Release() a.AppendStacktrace(sids, stacktraces) - mr, ok := db.MappingReader(0) + mr, ok := db.SymbolsResolver(0) require.True(t, ok) r := mr.StacktraceResolver() defer r.Release() @@ -378,13 +378,13 @@ func Test_Stacktraces_memory_resolve_chunked(t *testing.T) { }, } db := NewSymDB(cfg) - mw := db.MappingWriter(0) + mw := db.SymbolsAppender(0) a := mw.StacktraceAppender() defer a.Release() a.AppendStacktrace(sids, stacktraces) - mr, ok := db.MappingReader(0) + mr, ok := db.SymbolsResolver(0) require.True(t, ok) r := mr.StacktraceResolver() defer r.Release() @@ -417,7 +417,7 @@ func Test_Stacktraces_memory_resolve_concurrency(t *testing.T) { // Allocate stacktrace IDs. sids := make([]uint32, len(stacktraces)) db := NewSymDB(cfg) - mw := db.MappingWriter(0) + mw := db.SymbolsAppender(0) a := mw.StacktraceAppender() a.AppendStacktrace(sids, stacktraces) a.Release() @@ -438,7 +438,7 @@ func Test_Stacktraces_memory_resolve_concurrency(t *testing.T) { go func() { defer wg.Done() - a := db.MappingWriter(0).StacktraceAppender() + a := db.SymbolsAppender(0).StacktraceAppender() defer a.Release() for j := 0; j < appends; j++ { @@ -452,7 +452,7 @@ func Test_Stacktraces_memory_resolve_concurrency(t *testing.T) { go func() { defer wg.Done() - mr, ok := db.MappingReader(0) + mr, ok := db.SymbolsResolver(0) if !ok { return } diff --git a/pkg/phlaredb/symdb/stacktrace_tree_test.go b/pkg/phlaredb/symdb/stacktrace_tree_test.go index b8e3d776ae..94f1796844 100644 --- a/pkg/phlaredb/symdb/stacktrace_tree_test.go +++ b/pkg/phlaredb/symdb/stacktrace_tree_test.go @@ -88,7 +88,7 @@ func Test_stacktrace_tree_encoding_group(t *testing.T) { } func Test_stacktrace_tree_encoding_rand(t *testing.T) { - // TODO: Fuzzing. With random data it's easy to hit overflow. + // TODO: Fuzzing. nodes := make([]node, 1<<20) for i := range nodes { nodes[i] = node{ diff --git a/pkg/phlaredb/symdb/mapping_reader_file.go b/pkg/phlaredb/symdb/symbols_reader_file.go similarity index 55% rename from pkg/phlaredb/symdb/mapping_reader_file.go rename to pkg/phlaredb/symdb/symbols_reader_file.go index 1aa2dbdf41..c2b8167be0 100644 --- a/pkg/phlaredb/symdb/mapping_reader_file.go +++ b/pkg/phlaredb/symdb/symbols_reader_file.go @@ -6,6 +6,7 @@ import ( "fmt" "hash/crc32" "io" + "sort" "sync" "github.com/grafana/dskit/multierror" @@ -15,23 +16,23 @@ import ( ) var ( - _ MappingReader = (*mappingFileReader)(nil) + _ SymbolsResolver = (*partitionFileReader)(nil) _ StacktraceResolver = (*stacktraceResolverFile)(nil) ) type Reader struct { bucket objstore.BucketReader - maxConcurrentChunkFetch int - chunkFetchBufferSize int + maxConcurrentChunks int + chunkFetchBufferSize int - idx IndexFile - mappings map[uint64]*mappingFileReader + idx IndexFile + partitions map[uint64]*partitionFileReader } const ( - defaultMaxConcurrentChunkFetch = 8 - defaultChunkFetchBufferSize = 4096 + defaultMaxConcurrentChunks = 1 + defaultChunkFetchBufferSize = 4096 ) // NOTE(kolesnikovae): @@ -41,16 +42,16 @@ const ( type ReaderConfig struct { BucketReader objstore.BucketReader - MaxConcurrentChunkFetch int - ChunkFetchBufferSize int + MaxConcurrentChunks int + ChunkFetchBufferSize int } func Open(ctx context.Context, b objstore.BucketReader) (*Reader, error) { r := Reader{ bucket: b, - maxConcurrentChunkFetch: defaultMaxConcurrentChunkFetch, - chunkFetchBufferSize: defaultChunkFetchBufferSize, + maxConcurrentChunks: defaultMaxConcurrentChunks, + chunkFetchBufferSize: defaultChunkFetchBufferSize, } if err := r.open(ctx); err != nil { return nil, err @@ -71,46 +72,96 @@ func (r *Reader) open(ctx context.Context) error { return err } // TODO(kolesnikovae): Load in a smarter way as headers are ordered. - r.mappings = make(map[uint64]*mappingFileReader, len(r.idx.StacktraceChunkHeaders.Entries)/3) + r.partitions = make(map[uint64]*partitionFileReader, len(r.idx.StacktraceChunkHeaders.Entries)/3) for _, h := range r.idx.StacktraceChunkHeaders.Entries { - r.mapping(h.MappingName).addStacktracesChunk(h) + r.partition(h.Partition).addStacktracesChunk(h) } return nil } -func (r *Reader) mapping(n uint64) *mappingFileReader { - if m, ok := r.mappings[n]; ok { +func (r *Reader) partition(n uint64) *partitionFileReader { + if m, ok := r.partitions[n]; ok { return m } - m := &mappingFileReader{reader: r} - r.mappings[n] = m + m := &partitionFileReader{reader: r} + r.partitions[n] = m return m } -func (r *Reader) MappingReader(mappingName uint64) (MappingReader, bool) { - m, ok := r.mappings[mappingName] +func (r *Reader) SymbolsResolver(partition uint64) (SymbolsResolver, bool) { + m, ok := r.partitions[partition] return m, ok } -type mappingFileReader struct { +// Load causes reader to load all contents into memory. +func (r *Reader) Load(ctx context.Context) error { + partitions := make([]*partitionFileReader, len(r.partitions)) + var i int + for _, v := range r.partitions { + partitions[i] = v + i++ + } + sort.Slice(partitions, func(i, j int) bool { + return partitions[i].stacktraceChunks[0].header.Offset < + partitions[j].stacktraceChunks[0].header.Offset + }) + + offset := partitions[0].stacktraceChunks[0].header.Offset + var size int64 + for i = range partitions { + for _, c := range partitions[i].stacktraceChunks { + size += c.header.Size + } + } + + rc, err := r.bucket.GetRange(ctx, StacktracesFileName, offset, size) + if err != nil { + return err + } + defer func() { + err = multierror.New(err, rc.Close()).Err() + }() + + buf := bufio.NewReaderSize(rc, r.chunkFetchBufferSize) + for _, p := range partitions { + for _, c := range p.stacktraceChunks { + if err = c.readFrom(io.LimitReader(buf, c.header.Size)); err != nil { + return err + } + } + } + + return nil +} + +type partitionFileReader struct { reader *Reader stacktraceChunks []*stacktraceChunkFileReader } -func (m *mappingFileReader) StacktraceResolver() StacktraceResolver { +func (m *partitionFileReader) StacktraceResolver() StacktraceResolver { return &stacktraceResolverFile{ - mapping: m, + partition: m, } } -func (m *mappingFileReader) addStacktracesChunk(h StacktraceChunkHeader) { +func (m *partitionFileReader) WriteStats(s *Stats) { + var nodes uint32 + for _, c := range m.stacktraceChunks { + s.StacktracesTotal += int(c.header.Stacktraces) + nodes += c.header.StacktraceNodes + } + s.MaxStacktraceID = int(nodes) +} + +func (m *partitionFileReader) addStacktracesChunk(h StacktraceChunkHeader) { m.stacktraceChunks = append(m.stacktraceChunks, &stacktraceChunkFileReader{ reader: m.reader, header: h, }) } -func (m *mappingFileReader) stacktraceChunkReader(i uint32) *stacktraceChunkFileReader { +func (m *partitionFileReader) stacktraceChunkReader(i uint32) *stacktraceChunkFileReader { if int(i) < len(m.stacktraceChunks) { return m.stacktraceChunks[i] } @@ -118,7 +169,7 @@ func (m *mappingFileReader) stacktraceChunkReader(i uint32) *stacktraceChunkFile } type stacktraceResolverFile struct { - mapping *mappingFileReader + partition *partitionFileReader } func (r *stacktraceResolverFile) Release() {} @@ -126,37 +177,24 @@ func (r *stacktraceResolverFile) Release() {} var ErrInvalidStacktraceRange = fmt.Errorf("invalid range: stack traces can't be resolved") func (r *stacktraceResolverFile) ResolveStacktraces(ctx context.Context, dst StacktraceInserter, s []uint32) error { - if len(r.mapping.stacktraceChunks) == 0 { + if len(s) == 0 { + return nil + } + if len(r.partition.stacktraceChunks) == 0 { return ErrInvalidStacktraceRange } // First, we determine the chunks needed for the range. // All chunks in a block must have the same StacktraceMaxNodes. - sr := SplitStacktraces(s, r.mapping.stacktraceChunks[0].header.StacktraceMaxNodes) - - // TODO(kolesnikovae): - // Chunks are fetched concurrently, but inserted to dst sequentially, - // to avoid race condition on the implementation end: - // - Add maxConcurrentChunkResolve option that controls the behaviour. - // - Caching: already fetched chunks should be cached (serialized or not). + sr := SplitStacktraces(s, r.partition.stacktraceChunks[0].header.StacktraceMaxNodes) g, ctx := errgroup.WithContext(ctx) - g.SetLimit(r.mapping.reader.maxConcurrentChunkFetch) - rs := make([]*stacktracesResolve, len(sr)) - for i, c := range sr { - rs[i] = r.newResolve(ctx, dst, c) - g.Go(rs[i].fetch) - } - if err := g.Wait(); err != nil { - return err - } - - for _, cr := range rs { - cr.resolveStacktracesChunk(dst) - cr.release() + g.SetLimit(r.partition.reader.maxConcurrentChunks) + for _, c := range sr { + g.Go(r.newResolve(ctx, dst, c).do) } - return nil + return g.Wait() } func (r *stacktraceResolverFile) newResolve(ctx context.Context, dst StacktraceInserter, c StacktracesRange) *stacktracesResolve { @@ -179,8 +217,17 @@ type stacktracesResolve struct { c StacktracesRange } +func (r *stacktracesResolve) do() error { + if err := r.fetch(); err != nil { + return err + } + r.resolveStacktracesChunk(r.dst) + r.release() + return nil +} + func (r *stacktracesResolve) fetch() (err error) { - if r.cr = r.r.mapping.stacktraceChunkReader(r.c.chunk); r.cr == nil { + if r.cr = r.r.partition.stacktraceChunkReader(r.c.chunk); r.cr == nil { return ErrInvalidStacktraceRange } if r.t, err = r.cr.fetch(r.ctx); err != nil { @@ -207,6 +254,10 @@ type stacktraceChunkFileReader struct { header StacktraceChunkHeader m sync.Mutex tree *parentPointerTree + // Indicates that the chunk has been loaded into + // memory with Load call and should not be released + // until the block is closed. + loaded bool } func (c *stacktraceChunkFileReader) fetch(ctx context.Context) (_ *parentPointerTree, err error) { @@ -215,7 +266,6 @@ func (c *stacktraceChunkFileReader) fetch(ctx context.Context) (_ *parentPointer if c.tree != nil { return c.tree, nil } - rc, err := c.reader.bucket.GetRange(ctx, StacktracesFileName, c.header.Offset, c.header.Size) if err != nil { return nil, err @@ -223,36 +273,41 @@ func (c *stacktraceChunkFileReader) fetch(ctx context.Context) (_ *parentPointer defer func() { err = multierror.New(err, rc.Close()).Err() }() + // Consider pooling the buffer. + buf := bufio.NewReaderSize(rc, c.reader.chunkFetchBufferSize) + if err = c.readFrom(buf); err != nil { + return nil, err + } + return c.tree, nil +} +func (c *stacktraceChunkFileReader) readFrom(r io.Reader) error { // NOTE(kolesnikovae): Pool of node chunks could reduce // the alloc size, but it may affect memory locality. // Although, properly aligned chunks of, say, 1-4K nodes // which is 8-32KiB respectively, should not make things // much worse than they are. Worth experimenting. t := newParentPointerTree(c.header.StacktraceNodes) - // We unmarshal the tree speculatively, before validating // the checksum. Even random bytes can be unmarshalled to // a tree not causing any errors, therefore it is vital // to verify the correctness of the data. crc := crc32.New(castagnoli) - tee := io.TeeReader(rc, crc) - - // Consider pooling the buffer. - buf := bufio.NewReaderSize(tee, c.reader.chunkFetchBufferSize) - if _, err = t.ReadFrom(buf); err != nil { - return nil, fmt.Errorf("failed to unmarshal stack treaces: %w", err) + tee := io.TeeReader(r, crc) + if _, err := t.ReadFrom(tee); err != nil { + return fmt.Errorf("failed to unmarshal stack treaces: %w", err) } if c.header.CRC != crc.Sum32() { - return nil, ErrInvalidCRC + return ErrInvalidCRC } - c.tree = t - return t, nil + return nil } func (c *stacktraceChunkFileReader) reset() { c.m.Lock() - c.tree = nil + if !c.loaded { + c.tree = nil + } c.m.Unlock() } diff --git a/pkg/phlaredb/symdb/mapping_reader_file_test.go b/pkg/phlaredb/symdb/symbols_reader_file_test.go similarity index 95% rename from pkg/phlaredb/symdb/mapping_reader_file_test.go rename to pkg/phlaredb/symdb/symbols_reader_file_test.go index af46f9bf10..03be33139e 100644 --- a/pkg/phlaredb/symdb/mapping_reader_file_test.go +++ b/pkg/phlaredb/symdb/symbols_reader_file_test.go @@ -19,7 +19,7 @@ func Test_Reader_Open(t *testing.T) { } db := NewSymDB(cfg) - w := db.MappingWriter(1) + w := db.SymbolsAppender(1) a := w.StacktraceAppender() sids := make([]uint32, 5) a.AppendStacktrace(sids, []*schemav1.Stacktrace{ @@ -37,7 +37,7 @@ func Test_Reader_Open(t *testing.T) { require.NoError(t, err) x, err := Open(context.Background(), b) require.NoError(t, err) - mr, ok := x.MappingReader(1) + mr, ok := x.SymbolsResolver(1) require.True(t, ok) dst := new(mockStacktraceInserter) diff --git a/pkg/phlaredb/symdb/mapping_writer_file.go b/pkg/phlaredb/symdb/symbols_writer_file.go similarity index 96% rename from pkg/phlaredb/symdb/mapping_writer_file.go rename to pkg/phlaredb/symdb/symbols_writer_file.go index d14e741080..d0f2336c2a 100644 --- a/pkg/phlaredb/symdb/mapping_writer_file.go +++ b/pkg/phlaredb/symdb/symbols_writer_file.go @@ -38,13 +38,13 @@ func (w *Writer) writeStacktraceChunk(ci int, c *stacktraceChunk) (err error) { h := StacktraceChunkHeader{ Offset: w.scd.w.offset, Size: 0, // Set later. - MappingName: c.mapping.name, + Partition: c.parition.name, ChunkIndex: uint16(ci), ChunkEncoding: ChunkEncodingGroupVarint, - Stacktraces: 0, // TODO + Stacktraces: c.stacks, StacktraceNodes: c.tree.len(), StacktraceMaxDepth: 0, // TODO - StacktraceMaxNodes: c.mapping.maxNodesPerChunk, + StacktraceMaxNodes: c.parition.maxNodesPerChunk, CRC: 0, // Set later. } crc := crc32.New(castagnoli) diff --git a/pkg/phlaredb/symdb/mapping_writer_file_test.go b/pkg/phlaredb/symdb/symbols_writer_file_test.go similarity index 84% rename from pkg/phlaredb/symdb/mapping_writer_file_test.go rename to pkg/phlaredb/symdb/symbols_writer_file_test.go index 5f031d3fe9..0e38cd0904 100644 --- a/pkg/phlaredb/symdb/mapping_writer_file_test.go +++ b/pkg/phlaredb/symdb/symbols_writer_file_test.go @@ -22,7 +22,7 @@ func Test_Writer_IndexFile(t *testing.T) { sids := make([]uint32, 5) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() a.AppendStacktrace(sids, []*schemav1.Stacktrace{ {LocationIDs: []uint64{3, 2, 1}}, @@ -34,7 +34,7 @@ func Test_Writer_IndexFile(t *testing.T) { assert.Equal(t, []uint32{3, 2, 11, 16, 18}, sids) a.Release() - w = db.MappingWriter(1) + w = db.SymbolsAppender(1) a = w.StacktraceAppender() a.AppendStacktrace(sids, []*schemav1.Stacktrace{ {LocationIDs: []uint64{3, 2, 1}}, @@ -46,9 +46,9 @@ func Test_Writer_IndexFile(t *testing.T) { assert.Equal(t, []uint32{3, 2, 11, 16, 18}, sids) a.Release() - require.Len(t, db.mappings, 2) - require.Len(t, db.mappings[0].stacktraceChunks, 3) - require.Len(t, db.mappings[1].stacktraceChunks, 3) + require.Len(t, db.partitions, 2) + require.Len(t, db.partitions[0].stacktraceChunks, 3) + require.Len(t, db.partitions[1].stacktraceChunks, 3) require.NoError(t, db.Flush()) @@ -75,10 +75,10 @@ func Test_Writer_IndexFile(t *testing.T) { { Offset: 0, Size: 10, - MappingName: 0x0, + Partition: 0x0, ChunkIndex: 0x0, ChunkEncoding: 0x1, - Stacktraces: 0x0, + Stacktraces: 0x2, StacktraceNodes: 0x4, StacktraceMaxDepth: 0x0, StacktraceMaxNodes: 0x7, @@ -87,10 +87,10 @@ func Test_Writer_IndexFile(t *testing.T) { { Offset: 10, Size: 15, - MappingName: 0x0, + Partition: 0x0, ChunkIndex: 0x1, ChunkEncoding: 0x1, - Stacktraces: 0x0, + Stacktraces: 0x1, StacktraceNodes: 0x5, StacktraceMaxDepth: 0x0, StacktraceMaxNodes: 0x7, @@ -99,10 +99,10 @@ func Test_Writer_IndexFile(t *testing.T) { { Offset: 25, Size: 15, - MappingName: 0x0, + Partition: 0x0, ChunkIndex: 0x2, ChunkEncoding: 0x1, - Stacktraces: 0x0, + Stacktraces: 0x3, StacktraceNodes: 0x5, StacktraceMaxDepth: 0x0, StacktraceMaxNodes: 0x7, @@ -111,10 +111,10 @@ func Test_Writer_IndexFile(t *testing.T) { { Offset: 40, Size: 10, - MappingName: 0x1, + Partition: 0x1, ChunkIndex: 0x0, ChunkEncoding: 0x1, - Stacktraces: 0x0, + Stacktraces: 0x2, StacktraceNodes: 0x4, StacktraceMaxDepth: 0x0, StacktraceMaxNodes: 0x7, @@ -123,10 +123,10 @@ func Test_Writer_IndexFile(t *testing.T) { { Offset: 50, Size: 15, - MappingName: 0x1, + Partition: 0x1, ChunkIndex: 0x1, ChunkEncoding: 0x1, - Stacktraces: 0x0, + Stacktraces: 0x1, StacktraceNodes: 0x5, StacktraceMaxDepth: 0x0, StacktraceMaxNodes: 0x7, @@ -135,10 +135,10 @@ func Test_Writer_IndexFile(t *testing.T) { { Offset: 65, Size: 15, - MappingName: 0x1, + Partition: 0x1, ChunkIndex: 0x2, ChunkEncoding: 0x1, - Stacktraces: 0x0, + Stacktraces: 0x3, StacktraceNodes: 0x5, StacktraceMaxDepth: 0x0, StacktraceMaxNodes: 0x7, @@ -146,7 +146,7 @@ func Test_Writer_IndexFile(t *testing.T) { }, }, }, - CRC: 0x5bbecabf, + CRC: 0x6418eaed, } assert.Equal(t, expected, idx) diff --git a/pkg/phlaredb/symdb/symdb.go b/pkg/phlaredb/symdb/symdb.go index 9c6e8ba3ca..38a640dd15 100644 --- a/pkg/phlaredb/symdb/symdb.go +++ b/pkg/phlaredb/symdb/symdb.go @@ -12,8 +12,8 @@ type SymDB struct { writer *Writer stats stats - m sync.RWMutex - mappings map[uint64]*inMemoryMapping + m sync.RWMutex + partitions map[uint64]*inMemoryPartition wg sync.WaitGroup stop chan struct{} @@ -32,7 +32,7 @@ const statsUpdateInterval = 10 * time.Second type stats struct { memorySize atomic.Uint64 - mappings atomic.Uint32 + partitions atomic.Uint32 } func DefaultConfig() *Config { @@ -57,27 +57,27 @@ func NewSymDB(c *Config) *SymDB { c = DefaultConfig() } db := &SymDB{ - config: c, - writer: NewWriter(c.Dir), - mappings: make(map[uint64]*inMemoryMapping), - stop: make(chan struct{}), + config: c, + writer: NewWriter(c.Dir), + partitions: make(map[uint64]*inMemoryPartition), + stop: make(chan struct{}), } db.wg.Add(1) go db.updateStats() return db } -func (s *SymDB) MappingWriter(mappingName uint64) MappingWriter { - return s.mapping(mappingName) +func (s *SymDB) SymbolsAppender(partition uint64) SymbolsAppender { + return s.partition(partition) } -func (s *SymDB) MappingReader(mappingName uint64) (MappingReader, bool) { - return s.lookupMapping(mappingName) +func (s *SymDB) SymbolsResolver(partition uint64) (SymbolsResolver, bool) { + return s.lookupPartition(partition) } -func (s *SymDB) lookupMapping(mappingName uint64) (*inMemoryMapping, bool) { +func (s *SymDB) lookupPartition(partition uint64) (*inMemoryPartition, bool) { s.m.RLock() - p, ok := s.mappings[mappingName] + p, ok := s.partitions[partition] if ok { s.m.RUnlock() return p, true @@ -86,26 +86,26 @@ func (s *SymDB) lookupMapping(mappingName uint64) (*inMemoryMapping, bool) { return nil, false } -func (s *SymDB) mapping(mappingName uint64) *inMemoryMapping { - p, ok := s.lookupMapping(mappingName) +func (s *SymDB) partition(partition uint64) *inMemoryPartition { + p, ok := s.lookupPartition(partition) if ok { return p } s.m.Lock() - if p, ok = s.mappings[mappingName]; ok { + if p, ok = s.partitions[partition]; ok { s.m.Unlock() return p } - p = &inMemoryMapping{ - name: mappingName, + p = &inMemoryPartition{ + name: partition, maxNodesPerChunk: s.config.Stacktraces.MaxNodesPerChunk, stacktraceHashToID: make(map[uint64]uint32, defaultStacktraceTreeSize/2), } p.stacktraceChunks = append(p.stacktraceChunks, &stacktraceChunk{ - tree: newStacktraceTree(defaultStacktraceTreeSize), - mapping: p, + tree: newStacktraceTree(defaultStacktraceTreeSize), + parition: p, }) - s.mappings[mappingName] = p + s.partitions[partition] = p s.m.Unlock() return p } @@ -114,9 +114,9 @@ func (s *SymDB) Flush() error { close(s.stop) s.wg.Wait() s.m.RLock() - m := make([]*inMemoryMapping, len(s.mappings)) + m := make([]*inMemoryPartition, len(s.partitions)) var i int - for _, v := range s.mappings { + for _, v := range s.partitions { m[i] = v i++ } @@ -156,7 +156,7 @@ func (s *SymDB) updateStats() { return case <-t.C: s.m.RLock() - s.stats.mappings.Store(uint32(len(s.mappings))) + s.stats.partitions.Store(uint32(len(s.partitions))) s.stats.memorySize.Store(uint64(s.calculateMemoryFootprint())) s.m.RUnlock() } @@ -165,7 +165,7 @@ func (s *SymDB) updateStats() { // calculateMemoryFootprint estimates the memory footprint. func (s *SymDB) calculateMemoryFootprint() (v int) { - for _, m := range s.mappings { + for _, m := range s.partitions { m.stacktraceMutex.RLock() v += len(m.stacktraceChunkHeaders) * stacktraceChunkHeaderSize for _, c := range m.stacktraceChunks {