Skip to content

Commit 63561d7

Browse files
storage: Refactor block compaction to allow shard-splitting (#2366)
* storage: Refactor block compaction to allow shard-splitting * Rename to CompactWithSplitting for consistency * Introduce back compaction series testing. * Add sharding compaction level * Add some tests for CompactWithSplitting * Fix symbols split-compaction (#2371) * Fixes a race that was actually surfacing another real issue. * Add a tests for meta min/max time * Fixes meta min/max after split --------- Co-authored-by: Anton Kolesnikov <[email protected]>
1 parent cb6420d commit 63561d7

File tree

10 files changed

+802
-222
lines changed

10 files changed

+802
-222
lines changed

pkg/phlaredb/compact.go

Lines changed: 272 additions & 190 deletions
Large diffs are not rendered by default.

pkg/phlaredb/compact_test.go

Lines changed: 165 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
_ "net/http/pprof"
77
"os"
88
"path/filepath"
9+
"sort"
910
"testing"
1011
"time"
1112

@@ -14,6 +15,7 @@ import (
1415
"github.com/prometheus/common/model"
1516
"github.com/prometheus/prometheus/storage"
1617
"github.com/prometheus/prometheus/tsdb"
18+
"github.com/samber/lo"
1719
"github.com/stretchr/testify/assert"
1820
"github.com/stretchr/testify/require"
1921

@@ -23,6 +25,7 @@ import (
2325
"github.com/grafana/pyroscope/pkg/objstore/client"
2426
"github.com/grafana/pyroscope/pkg/objstore/providers/filesystem"
2527
"github.com/grafana/pyroscope/pkg/phlaredb/block"
28+
"github.com/grafana/pyroscope/pkg/phlaredb/sharding"
2629
"github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index"
2730
"github.com/grafana/pyroscope/pkg/pprof/testhelper"
2831
)
@@ -85,6 +88,153 @@ func TestCompact(t *testing.T) {
8588
require.Equal(t, expected.String(), res.String())
8689
}
8790

91+
func TestCompactWithSplitting(t *testing.T) {
92+
ctx := context.Background()
93+
94+
b1 := newBlock(t, func() []*testhelper.ProfileBuilder {
95+
return append(
96+
profileSeriesGenerator(t, time.Unix(1, 0), time.Unix(10, 0), time.Second, "job", "a"),
97+
profileSeriesGenerator(t, time.Unix(11, 0), time.Unix(20, 0), time.Second, "job", "b")...,
98+
)
99+
})
100+
b2 := newBlock(t, func() []*testhelper.ProfileBuilder {
101+
return append(
102+
append(
103+
append(
104+
profileSeriesGenerator(t, time.Unix(1, 0), time.Unix(10, 0), time.Second, "job", "c"),
105+
profileSeriesGenerator(t, time.Unix(11, 0), time.Unix(20, 0), time.Second, "job", "d")...,
106+
), profileSeriesGenerator(t, time.Unix(1, 0), time.Unix(10, 0), time.Second, "job", "a")...,
107+
),
108+
profileSeriesGenerator(t, time.Unix(11, 0), time.Unix(20, 0), time.Second, "job", "b")...,
109+
)
110+
})
111+
dst := t.TempDir()
112+
compacted, err := CompactWithSplitting(ctx, []BlockReader{b1, b2, b2, b1}, 16, dst)
113+
require.NoError(t, err)
114+
115+
// 4 shards one per series.
116+
require.Equal(t, 4, len(compacted))
117+
require.Equal(t, "1_of_16", compacted[0].Labels[sharding.CompactorShardIDLabel])
118+
require.Equal(t, "6_of_16", compacted[1].Labels[sharding.CompactorShardIDLabel])
119+
require.Equal(t, "7_of_16", compacted[2].Labels[sharding.CompactorShardIDLabel])
120+
require.Equal(t, "14_of_16", compacted[3].Labels[sharding.CompactorShardIDLabel])
121+
122+
// The series b should span from 11 to 20 and not 1 to 20.
123+
require.Equal(t, model.TimeFromUnix(11), compacted[1].MinTime)
124+
require.Equal(t, model.TimeFromUnix(20), compacted[1].MaxTime)
125+
126+
// We first verify we have all series and timestamps across querying all blocks.
127+
queriers := make(Queriers, len(compacted))
128+
for i, blk := range compacted {
129+
queriers[i] = blockQuerierFromMeta(t, dst, blk)
130+
}
131+
132+
err = queriers.Open(context.Background())
133+
require.NoError(t, err)
134+
matchAll := &ingesterv1.SelectProfilesRequest{
135+
LabelSelector: "{}",
136+
Type: mustParseProfileSelector(t, "process_cpu:cpu:nanoseconds:cpu:nanoseconds"),
137+
Start: 0,
138+
End: 40000,
139+
}
140+
it, err := queriers.SelectMatchingProfiles(context.Background(), matchAll)
141+
require.NoError(t, err)
142+
143+
seriesMap := make(map[model.Fingerprint]lo.Tuple2[phlaremodel.Labels, []model.Time])
144+
for it.Next() {
145+
r := it.At()
146+
seriesMap[r.Fingerprint()] = lo.T2(r.Labels().WithoutPrivateLabels(), append(seriesMap[r.Fingerprint()].B, r.Timestamp()))
147+
}
148+
require.NoError(t, it.Err())
149+
require.NoError(t, it.Close())
150+
series := lo.Values(seriesMap)
151+
sort.Slice(series, func(i, j int) bool {
152+
return phlaremodel.CompareLabelPairs(series[i].A, series[j].A) < 0
153+
})
154+
require.Equal(t, []lo.Tuple2[phlaremodel.Labels, []model.Time]{
155+
lo.T2(phlaremodel.LabelsFromStrings("job", "a"),
156+
generateTimes(t, model.TimeFromUnix(1), model.TimeFromUnix(10)),
157+
),
158+
lo.T2(phlaremodel.LabelsFromStrings("job", "b"),
159+
generateTimes(t, model.TimeFromUnix(11), model.TimeFromUnix(20)),
160+
),
161+
lo.T2(phlaremodel.LabelsFromStrings("job", "c"),
162+
generateTimes(t, model.TimeFromUnix(1), model.TimeFromUnix(10)),
163+
),
164+
lo.T2(phlaremodel.LabelsFromStrings("job", "d"),
165+
generateTimes(t, model.TimeFromUnix(11), model.TimeFromUnix(20)),
166+
),
167+
}, series)
168+
169+
// Then we query 2 different shards and verify we have a subset of series.
170+
it, err = queriers[0].SelectMatchingProfiles(ctx, matchAll)
171+
require.NoError(t, err)
172+
seriesResult, err := queriers[0].MergeByLabels(context.Background(), it, "job")
173+
require.NoError(t, err)
174+
require.Equal(t,
175+
[]*typesv1.Series{
176+
{
177+
Labels: phlaremodel.LabelsFromStrings("job", "a"),
178+
Points: generatePoints(t, model.TimeFromUnix(1), model.TimeFromUnix(10)),
179+
},
180+
}, seriesResult)
181+
182+
it, err = queriers[1].SelectMatchingProfiles(ctx, matchAll)
183+
require.NoError(t, err)
184+
seriesResult, err = queriers[1].MergeByLabels(context.Background(), it, "job")
185+
require.NoError(t, err)
186+
require.Equal(t,
187+
[]*typesv1.Series{
188+
{
189+
Labels: phlaremodel.LabelsFromStrings("job", "b"),
190+
Points: generatePoints(t, model.TimeFromUnix(11), model.TimeFromUnix(20)),
191+
},
192+
}, seriesResult)
193+
194+
// Finally test some stacktraces resolution.
195+
it, err = queriers[1].SelectMatchingProfiles(ctx, matchAll)
196+
require.NoError(t, err)
197+
res, err := queriers[1].MergeByStacktraces(ctx, it)
198+
require.NoError(t, err)
199+
200+
expected := new(phlaremodel.Tree)
201+
expected.InsertStack(10, "baz", "bar", "foo")
202+
require.Equal(t, expected.String(), res.String())
203+
}
204+
205+
// nolint:unparam
206+
func profileSeriesGenerator(t *testing.T, from, through time.Time, interval time.Duration, lbls ...string) []*testhelper.ProfileBuilder {
207+
t.Helper()
208+
var builders []*testhelper.ProfileBuilder
209+
for ts := from; ts.Before(through) || ts.Equal(through); ts = ts.Add(interval) {
210+
builders = append(builders,
211+
testhelper.NewProfileBuilder(ts.UnixNano()).
212+
CPUProfile().
213+
WithLabels(
214+
lbls...,
215+
).ForStacktraceString("foo", "bar", "baz").AddSamples(1))
216+
}
217+
return builders
218+
}
219+
220+
func generatePoints(t *testing.T, from, through model.Time) []*typesv1.Point {
221+
t.Helper()
222+
var points []*typesv1.Point
223+
for ts := from; ts.Before(through) || ts.Equal(through); ts = ts.Add(time.Second) {
224+
points = append(points, &typesv1.Point{Timestamp: int64(ts), Value: 1})
225+
}
226+
return points
227+
}
228+
229+
func generateTimes(t *testing.T, from, through model.Time) []model.Time {
230+
t.Helper()
231+
var times []model.Time
232+
for ts := from; ts.Before(through) || ts.Equal(through); ts = ts.Add(time.Second) {
233+
times = append(times, ts)
234+
}
235+
return times
236+
}
237+
88238
func TestProfileRowIterator(t *testing.T) {
89239
b := newBlock(t, func() []*testhelper.ProfileBuilder {
90240
return []*testhelper.ProfileBuilder{
@@ -268,28 +418,22 @@ func TestSeriesRewriter(t *testing.T) {
268418
})
269419
rows, err := newProfileRowIterator(blk)
270420
require.NoError(t, err)
271-
filePath := filepath.Join(t.TempDir(), block.IndexFilename)
272-
idxw, err := prepareIndexWriter(context.Background(), filePath, []BlockReader{blk})
273-
require.NoError(t, err)
274-
it := newSeriesRewriter(rows, idxw)
275-
// tests that all rows are written to the correct series index
276-
require.True(t, it.Next())
277-
require.Equal(t, uint32(0), it.At().row.SeriesIndex())
278-
require.True(t, it.Next())
279-
require.Equal(t, uint32(0), it.At().row.SeriesIndex())
280-
require.True(t, it.Next())
281-
require.Equal(t, uint32(0), it.At().row.SeriesIndex())
282-
require.True(t, it.Next())
283-
require.Equal(t, uint32(1), it.At().row.SeriesIndex())
284-
require.True(t, it.Next())
285-
require.Equal(t, uint32(2), it.At().row.SeriesIndex())
286-
require.True(t, it.Next())
287-
require.Equal(t, uint32(2), it.At().row.SeriesIndex())
288-
require.False(t, it.Next())
421+
path := t.TempDir()
422+
filePath := filepath.Join(path, block.IndexFilename)
423+
idxw := newIndexRewriter(path)
424+
seriesIdx := []uint32{}
425+
for rows.Next() {
426+
r := rows.At()
427+
require.NoError(t, idxw.ReWriteRow(r))
428+
seriesIdx = append(seriesIdx, r.row.SeriesIndex())
429+
}
430+
require.NoError(t, rows.Err())
431+
require.NoError(t, rows.Close())
289432

290-
require.NoError(t, it.Err())
291-
require.NoError(t, it.Close())
292-
require.NoError(t, idxw.Close())
433+
require.Equal(t, []uint32{0, 0, 0, 1, 2, 2}, seriesIdx)
434+
435+
err = idxw.Close(context.Background())
436+
require.NoError(t, err)
293437

294438
idxr, err := index.NewFileReader(filePath)
295439
require.NoError(t, err)

pkg/phlaredb/head.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ func (h *Head) flush(ctx context.Context) error {
488488
// It must be guaranteed that no new inserts will happen
489489
// after the call start.
490490
h.inFlightProfiles.Wait()
491-
if len(h.profiles.slice) == 0 {
491+
if h.profiles.index.totalProfiles.Load() == 0 {
492492
level.Info(h.logger).Log("msg", "head empty - no block written")
493493
return os.RemoveAll(h.headPath)
494494
}

pkg/phlaredb/profile_store.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,13 @@ type profileStore struct {
6262
flushBufferLbs []phlaremodel.Labels
6363
}
6464

65-
func newProfileWriter(writer io.Writer) *parquet.GenericWriter[*schemav1.Profile] {
66-
return parquet.NewGenericWriter[*schemav1.Profile](writer, schemav1.ProfilesSchema,
67-
parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "pyroscopedb-parquet-buffers*")),
68-
parquet.CreatedBy("github.com/grafana/pyroscope/", build.Version, build.Revision),
69-
parquet.PageBufferSize(3*1024*1024),
65+
func newParquetProfileWriter(writer io.Writer, options ...parquet.WriterOption) *parquet.GenericWriter[*schemav1.Profile] {
66+
options = append(options, parquet.PageBufferSize(3*1024*1024))
67+
options = append(options, parquet.CreatedBy("github.com/grafana/pyroscope/", build.Version, build.Revision))
68+
options = append(options, parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "pyroscopedb-parquet-buffers*")))
69+
options = append(options, schemav1.ProfilesSchema)
70+
return parquet.NewGenericWriter[*schemav1.Profile](
71+
writer, options...,
7072
)
7173
}
7274

@@ -82,7 +84,7 @@ func newProfileStore(phlarectx context.Context) *profileStore {
8284
go s.cutRowGroupLoop()
8385
// Initialize writer on /dev/null
8486
// TODO: Reuse parquet.Writer beyond life time of the head.
85-
s.writer = newProfileWriter(io.Discard)
87+
s.writer = newParquetProfileWriter(io.Discard)
8688

8789
return s
8890
}

pkg/phlaredb/schemas/v1/functions.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,8 @@ type InMemoryFunction struct {
5353
// Line number in source file.
5454
StartLine uint32
5555
}
56+
57+
func (f *InMemoryFunction) Clone() *InMemoryFunction {
58+
n := *f
59+
return &n
60+
}

pkg/phlaredb/schemas/v1/locations.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,13 @@ type InMemoryLocation struct {
110110
Line []InMemoryLine
111111
}
112112

113+
func (l *InMemoryLocation) Clone() *InMemoryLocation {
114+
x := *l
115+
x.Line = make([]InMemoryLine, len(l.Line))
116+
copy(x.Line, l.Line)
117+
return &x
118+
}
119+
113120
type InMemoryLine struct {
114121
// The id of the corresponding profile.Function for this line.
115122
FunctionId uint32

pkg/phlaredb/schemas/v1/mappings.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,8 @@ type InMemoryMapping struct {
7373
HasLineNumbers bool
7474
HasInlineFrames bool
7575
}
76+
77+
func (m *InMemoryMapping) Clone() *InMemoryMapping {
78+
n := *m
79+
return &n
80+
}

pkg/phlaredb/sharding/label.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// SPDX-License-Identifier: AGPL-3.0-only
2+
3+
package sharding
4+
5+
import (
6+
"fmt"
7+
"strconv"
8+
"strings"
9+
10+
"github.com/pkg/errors"
11+
"github.com/prometheus/prometheus/model/labels"
12+
)
13+
14+
const (
15+
// ShardLabel is a reserved label referencing a shard on read path.
16+
ShardLabel = "__query_shard__"
17+
// CompactorShardIDLabel is the external label used to store
18+
// the ID of a sharded block generated by the split-and-merge compactor. If a block hasn't
19+
// this label, it means the block hasn't been split.
20+
CompactorShardIDLabel = "__compactor_shard_id__"
21+
)
22+
23+
// ShardSelector holds information about the configured query shard.
24+
type ShardSelector struct {
25+
ShardIndex uint64
26+
ShardCount uint64
27+
}
28+
29+
// LabelValue returns the label value to use to select this shard.
30+
func (shard ShardSelector) LabelValue() string {
31+
return FormatShardIDLabelValue(shard.ShardIndex, shard.ShardCount)
32+
}
33+
34+
// Label generates the ShardSelector as a label.
35+
func (shard ShardSelector) Label() labels.Label {
36+
return labels.Label{
37+
Name: ShardLabel,
38+
Value: shard.LabelValue(),
39+
}
40+
}
41+
42+
// Matcher converts ShardSelector to Matcher.
43+
func (shard ShardSelector) Matcher() *labels.Matcher {
44+
return labels.MustNewMatcher(labels.MatchEqual, ShardLabel, shard.LabelValue())
45+
}
46+
47+
// ShardFromMatchers extracts a ShardSelector and the index it was pulled from the matcher list.
48+
func ShardFromMatchers(matchers []*labels.Matcher) (shard *ShardSelector, idx int, err error) {
49+
for i, matcher := range matchers {
50+
if matcher.Name == ShardLabel && matcher.Type == labels.MatchEqual {
51+
index, count, err := ParseShardIDLabelValue(matcher.Value)
52+
if err != nil {
53+
return nil, i, err
54+
}
55+
return &ShardSelector{
56+
ShardIndex: index,
57+
ShardCount: count,
58+
}, i, nil
59+
}
60+
}
61+
return nil, 0, nil
62+
}
63+
64+
// RemoveShardFromMatchers returns the input matchers without the label matcher on the query shard (if any).
65+
func RemoveShardFromMatchers(matchers []*labels.Matcher) (shard *ShardSelector, filtered []*labels.Matcher, err error) {
66+
shard, idx, err := ShardFromMatchers(matchers)
67+
if err != nil || shard == nil {
68+
return nil, matchers, err
69+
}
70+
71+
// Create a new slice with the shard matcher removed.
72+
filtered = make([]*labels.Matcher, 0, len(matchers)-1)
73+
filtered = append(filtered, matchers[:idx]...)
74+
filtered = append(filtered, matchers[idx+1:]...)
75+
76+
return shard, filtered, nil
77+
}
78+
79+
// FormatShardIDLabelValue expects 0-based shardID, but uses 1-based shard in the output string.
80+
func FormatShardIDLabelValue(shardID, shardCount uint64) string {
81+
return fmt.Sprintf("%d_of_%d", shardID+1, shardCount)
82+
}
83+
84+
// ParseShardIDLabelValue returns original (0-based) shard index and shard count parsed from formatted value.
85+
func ParseShardIDLabelValue(val string) (index, shardCount uint64, _ error) {
86+
// If we fail to parse shardID, we better not consider this block fully included in successors.
87+
matches := strings.Split(val, "_")
88+
if len(matches) != 3 || matches[1] != "of" {
89+
return 0, 0, errors.Errorf("invalid shard ID: %q", val)
90+
}
91+
92+
index, err := strconv.ParseUint(matches[0], 10, 64)
93+
if err != nil {
94+
return 0, 0, errors.Errorf("invalid shard ID: %q: %v", val, err)
95+
}
96+
count, err := strconv.ParseUint(matches[2], 10, 64)
97+
if err != nil {
98+
return 0, 0, errors.Errorf("invalid shard ID: %q: %v", val, err)
99+
}
100+
101+
if index == 0 || count == 0 || index > count {
102+
return 0, 0, errors.Errorf("invalid shard ID: %q", val)
103+
}
104+
105+
return index - 1, count, nil
106+
}

0 commit comments

Comments
 (0)