Skip to content

Handle symdb partition initialization error #2519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 50 additions & 77 deletions pkg/phlaredb/symdb/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
parquetobj "github.com/grafana/pyroscope/pkg/objstore/parquet"
"github.com/grafana/pyroscope/pkg/phlaredb/block"
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/util/refctr"
)

type Reader struct {
Expand Down Expand Up @@ -184,6 +185,7 @@ func (r *Reader) partition(ctx context.Context, partition uint64) (*partition, e
return nil, ErrPartitionNotFound
}
if err := p.init(ctx); err != nil {
p.Release()
return nil, err
}
return p, nil
Expand Down Expand Up @@ -332,46 +334,33 @@ type stacktraceChunkReader struct {
reader *Reader
header StacktraceChunkHeader

m sync.Mutex
r int64
r refctr.Counter
t *parentPointerTree
}

func (c *stacktraceChunkReader) fetch(ctx context.Context) (err error) {
func (c *stacktraceChunkReader) fetch(ctx context.Context) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "stacktraceChunkReader.fetch")
span.LogFields(
otlog.Int64("size", c.header.Size),
otlog.Uint32("nodes", c.header.StacktraceNodes),
otlog.Uint32("stacks", c.header.Stacktraces),
)
defer span.Finish()
// Mutex is acquired to serialize access to the tree,
// so that the consequent callers only have access to
// it after it is fully loaded. Use of atomics here
// for reference counting is not sufficient.
c.m.Lock()
defer func() {
return c.r.Inc(func() error {
f, err := c.reader.file(StacktracesFileName)
if err != nil {
c.r--
return err
}
c.m.Unlock()
}()
if c.r++; c.r > 1 {
return nil
}
f, err := c.reader.file(StacktracesFileName)
if err != nil {
return err
}
rc, err := c.reader.bucket.GetRange(ctx, f.RelPath, c.header.Offset, c.header.Size)
if err != nil {
return err
}
defer func() {
err = multierror.New(err, rc.Close()).Err()
}()
// Consider pooling the buffer.
return c.readFrom(bufio.NewReaderSize(rc, c.reader.chunkFetchBufferSize))
rc, err := c.reader.bucket.GetRange(ctx, f.RelPath, c.header.Offset, c.header.Size)
if err != nil {
return err
}
defer func() {
err = multierror.New(err, rc.Close()).Err()
}()
// Consider pooling the buffer.
return c.readFrom(bufio.NewReaderSize(rc, c.reader.chunkFetchBufferSize))
})
}

func (c *stacktraceChunkReader) readFrom(r io.Reader) error {
Expand All @@ -388,7 +377,7 @@ func (c *stacktraceChunkReader) readFrom(r io.Reader) error {
crc := crc32.New(castagnoli)
tee := io.TeeReader(r, crc)
if _, err := t.ReadFrom(tee); err != nil {
return fmt.Errorf("failed to unmarshal stack treaces: %w", err)
return fmt.Errorf("failed to unmarshal stack traces: %w", err)
}
if c.header.CRC != crc.Sum32() {
return ErrInvalidCRC
Expand All @@ -398,14 +387,9 @@ func (c *stacktraceChunkReader) readFrom(r io.Reader) error {
}

func (c *stacktraceChunkReader) release() {
// To avoid race with "fetch" caller that could
// have started re-reading the tree right after
// the reference counter has decreased to 0.
c.m.Lock()
if c.r--; c.r < 1 {
c.r.Dec(func() {
c.t = nil
}
c.m.Unlock()
})
}

type parquetTableRange[M schemav1.Models, P schemav1.Persister[M]] struct {
Expand All @@ -415,8 +399,7 @@ type parquetTableRange[M schemav1.Models, P schemav1.Persister[M]] struct {

file *parquetobj.File

m sync.RWMutex
r int64
r refctr.Counter
s []M
}

Expand All @@ -430,43 +413,35 @@ func (t *parquetTableRange[M, P]) fetch(ctx context.Context) (err error) {
"row_groups": len(t.headers),
})
defer span.Finish()
t.m.Lock()
defer func() {
if err != nil {
t.r--
}
t.m.Unlock()
}()
if t.r++; t.r > 1 {
return nil
}
var s uint32
for _, h := range t.headers {
s += h.Rows
}
buf := make([]parquet.Row, inMemoryReaderRowsBufSize)
t.s = make([]M, s)
var offset int
// TODO(kolesnikovae): Row groups could be fetched in parallel.
rgs := t.file.RowGroups()
for _, h := range t.headers {
span.LogFields(
otlog.Uint32("row_group", h.RowGroup),
otlog.Uint32("index_row", h.Index),
otlog.Uint32("rows", h.Rows),
)
rg := rgs[h.RowGroup]
rows := rg.Rows()
if err := rows.SeekToRow(int64(h.Index)); err != nil {
return err
return t.r.Inc(func() error {
var s uint32
for _, h := range t.headers {
s += h.Rows
}
dst := t.s[offset : offset+int(h.Rows)]
if err := t.readRows(dst, buf, rows); err != nil {
return fmt.Errorf("reading row group from parquet file %q: %w", t.file.Path(), err)
buf := make([]parquet.Row, inMemoryReaderRowsBufSize)
t.s = make([]M, s)
var offset int
// TODO(kolesnikovae): Row groups could be fetched in parallel.
rgs := t.file.RowGroups()
for _, h := range t.headers {
span.LogFields(
otlog.Uint32("row_group", h.RowGroup),
otlog.Uint32("index_row", h.Index),
otlog.Uint32("rows", h.Rows),
)
rg := rgs[h.RowGroup]
rows := rg.Rows()
if err := rows.SeekToRow(int64(h.Index)); err != nil {
return err
}
dst := t.s[offset : offset+int(h.Rows)]
if err := t.readRows(dst, buf, rows); err != nil {
return fmt.Errorf("reading row group from parquet file %q: %w", t.file.Path(), err)
}
offset += int(h.Rows)
}
offset += int(h.Rows)
}
return nil
return nil
})
}

func (t *parquetTableRange[M, P]) readRows(dst []M, buf []parquet.Row, rows parquet.Rows) (err error) {
Expand Down Expand Up @@ -499,9 +474,7 @@ func (t *parquetTableRange[M, P]) readRows(dst []M, buf []parquet.Row, rows parq
}

func (t *parquetTableRange[M, P]) release() {
t.m.Lock()
if t.r--; t.r < 1 {
t.r.Dec(func() {
t.s = nil
}
t.m.Unlock()
})
}
10 changes: 7 additions & 3 deletions pkg/phlaredb/symdb/block_reader_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ import (
pparquet "github.com/grafana/pyroscope/pkg/parquet"
)

// Load loads all the partitions into memory. Partitions are kept
// in memory during the whole lifetime of the Reader object.
//
// The main user of the function is Rewriter: as far as is not
// known which partitions will be fetched in advance, but it is
// known that all of them or majority will be requested, preloading
// all of them is more efficient yet consumes more memory.
func (r *Reader) Load(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { return r.loadStacktraces(ctx) })
Expand Down Expand Up @@ -110,9 +117,6 @@ func withRowIterator(f parquetobj.File, partitions []*partition, x loader) error
}

func (t *parquetTableRange[M, P]) loadFrom(iter iter.Iterator[parquet.Row]) error {
if t.r++; t.r > 1 {
return nil
}
var s uint32
for _, h := range t.headers {
s += h.Rows
Expand Down
32 changes: 25 additions & 7 deletions pkg/phlaredb/symdb/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/google/pprof/profile"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"golang.org/x/sync/errgroup"

"github.com/grafana/pyroscope/pkg/model"
Expand All @@ -21,8 +22,9 @@ import (
//
// A new Resolver must be created for each profile.
type Resolver struct {
ctx context.Context
span opentracing.Span
ctx context.Context
cancel context.CancelFunc
span opentracing.Span

s SymbolsReader
g *errgroup.Group
Expand All @@ -44,6 +46,7 @@ func WithMaxConcurrent(n int) ResolverOption {
type lazyPartition struct {
samples map[uint32]int64
c chan *Symbols
err chan error
done chan struct{}
}

Expand All @@ -54,11 +57,15 @@ func NewResolver(ctx context.Context, s SymbolsReader) *Resolver {
p: make(map[uint64]*lazyPartition),
}
r.span, r.ctx = opentracing.StartSpanFromContext(ctx, "NewResolver")
r.g, r.ctx = errgroup.WithContext(ctx)
r.ctx, r.cancel = context.WithCancel(r.ctx)
r.g, r.ctx = errgroup.WithContext(r.ctx)
return &r
}

func (r *Resolver) Release() {
r.cancel()
// The error is already sent to the caller.
_ = r.g.Wait()
r.span.Finish()
}

Expand Down Expand Up @@ -95,6 +102,7 @@ func (r *Resolver) Partition(partition uint64) map[uint32]int64 {
}
p = &lazyPartition{
samples: make(map[uint32]int64),
err: make(chan error),
done: make(chan struct{}),
c: make(chan *Symbols, 1),
}
Expand All @@ -103,16 +111,22 @@ func (r *Resolver) Partition(partition uint64) map[uint32]int64 {
r.g.Go(func() error {
pr, err := r.s.Partition(r.ctx, partition)
if err != nil {
return err
r.span.LogFields(log.String("err", err.Error()))
select {
case <-r.ctx.Done():
return r.ctx.Err()
case p.err <- err:
return err
}
}
defer pr.Release()
p.c <- pr.Symbols()
select {
case <-r.ctx.Done():
return r.ctx.Err()
case p.c <- pr.Symbols():
<-p.done
case <-p.done:
return nil
}
return nil
})
return p.samples
}
Expand All @@ -134,6 +148,8 @@ func (r *Resolver) Tree() (*model.Tree, error) {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-p.err:
return err
case symbols := <-p.c:
samples := schemav1.NewSamplesFromMap(p.samples)
rt, err := symbols.Tree(ctx, samples)
Expand Down Expand Up @@ -171,6 +187,8 @@ func (r *Resolver) Profile() (*profile.Profile, error) {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-p.err:
return err
case symbols := <-p.c:
samples := schemav1.NewSamplesFromMap(p.samples)
rp, err := symbols.Profile(ctx, samples)
Expand Down
22 changes: 22 additions & 0 deletions pkg/phlaredb/symdb/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package symdb

import (
"context"
"io"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
)

func Test_memory_Resolver_ResolveProfile(t *testing.T) {
Expand Down Expand Up @@ -115,3 +119,21 @@ func Test_Resolver_Unreleased_Failed_Partition(t *testing.T) {
require.NoError(t, err)
r.Release()
}

func Test_Resolver_Error_Propagation(t *testing.T) {
m := new(mockSymbolsReader)
m.On("Partition", mock.Anything, mock.Anything).Return(nil, io.EOF).Once()
r := NewResolver(context.Background(), m)
r.AddSamples(0, schemav1.Samples{})
_, err := r.Tree()
require.ErrorIs(t, err, io.EOF)
r.Release()
}

type mockSymbolsReader struct{ mock.Mock }

func (m *mockSymbolsReader) Partition(ctx context.Context, partition uint64) (PartitionReader, error) {
args := m.Called(ctx, partition)
r, _ := args.Get(0).(PartitionReader)
return r, args.Error(1)
}
41 changes: 41 additions & 0 deletions pkg/util/refctr/refctr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package refctr

import "sync"

type Counter struct {
m sync.Mutex
c int
}

// Inc increments the counter and calls the init function,
// if this is the first reference. The call returns an
// error only if init call has failed, and the reference
// has not been incremented.
func (r *Counter) Inc(init func() error) (err error) {
r.m.Lock()
defer func() {
// If initialization fails, we need to make sure
// the next call makes another attempt.
if err != nil {
r.c--
}
r.m.Unlock()
}()
if r.c++; r.c > 1 {
return nil
}
// Mutex is acquired during the call in order to serialize
// access to the resources, so that the consequent callers
// only have access to them after initialization finishes.
return init()
}

// Dec decrements the counter and calls the release function,
// if this is the last reference.
func (r *Counter) Dec(release func()) {
r.m.Lock()
if r.c--; r.c < 1 {
release()
}
r.m.Unlock()
}
Loading