@@ -2,7 +2,9 @@ package phlaredb
2
2
3
3
import (
4
4
"context"
5
+ "crypto/md5"
5
6
"fmt"
7
+ "io"
6
8
"os"
7
9
"strings"
8
10
"testing"
@@ -79,6 +81,53 @@ func sameProfileStream(i int) *testProfile {
79
81
return tp
80
82
}
81
83
84
+ // This will simulate a profile stream which ends and a new one starts at i > boundary
85
+ func profileStreamEndingAndStarting (boundary int ) func (int ) * testProfile {
86
+ return func (i int ) * testProfile {
87
+ tp := & testProfile {}
88
+
89
+ series := "at-beginning"
90
+ if i > boundary {
91
+ series = "at-end"
92
+ }
93
+
94
+ tp .profileName = "process_cpu:cpu:nanoseconds:cpu:nanoseconds"
95
+ tp .lbls = phlaremodel .LabelsFromStrings (
96
+ phlaremodel .LabelNameProfileType , tp .profileName ,
97
+ "job" , "test" ,
98
+ "stream" , series ,
99
+ )
100
+
101
+ tp .p .ID = uuid .MustParse (fmt .Sprintf ("00000000-0000-0000-0000-%012d" , i ))
102
+ tp .p .TimeNanos = time .Second .Nanoseconds () * int64 (i )
103
+ tp .p .Samples = []* schemav1.Sample {
104
+ {
105
+ StacktraceID : 0x1 ,
106
+ Value : 10.0 ,
107
+ },
108
+ }
109
+ tp .populateFingerprint ()
110
+ return tp
111
+ }
112
+ }
113
+
114
+ func nProfileStreams (n int ) func (int ) * testProfile {
115
+ return func (i int ) * testProfile {
116
+ tp := sameProfileStream (i / n )
117
+
118
+ tp .lbls = phlaremodel .LabelsFromStrings (
119
+ phlaremodel .LabelNameProfileType , tp .profileName ,
120
+ "job" , "test" ,
121
+ "stream" , fmt .Sprintf ("%x" , md5 .Sum ([]byte (fmt .Sprintf ("%d" , i % n )))),
122
+ )
123
+ tp .p .ID = uuid .MustParse (fmt .Sprintf ("00000000-0000-0000-0000-%012d" , i ))
124
+
125
+ tp .populateFingerprint ()
126
+ return tp
127
+
128
+ }
129
+ }
130
+
82
131
func readFullParquetFile [M any ](t * testing.T , path string ) ([]M , uint64 ) {
83
132
f , err := os .Open (path )
84
133
require .NoError (t , err )
@@ -95,8 +144,15 @@ func readFullParquetFile[M any](t *testing.T, path string) ([]M, uint64) {
95
144
reader := parquet.NewGenericReader [M ](f )
96
145
97
146
slice := make ([]M , reader .NumRows ())
98
- _ , err = reader .Read (slice )
99
- require .NoError (t , err )
147
+ offset := 0
148
+ for {
149
+ n , err := reader .Read (slice [offset :])
150
+ if err == io .EOF {
151
+ break
152
+ }
153
+ require .NoError (t , err )
154
+ offset += n
155
+ }
100
156
101
157
return slice , numRGs
102
158
}
@@ -131,13 +187,27 @@ func TestProfileStore_RowGroupSplitting(t *testing.T) {
131
187
expectedNumRows : 100 ,
132
188
values : sameProfileStream ,
133
189
},
190
+ {
191
+ name : "a stream ending after half of the samples and a new one starting" ,
192
+ cfg : & ParquetConfig {MaxRowGroupBytes : 1828 , MaxBufferRowCount : 100000 },
193
+ expectedNumRGs : 10 ,
194
+ expectedNumRows : 100 ,
195
+ values : profileStreamEndingAndStarting (50 ),
196
+ },
134
197
{
135
198
name : "multiple row groups because of maximum row num" ,
136
199
cfg : & ParquetConfig {MaxRowGroupBytes : 128000 , MaxBufferRowCount : 10 },
137
200
expectedNumRGs : 10 ,
138
201
expectedNumRows : 100 ,
139
202
values : sameProfileStream ,
140
203
},
204
+ {
205
+ name : "a single sample per series" ,
206
+ cfg : & ParquetConfig {MaxRowGroupBytes : 1828 , MaxBufferRowCount : 100000 },
207
+ expectedNumRGs : 10 ,
208
+ expectedNumRows : 100 ,
209
+ values : nProfileStreams (100 ),
210
+ },
141
211
} {
142
212
t .Run (tc .name , func (t * testing.T ) {
143
213
path := t .TempDir ()
@@ -164,9 +234,19 @@ func TestProfileStore_RowGroupSplitting(t *testing.T) {
164
234
rows , numRGs := readFullParquetFile [* schemav1.Profile ](t , path + "/profiles.parquet" )
165
235
require .Equal (t , int (tc .expectedNumRows ), len (rows ))
166
236
assert .Equal (t , tc .expectedNumRGs , numRGs )
167
- assert .Equal (t , "00000000-0000-0000-0000-000000000000" , rows [0 ].ID .String ())
168
- assert .Equal (t , "00000000-0000-0000-0000-000000000001" , rows [1 ].ID .String ())
169
- assert .Equal (t , "00000000-0000-0000-0000-000000000002" , rows [2 ].ID .String ())
237
+
238
+ // ensure all profiles are there
239
+ idExisting := make (map [uuid.UUID ]int , tc .expectedNumRows )
240
+ for i := range rows {
241
+ _ , ok := idExisting [rows [i ].ID ]
242
+ assert .False (t , ok , "expected ID to not exists more than once" )
243
+ idExisting [rows [i ].ID ] = i
244
+ }
245
+ for i := 0 ; i < int (tc .expectedNumRows ); i ++ {
246
+ id := uuid .MustParse (fmt .Sprintf ("00000000-0000-0000-0000-%012d" , i ))
247
+ _ , ok := idExisting [id ]
248
+ assert .True (t , ok , fmt .Sprintf ("expected ID %s to exist in output" , id .String ()))
249
+ }
170
250
})
171
251
}
172
252
}
0 commit comments