Skip to content

Commit 1f811a0

Browse files
authored
Revert "perf(v2): static parquet page buffer size (#4208)" (#4221)
This reverts commit 77754da.
1 parent e7da795 commit 1f811a0

File tree

6 files changed

+49
-15
lines changed

6 files changed

+49
-15
lines changed

pkg/experiment/block/block.go

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,29 @@ const (
2323
symbolsPrefetchSize = 32 << 10
2424
)
2525

26-
const (
27-
// Each 2MB translates to an I/O read op.
28-
parquetReadBufferSize = 2 << 20
29-
parquetPageWriteBufferSize = 1 << 20
30-
)
26+
func estimateReadBufferSize(s int64) int {
27+
const minSize = 64 << 10
28+
const maxSize = 1 << 20
29+
// Parquet has global buffer map, where buffer size is key,
30+
// so we want a low cardinality here.
31+
e := nextPowerOfTwo(uint32(s / 10))
32+
if e < minSize {
33+
return minSize
34+
}
35+
return int(min(e, maxSize))
36+
}
37+
38+
// This is a verbatim copy of estimateReadBufferSize.
39+
// It's kept for the sake of clarity and to avoid confusion.
40+
func estimatePageBufferSize(s int64) int {
41+
const minSize = 64 << 10
42+
const maxSize = 1 << 20
43+
e := nextPowerOfTwo(uint32(s / 10))
44+
if e < minSize {
45+
return minSize
46+
}
47+
return int(min(e, maxSize))
48+
}
3149

3250
func estimateFooterSize(size int64) int64 {
3351
var s int64
@@ -49,3 +67,17 @@ func estimateFooterSize(size int64) int64 {
4967
}
5068
return s
5169
}
70+
71+
func nextPowerOfTwo(n uint32) uint32 {
72+
if n == 0 {
73+
return 1
74+
}
75+
n--
76+
n |= n >> 1
77+
n |= n >> 2
78+
n |= n >> 4
79+
n |= n >> 8
80+
n |= n >> 16
81+
n++
82+
return n
83+
}

pkg/experiment/block/compaction.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,13 @@ func (m *datasetCompaction) registerSampleObserver(observer SampleObserver) {
374374
}
375375

376376
func (m *datasetCompaction) open(ctx context.Context, w io.Writer) (err error) {
377-
m.profilesWriter = newProfileWriter(w)
377+
var estimatedProfileTableSize int64
378+
for _, ds := range m.datasets {
379+
estimatedProfileTableSize += ds.sectionSize(SectionProfiles)
380+
}
381+
pageBufferSize := estimatePageBufferSize(estimatedProfileTableSize)
382+
m.profilesWriter = newProfileWriter(pageBufferSize, w)
383+
378384
m.indexRewriter = newIndexRewriter()
379385
m.symbolsRewriter = newSymbolsRewriter()
380386

pkg/experiment/block/object.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,6 @@ func (obj *Object) Download(ctx context.Context) error {
196196
return nil
197197
}
198198

199-
func (obj *Object) Path() string { return obj.path }
200-
201199
func (obj *Object) Metadata() *metastorev1.BlockMeta { return obj.meta }
202200

203201
func (obj *Object) SetMetadata(md *metastorev1.BlockMeta) { obj.meta = md }

pkg/experiment/block/section_profiles.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func openProfileTable(_ context.Context, s *Dataset) (err error) {
4141
estimateFooterSize(size),
4242
parquet.SkipBloomFilters(true),
4343
parquet.FileReadMode(parquet.ReadModeAsync),
44-
parquet.ReadBufferSize(parquetReadBufferSize))
44+
parquet.ReadBufferSize(estimateReadBufferSize(size)))
4545
}
4646
if err != nil {
4747
return fmt.Errorf("opening profile parquet table: %w", err)
@@ -183,12 +183,12 @@ type profilesWriter struct {
183183
profiles uint64
184184
}
185185

186-
func newProfileWriter(w io.Writer) *profilesWriter {
186+
func newProfileWriter(pageBufferSize int, w io.Writer) *profilesWriter {
187187
return &profilesWriter{
188188
buf: make([]parquet.Row, 1),
189189
GenericWriter: parquet.NewGenericWriter[*schemav1.Profile](w,
190190
parquet.CreatedBy("github.com/grafana/pyroscope/", build.Version, build.Revision),
191-
parquet.PageBufferSize(parquetPageWriteBufferSize),
191+
parquet.PageBufferSize(pageBufferSize),
192192
// Note that parquet keeps ALL RG pages in memory (ColumnPageBuffers).
193193
parquet.MaxRowsPerRowGroup(maxRowsPerRowGroup),
194194
schemav1.ProfilesSchema,

pkg/experiment/ingester/memdb/profiles.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/grafana/pyroscope/pkg/util/build"
1111
)
1212

13-
const segmentsParquetWriteBufferSize = 256 << 10
13+
const segmentsParquetWriteBufferSize = 32 << 10
1414

1515
func WriteProfiles(metrics *HeadMetrics, profiles []v1.InMemoryProfile) ([]byte, error) {
1616
buf := &bytes.Buffer{}

pkg/experiment/query_backend/query.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,7 @@ type blockContext struct {
8686

8787
func (b *blockContext) execute() error {
8888
var span opentracing.Span
89-
span, b.ctx = opentracing.StartSpanFromContext(b.ctx, "blockContext.execute", opentracing.Tags{
90-
"object_name": b.obj.Path(),
91-
})
89+
span, b.ctx = opentracing.StartSpanFromContext(b.ctx, "blockContext.execute")
9290
defer span.Finish()
9391

9492
if idx := b.datasetIndex(); idx != nil {

0 commit comments

Comments
 (0)