diff --git a/pkg/iter/batch_async.go b/pkg/iter/batch_async.go new file mode 100644 index 0000000000..0a098ceb9a --- /dev/null +++ b/pkg/iter/batch_async.go @@ -0,0 +1,119 @@ +package iter + +type AsyncBatchIterator[T, N any] struct { + idx int + batch []N + buffered []N + + close chan struct{} + done chan struct{} + c chan batch[N] + delegate Iterator[T] + + clone func(T) N + release func([]N) +} + +type batch[T any] struct { + buffered []T + done chan struct{} +} + +const minBatchSize = 64 + +func NewAsyncBatchIterator[T, N any]( + iterator Iterator[T], + size int, + clone func(T) N, + release func([]N), +) *AsyncBatchIterator[T, N] { + if size == 0 { + size = minBatchSize + } + x := &AsyncBatchIterator[T, N]{ + idx: -1, + batch: make([]N, 0, size), + buffered: make([]N, 0, size), + close: make(chan struct{}), + done: make(chan struct{}), + c: make(chan batch[N]), + clone: clone, + release: release, + delegate: iterator, + } + go x.iterate() + return x +} + +func (x *AsyncBatchIterator[T, N]) Next() bool { + if x.idx < 0 || x.idx >= len(x.batch)-1 { + if !x.loadBatch() { + return false + } + } + x.idx++ + return true +} + +func (x *AsyncBatchIterator[T, N]) At() N { return x.batch[x.idx] } + +func (x *AsyncBatchIterator[T, N]) iterate() { + defer func() { + close(x.c) + close(x.done) + }() + for x.fillBuffer() { + b := batch[N]{ + buffered: x.buffered, + done: make(chan struct{}), + } + select { + case x.c <- b: + // Wait for the next loadBatch call. + <-b.done + case <-x.close: + return + } + } +} + +func (x *AsyncBatchIterator[T, N]) loadBatch() bool { + var b batch[N] + select { + case b = <-x.c: + case <-x.done: + } + if len(b.buffered) == 0 { + return false + } + // Swap buffers and signal "iterate" goroutine + // that x.buffered can be used: it will + // immediately start filling the buffer. + x.buffered, x.batch = x.batch, b.buffered + x.idx = -1 + close(b.done) + return true +} + +func (x *AsyncBatchIterator[T, N]) fillBuffer() bool { + x.buffered = x.buffered[:cap(x.buffered)] + x.release(x.buffered) + for i := range x.buffered { + if !x.delegate.Next() { + x.buffered = x.buffered[:i] + break + } + x.buffered[i] = x.clone(x.delegate.At()) + } + return len(x.buffered) > 0 +} + +func (x *AsyncBatchIterator[T, N]) Close() error { + close(x.close) + <-x.done + return x.delegate.Close() +} + +func (x *AsyncBatchIterator[T, N]) Err() error { + return x.delegate.Err() +} diff --git a/pkg/iter/batch_async_test.go b/pkg/iter/batch_async_test.go new file mode 100644 index 0000000000..9035abf543 --- /dev/null +++ b/pkg/iter/batch_async_test.go @@ -0,0 +1,116 @@ +package iter + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_AsyncBatchIter(t *testing.T) { + type testCase struct { + description string + seqSize int + bufSize int + } + testCases := []testCase{ + { + description: "empty iterator", + seqSize: 0, + bufSize: 1, + }, + { + description: "empty iterator, zero buffer", + seqSize: 0, + bufSize: 0, + }, + { + description: "zero buffer", + seqSize: 10, + bufSize: 0, + }, + { + description: "iterator < buffer", + seqSize: 5, + bufSize: 10, + }, + { + description: "iterator == buffer", + seqSize: 10, + bufSize: 10, + }, + { + description: "iterator > buffer", + seqSize: 25, + bufSize: 10, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.description, func(t *testing.T) { + x := NewAsyncBatchIterator[int, int]( + newSeqIterator(tc.seqSize), + tc.bufSize, + func(i int) int { return i }, + func([]int) {}, + ) + var p, c int + for x.Next() { + i := x.At() + require.Equal(t, 1, i-p) + p = i + c++ + } + require.Equal(t, tc.seqSize, c) + require.NoError(t, x.Err()) + require.NoError(t, x.Close()) + }) + } +} + +type seqIterator struct{ n, c int } + +func newSeqIterator(n int) *seqIterator { + return &seqIterator{n: n} +} + +func (x *seqIterator) Next() bool { + if x.c < x.n { + x.c++ + return true + } + return false +} + +func (x *seqIterator) At() int { return x.c } + +func (x *seqIterator) Close() error { return nil } +func (x *seqIterator) Err() error { return nil } + +// Benchmark_AsyncBatchIterator-10 91417 13353 ns/op 17017 B/op 10 allocs/op +func Benchmark_AsyncBatchIterator(b *testing.B) { + b.ReportAllocs() + var n int + for i := 0; i < b.N; i++ { + x := NewAsyncBatchIterator[int, int]( + newSeqIterator(1<<20), + 1<<10, + func(i int) int { return i }, + func([]int) {}, + ) + for x.Next() { + n += x.At() + } + } +} + +// Benchmark_BufferedIterator-10 12 99730976 ns/op 10047 B/op 8 allocs/op +func Benchmark_BufferedIterator(b *testing.B) { + b.ReportAllocs() + var n int + for i := 0; i < b.N; i++ { + x := NewBufferedIterator[int](newSeqIterator(1<<20), 1<<10) + for x.Next() { + n += x.At() + } + } +} diff --git a/pkg/iter/iter.go b/pkg/iter/iter.go index 7a00660c98..12a183426f 100644 --- a/pkg/iter/iter.go +++ b/pkg/iter/iter.go @@ -152,6 +152,9 @@ func (i *sliceIterator[A]) Close() error { } func Slice[T any](it Iterator[T]) ([]T, error) { + if s, ok := it.(*sliceIterator[T]); ok { + return s.list, nil + } var result []T defer it.Close() for it.Next() { diff --git a/pkg/iter/tee.go b/pkg/iter/tee.go new file mode 100644 index 0000000000..479f703d63 --- /dev/null +++ b/pkg/iter/tee.go @@ -0,0 +1,170 @@ +package iter + +import ( + "math" + "sync" +) + +const defaultTeeBufferSize = 4096 + +// Tee returns 2 independent iterators from a single iterable. +// +// The original iterator should not be used anywhere else, except that it's +// caller responsibility to close it and handle the error, after all the +// tee iterators finished. +// +// Tee buffers source objects, and frees them eventually: when an object +// from the source iterator is consumed, the ownership is transferred to Tee. +// Therefore, the caller must ensure the source iterator never reuses objects +// returned with At. +// +// Tee never blocks the leader iterator, instead, it grows the internal buffer: +// if any of the returned iterators are abandoned, all source iterator objects +// will be held in the buffer. +func Tee[T any](iter Iterator[T]) (a, b Iterator[T]) { + s := newTee[T](iter, 2, defaultTeeBufferSize) + return s[0], s[1] +} + +func TeeN[T any](iter Iterator[T], n int) []Iterator[T] { + return newTee[T](iter, n, defaultTeeBufferSize) +} + +// NOTE(kolesnikovae): The implementation design aims for simplicity. +// A more efficient tee can be implemented on top of a linked +// list of small arrays: +// - More efficient (de-)allocations (chunk pool). +// - Less/no mutex contention. + +func newTee[T any](iter Iterator[T], n, bufSize int) []Iterator[T] { + if n < 0 { + return nil + } + s := &sharedIterator[T]{ + s: int64(bufSize), + i: iter, + t: make([]int64, n), + v: make([]T, 0, bufSize), + } + t := make([]Iterator[T], n) + for i := range s.t { + t[i] = &tee[T]{ + s: s, + n: i, + } + } + return t +} + +type sharedIterator[T any] struct { + s int64 + i Iterator[T] + e error + t []int64 + m sync.RWMutex + v []T + w int64 +} + +func (s *sharedIterator[T]) next(n int) bool { + s.m.RLock() + if s.t[n] < s.w { + s.t[n]++ + s.m.RUnlock() + return true + } + s.m.RUnlock() + s.m.Lock() + defer s.m.Unlock() + if s.t[n] < s.w { + s.t[n]++ + return true + } + // All the memoized items were consumed. + if s.e != nil { + return false + } + s.clean() // Conditionally clean consumed values. + // Fetch the next batch from the source iterator. + var i int64 + for ; i < s.s; i++ { + if !s.i.Next() { + break + } + s.v = append(s.v, s.i.At()) + } + s.e = s.i.Err() + s.w += i + if i != 0 { + s.t[n]++ + return true + } + return false +} + +func (s *sharedIterator[T]) clean() { + lo := int64(-1) + for _, v := range s.t { + if v < lo || lo == -1 { + lo = v + } + } + if lo < s.s { + return + } + if lo == math.MaxInt64 { + // All iterators have been closed. + return + } + // Clean values that will be removed, shift + // remaining values to the beginning and update + // iterator offsets accordingly. + lo-- + var v T + for i := range s.v[:lo] { + s.v[i] = v + } + s.v = s.v[:copy(s.v, s.v[lo:])] + s.w -= lo + for i := range s.t { + if s.t[i] != math.MaxInt64 { + s.t[i] -= lo + } + } +} + +func (s *sharedIterator[T]) at(n int) T { + s.m.RLock() + v := s.v[s.t[n]-1] + s.m.RUnlock() + return v +} + +func (s *sharedIterator[T]) close(n int) { + s.m.RLock() + s.t[n] = math.MaxInt64 + s.m.RUnlock() +} + +func (s *sharedIterator[T]) err() error { + s.m.RLock() + e := s.e + s.m.RUnlock() + return e +} + +type tee[T any] struct { + s *sharedIterator[T] + n int +} + +func (t *tee[T]) Next() bool { return t.s.next(t.n) } + +func (t *tee[T]) At() T { return t.s.at(t.n) } + +func (t *tee[T]) Err() error { return t.s.err() } + +func (t *tee[T]) Close() error { + t.s.close(t.n) + return nil +} diff --git a/pkg/iter/tee_test.go b/pkg/iter/tee_test.go new file mode 100644 index 0000000000..29bc0f810d --- /dev/null +++ b/pkg/iter/tee_test.go @@ -0,0 +1,170 @@ +package iter + +import ( + "runtime" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_Tee_All(t *testing.T) { + type testCase struct { + name string + items int + bufSize int + iters int + } + testCases := []testCase{ + { + name: "empty", + items: 0, + iters: 10, + bufSize: 512, + }, + { + name: "no iterators", + bufSize: 512, + }, + { + name: "single iterator", + items: 1000, + iters: 1, + bufSize: 512, + }, + { + name: "matches buffer size", + items: 512, + iters: 10, + bufSize: 512, + }, + { + name: "larger than buffer", + items: 1000, + iters: 10, + bufSize: 512, + }, + { + name: "less than buffer", + items: 7, + iters: 10, + bufSize: 512, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + var wg sync.WaitGroup + s := newSeqIterator(tc.items) + n := make([]int, tc.iters) + it := newTee[int](s, tc.iters, tc.bufSize) + for i, x := range it { + x := x + i := i + wg.Add(1) + go func() { + defer wg.Done() + for x.Next() { + n[i]++ + } + assert.NoError(t, x.Close()) + assert.NoError(t, x.Err()) + }() + } + wg.Wait() + for i, v := range n { + assert.Equal(t, tc.items, v) + assert.False(t, it[i].Next()) + assert.NoError(t, it[i].Close()) + assert.NoError(t, it[i].Err()) + } + assert.False(t, s.Next()) + assert.NoError(t, s.Close()) + assert.NoError(t, s.Err()) + }) + } +} + +func Test_Tee_Lag(t *testing.T) { + const ( + items = 1000 + iters = 10 + bufSize = 512 + ) + var wg sync.WaitGroup + s := newSeqIterator(items) + n := make([]int, iters) + it := newTee[int](s, iters, bufSize) + for i, x := range it { + x := x + i := i + wg.Add(1) + go func() { + defer wg.Done() + // Each iterator will consume i * 100 items. + for (n[i] < (i+1)*(items/iters)) && x.Next() { + n[i]++ + } + assert.NoError(t, x.Close()) + assert.NoError(t, x.Err()) + }() + } + wg.Wait() + for i, v := range n { + assert.Equal(t, (i+1)*(items/iters), v) + assert.False(t, it[i].Next()) + assert.NoError(t, it[i].Close()) + assert.NoError(t, it[i].Err()) + } + assert.False(t, s.Next()) + assert.NoError(t, s.Close()) + assert.NoError(t, s.Err()) +} + +func Test_Tee_BufferReuse(t *testing.T) { + const ( + items = 1 << 20 + iters = 2 + bufSize = 512 + ) + + var wg sync.WaitGroup + s := newSeqIterator(items) + n := make([]int, iters) + it := newTee[int](s, iters, bufSize) + for i, x := range it { + x := x + i := i + wg.Add(1) + go func() { + defer wg.Done() + var j int + for x.Next() { + n[i]++ + j++ + // Let others consume. + if j%4<<10 == 0 { + runtime.Gosched() + } + } + assert.NoError(t, x.Close()) + assert.NoError(t, x.Err()) + }() + } + wg.Wait() + + for i, v := range n { + assert.Equal(t, items, v) + assert.False(t, it[i].Next()) + assert.NoError(t, it[i].Close()) + assert.NoError(t, it[i].Err()) + } + assert.False(t, s.Next()) + assert.NoError(t, s.Close()) + assert.NoError(t, s.Err()) + + // Might be flaky. + // Typically, for the given test, the expected + // buffer capacity is within [10K:100K]. + assert.Less(t, cap(it[0].(*tee[int]).s.v), 2*items) +} diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 988109c30c..2cf7090e1f 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -1483,13 +1483,3 @@ func (r *parquetReader[M, P]) columnIter(ctx context.Context, columnName string, ctx = query.AddMetricsToContext(ctx, r.metrics.query) return query.NewSyncIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias) } - -func repeatedColumnIter[T any](ctx context.Context, source Source, columnName string, rows iter.Iterator[T]) iter.Iterator[*query.RepeatedRow[T]] { - column, found := source.Schema().Lookup(strings.Split(columnName, ".")...) - if !found { - return iter.NewErrIterator[*query.RepeatedRow[T]](fmt.Errorf("column '%s' not found in parquet file", columnName)) - } - - opentracing.SpanFromContext(ctx).SetTag("columnName", columnName) - return query.NewRepeatedPageIterator(ctx, rows, source.RowGroups(), column.ColumnIndex, 1e4) -} diff --git a/pkg/phlaredb/block_querier_symbols.go b/pkg/phlaredb/block_querier_symbols.go index 48ca86c0b3..408b82fd0b 100644 --- a/pkg/phlaredb/block_querier_symbols.go +++ b/pkg/phlaredb/block_querier_symbols.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "strings" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/runutil" @@ -15,6 +16,7 @@ import ( phlareobj "github.com/grafana/pyroscope/pkg/objstore" parquetobj "github.com/grafana/pyroscope/pkg/objstore/parquet" "github.com/grafana/pyroscope/pkg/phlaredb/block" + "github.com/grafana/pyroscope/pkg/phlaredb/query" schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" "github.com/grafana/pyroscope/pkg/phlaredb/symdb" "github.com/grafana/pyroscope/pkg/util" @@ -87,14 +89,18 @@ func (r *symbolsResolverV1) Partition(_ context.Context, _ uint64) (symdb.Partit type stacktraceResolverV1 struct{ r *symbolsResolverV1 } func (r stacktraceResolverV1) ResolveStacktraceLocations(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error { - it := repeatedColumnIter(ctx, r.r.stacktraces.file, "LocationIDs.list.element", iter.NewSliceIterator(stacktraces)) - defer it.Close() + column, err := schemav1.ResolveColumnByPath(r.r.stacktraces.file.Schema(), strings.Split("LocationIDs.list.element", ".")) + if err != nil { + return err + } + it := query.NewRepeatedRowIterator(ctx, iter.NewSliceIterator(stacktraces), r.r.stacktraces.file.RowGroups(), column.ColumnIndex) + defer runutil.CloseWithErrCapture(&err, it, "failed to close stack trace stream") t := make([]int32, 0, 64) for it.Next() { s := it.At() t = grow(t, len(s.Values)) for i, v := range s.Values { - t[i] = v.Int32() + t[i] = v[0].Int32() } dst.InsertStacktrace(s.Row, t) } diff --git a/pkg/phlaredb/query/profiles.go b/pkg/phlaredb/query/profiles.go deleted file mode 100644 index 92a9d48905..0000000000 --- a/pkg/phlaredb/query/profiles.go +++ /dev/null @@ -1,46 +0,0 @@ -package query - -// type ProfileValue struct { -// Profile -// Value int64 -// } - -// type SeriesIterator struct { -// rowNums Iterator - -// curr Profile -// buffer [][]parquet.Value -// } - -// func NewSeriesIterator(rowNums Iterator, fp model.Fingerprint, lbs phlaremodel.Labels) *SeriesIterator { -// return &SeriesIterator{ -// rowNums: rowNums, -// curr: Profile{fp: fp, labels: lbs}, -// } -// } - -// func (p *SeriesIterator) Next() bool { -// if !p.rowNums.Next() { -// return false -// } -// if p.buffer == nil { -// p.buffer = make([][]parquet.Value, 2) -// } -// result := p.rowNums.At() -// p.curr.RowNum = result.RowNumber[0] -// p.buffer = result.Columns(p.buffer, "TimeNanos") -// p.curr.t = model.TimeFromUnixNano(p.buffer[0][0].Int64()) -// return true -// } - -// func (p *SeriesIterator) At() Profile { -// return p.curr -// } - -// func (p *SeriesIterator) Err() error { -// return p.rowNums.Err() -// } - -// func (p *SeriesIterator) Close() error { -// return p.rowNums.Close() -// } diff --git a/pkg/phlaredb/query/repeated.go b/pkg/phlaredb/query/repeated.go index a903d22cf1..40ff939e0d 100644 --- a/pkg/phlaredb/query/repeated.go +++ b/pkg/phlaredb/query/repeated.go @@ -4,345 +4,396 @@ import ( "context" "fmt" "io" + "sync" "time" "github.com/grafana/dskit/multierror" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/parquet-go/parquet-go" - "github.com/samber/lo" "github.com/grafana/pyroscope/pkg/iter" ) type RepeatedRow[T any] struct { Row T - Values []parquet.Value + Values [][]parquet.Value } -type repeatedPageIterator[T any] struct { - rows iter.Iterator[T] - column int - readSize int - ctx context.Context - span opentracing.Span - - rgs []parquet.RowGroup - startRowGroupRowNum int64 - - currentPage parquet.Page - startPageRowNum int64 - - pageNextRowNum int64 +type repeatedRowIterator[T any] struct { + columns iter.Iterator[[][]parquet.Value] + rows iter.Iterator[T] +} - currentPages parquet.Pages - valueReader parquet.ValueReader +const ( + // Batch size specifies how many rows to be read + // from a column at once. Note that the batched rows + // are buffered in-memory, but not reference pages + // they were read from. + defaultRepeatedRowIteratorBatchSize = 128 - rowFinished bool - skipping bool - err error - done bool // because we advance the iterator to seek in advance we remember if we are done - currentValue *RepeatedRow[T] - buffer []parquet.Value - originalBuffer []parquet.Value -} + // The value specifies how many individual values to be + // read (decoded) from the page. + // + // Too big read size does not make much sense: despite + // the fact that this does not impact read amplification + // as the page is already fully read, decoding of the + // values is not free. + // + // How many values we expect per a row, the upper boundary? + repeatedRowColumnIteratorReadSize = 2 << 10 +) -// NewRepeatedPageIterator returns an iterator that iterates over the repeated values in a column. -// The iterator can only seek forward and so rows should be sorted by row number. -func NewRepeatedPageIterator[T any]( +func NewRepeatedRowIterator[T any]( ctx context.Context, rows iter.Iterator[T], - rgs []parquet.RowGroup, - column int, - readSize int, -) iter.Iterator[*RepeatedRow[T]] { - if readSize <= 0 { - panic("readSize must be greater than 0") + rowGroups []parquet.RowGroup, + columns ...int, +) iter.Iterator[RepeatedRow[T]] { + rows, rowNumbers := iter.Tee(rows) + return &repeatedRowIterator[T]{ + rows: rows, + columns: NewMultiColumnIterator(ctx, + WrapWithRowNumber(rowNumbers), + defaultRepeatedRowIteratorBatchSize, + rowGroups, columns...), + } +} + +func (x *repeatedRowIterator[T]) Next() bool { + if !x.rows.Next() { + return false } - buffer := make([]parquet.Value, readSize) - done := !rows.Next() - span, ctx := opentracing.StartSpanFromContext(ctx, "NewRepeatedPageIterator") - return &repeatedPageIterator[T]{ - ctx: ctx, - span: span, - rows: rows, - rgs: rgs, - column: column, - readSize: readSize, - buffer: buffer[:0], - originalBuffer: buffer, - currentValue: &RepeatedRow[T]{}, - done: done, - rowFinished: true, - skipping: false, + return x.columns.Next() +} + +func (x *repeatedRowIterator[T]) At() RepeatedRow[T] { + return RepeatedRow[T]{ + Values: x.columns.At(), + Row: x.rows.At(), } } -// seekRowNum the row num to seek to. -func (it *repeatedPageIterator[T]) seekRowNum() int64 { - switch i := any(it.rows.At()).(type) { +func (x *repeatedRowIterator[T]) Err() error { + return x.columns.Err() +} + +func (x *repeatedRowIterator[T]) Close() error { + return x.columns.Close() +} + +type rowNumberIterator[T any] struct{ it iter.Iterator[T] } + +func WrapWithRowNumber[T any](it iter.Iterator[T]) iter.Iterator[int64] { + return &rowNumberIterator[T]{it} +} + +func (x *rowNumberIterator[T]) Next() bool { return x.it.Next() } +func (x *rowNumberIterator[T]) Err() error { return x.it.Err() } +func (x *rowNumberIterator[T]) Close() error { return x.it.Close() } + +func (x *rowNumberIterator[T]) At() int64 { + v := any(x.it.At()) + switch r := v.(type) { case RowGetter: - return i.RowNumber() + return r.RowNumber() case int64: - return i + return r case uint32: - return int64(i) + return int64(r) default: - panic(fmt.Sprintf("unknown row type: %T", it.rows.At())) + panic(fmt.Sprintf("unknown row type: %T", v)) } } -func (it *repeatedPageIterator[T]) Next() bool { -Outer: - for { - if it.done { - return false - } - for len(it.rgs) != 0 && (it.seekRowNum() >= (it.startRowGroupRowNum + it.rgs[0].NumRows())) { - if !it.closeCurrentPages() { - return false - } - it.startRowGroupRowNum += it.rgs[0].NumRows() - it.rgs = it.rgs[1:] - } - if len(it.rgs) == 0 { +type multiColumnIterator struct { + r []iter.Iterator[int64] + c []iter.Iterator[[]parquet.Value] + v [][]parquet.Value +} + +func NewMultiColumnIterator( + ctx context.Context, + rows iter.Iterator[int64], + batchSize int, + rowGroups []parquet.RowGroup, + columns ...int, +) iter.Iterator[[][]parquet.Value] { + m := multiColumnIterator{ + c: make([]iter.Iterator[[]parquet.Value], len(columns)), + v: make([][]parquet.Value, len(columns)), + // Even if there is just one column, we do need to tee it, + // as the source rows iterator is owned by caller, and we + // must never close it on our own. + r: iter.TeeN(rows, len(columns)), + } + for i, column := range columns { + m.c[i] = iter.NewAsyncBatchIterator[[]parquet.Value]( + NewRepeatedRowColumnIterator(ctx, m.r[i], rowGroups, column), + batchSize, + CloneParquetValues, + ReleaseParquetValues, + ) + } + return &m +} + +func (m *multiColumnIterator) Next() bool { + for i, x := range m.c { + if !x.Next() { return false } - if it.currentPages == nil { - it.currentPages = it.rgs[0].ColumnChunks()[it.column].Pages() - } - // read a new page. - if it.currentPage == nil { - // SeekToRow seek across and within pages. So the next position in the page will the be the row. - seekTo := it.seekRowNum() - it.startRowGroupRowNum - if err := it.currentPages.SeekToRow(seekTo); err != nil { - it.err = err - it.currentPages = nil // we can set it to nil since somehow it was closed. - return false - } - it.startPageRowNum = it.seekRowNum() - it.pageNextRowNum = 0 - it.buffer = it.buffer[:0] - it.rowFinished = true - it.skipping = false - var err error - pageReadStart := time.Now() - it.currentPage, err = it.currentPages.ReadPage() - pageReadDurationMs := time.Since(pageReadStart).Milliseconds() - if err != nil { - if err == io.EOF { - continue - } - it.err = err - return false - } - it.span.LogFields( - otlog.String("msg", "Page read"), - otlog.Int64("startRowGroupRowNum", it.startRowGroupRowNum), - otlog.Int64("startPageRowNum", it.startPageRowNum), - otlog.Int64("pageRowNum", it.currentPage.NumRows()), - otlog.Int64("duration_ms", pageReadDurationMs), - ) - it.valueReader = it.currentPage.Values() - } - // if there's no more value in that page we can skip it. - if it.seekRowNum() >= it.startPageRowNum+it.currentPage.NumRows() { - it.currentPage = nil - continue - } + m.v[i] = x.At() + } + return true +} - // only read values if the buffer is empty - if len(it.buffer) == 0 { - // reading values.... - it.buffer = it.originalBuffer - n, err := it.valueReader.ReadValues(it.buffer) - if err != nil && err != io.EOF { - it.err = err - return false - } - it.buffer = it.buffer[:n] - // no more buffer, move to next page - if len(it.buffer) == 0 { - it.done = !it.rows.Next() // if the page has no more data the current row is over. - it.currentPage = nil - continue - } - } +func (m *multiColumnIterator) At() [][]parquet.Value { return m.v } - // we have data in the buffer. - it.currentValue.Row = it.rows.At() - start, next, ok := it.readNextRow() - if ok && it.rowFinished { - if it.seekRowNum() > it.startPageRowNum+it.pageNextRowNum { - it.pageNextRowNum++ - it.buffer = it.buffer[next:] - continue Outer - } - it.pageNextRowNum++ - it.currentValue.Values = it.buffer[:next] - it.buffer = it.buffer[next:] // consume the values. - it.done = !it.rows.Next() - return true +func (m *multiColumnIterator) Err() error { + var err multierror.MultiError + for i := range m.c { + err.Add(m.c[i].Err()) + err.Add(m.r[i].Err()) + } + return err.Err() +} + +func (m *multiColumnIterator) Close() error { + var err multierror.MultiError + for i := range m.c { + err.Add(m.c[i].Close()) + err.Add(m.r[i].Close()) + } + return err.Err() +} + +var ErrSeekOutOfRange = fmt.Errorf("bug: south row is out of range") + +type repeatedRowColumnIterator struct { + ctx context.Context + span opentracing.Span + + rows iter.Iterator[int64] + rgs []parquet.RowGroup + column int + readSize int + + pages parquet.Pages + page parquet.Page + + minRGRowNum int64 + maxRGRowNum int64 + maxPageRowNum int64 + + rowsRead int64 + rowsFetched int64 + pageBytes int64 + + vit *repeatedValuePageIterator + prev int64 + err error +} + +func NewRepeatedRowColumnIterator(ctx context.Context, rows iter.Iterator[int64], rgs []parquet.RowGroup, column int) iter.Iterator[[]parquet.Value] { + r := repeatedRowColumnIterator{ + rows: rows, + rgs: rgs, + column: column, + vit: getRepeatedValuePageIteratorFromPool(), + readSize: repeatedRowColumnIteratorReadSize, + } + r.span, r.ctx = opentracing.StartSpanFromContext(ctx, "RepeatedRowColumnIterator") + return &r +} + +func (x *repeatedRowColumnIterator) Next() bool { + if !x.rows.Next() || x.err != nil { + return false + } + rn := x.rows.At() + if rn >= x.maxRGRowNum { + if !x.seekRowGroup(rn) { + return false } - // we read a partial row or we're skipping a row. - if it.rowFinished || it.skipping { - it.rowFinished = false - // skip until we find the next row. - if it.seekRowNum() > it.startPageRowNum+it.pageNextRowNum { - last := it.buffer[start].RepetitionLevel() - if it.skipping && last == 0 { - it.buffer = it.buffer[start:] - it.pageNextRowNum++ - it.skipping = false - it.rowFinished = true - } else { - if start != 0 { - next = start + 1 - } - it.buffer = it.buffer[next:] - it.skipping = true - } - continue Outer - } - it.currentValue.Values = it.buffer[:next] - it.buffer = it.buffer[next:] // consume the values. - return true + } + rn -= x.minRGRowNum + if x.page == nil || rn >= x.maxPageRowNum { + if !x.readPage(rn) { + return false } - // this is the start of a new row. - if !it.rowFinished && it.buffer[start].RepetitionLevel() == 0 { - // consume values up to the new start if there is - if start >= 1 { - it.currentValue.Values = it.buffer[:start] - it.buffer = it.buffer[start:] // consume the values. - return true - } - // or move to the next row. - it.pageNextRowNum++ - it.done = !it.rows.Next() - it.rowFinished = true - continue Outer + // readPage ensures that the first row in the + // page matches rn, therefore we don't need to + // skip anything. + x.prev = rn - 1 + } + // Skip rows to the rn. + next := int(rn - x.prev) + x.prev = rn + for i := 0; i < next; i++ { + if !x.vit.Next() { + x.err = ErrSeekOutOfRange + return false } - it.currentValue.Values = it.buffer[:next] - it.buffer = it.buffer[next:] // consume the values. - return true } + x.rowsRead++ + return true } -func (it *repeatedPageIterator[T]) readNextRow() (int, int, bool) { - start := 0 - foundStart := false - for i, v := range it.buffer { - if v.RepetitionLevel() == 0 && !foundStart { - foundStart = true - start = i +func (x *repeatedRowColumnIterator) seekRowGroup(rn int64) bool { + for i, rg := range x.rgs { + x.minRGRowNum = x.maxRGRowNum + x.maxRGRowNum += rg.NumRows() + if rn >= x.maxRGRowNum { continue } - if v.RepetitionLevel() == 0 && foundStart { - return start, i, true - } + x.rgs = x.rgs[i+1:] + return x.openChunk(rg) } - return start, len(it.buffer), false + return false } -func (it *repeatedPageIterator[T]) closeCurrentPages() bool { - if it.currentPages != nil { - if err := it.currentPages.Close(); err != nil { - it.err = err - it.currentPages = nil +func (x *repeatedRowColumnIterator) openChunk(rg parquet.RowGroup) bool { + x.page = nil + x.vit.reset(nil, 0) + if x.pages != nil { + if x.err = x.pages.Close(); x.err != nil { return false } - it.currentPages = nil } + x.pages = rg.ColumnChunks()[x.column].Pages() return true } -// At returns the current value. -// Only valid after a call to Next. -// The returned value is reused on the next call to Next and should not be retained. -func (it *repeatedPageIterator[T]) At() *RepeatedRow[T] { - return it.currentValue -} - -func (it *repeatedPageIterator[T]) Err() error { - return it.err -} - -func (it *repeatedPageIterator[T]) Close() error { - defer it.span.Finish() - if it.currentPages != nil { - if err := it.currentPages.Close(); err != nil { - return err +func (x *repeatedRowColumnIterator) readPage(rn int64) bool { + if x.err = x.ctx.Err(); x.err != nil { + return false + } + if x.err = x.pages.SeekToRow(rn); x.err != nil { + return false + } + readPageStart := time.Now() + if x.page, x.err = x.pages.ReadPage(); x.err != nil { + if x.err != io.EOF { + x.span.LogFields(otlog.Error(x.err)) + return false + } + x.err = nil + // ReadPage should never return page along with EOF, + // however this is not a strict contract. + if x.page == nil { + return false } - it.currentPages = nil } - return nil + pageReadDurationMs := time.Since(readPageStart).Milliseconds() + // NumRows return the number of row in the page + // not counting skipped ones (because of SeekToRow). + // The implementation is quite expensive, therefore + // we should call it once per page. + x.pageBytes += x.page.Size() + pageNumRows := x.page.NumRows() + x.maxPageRowNum = rn + pageNumRows + x.rowsFetched += pageNumRows + x.vit.reset(x.page, x.readSize) + x.span.LogFields( + otlog.String("msg", "Page read"), + otlog.Int64("min_rg_row", x.minRGRowNum), + otlog.Int64("max_rg_row", x.maxRGRowNum), + otlog.Int64("seek_row", x.minRGRowNum+rn), + otlog.Int64("page_read_ms", pageReadDurationMs), + otlog.Int64("page_num_rows", pageNumRows), + ) + return true } -type MultiRepeatedRow[T any] struct { - Row T - Values [][]parquet.Value +func (x *repeatedRowColumnIterator) At() []parquet.Value { return x.vit.At() } +func (x *repeatedRowColumnIterator) Err() error { return x.err } +func (x *repeatedRowColumnIterator) Close() error { + putRepeatedValuePageIteratorToPool(x.vit) + err := x.pages.Close() + x.span.LogFields( + otlog.Int64("page_bytes", x.pageBytes), + otlog.Int64("rows_fetched", x.rowsFetched), + otlog.Int64("rows_read", x.rowsRead), + ) + x.span.Finish() + return err } -type multiRepeatedPageIterator[T any] struct { - iters []iter.Iterator[*RepeatedRow[T]] - asyncNext []<-chan bool - err error - curr *MultiRepeatedRow[T] +var repeatedValuePageIteratorPool = sync.Pool{New: func() any { return new(repeatedValuePageIterator) }} + +func getRepeatedValuePageIteratorFromPool() *repeatedValuePageIterator { + return repeatedValuePageIteratorPool.Get().(*repeatedValuePageIterator) } -// NewMultiRepeatedPageIterator returns an iterator that iterates over the values of repeated columns nested together. -// Each column is iterate over in parallel. -// If one column is finished, the iterator will return false. -func NewMultiRepeatedPageIterator[T any](iters ...iter.Iterator[*RepeatedRow[T]]) iter.Iterator[*MultiRepeatedRow[T]] { - return &multiRepeatedPageIterator[T]{ - iters: iters, - asyncNext: make([]<-chan bool, len(iters)), - curr: &MultiRepeatedRow[T]{ - Values: make([][]parquet.Value, len(iters)), - }, - } +func putRepeatedValuePageIteratorToPool(x *repeatedValuePageIterator) { + x.reset(nil, 0) + repeatedValuePageIteratorPool.Put(x) } -func (it *multiRepeatedPageIterator[T]) Next() bool { - for i := range it.iters { - i := i - it.asyncNext[i] = lo.Async(func() bool { - next := it.iters[i].Next() - if next { - it.curr.Values[i] = it.iters[i].At().Values - if i == 0 { - it.curr.Row = it.iters[i].At().Row - } - } - return next - }) - } - next := true - for i := range it.iters { - if !<-it.asyncNext[i] { - next = false - } - } - return next +// RepeatedValuePageIterator iterates over repeated fields. +// FIXME(kolesnikovae): Definition level is ignored. +type repeatedValuePageIterator struct { + page parquet.ValueReader + buf []parquet.Value + off int + row []parquet.Value + err error } -func (it *multiRepeatedPageIterator[T]) At() *MultiRepeatedRow[T] { - return it.curr +func NewRepeatedValuePageIterator(page parquet.Page, readSize int) iter.Iterator[[]parquet.Value] { + var r repeatedValuePageIterator + r.reset(page, readSize) + return &r } -func (it *multiRepeatedPageIterator[T]) Err() error { - errs := multierror.New() - errs.Add(it.err) - for _, i := range it.iters { - errs.Add(i.Err()) +func (x *repeatedValuePageIterator) At() []parquet.Value { return x.row } +func (x *repeatedValuePageIterator) Err() error { return x.err } +func (x *repeatedValuePageIterator) Close() error { return nil } + +func (x *repeatedValuePageIterator) reset(page parquet.Page, readSize int) { + if cap(x.buf) < readSize { + x.buf = make([]parquet.Value, 0, readSize) + } + x.page = nil + if page != nil { + x.page = page.Values() } - return errs.Err() + x.buf = x.buf[:0] + x.row = x.row[:0] + x.err = nil + x.off = 0 } -func (it *multiRepeatedPageIterator[T]) Close() error { - errs := multierror.New() - for _, i := range it.iters { - errs.Add(i.Close()) +func (x *repeatedValuePageIterator) Next() bool { + if x.err != nil { + return false + } + x.row = x.row[:0] + var err error + var n int +loop: + for { + buf := x.buf[x.off:] + for _, v := range buf { + if v.RepetitionLevel() == 0 && len(x.row) > 0 { + // Found a new row. + break loop + } + x.row = append(x.row, v) + x.off++ + } + // Refill the buffer. + x.buf = x.buf[:cap(x.buf)] + x.off = 0 + n, err = x.page.ReadValues(x.buf) + x.buf = x.buf[:n] + if err != nil && err != io.EOF { + x.err = err + } + if n == 0 { + break + } } - return errs.Err() + return len(x.row) > 0 } diff --git a/pkg/phlaredb/query/repeated_test.go b/pkg/phlaredb/query/repeated_test.go index e11c2a7316..f187210424 100644 --- a/pkg/phlaredb/query/repeated_test.go +++ b/pkg/phlaredb/query/repeated_test.go @@ -7,12 +7,13 @@ import ( "github.com/google/go-cmp/cmp" "github.com/parquet-go/parquet-go" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/pyroscope/pkg/iter" ) -type RepeatedTestRow struct { +type repeatedTestRow struct { List []int64 } @@ -24,11 +25,11 @@ func (t testRowGetter) RowNumber() int64 { return t.RowNum } -func Test_RepeatedIterator(t *testing.T) { +func Test_RepeatedRowIterator_SingleColumn(t *testing.T) { for _, tc := range []struct { name string rows []testRowGetter - rgs [][]RepeatedTestRow + rgs [][]repeatedTestRow expected []RepeatedRow[testRowGetter] readSize int }{ @@ -39,7 +40,7 @@ func Test_RepeatedIterator(t *testing.T) { {1}, {2}, }, - rgs: [][]RepeatedTestRow{ + rgs: [][]repeatedTestRow{ { {[]int64{1, 1, 1, 1}}, {[]int64{2}}, @@ -47,9 +48,9 @@ func Test_RepeatedIterator(t *testing.T) { }, }, expected: []RepeatedRow[testRowGetter]{ - {testRowGetter{0}, []parquet.Value{parquet.ValueOf(1), parquet.ValueOf(1), parquet.ValueOf(1), parquet.ValueOf(1)}}, - {testRowGetter{1}, []parquet.Value{parquet.ValueOf(2)}}, - {testRowGetter{2}, []parquet.Value{parquet.ValueOf(3), parquet.ValueOf(4)}}, + {testRowGetter{0}, [][]parquet.Value{{parquet.ValueOf(1), parquet.ValueOf(1), parquet.ValueOf(1), parquet.ValueOf(1)}}}, + {testRowGetter{1}, [][]parquet.Value{{parquet.ValueOf(2)}}}, + {testRowGetter{2}, [][]parquet.Value{{parquet.ValueOf(3), parquet.ValueOf(4)}}}, }, }, { @@ -59,7 +60,7 @@ func Test_RepeatedIterator(t *testing.T) { {2}, {7}, }, - rgs: [][]RepeatedTestRow{ + rgs: [][]repeatedTestRow{ { {[]int64{1}}, {[]int64{2}}, @@ -77,9 +78,9 @@ func Test_RepeatedIterator(t *testing.T) { }, }, expected: []RepeatedRow[testRowGetter]{ - {testRowGetter{0}, []parquet.Value{parquet.ValueOf(1)}}, - {testRowGetter{2}, []parquet.Value{parquet.ValueOf(3)}}, - {testRowGetter{7}, []parquet.Value{parquet.ValueOf(8)}}, + {testRowGetter{0}, [][]parquet.Value{{parquet.ValueOf(1)}}}, + {testRowGetter{2}, [][]parquet.Value{{parquet.ValueOf(3)}}}, + {testRowGetter{7}, [][]parquet.Value{{parquet.ValueOf(8)}}}, }, }, { @@ -89,7 +90,7 @@ func Test_RepeatedIterator(t *testing.T) { {1}, {2}, }, - rgs: [][]RepeatedTestRow{ + rgs: [][]repeatedTestRow{ { {[]int64{1, 2, 3}}, {[]int64{4, 5, 6}}, @@ -97,9 +98,9 @@ func Test_RepeatedIterator(t *testing.T) { }, }, expected: []RepeatedRow[testRowGetter]{ - {testRowGetter{0}, []parquet.Value{parquet.ValueOf(1), parquet.ValueOf(2), parquet.ValueOf(3)}}, - {testRowGetter{1}, []parquet.Value{parquet.ValueOf(4), parquet.ValueOf(5), parquet.ValueOf(6)}}, - {testRowGetter{2}, []parquet.Value{parquet.ValueOf(7), parquet.ValueOf(8), parquet.ValueOf(9)}}, + {testRowGetter{0}, [][]parquet.Value{{parquet.ValueOf(1), parquet.ValueOf(2), parquet.ValueOf(3)}}}, + {testRowGetter{1}, [][]parquet.Value{{parquet.ValueOf(4), parquet.ValueOf(5), parquet.ValueOf(6)}}}, + {testRowGetter{2}, [][]parquet.Value{{parquet.ValueOf(7), parquet.ValueOf(8), parquet.ValueOf(9)}}}, }, }, { @@ -107,7 +108,7 @@ func Test_RepeatedIterator(t *testing.T) { rows: []testRowGetter{ {0}, {1}, {2}, {6}, {7}, {8}, }, - rgs: [][]RepeatedTestRow{ + rgs: [][]repeatedTestRow{ { {[]int64{1, 2, 3}}, {[]int64{4, 5, 6}}, @@ -125,12 +126,12 @@ func Test_RepeatedIterator(t *testing.T) { }, }, expected: []RepeatedRow[testRowGetter]{ - {testRowGetter{0}, []parquet.Value{parquet.ValueOf(1), parquet.ValueOf(2), parquet.ValueOf(3)}}, - {testRowGetter{1}, []parquet.Value{parquet.ValueOf(4), parquet.ValueOf(5), parquet.ValueOf(6)}}, - {testRowGetter{2}, []parquet.Value{parquet.ValueOf(7), parquet.ValueOf(8), parquet.ValueOf(9)}}, - {testRowGetter{6}, []parquet.Value{parquet.ValueOf(19), parquet.ValueOf(20), parquet.ValueOf(21)}}, - {testRowGetter{7}, []parquet.Value{parquet.ValueOf(22), parquet.ValueOf(23), parquet.ValueOf(24)}}, - {testRowGetter{8}, []parquet.Value{parquet.ValueOf(25), parquet.ValueOf(26), parquet.ValueOf(27)}}, + {testRowGetter{0}, [][]parquet.Value{{parquet.ValueOf(1), parquet.ValueOf(2), parquet.ValueOf(3)}}}, + {testRowGetter{1}, [][]parquet.Value{{parquet.ValueOf(4), parquet.ValueOf(5), parquet.ValueOf(6)}}}, + {testRowGetter{2}, [][]parquet.Value{{parquet.ValueOf(7), parquet.ValueOf(8), parquet.ValueOf(9)}}}, + {testRowGetter{6}, [][]parquet.Value{{parquet.ValueOf(19), parquet.ValueOf(20), parquet.ValueOf(21)}}}, + {testRowGetter{7}, [][]parquet.Value{{parquet.ValueOf(22), parquet.ValueOf(23), parquet.ValueOf(24)}}}, + {testRowGetter{8}, [][]parquet.Value{{parquet.ValueOf(25), parquet.ValueOf(26), parquet.ValueOf(27)}}}, }, }, { @@ -138,7 +139,7 @@ func Test_RepeatedIterator(t *testing.T) { rows: []testRowGetter{ {1}, }, - rgs: [][]RepeatedTestRow{ + rgs: [][]repeatedTestRow{ { {[]int64{1, 2, 3}}, {[]int64{4, 5, 6}}, @@ -146,7 +147,7 @@ func Test_RepeatedIterator(t *testing.T) { }, }, expected: []RepeatedRow[testRowGetter]{ - {testRowGetter{1}, []parquet.Value{parquet.ValueOf(4), parquet.ValueOf(5), parquet.ValueOf(6)}}, + {testRowGetter{1}, [][]parquet.Value{{parquet.ValueOf(4), parquet.ValueOf(5), parquet.ValueOf(6)}}}, }, }, { @@ -157,7 +158,7 @@ func Test_RepeatedIterator(t *testing.T) { {5}, {7}, }, - rgs: [][]RepeatedTestRow{ + rgs: [][]repeatedTestRow{ { {[]int64{1, 2, 3}}, // 0 {[]int64{4, 5, 6}}, @@ -173,10 +174,10 @@ func Test_RepeatedIterator(t *testing.T) { }, }, expected: []RepeatedRow[testRowGetter]{ - {testRowGetter{0}, []parquet.Value{parquet.ValueOf(1), parquet.ValueOf(2), parquet.ValueOf(3)}}, - {testRowGetter{2}, []parquet.Value{parquet.ValueOf(7), parquet.ValueOf(8), parquet.ValueOf(9)}}, - {testRowGetter{5}, []parquet.Value{parquet.ValueOf(10), parquet.ValueOf(11), parquet.ValueOf(12)}}, - {testRowGetter{7}, []parquet.Value{parquet.ValueOf(13), parquet.ValueOf(14), parquet.ValueOf(15)}}, + {testRowGetter{0}, [][]parquet.Value{{parquet.ValueOf(1), parquet.ValueOf(2), parquet.ValueOf(3)}}}, + {testRowGetter{2}, [][]parquet.Value{{parquet.ValueOf(7), parquet.ValueOf(8), parquet.ValueOf(9)}}}, + {testRowGetter{5}, [][]parquet.Value{{parquet.ValueOf(10), parquet.ValueOf(11), parquet.ValueOf(12)}}}, + {testRowGetter{7}, [][]parquet.Value{{parquet.ValueOf(13), parquet.ValueOf(14), parquet.ValueOf(15)}}}, }, }, { @@ -187,7 +188,7 @@ func Test_RepeatedIterator(t *testing.T) { {8}, {10}, }, - rgs: [][]RepeatedTestRow{ + rgs: [][]repeatedTestRow{ { {[]int64{1, 2, 3}}, // 0 {[]int64{4, 5, 6}}, @@ -208,10 +209,10 @@ func Test_RepeatedIterator(t *testing.T) { }, }, expected: []RepeatedRow[testRowGetter]{ - {testRowGetter{0}, []parquet.Value{parquet.ValueOf(1), parquet.ValueOf(2), parquet.ValueOf(3)}}, - {testRowGetter{2}, []parquet.Value{parquet.ValueOf(7), parquet.ValueOf(8), parquet.ValueOf(9)}}, - {testRowGetter{8}, []parquet.Value{parquet.ValueOf(10), parquet.ValueOf(11), parquet.ValueOf(12)}}, - {testRowGetter{10}, []parquet.Value{parquet.ValueOf(13), parquet.ValueOf(14), parquet.ValueOf(15)}}, + {testRowGetter{0}, [][]parquet.Value{{parquet.ValueOf(1), parquet.ValueOf(2), parquet.ValueOf(3)}}}, + {testRowGetter{2}, [][]parquet.Value{{parquet.ValueOf(7), parquet.ValueOf(8), parquet.ValueOf(9)}}}, + {testRowGetter{8}, [][]parquet.Value{{parquet.ValueOf(10), parquet.ValueOf(11), parquet.ValueOf(12)}}}, + {testRowGetter{10}, [][]parquet.Value{{parquet.ValueOf(13), parquet.ValueOf(14), parquet.ValueOf(15)}}}, }, }, { @@ -222,7 +223,7 @@ func Test_RepeatedIterator(t *testing.T) { {8}, {10}, }, - rgs: [][]RepeatedTestRow{ + rgs: [][]repeatedTestRow{ { {[]int64{1, 2, 3}}, // 0 {[]int64{4, 5}}, @@ -243,10 +244,10 @@ func Test_RepeatedIterator(t *testing.T) { }, }, expected: []RepeatedRow[testRowGetter]{ - {testRowGetter{0}, []parquet.Value{parquet.ValueOf(1), parquet.ValueOf(2), parquet.ValueOf(3)}}, - {testRowGetter{2}, []parquet.Value{parquet.ValueOf(7)}}, - {testRowGetter{8}, []parquet.Value{parquet.ValueOf(10), parquet.ValueOf(11), parquet.ValueOf(12)}}, - {testRowGetter{10}, []parquet.Value{parquet.ValueOf(13), parquet.ValueOf(14)}}, + {testRowGetter{0}, [][]parquet.Value{{parquet.ValueOf(1), parquet.ValueOf(2), parquet.ValueOf(3)}}}, + {testRowGetter{2}, [][]parquet.Value{{parquet.ValueOf(7)}}}, + {testRowGetter{8}, [][]parquet.Value{{parquet.ValueOf(10), parquet.ValueOf(11), parquet.ValueOf(12)}}}, + {testRowGetter{10}, [][]parquet.Value{{parquet.ValueOf(13), parquet.ValueOf(14)}}}, }, }, } { @@ -262,9 +263,9 @@ func Test_RepeatedIterator(t *testing.T) { } groups = append(groups, buffer) } - actual := readPageIterator(t, - NewRepeatedPageIterator( - context.Background(), iter.NewSliceIterator(tc.rows), groups, 0, tc.readSize)) + actual := readRepeatedRowIterator(t, + NewRepeatedRowIterator(context.Background(), + iter.NewSliceIterator(tc.rows), groups, 0)) if diff := cmp.Diff(tc.expected, actual, int64ParquetComparer()); diff != "" { t.Errorf("result mismatch (-want +got):\n%s", diff) } @@ -274,31 +275,56 @@ func Test_RepeatedIterator(t *testing.T) { } } -type MultiRepeatedItem struct { +func Test_RepeatedRowIterator_Cancellation(t *testing.T) { + var groups []parquet.RowGroup + for _, rg := range [][]repeatedTestRow{ + { + {[]int64{1, 1, 1, 1}}, + {[]int64{2}}, + {[]int64{3, 4}}, + }, + } { + buffer := parquet.NewBuffer() + for _, row := range rg { + require.NoError(t, buffer.Write(row)) + } + groups = append(groups, buffer) + } + + rows := iter.NewSliceIterator([]testRowGetter{{0}}) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + it := NewRepeatedRowIterator(ctx, rows, groups, 0) + assert.False(t, it.Next()) + assert.Error(t, context.Canceled, it.Err()) + assert.NoError(t, it.Close()) +} + +type multiColumnItem struct { X int64 Y int64 } -type MultiRepeatedTestRow struct { - List []MultiRepeatedItem +type multiColumnRepeatedTestRow struct { + List []multiColumnItem } -func Test_MultiRepeatedPageIterator(t *testing.T) { +func Test_RepeatedRowPageIterator_MultipleColumns(t *testing.T) { for _, tc := range []struct { name string rows []testRowGetter - rgs [][]MultiRepeatedTestRow - expected []MultiRepeatedRow[testRowGetter] + rgs [][]multiColumnRepeatedTestRow + expected []RepeatedRow[testRowGetter] }{ { name: "single row group", rows: []testRowGetter{ {0}, }, - rgs: [][]MultiRepeatedTestRow{ + rgs: [][]multiColumnRepeatedTestRow{ { { - List: []MultiRepeatedItem{ + List: []multiColumnItem{ {1, 2}, {3, 4}, {5, 6}, @@ -306,7 +332,7 @@ func Test_MultiRepeatedPageIterator(t *testing.T) { }, }, }, - expected: []MultiRepeatedRow[testRowGetter]{ + expected: []RepeatedRow[testRowGetter]{ { testRowGetter{0}, [][]parquet.Value{ @@ -323,21 +349,21 @@ func Test_MultiRepeatedPageIterator(t *testing.T) { {4}, {7}, }, - rgs: [][]MultiRepeatedTestRow{ + rgs: [][]multiColumnRepeatedTestRow{ { - {List: []MultiRepeatedItem{{0, 0}, {0, 0}}}, - {List: []MultiRepeatedItem{{1, 2}, {3, 4}}}, // 1 - {List: []MultiRepeatedItem{{0, 0}, {0, 0}}}, + {List: []multiColumnItem{{0, 0}, {0, 0}}}, + {List: []multiColumnItem{{1, 2}, {3, 4}}}, // 1 + {List: []multiColumnItem{{0, 0}, {0, 0}}}, }, { - {List: []MultiRepeatedItem{{0, 0}, {0, 0}}}, - {List: []MultiRepeatedItem{{5, 6}, {7, 8}}}, // 4 - {List: []MultiRepeatedItem{{0, 0}, {0, 0}}}, - {List: []MultiRepeatedItem{{0, 0}, {0, 0}}}, - {List: []MultiRepeatedItem{{9, 10}}}, // 7 + {List: []multiColumnItem{{0, 0}, {0, 0}}}, + {List: []multiColumnItem{{5, 6}, {7, 8}}}, // 4 + {List: []multiColumnItem{{0, 0}, {0, 0}}}, + {List: []multiColumnItem{{0, 0}, {0, 0}}}, + {List: []multiColumnItem{{9, 10}}}, // 7 }, }, - expected: []MultiRepeatedRow[testRowGetter]{ + expected: []RepeatedRow[testRowGetter]{ { testRowGetter{1}, [][]parquet.Value{ @@ -372,13 +398,9 @@ func Test_MultiRepeatedPageIterator(t *testing.T) { } groups = append(groups, buffer) } - actual := readMultiPageIterator(t, - NewMultiRepeatedPageIterator( - NewRepeatedPageIterator( - context.Background(), iter.NewSliceIterator(tc.rows), groups, 0, 1000), - NewRepeatedPageIterator( - context.Background(), iter.NewSliceIterator(tc.rows), groups, 1, 1000), - ), + actual := readRepeatedRowIterator(t, + NewRepeatedRowIterator(context.Background(), + iter.NewSliceIterator(tc.rows), groups, 0, 1), ) if diff := cmp.Diff(tc.expected, actual, int64ParquetComparer()); diff != "" { t.Errorf("result mismatch (-want +got):\n%s", diff) @@ -387,37 +409,13 @@ func Test_MultiRepeatedPageIterator(t *testing.T) { } } -// readPageIterator reads all the values from the iterator and returns the result. -// Result are copied to avoid keeping reference between next calls. -func readPageIterator(t *testing.T, it iter.Iterator[*RepeatedRow[testRowGetter]]) []RepeatedRow[testRowGetter] { +func readRepeatedRowIterator(t *testing.T, it iter.Iterator[RepeatedRow[testRowGetter]]) []RepeatedRow[testRowGetter] { defer func() { require.NoError(t, it.Close()) }() var result []RepeatedRow[testRowGetter] for it.Next() { current := RepeatedRow[testRowGetter]{ - Row: it.At().Row, - Values: make([]parquet.Value, len(it.At().Values)), - } - copy(current.Values, it.At().Values) - if len(result) > 0 && current.Row.RowNumber() == result[len(result)-1].Row.RowNumber() { - result[len(result)-1].Values = append(result[len(result)-1].Values, current.Values...) - continue - } - - result = append(result, current) - } - require.NoError(t, it.Err()) - return result -} - -func readMultiPageIterator(t *testing.T, it iter.Iterator[*MultiRepeatedRow[testRowGetter]]) []MultiRepeatedRow[testRowGetter] { - defer func() { - require.NoError(t, it.Close()) - }() - var result []MultiRepeatedRow[testRowGetter] - for it.Next() { - current := MultiRepeatedRow[testRowGetter]{ Row: it.At().Row, Values: make([][]parquet.Value, len(it.At().Values)), } diff --git a/pkg/phlaredb/query/util.go b/pkg/phlaredb/query/util.go index 72f01b1be3..6e5b5cef29 100644 --- a/pkg/phlaredb/query/util.go +++ b/pkg/phlaredb/query/util.go @@ -3,10 +3,11 @@ package query import ( "strings" - pq "github.com/parquet-go/parquet-go" + "github.com/colega/zeropool" + "github.com/parquet-go/parquet-go" ) -func GetColumnIndexByPath(pf *pq.File, s string) (index, depth int) { +func GetColumnIndexByPath(pf *parquet.File, s string) (index, depth int) { colSelector := strings.Split(s, ".") n := pf.Root() for len(colSelector) > 0 { @@ -22,7 +23,127 @@ func GetColumnIndexByPath(pf *pq.File, s string) (index, depth int) { return n.Index(), depth } -func HasColumn(pf *pq.File, s string) bool { +func HasColumn(pf *parquet.File, s string) bool { index, _ := GetColumnIndexByPath(pf, s) return index >= 0 } + +func RowGroupBoundaries(groups []parquet.RowGroup) []int64 { + b := make([]int64, len(groups)) + var o int64 + for i := range b { + o += groups[i].NumRows() + b[i] = o + } + return b +} + +func SplitRows(rows, groups []int64) [][]int64 { + switch len(groups) { + case 0: + return nil + case 1: + return [][]int64{rows} + } + // Sanity check: max row must be less than + // the number of rows in the last group. + if rows[len(rows)-1] >= groups[len(groups)-1] { + panic(ErrSeekOutOfRange) + } + split := make([][]int64, len(groups)) + var j, r int + maxRow := groups[j] + for i, rn := range rows { + if rn < maxRow { + continue + } + split[j], rows = rows[:i-r], rows[i-r:] + r = i + // Find matching group. + for x, v := range groups[j:] { + if rn >= v { + continue + } + j += x + break + } + maxRow = groups[j] + } + // Last bit. + split[j] = rows + // Subtract group offset from the row numbers, + // which makes them local to the group. + for i, g := range split[1:] { + offset := groups[i] + for n := range g { + g[n] -= offset + } + } + return split +} + +var parquetValuesPool = zeropool.New(func() []parquet.Value { return nil }) + +func CloneParquetValues(values []parquet.Value) []parquet.Value { + p := parquetValuesPool.Get() + if l := len(values); cap(p) < l { + p = make([]parquet.Value, 0, 2*l) + } + p = p[:len(values)] + for i, v := range values { + p[i] = v.Clone() + } + return p +} + +func ReleaseParquetValues(b [][]parquet.Value) { + for _, s := range b { + if cap(s) > 0 { + parquetValuesPool.Put(s) + } + } +} + +var uint64valuesPool = zeropool.New(func() []uint64 { return nil }) + +func CloneUint64ParquetValues(values []parquet.Value) []uint64 { + uint64s := uint64valuesPool.Get() + if l := len(values); cap(uint64s) < l { + uint64s = make([]uint64, 0, 2*l) + } + uint64s = uint64s[:len(values)] + for i, v := range values { + uint64s[i] = v.Uint64() + } + return uint64s +} + +func ReleaseUint64Values(b [][]uint64) { + for _, s := range b { + if len(s) > 0 { + uint64valuesPool.Put(s) + } + } +} + +var uint32valuesPool = zeropool.New(func() []uint32 { return nil }) + +func CloneUint32ParquetValues(values []parquet.Value) []uint32 { + uint32s := uint32valuesPool.Get() + if l := len(values); cap(uint32s) < l { + uint32s = make([]uint32, 0, 2*l) + } + uint32s = uint32s[:len(values)] + for i, v := range values { + uint32s[i] = v.Uint32() + } + return uint32s +} + +func ReleaseUint32Values(b [][]uint32) { + for _, s := range b { + if len(s) > 0 { + uint32valuesPool.Put(s) + } + } +} diff --git a/pkg/phlaredb/query/util_test.go b/pkg/phlaredb/query/util_test.go new file mode 100644 index 0000000000..a8e6167ba7 --- /dev/null +++ b/pkg/phlaredb/query/util_test.go @@ -0,0 +1,63 @@ +package query + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_SplitRows(t *testing.T) { + type testCase struct { + rows []int64 + groups []int64 + expected [][]int64 + } + + testCases := []testCase{ + { + // [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, .............. 100] + // [ ][10 ][50 ][77 ][110] + // [0, 1, 2, 3, 4, 5, 6, 7, 8, 9][ 0, 1 ][ ][ 23][ ] + rows: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 100}, + groups: []int64{10, 50, 77, 101, 110}, + expected: [][]int64{{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, {0, 1}, nil, {23}, nil}, + }, + { + // * + // [0, 1][2, 3][4] + // [0, 1][0, 1][0] + // * + // [0][ ][ ] + rows: []int64{0}, + groups: []int64{2, 4, 5}, + expected: [][]int64{{0}, nil, nil}, + }, + { + // * + // [0, 1][2, 3][4] + // [0, 1][0, 1][0] + // * + // [ ][ ][0] + rows: []int64{4}, + groups: []int64{2, 4, 5}, + expected: [][]int64{{}, nil, {0}}, + }, + { + // * * * * + // [0, 1][2, 3][4, 5][6, 7][8, 9] + // [0, 1][0, 1][0, 1][0, 1][0, 1] + // * * * * + // [ ][ 1][0 ][ 1][ 1] + rows: []int64{3, 4, 7, 9}, + groups: []int64{2, 4, 6, 8, 10}, + expected: [][]int64{{}, {1}, {0}, {1}, {1}}, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run("", func(t *testing.T) { + assert.Equal(t, tc.expected, SplitRows(tc.rows, tc.groups)) + }) + } +} diff --git a/pkg/phlaredb/sample_merge.go b/pkg/phlaredb/sample_merge.go index 4327a31dfe..c9e425ad03 100644 --- a/pkg/phlaredb/sample_merge.go +++ b/pkg/phlaredb/sample_merge.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/google/pprof/profile" + "github.com/grafana/dskit/runutil" "github.com/opentracing/opentracing-go" "github.com/parquet-go/parquet-go" "github.com/prometheus/common/model" @@ -15,6 +16,7 @@ import ( "github.com/grafana/pyroscope/pkg/iter" phlaremodel "github.com/grafana/pyroscope/pkg/model" "github.com/grafana/pyroscope/pkg/phlaredb/query" + v1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" "github.com/grafana/pyroscope/pkg/phlaredb/symdb" ) @@ -43,7 +45,6 @@ func (b *singleBlockQuerier) MergePprof(ctx context.Context, rows iter.Iterator[ func (b *singleBlockQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByLabels - Block") defer sp.Finish() - m := make(seriesByLabels) columnName := "TotalValue" if b.meta.Version == 1 { @@ -71,63 +72,63 @@ type Source interface { RowGroups() []parquet.RowGroup } -func mergeByStacktraces(ctx context.Context, profileSource Source, rows iter.Iterator[Profile], r *symdb.Resolver) error { +func mergeByStacktraces(ctx context.Context, profileSource Source, rows iter.Iterator[Profile], r *symdb.Resolver) (err error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "mergeByStacktraces") defer sp.Finish() - // clone the rows to be able to iterate over them twice - multiRows, err := iter.CloneN(rows, 2) - if err != nil { + var columns v1.SampleColumns + if err = columns.Resolve(profileSource.Schema()); err != nil { return err } - it := query.NewMultiRepeatedPageIterator( - repeatedColumnIter(ctx, profileSource, "Samples.list.element.StacktraceID", multiRows[0]), - repeatedColumnIter(ctx, profileSource, "Samples.list.element.Value", multiRows[1]), + profiles := query.NewRepeatedRowIterator(ctx, rows, profileSource.RowGroups(), + columns.StacktraceID.ColumnIndex, + columns.Value.ColumnIndex, ) - defer it.Close() - for it.Next() { - values := it.At().Values - p := r.Partition(it.At().Row.StacktracePartition()) - for i := 0; i < len(values[0]); i++ { - p[uint32(values[0][i].Int64())] += values[1][i].Int64() + defer runutil.CloseWithErrCapture(&err, profiles, "failed to close profile stream") + for profiles.Next() { + p := profiles.At() + partition := r.Partition(p.Row.StacktracePartition()) + stacktraces := p.Values[0] + values := p.Values[1] + for i, sid := range stacktraces { + partition[sid.Uint32()] += values[i].Int64() } } - return it.Err() + return profiles.Err() } -func mergeBySpans(ctx context.Context, profileSource Source, rows iter.Iterator[Profile], r *symdb.Resolver, spanSelector phlaremodel.SpanSelector) error { +func mergeBySpans(ctx context.Context, profileSource Source, rows iter.Iterator[Profile], r *symdb.Resolver, spanSelector phlaremodel.SpanSelector) (err error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "mergeBySpans") defer sp.Finish() - if _, found := profileSource.Schema().Lookup(strings.Split("Samples.list.element.SpanID", ".")...); !found { - return nil - } - // clone the rows to be able to iterate over them twice - multiRows, err := iter.CloneN(rows, 3) - if err != nil { + var columns v1.SampleColumns + if err = columns.Resolve(profileSource.Schema()); err != nil { return err } - it := query.NewMultiRepeatedPageIterator( - repeatedColumnIter(ctx, profileSource, "Samples.list.element.StacktraceID", multiRows[0]), - repeatedColumnIter(ctx, profileSource, "Samples.list.element.Value", multiRows[1]), - repeatedColumnIter(ctx, profileSource, "Samples.list.element.SpanID", multiRows[2]), + if !columns.HasSpanID() { + return nil + } + profiles := query.NewRepeatedRowIterator(ctx, rows, profileSource.RowGroups(), + columns.StacktraceID.ColumnIndex, + columns.Value.ColumnIndex, + columns.SpanID.ColumnIndex, ) - defer it.Close() - for it.Next() { - values := it.At().Values - p := r.Partition(it.At().Row.StacktracePartition()) - stacktraces := values[0] - sampleValues := values[1] - spans := values[2] - for i := 0; i < len(stacktraces); i++ { + defer runutil.CloseWithErrCapture(&err, profiles, "failed to close profile stream") + for profiles.Next() { + p := profiles.At() + partition := r.Partition(p.Row.StacktracePartition()) + stacktraces := p.Values[0] + values := p.Values[1] + spans := p.Values[2] + for i, sid := range stacktraces { spanID := spans[i].Uint64() if spanID == 0 { continue } if _, ok := spanSelector[spanID]; ok { - p[uint32(stacktraces[i].Int64())] += sampleValues[i].Int64() + partition[sid.Uint32()] += values[i].Int64() } } } - return it.Err() + return profiles.Err() } type seriesByLabels map[string]*typesv1.Series @@ -146,20 +147,23 @@ func (m seriesByLabels) normalize() []*typesv1.Series { return result } -func mergeByLabels(ctx context.Context, profileSource Source, columnName string, rows iter.Iterator[Profile], m seriesByLabels, by ...string) error { - it := repeatedColumnIter(ctx, profileSource, columnName, rows) - - defer it.Close() +func mergeByLabels(ctx context.Context, profileSource Source, columnName string, rows iter.Iterator[Profile], m seriesByLabels, by ...string) (err error) { + column, err := v1.ResolveColumnByPath(profileSource.Schema(), strings.Split(columnName, ".")) + if err != nil { + return err + } + profiles := query.NewRepeatedRowIterator(ctx, rows, profileSource.RowGroups(), column.ColumnIndex) + defer runutil.CloseWithErrCapture(&err, profiles, "failed to close profile stream") labelsByFingerprint := map[model.Fingerprint]string{} labelBuf := make([]byte, 0, 1024) - for it.Next() { - values := it.At() + for profiles.Next() { + values := profiles.At() p := values.Row var total int64 for _, e := range values.Values { - total += e.Int64() + total += e[0].Int64() } labelsByString, ok := labelsByFingerprint[p.Fingerprint()] if !ok { @@ -185,5 +189,5 @@ func mergeByLabels(ctx context.Context, profileSource Source, columnName string, Value: float64(total), }) } - return it.Err() + return profiles.Err() } diff --git a/pkg/phlaredb/schemas/v1/profiles.go b/pkg/phlaredb/schemas/v1/profiles.go index 7de4a9eb59..afb147c715 100644 --- a/pkg/phlaredb/schemas/v1/profiles.go +++ b/pkg/phlaredb/schemas/v1/profiles.go @@ -5,6 +5,7 @@ import ( "io" "math" "sort" + "strings" "unsafe" "github.com/google/uuid" @@ -79,6 +80,44 @@ func init() { stacktracePartitionColIndex = stacktracePartitionCol.ColumnIndex } +var ( + sampleStacktraceIDColumnPath = strings.Split("Samples.list.element.StacktraceID", ".") + sampleValueColumnPath = strings.Split("Samples.list.element.Value", ".") + sampleSpanIDColumnPath = strings.Split("Samples.list.element.SpanID", ".") +) + +var ErrColumnNotFound = fmt.Errorf("column path not found") + +type SampleColumns struct { + StacktraceID parquet.LeafColumn + Value parquet.LeafColumn + SpanID parquet.LeafColumn +} + +func (c *SampleColumns) Resolve(schema *parquet.Schema) error { + var err error + if c.StacktraceID, err = ResolveColumnByPath(schema, sampleStacktraceIDColumnPath); err != nil { + return err + } + if c.Value, err = ResolveColumnByPath(schema, sampleValueColumnPath); err != nil { + return err + } + // Optional. + c.SpanID, _ = ResolveColumnByPath(schema, sampleSpanIDColumnPath) + return nil +} + +func (c *SampleColumns) HasSpanID() bool { + return c.SpanID.Node != nil +} + +func ResolveColumnByPath(schema *parquet.Schema, path []string) (parquet.LeafColumn, error) { + if c, ok := schema.Lookup(path...); ok { + return c, nil + } + return parquet.LeafColumn{}, fmt.Errorf("%w: %v", ErrColumnNotFound, path) +} + type Sample struct { StacktraceID uint64 `parquet:",delta"` Value int64 `parquet:",delta"`