Skip to content

Commit 68da7be

Browse files
committed
Add some experiments with schemas
1 parent 91a5511 commit 68da7be

File tree

3 files changed

+294
-0
lines changed

3 files changed

+294
-0
lines changed

pkg/firedb/schemas/schema_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package schema
2+
3+
import (
4+
"testing"
5+
6+
"github.com/grafana/fire/pkg/firedb"
7+
v1 "github.com/grafana/fire/pkg/firedb/schemas/v1"
8+
"github.com/segmentio/parquet-go"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestSchema(t *testing.T) {
13+
14+
originalSchema := parquet.SchemaOf(&firedb.Profile{})
15+
16+
v1Schema := v1.ProfilesSchema()
17+
require.Equal(t, originalSchema.String(), v1Schema.String())
18+
}

pkg/firedb/schemas/v1/schema.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package v1
2+
3+
import (
4+
"reflect"
5+
"strings"
6+
"unicode"
7+
"unicode/utf8"
8+
9+
"github.com/segmentio/parquet-go"
10+
"github.com/segmentio/parquet-go/compress"
11+
"github.com/segmentio/parquet-go/deprecated"
12+
"github.com/segmentio/parquet-go/encoding"
13+
"github.com/segmentio/parquet-go/format"
14+
)
15+
16+
type Group []*groupField
17+
18+
func (g Group) String() string {
19+
s := new(strings.Builder)
20+
parquet.PrintSchema(s, "", g)
21+
return s.String()
22+
}
23+
24+
func (g Group) Type() parquet.Type { return &groupType{} }
25+
26+
func (g Group) Optional() bool { return false }
27+
28+
func (g Group) Repeated() bool { return false }
29+
30+
func (g Group) Required() bool { return true }
31+
32+
func (g Group) Leaf() bool { return false }
33+
34+
func (g Group) Fields() []parquet.Field {
35+
fields := make([]parquet.Field, len(g))
36+
for pos := range g {
37+
fields[pos] = g[pos]
38+
}
39+
return fields
40+
}
41+
42+
func (g Group) Encoding() encoding.Encoding { return nil }
43+
44+
func (g Group) Compression() compress.Codec { return nil }
45+
46+
func (g Group) GoType() reflect.Type { return goTypeOfGroup(g) }
47+
48+
func exportedStructFieldName(name string) string {
49+
firstRune, size := utf8.DecodeRuneInString(name)
50+
return string([]rune{unicode.ToUpper(firstRune)}) + name[size:]
51+
}
52+
53+
func goTypeOfGroup(node parquet.Node) reflect.Type {
54+
fields := node.Fields()
55+
structFields := make([]reflect.StructField, len(fields))
56+
for i, field := range fields {
57+
structFields[i].Name = exportedStructFieldName(field.Name())
58+
structFields[i].Type = field.GoType()
59+
// TODO: can we reconstruct a struct tag that would be valid if a value
60+
// of this type were passed to SchemaOf?
61+
}
62+
return reflect.StructOf(structFields)
63+
}
64+
65+
type groupField struct {
66+
parquet.Node
67+
name string
68+
}
69+
70+
type groupType struct{}
71+
72+
func (groupType) String() string { return "group" }
73+
74+
func (groupType) Kind() parquet.Kind {
75+
panic("cannot call Kind on parquet group")
76+
}
77+
78+
func (groupType) Compare(parquet.Value, parquet.Value) int {
79+
panic("cannot compare values on parquet group")
80+
}
81+
82+
func (groupType) NewColumnIndexer(int) parquet.ColumnIndexer {
83+
panic("cannot create column indexer from parquet group")
84+
}
85+
86+
func (groupType) NewDictionary(int, int, []byte) parquet.Dictionary {
87+
panic("cannot create dictionary from parquet group")
88+
}
89+
90+
func (t groupType) NewColumnBuffer(int, int) parquet.ColumnBuffer {
91+
panic("cannot create column buffer from parquet group")
92+
}
93+
94+
func (t groupType) NewPage(int, int, []byte) parquet.Page {
95+
panic("cannot create page from parquet group")
96+
}
97+
98+
func (groupType) Encode(_, _ []byte, _ encoding.Encoding) ([]byte, error) {
99+
panic("cannot encode parquet group")
100+
}
101+
102+
func (groupType) Decode(_, _ []byte, _ encoding.Encoding) ([]byte, error) {
103+
panic("cannot decode parquet group")
104+
}
105+
106+
func (groupType) Length() int { return 0 }
107+
108+
func (groupType) EstimateSize(int) int64 { return 0 }
109+
110+
func (groupType) ColumnOrder() *format.ColumnOrder { return nil }
111+
112+
func (groupType) PhysicalType() *format.Type { return nil }
113+
114+
func (groupType) LogicalType() *format.LogicalType { return nil }
115+
116+
func (groupType) ConvertedType() *deprecated.ConvertedType { return nil }
117+
118+
func (f *groupField) Name() string { return f.name }
119+
120+
func (f *groupField) Value(base reflect.Value) reflect.Value {
121+
return base.MapIndex(reflect.ValueOf(&f.name).Elem())
122+
}
123+
124+
func ProfilesSchema() *parquet.Schema {
125+
stringRef := parquet.Encoded(parquet.Int(64), &parquet.DeltaBinaryPacked)
126+
sampleType := parquet.Group{
127+
"Type": stringRef,
128+
"Unit": stringRef,
129+
}
130+
131+
externalLabels := parquet.Repeated(Group{
132+
{name: "Name", Node: stringRef},
133+
{name: "Value", Node: stringRef},
134+
})
135+
136+
pprofLabels := parquet.Repeated(Group{
137+
{name: "Key", Node: stringRef},
138+
{name: "Str", Node: parquet.Optional(stringRef)},
139+
{name: "Num", Node: parquet.Optional(parquet.Int(64))},
140+
{name: "NumUnit", Node: parquet.Optional(stringRef)},
141+
})
142+
143+
s := parquet.NewSchema("Profile", Group{
144+
{name: "ID", Node: parquet.UUID()},
145+
{name: "ExternalLabels", Node: externalLabels},
146+
{name: "Types", Node: parquet.Repeated(sampleType)},
147+
{name: "Samples", Node: parquet.Repeated(Group{
148+
{name: "LocationIds", Node: parquet.Repeated(parquet.Uint(64))},
149+
{name: "Values", Node: parquet.Repeated(parquet.Encoded(parquet.Int(64), &parquet.DeltaBinaryPacked))},
150+
{name: "Labels", Node: pprofLabels},
151+
})},
152+
{name: "DropFrames", Node: stringRef},
153+
{name: "KeepFrames", Node: stringRef},
154+
{name: "TimeNanos", Node: parquet.Timestamp(parquet.Nanosecond)},
155+
{name: "DurationNanos", Node: parquet.Int(64)},
156+
{name: "PeriodType", Node: parquet.Optional(sampleType)},
157+
{name: "Period", Node: parquet.Int(64)},
158+
{name: "Comments", Node: parquet.Repeated(stringRef)},
159+
{name: "DefaultSampleType", Node: parquet.Int(64)},
160+
})
161+
return s
162+
}

pkg/firedb/schemas/v2/schema.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package v2
2+
3+
import (
4+
"github.com/polarsignals/frostdb/dynparquet"
5+
"github.com/segmentio/parquet-go"
6+
)
7+
8+
const (
9+
ColumnID = "id"
10+
ColumnLabels = "labels"
11+
ColumnSampleType = "sample_type"
12+
ColumnSampleUnit = "sample_unit"
13+
ColumnLocationIDs = "location_ids"
14+
ColumnSamples = "samples"
15+
ColumnPprofLabels = "pprof_labels"
16+
ColumnDropFrames = "drop_frames"
17+
ColumnKeepFrames = "keep_frames"
18+
ColumnTimeNanos = "time_nanos"
19+
ColumnDurationNanos = "duration_nanos"
20+
ColumnPeriod = "period"
21+
ColumnPeriodType = "period_type"
22+
ColumnPeriodUnit = "period_unit"
23+
ColumnComments = "comments"
24+
)
25+
26+
func Profiles() *dynparquet.Schema {
27+
stringRef := parquet.Encoded(parquet.Int(64), &parquet.DeltaBinaryPacked)
28+
29+
labels := parquet.Repeated(parquet.Group{
30+
"Name": stringRef,
31+
"Value": stringRef,
32+
})
33+
34+
pprofLabels := parquet.Repeated(parquet.Group{
35+
"Key": stringRef,
36+
"Str": parquet.Optional(stringRef),
37+
"Num": parquet.Optional(parquet.Int(64)),
38+
"NumUnit": parquet.Optional(stringRef),
39+
})
40+
41+
return dynparquet.NewSchema(
42+
"profiles",
43+
[]dynparquet.ColumnDefinition{
44+
{
45+
Name: ColumnID,
46+
StorageLayout: parquet.Int(64),
47+
Dynamic: false,
48+
}, {
49+
Name: ColumnLabels,
50+
StorageLayout: labels,
51+
Dynamic: false,
52+
}, {
53+
Name: ColumnSampleType,
54+
StorageLayout: stringRef,
55+
Dynamic: false,
56+
}, {
57+
Name: ColumnSampleUnit,
58+
StorageLayout: stringRef,
59+
Dynamic: false,
60+
}, {
61+
Name: ColumnLocationIDs,
62+
StorageLayout: parquet.Repeated(parquet.Repeated(parquet.Uint(64))),
63+
Dynamic: false,
64+
}, {
65+
Name: ColumnSamples,
66+
StorageLayout: parquet.Repeated(parquet.Encoded(parquet.Int(64), &parquet.DeltaBinaryPacked)),
67+
Dynamic: false,
68+
}, {
69+
Name: ColumnPprofLabels,
70+
StorageLayout: pprofLabels,
71+
Dynamic: false,
72+
}, {
73+
Name: ColumnDropFrames,
74+
StorageLayout: stringRef,
75+
Dynamic: false,
76+
}, {
77+
Name: ColumnKeepFrames,
78+
StorageLayout: stringRef,
79+
Dynamic: false,
80+
}, {
81+
Name: ColumnTimeNanos,
82+
StorageLayout: parquet.Timestamp(parquet.Nanosecond),
83+
Dynamic: false,
84+
}, {
85+
Name: ColumnDurationNanos,
86+
StorageLayout: parquet.Int(64),
87+
Dynamic: false,
88+
}, {
89+
Name: ColumnPeriod,
90+
StorageLayout: parquet.Int(64),
91+
Dynamic: false,
92+
}, {
93+
Name: ColumnPeriodType,
94+
StorageLayout: stringRef,
95+
Dynamic: false,
96+
}, {
97+
Name: ColumnPeriodUnit,
98+
StorageLayout: stringRef,
99+
Dynamic: true,
100+
},
101+
},
102+
[]dynparquet.SortingColumn{
103+
dynparquet.Ascending(ColumnID),
104+
dynparquet.Ascending(ColumnSampleType),
105+
dynparquet.Ascending(ColumnSampleUnit),
106+
dynparquet.Ascending(ColumnPeriodType),
107+
dynparquet.Ascending(ColumnPeriodUnit),
108+
dynparquet.NullsFirst(dynparquet.Ascending(ColumnLabels)),
109+
dynparquet.NullsFirst(dynparquet.Ascending(ColumnLocationIDs)),
110+
dynparquet.Ascending(ColumnTimeNanos),
111+
dynparquet.NullsFirst(dynparquet.Ascending(ColumnPprofLabels)),
112+
},
113+
)
114+
}

0 commit comments

Comments
 (0)