@@ -10,6 +10,7 @@ import (
10
10
11
11
"github.com/cespare/xxhash/v2"
12
12
"github.com/google/pprof/profile"
13
+ "github.com/grafana/pyroscope/ebpf/sd"
13
14
"github.com/klauspost/compress/gzip"
14
15
"github.com/prometheus/prometheus/model/labels"
15
16
)
@@ -26,21 +27,96 @@ var (
26
27
}
27
28
)
28
29
30
+ type SampleType uint32
31
+
32
+ var SampleTypeCpu = SampleType (0 )
33
+ var SampleTypeMem = SampleType (1 )
34
+
35
+ type SampleAggregation bool
36
+
37
+ var (
38
+ // SampleAggregated mean samples are accumulated in ebpf, no need to dedup these
39
+ SampleAggregated = SampleAggregation (true )
40
+ // SampleNotAggregated mean values are not accumulated in ebpf, but streamed to userspace with value=1
41
+ // TODO make consider aggregating python in ebpf as well
42
+ SampleNotAggregated = SampleAggregation (false )
43
+ )
44
+
45
+ type CollectProfilesCallback func (sample ProfileSample )
46
+
47
+ type SamplesCollector interface {
48
+ CollectProfiles (callback CollectProfilesCallback ) error
49
+ }
50
+
51
+ type ProfileSample struct {
52
+ Target * sd.Target
53
+ Pid uint32
54
+ SampleType SampleType
55
+ Aggregation SampleAggregation
56
+ Stack []string
57
+ Value uint64
58
+ Value2 uint64
59
+ }
60
+
61
+ type BuildersOptions struct {
62
+ SampleRate int64
63
+ PerPIDProfile bool
64
+ }
65
+
66
+ type builderHashKey struct {
67
+ labelsHash uint64
68
+ pid uint32
69
+ sampleType SampleType
70
+ }
71
+
29
72
type ProfileBuilders struct {
30
- Builders map [uint64 ]* ProfileBuilder
31
- SampleRate int64
73
+ Builders map [builderHashKey ]* ProfileBuilder
74
+ opt BuildersOptions
32
75
}
33
76
34
- func NewProfileBuilders (sampleRate int64 ) * ProfileBuilders {
35
- return & ProfileBuilders {Builders : make (map [uint64 ]* ProfileBuilder ), SampleRate : sampleRate }
77
+ func NewProfileBuilders (options BuildersOptions ) * ProfileBuilders {
78
+ return & ProfileBuilders {Builders : make (map [builderHashKey ]* ProfileBuilder ), opt : options }
36
79
}
37
80
38
- func (b ProfileBuilders ) BuilderForTarget (hash uint64 , labels labels.Labels ) * ProfileBuilder {
39
- res := b .Builders [hash ]
81
+ func Collect (builders * ProfileBuilders , collector SamplesCollector ) error {
82
+ return collector .CollectProfiles (func (sample ProfileSample ) {
83
+ builders .AddSample (& sample )
84
+ })
85
+ }
86
+
87
+ func (b * ProfileBuilders ) AddSample (sample * ProfileSample ) {
88
+ bb := b .BuilderForSample (sample )
89
+ if sample .Aggregation == SampleAggregated {
90
+ bb .CreateSample (sample )
91
+ } else {
92
+ bb .CreateSampleOrAddValue (sample )
93
+ }
94
+ }
95
+
96
+ func (b * ProfileBuilders ) BuilderForSample (sample * ProfileSample ) * ProfileBuilder {
97
+ labelsHash , labels := sample .Target .Labels ()
98
+
99
+ k := builderHashKey {labelsHash : labelsHash , sampleType : sample .SampleType }
100
+ if b .opt .PerPIDProfile {
101
+ k .pid = sample .Pid
102
+ }
103
+ res := b .Builders [k ]
40
104
if res != nil {
41
105
return res
42
106
}
43
107
108
+ var sampleType []* profile.ValueType
109
+ var periodType * profile.ValueType
110
+ var period int64
111
+ if sample .SampleType == SampleTypeCpu {
112
+ sampleType = []* profile.ValueType {{Type : "cpu" , Unit : "nanoseconds" }}
113
+ periodType = & profile.ValueType {Type : "cpu" , Unit : "nanoseconds" }
114
+ period = time .Second .Nanoseconds () / b .opt .SampleRate
115
+ } else {
116
+ sampleType = []* profile.ValueType {{Type : "alloc_objects" , Unit : "count" }, {Type : "alloc_space" , Unit : "bytes" }}
117
+ periodType = & profile.ValueType {Type : "space" , Unit : "bytes" }
118
+ period = 512 * 1024 // todo
119
+ }
44
120
builder := & ProfileBuilder {
45
121
locations : make (map [string ]* profile.Location ),
46
122
functions : make (map [string ]* profile.Function ),
@@ -52,16 +128,16 @@ func (b ProfileBuilders) BuilderForTarget(hash uint64, labels labels.Labels) *Pr
52
128
ID : 1 ,
53
129
},
54
130
},
55
- SampleType : [] * profile. ValueType {{ Type : "cpu" , Unit : "nanoseconds" }} ,
56
- Period : time . Second . Nanoseconds () / b . SampleRate ,
57
- PeriodType : & profile. ValueType { Type : "cpu" , Unit : "nanoseconds" } ,
131
+ SampleType : sampleType ,
132
+ Period : period ,
133
+ PeriodType : periodType ,
58
134
TimeNanos : time .Now ().UnixNano (),
59
135
},
60
136
tmpLocationIDs : make ([]uint64 , 0 , 128 ),
61
137
tmpLocations : make ([]* profile.Location , 0 , 128 ),
62
138
}
63
139
res = builder
64
- b .Builders [hash ] = res
140
+ b .Builders [k ] = res
65
141
return res
66
142
}
67
143
@@ -76,36 +152,31 @@ type ProfileBuilder struct {
76
152
tmpLocationIDs []uint64
77
153
}
78
154
79
- func (p * ProfileBuilder ) CreateSample (stacktrace []string , value uint64 ) {
80
- sample := & profile.Sample {
81
- Value : []int64 {int64 (value ) * p .Profile .Period },
82
- }
83
- for _ , s := range stacktrace {
84
- loc := p .addLocation (s )
85
- sample .Location = append (sample .Location , loc )
155
+ func (p * ProfileBuilder ) CreateSample (inputSample * ProfileSample ) {
156
+ sample := p .newSample (inputSample )
157
+ p .addValue (inputSample , sample )
158
+ for i , s := range inputSample .Stack {
159
+ sample .Location [i ] = p .addLocation (s )
86
160
}
87
161
p .Profile .Sample = append (p .Profile .Sample , sample )
88
162
}
89
163
90
- func (p * ProfileBuilder ) CreateSampleOrAddValue (stacktrace []string , value uint64 ) {
91
- scaledValue := int64 (value ) * p .Profile .Period
164
+ func (p * ProfileBuilder ) CreateSampleOrAddValue (inputSample * ProfileSample ) {
92
165
p .tmpLocations = p .tmpLocations [:0 ]
93
166
p .tmpLocationIDs = p .tmpLocationIDs [:0 ]
94
- for _ , s := range stacktrace {
167
+ for _ , s := range inputSample . Stack {
95
168
loc := p .addLocation (s )
96
169
p .tmpLocations = append (p .tmpLocations , loc )
97
170
p .tmpLocationIDs = append (p .tmpLocationIDs , loc .ID )
98
171
}
99
172
h := xxhash .Sum64 (uint64Bytes (p .tmpLocationIDs ))
100
173
sample := p .sampleHashToSample [h ]
101
174
if sample != nil {
102
- sample . Value [ 0 ] += scaledValue
175
+ p . addValue ( inputSample , sample )
103
176
return
104
177
}
105
- sample = & profile.Sample {
106
- Location : make ([]* profile.Location , len (p .tmpLocations )),
107
- Value : []int64 {scaledValue },
108
- }
178
+ sample = p .newSample (inputSample )
179
+ p .addValue (inputSample , sample )
109
180
copy (sample .Location , p .tmpLocations )
110
181
p .sampleHashToSample [h ] = sample
111
182
p .Profile .Sample = append (p .Profile .Sample , sample )
@@ -177,3 +248,22 @@ func uint64Bytes(s []uint64) []byte {
177
248
hdr .Data = uintptr (unsafe .Pointer (& s [0 ]))
178
249
return bs
179
250
}
251
+ func (p * ProfileBuilder ) newSample (inputSample * ProfileSample ) * profile.Sample {
252
+ sample := new (profile.Sample )
253
+ if inputSample .SampleType == SampleTypeCpu {
254
+ sample .Value = []int64 {0 }
255
+ } else {
256
+ sample .Value = []int64 {0 , 0 }
257
+ }
258
+ sample .Location = make ([]* profile.Location , len (inputSample .Stack ))
259
+ return sample
260
+ }
261
+
262
+ func (p * ProfileBuilder ) addValue (inputSample * ProfileSample , sample * profile.Sample ) {
263
+ if inputSample .SampleType == SampleTypeCpu {
264
+ sample .Value [0 ] += int64 (inputSample .Value ) * p .Profile .Period
265
+ } else {
266
+ sample .Value [0 ] += int64 (inputSample .Value )
267
+ sample .Value [1 ] += int64 (inputSample .Value2 )
268
+ }
269
+ }
0 commit comments