Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/iter/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ func NewSliceSeekIterator[A constraints.Ordered](s []A) SeekIterator[A, A] {
}
}

type slicePositionIterator[T constraints.Integer, M any] struct {
i Iterator[T]
s []M
}

func NewSliceIndexIterator[T constraints.Integer, M any](s []M, i Iterator[T]) Iterator[M] {
return slicePositionIterator[T, M]{s: s, i: i}
}

func (i slicePositionIterator[T, M]) Next() bool { return i.i.Next() }
func (i slicePositionIterator[T, M]) At() M { return i.s[i.i.At()] }
func (i slicePositionIterator[T, M]) Err() error { return i.i.Err() }
func (i slicePositionIterator[T, M]) Close() error { return i.i.Close() }

type sliceSeekIterator[A constraints.Ordered] struct {
*sliceIterator[A]
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/iter/tree.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package iter

import (
<<<<<<< HEAD
"github.com/grafana/pyroscope/pkg/util/loser"
=======
"github.com/grafana/phlare/pkg/util/loser"
>>>>>>> ee8a92e04 (Add first draft of block compaction)
)

var _ Iterator[interface{}] = &TreeIterator[interface{}]{}
Expand Down
63 changes: 51 additions & 12 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,13 @@ type singleBlockQuerier struct {
type StacktraceDB interface {
Open(ctx context.Context) error
Close() error
Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error

// Load the database into memory entirely.
// This method is used at compaction.
Load(context.Context) error
WriteStats(partition uint64, s *symdb.Stats)

Resolve(ctx context.Context, partition uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error
}

type stacktraceResolverV1 struct {
Expand All @@ -321,18 +327,33 @@ func (r *stacktraceResolverV1) Close() error {
return r.stacktraces.Close()
}

func (r *stacktraceResolverV1) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error {
func (r *stacktraceResolverV1) Resolve(ctx context.Context, _ uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error {
stacktraces := repeatedColumnIter(ctx, r.stacktraces.file, "LocationIDs.list.element", iter.NewSliceIterator(stacktraceIDs))
defer stacktraces.Close()

t := make([]int32, 0, 64)
for stacktraces.Next() {
s := stacktraces.At()
locs.addFromParquet(int64(s.Row), s.Values)

t = grow(t, len(s.Values))
for i, v := range s.Values {
t[i] = v.Int32()
}
locs.InsertStacktrace(s.Row, t)
}
return stacktraces.Err()
}

func (r *stacktraceResolverV1) WriteStats(_ uint64, s *symdb.Stats) {
s.StacktracesTotal = int(r.stacktraces.file.NumRows())
s.MaxStacktraceID = s.StacktracesTotal
}

func (r *stacktraceResolverV1) Load(context.Context) error {
// FIXME(kolesnikovae): Loading all stacktraces from parquet file
// into memory is likely a bad choice. Instead we could convert
// it to symdb first.
return nil
}
Comment on lines +350 to +355
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compacting v1 stacktraces might be a bit challenging. It is easy if a source block is compacted entirely, when we read all of its profiles sequentially. However, in practice I guess we will need to filter based on time and series which changes the access pattern to random access.

@cyriltovena what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we don't compact v1.


type stacktraceResolverV2 struct {
reader *symdb.Reader
bucketReader phlareobj.Bucket
Expand All @@ -351,19 +372,25 @@ func (r *stacktraceResolverV2) Close() error {
return nil
}

func (r *stacktraceResolverV2) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error {
mr, ok := r.reader.MappingReader(mapping)
func (r *stacktraceResolverV2) Resolve(ctx context.Context, partition uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error {
mr, ok := r.reader.SymbolsResolver(partition)
if !ok {
return nil
}
resolver := mr.StacktraceResolver()
defer resolver.Release()
return resolver.ResolveStacktraces(ctx, locs, stacktraceIDs)
}

return resolver.ResolveStacktraces(ctx, symdb.StacktraceInserterFn(
func(stacktraceID uint32, locations []int32) {
locs.add(int64(stacktraceID), locations)
},
), stacktraceIDs)
func (r *stacktraceResolverV2) Load(ctx context.Context) error {
return r.reader.Load(ctx)
}

func (r *stacktraceResolverV2) WriteStats(partition uint64, s *symdb.Stats) {
mr, ok := r.reader.SymbolsResolver(partition)
if ok {
mr.WriteStats(s)
}
}

func NewSingleBlockQuerierFromMeta(phlarectx context.Context, bucketReader phlareobj.Bucket, meta *block.Meta) *singleBlockQuerier {
Expand Down Expand Up @@ -435,6 +462,18 @@ func (b *singleBlockQuerier) Index() IndexReader {
return b.index
}

func (b *singleBlockQuerier) Symbols() SymbolsReader {
return &inMemorySymbolsReader{
partitions: make(map[uint64]*inMemorySymbolsResolver),

strings: b.strings,
functions: b.functions,
locations: b.locations,
mappings: b.mappings,
stacktraces: b.stacktraces,
}
}

func (b *singleBlockQuerier) Meta() block.Meta {
if b.meta == nil {
return block.Meta{}
Expand Down
82 changes: 82 additions & 0 deletions pkg/phlaredb/block_symbols_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package phlaredb

import (
"context"

"github.com/grafana/pyroscope/pkg/iter"
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/phlaredb/symdb"
)

// TODO(kolesnikovae): Refactor to symdb.

type SymbolsReader interface {
SymbolsResolver(partition uint64) (SymbolsResolver, error)
}

type SymbolsResolver interface {
ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error

Locations(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation]
Mappings(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping]
Functions(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction]
Strings(iter.Iterator[uint32]) iter.Iterator[string]

WriteStats(*symdb.Stats)
}

type inMemorySymbolsReader struct {
partitions map[uint64]*inMemorySymbolsResolver

// TODO(kolesnikovae): Split into partitions.
strings inMemoryparquetReader[string, *schemav1.StringPersister]
functions inMemoryparquetReader[*schemav1.InMemoryFunction, *schemav1.FunctionPersister]
locations inMemoryparquetReader[*schemav1.InMemoryLocation, *schemav1.LocationPersister]
mappings inMemoryparquetReader[*schemav1.InMemoryMapping, *schemav1.MappingPersister]
stacktraces StacktraceDB
}

func (r *inMemorySymbolsReader) SymbolsResolver(partition uint64) (SymbolsResolver, error) {
p, ok := r.partitions[partition]
if !ok {
p = &inMemorySymbolsResolver{
partition: partition,
reader: r,
}
r.partitions[partition] = p
}
return p, nil
}

type inMemorySymbolsResolver struct {
partition uint64
reader *inMemorySymbolsReader
}

func (s inMemorySymbolsResolver) ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error {
return s.reader.stacktraces.Resolve(ctx, s.partition, dst, stacktraces)
}

func (s inMemorySymbolsResolver) Locations(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation] {
return iter.NewSliceIndexIterator(s.reader.locations.cache, i)
}

func (s inMemorySymbolsResolver) Mappings(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] {
return iter.NewSliceIndexIterator(s.reader.mappings.cache, i)
}

func (s inMemorySymbolsResolver) Functions(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] {
return iter.NewSliceIndexIterator(s.reader.functions.cache, i)
}

func (s inMemorySymbolsResolver) Strings(i iter.Iterator[uint32]) iter.Iterator[string] {
return iter.NewSliceIndexIterator(s.reader.strings.cache, i)
}

func (s inMemorySymbolsResolver) WriteStats(stats *symdb.Stats) {
s.reader.stacktraces.WriteStats(s.partition, stats)
stats.LocationsTotal = len(s.reader.locations.cache)
stats.MappingsTotal = len(s.reader.mappings.cache)
stats.FunctionsTotal = len(s.reader.functions.cache)
stats.StringsTotal = len(s.reader.strings.cache)
}
111 changes: 111 additions & 0 deletions pkg/phlaredb/block_symbols_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package phlaredb

import (
"context"
"fmt"
"path/filepath"

schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/phlaredb/symdb"
)

// TODO(kolesnikovae): Refactor to symdb.

type SymbolsWriter interface {
SymbolsAppender(partition uint64) (SymbolsAppender, error)
}

type SymbolsAppender interface {
AppendStacktraces([]uint32, []*schemav1.Stacktrace)
AppendLocations([]uint32, []*schemav1.InMemoryLocation)
AppendMappings([]uint32, []*schemav1.InMemoryMapping)
AppendFunctions([]uint32, []*schemav1.InMemoryFunction)
AppendStrings([]uint32, []string)
}

type symbolsWriter struct {
partitions map[uint64]*symbolsAppender

locations deduplicatingSlice[*schemav1.InMemoryLocation, locationsKey, *locationsHelper, *schemav1.LocationPersister]
mappings deduplicatingSlice[*schemav1.InMemoryMapping, mappingsKey, *mappingsHelper, *schemav1.MappingPersister]
functions deduplicatingSlice[*schemav1.InMemoryFunction, functionsKey, *functionsHelper, *schemav1.FunctionPersister]
strings deduplicatingSlice[string, string, *stringsHelper, *schemav1.StringPersister]
tables []Table

symdb *symdb.SymDB
}

func newSymbolsWriter(dst string, cfg *ParquetConfig) (*symbolsWriter, error) {
w := symbolsWriter{
partitions: make(map[uint64]*symbolsAppender),
}
dir := filepath.Join(dst, symdb.DefaultDirName)
w.symdb = symdb.NewSymDB(symdb.DefaultConfig().WithDirectory(dir))
w.tables = []Table{
&w.locations,
&w.mappings,
&w.functions,
&w.strings,
}
for _, t := range w.tables {
if err := t.Init(dst, cfg, contextHeadMetrics(context.Background())); err != nil {
return nil, err
}
}
return &w, nil
}

func (w *symbolsWriter) SymbolsAppender(partition uint64) (SymbolsAppender, error) {
p, ok := w.partitions[partition]
if !ok {
appender := w.symdb.SymbolsAppender(partition)
x := &symbolsAppender{
stacktraces: appender.StacktraceAppender(),
writer: w,
}
w.partitions[partition] = x
p = x
}
return p, nil
}

func (w *symbolsWriter) Close() error {
for _, t := range w.tables {
_, _, err := t.Flush(context.Background())
if err != nil {
return fmt.Errorf("flushing table %s: %w", t.Name(), err)
}
if err = t.Close(); err != nil {
return fmt.Errorf("closing table %s: %w", t.Name(), err)
}
}
if err := w.symdb.Flush(); err != nil {
return fmt.Errorf("flushing symbol database: %w", err)
}
return nil
}

type symbolsAppender struct {
stacktraces symdb.StacktraceAppender
writer *symbolsWriter
}

func (s symbolsAppender) AppendStacktraces(dst []uint32, stacktraces []*schemav1.Stacktrace) {
s.stacktraces.AppendStacktrace(dst, stacktraces)
}

func (s symbolsAppender) AppendLocations(dst []uint32, locations []*schemav1.InMemoryLocation) {
s.writer.locations.append(dst, locations)
}

func (s symbolsAppender) AppendMappings(dst []uint32, mappings []*schemav1.InMemoryMapping) {
s.writer.mappings.append(dst, mappings)
}

func (s symbolsAppender) AppendFunctions(dst []uint32, functions []*schemav1.InMemoryFunction) {
s.writer.functions.append(dst, functions)
}

func (s symbolsAppender) AppendStrings(dst []uint32, strings []string) {
s.writer.strings.append(dst, strings)
}
Loading