Skip to content

Commit b47fbaf

Browse files
author
Ron cohen
committed
WIP in intake v2.
1 parent fb10720 commit b47fbaf

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+3006
-1029
lines changed

beater/handlers.go

Lines changed: 80 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@ import (
1818

1919
conf "github.com/elastic/apm-server/config"
2020
"github.com/elastic/apm-server/decoder"
21+
"github.com/elastic/apm-server/model"
2122
"github.com/elastic/apm-server/processor"
2223
perr "github.com/elastic/apm-server/processor/error"
23-
"github.com/elastic/apm-server/processor/healthcheck"
2424
"github.com/elastic/apm-server/processor/sourcemap"
2525
"github.com/elastic/apm-server/processor/transaction"
26+
2627
"github.com/elastic/apm-server/utility"
2728
"github.com/elastic/beats/libbeat/logp"
2829
"github.com/elastic/beats/libbeat/monitoring"
@@ -35,10 +36,13 @@ const (
3536
FrontendErrorsURL = "/v1/client-side/errors"
3637
HealthCheckURL = "/healthcheck"
3738
SourcemapsURL = "/v1/client-side/sourcemaps"
39+
StreamBackendURL = "/v2/intake"
3840

3941
rateLimitCacheSize = 1000
4042
rateLimitBurstMultiplier = 2
4143

44+
v2TransformBatchSize = 100
45+
4246
supportedHeaders = "Content-Type, Content-Encoding, Accept"
4347
supportedMethods = "POST, OPTIONS"
4448
)
@@ -58,6 +62,10 @@ type serverResponse struct {
5862
counter *monitoring.Int
5963
}
6064

65+
func (s *serverResponse) IsError() bool {
66+
return s.err != nil
67+
}
68+
6169
var (
6270
serverMetrics = monitoring.Default.NewRegistry("apm-server.server", monitoring.PublishExpvar)
6371
counter = func(s string) *monitoring.Int {
@@ -117,13 +125,18 @@ var (
117125
}
118126
}
119127

128+
errHeaderMissing = errors.New("header must be first object in stream")
129+
120130
Routes = map[string]routeMapping{
121131
BackendTransactionsURL: {backendHandler, transaction.NewProcessor},
122132
FrontendTransactionsURL: {frontendHandler, transaction.NewProcessor},
123-
BackendErrorsURL: {backendHandler, perr.NewProcessor},
124-
FrontendErrorsURL: {frontendHandler, perr.NewProcessor},
125-
HealthCheckURL: {healthCheckHandler, healthcheck.NewProcessor},
126-
SourcemapsURL: {sourcemapHandler, sourcemap.NewProcessor},
133+
134+
StreamBackendURL: {streamBackendHandler, nil},
135+
136+
BackendErrorsURL: {backendHandler, perr.NewProcessor},
137+
FrontendErrorsURL: {frontendHandler, perr.NewProcessor},
138+
HealthCheckURL: {healthCheckHandler, nil},
139+
SourcemapsURL: {sourcemapHandler, sourcemap.NewProcessor},
127140
}
128141
)
129142

@@ -163,19 +176,29 @@ func concurrencyLimitHandler(beaterConfig *Config, h http.Handler) http.Handler
163176
}
164177

165178
func backendHandler(pf ProcessorFactory, beaterConfig *Config, report reporter) http.Handler {
179+
extractors := []decoder.Extractor{}
180+
if beaterConfig.AugmentEnabled {
181+
extractors = append(extractors, decoder.SystemExtractor)
182+
}
183+
166184
return logHandler(
167185
concurrencyLimitHandler(beaterConfig,
168186
authHandler(beaterConfig.SecretToken,
169-
processRequestHandler(pf, conf.Config{}, report,
170-
decoder.DecodeSystemData(decoder.DecodeLimitJSONData(beaterConfig.MaxUnzippedSize), beaterConfig.AugmentEnabled)))))
187+
processRequestHandler(pf, conf.TransformConfig{}, report, extractors,
188+
decoder.DecodeLimitJSONData(beaterConfig.MaxUnzippedSize)))))
171189
}
172190

173191
func frontendHandler(pf ProcessorFactory, beaterConfig *Config, report reporter) http.Handler {
192+
extractors := []decoder.Extractor{}
193+
if beaterConfig.AugmentEnabled {
194+
extractors = append(extractors, decoder.UserExtractor)
195+
}
196+
174197
smapper, err := beaterConfig.Frontend.memoizedSmapMapper()
175198
if err != nil {
176199
logp.NewLogger("handler").Error(err.Error())
177200
}
178-
config := conf.Config{
201+
config := conf.TransformConfig{
179202
SmapMapper: smapper,
180203
LibraryPattern: regexp.MustCompile(beaterConfig.Frontend.LibraryPattern),
181204
ExcludeFromGrouping: regexp.MustCompile(beaterConfig.Frontend.ExcludeFromGrouping),
@@ -185,8 +208,21 @@ func frontendHandler(pf ProcessorFactory, beaterConfig *Config, report reporter)
185208
concurrencyLimitHandler(beaterConfig,
186209
ipRateLimitHandler(beaterConfig.Frontend.RateLimit,
187210
corsHandler(beaterConfig.Frontend.AllowOrigins,
188-
processRequestHandler(pf, config, report,
189-
decoder.DecodeUserData(decoder.DecodeLimitJSONData(beaterConfig.MaxUnzippedSize), beaterConfig.AugmentEnabled)))))))
211+
processRequestHandler(pf, config, report, extractors,
212+
decoder.DecodeLimitJSONData(beaterConfig.MaxUnzippedSize)))))))
213+
}
214+
215+
func streamBackendHandler(_ ProcessorFactory, beaterConfig *Config, report reporter) http.Handler {
216+
extractors := []decoder.Extractor{
217+
decoder.SystemExtractor,
218+
}
219+
220+
requestDecodeer := decoder.StreamDecodeLimitJSONData(beaterConfig.MaxUnzippedSize)
221+
222+
return logHandler(
223+
concurrencyLimitHandler(beaterConfig,
224+
authHandler(beaterConfig.SecretToken,
225+
processStreamRequest(v2TransformBatchSize, conf.TransformConfig{}, report, extractors, requestDecodeer))))
190226
}
191227

192228
func sourcemapHandler(pf ProcessorFactory, beaterConfig *Config, report reporter) http.Handler {
@@ -197,7 +233,7 @@ func sourcemapHandler(pf ProcessorFactory, beaterConfig *Config, report reporter
197233
return logHandler(
198234
killSwitchHandler(beaterConfig.Frontend.isEnabled(),
199235
authHandler(beaterConfig.SecretToken,
200-
processRequestHandler(pf, conf.Config{SmapMapper: smapper}, report, decoder.DecodeSourcemapFormData))))
236+
processRequestHandler(pf, conf.TransformConfig{SmapMapper: smapper}, report, []decoder.Extractor{}, decoder.DecodeSourcemapFormData))))
201237
}
202238

203239
func healthCheckHandler(_ ProcessorFactory, _ *Config, _ reporter) http.Handler {
@@ -346,14 +382,14 @@ func corsHandler(allowedOrigins []string, h http.Handler) http.Handler {
346382
})
347383
}
348384

349-
func processRequestHandler(pf ProcessorFactory, config conf.Config, report reporter, decode decoder.Decoder) http.Handler {
385+
func processRequestHandler(pf ProcessorFactory, config conf.TransformConfig, report reporter, extractors []decoder.Extractor, decode func(req *http.Request) (map[string]interface{}, error)) http.Handler {
350386
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
351-
res := processRequest(r, pf, config, report, decode)
387+
res := processRequest(r, pf, config, report, extractors, decode)
352388
sendStatus(w, r, res)
353389
})
354390
}
355391

356-
func processRequest(r *http.Request, pf ProcessorFactory, config conf.Config, report reporter, decode decoder.Decoder) serverResponse {
392+
func processRequest(r *http.Request, pf ProcessorFactory, config conf.TransformConfig, report reporter, extractors []decoder.Extractor, decode func(req *http.Request) (map[string]interface{}, error)) serverResponse {
357393
processor := pf()
358394

359395
if r.Method != "POST" {
@@ -369,16 +405,18 @@ func processRequest(r *http.Request, pf ProcessorFactory, config conf.Config, re
369405

370406
}
371407

372-
if err = processor.Validate(data); err != nil {
373-
return cannotValidateResponse(err)
408+
transformables, tctx, response := ProcessPayload(data, processor)
409+
if response.err != nil {
410+
return response
374411
}
375412

376-
payload, err := processor.Decode(data)
377-
if err != nil {
378-
return cannotDecodeResponse(err)
413+
req := pendingReq{
414+
transformable: transformables,
415+
config: config,
416+
context: tctx,
379417
}
380418

381-
if err = report(pendingReq{payload: payload, config: config}); err != nil {
419+
if err = report(req); err != nil {
382420
if strings.Contains(err.Error(), "publisher is being stopped") {
383421
return serverShuttingDownResponse(err)
384422
}
@@ -388,6 +426,24 @@ func processRequest(r *http.Request, pf ProcessorFactory, config conf.Config, re
388426
return acceptedResponse
389427
}
390428

429+
func ProcessPayload(data map[string]interface{}, p processor.Processor) (model.TransformableBatch, *model.TransformContext, serverResponse) {
430+
var err error
431+
if err = p.Validate(data); err != nil {
432+
return nil, nil, cannotValidateResponse(err)
433+
}
434+
435+
transformationContext, err := model.DecodeContext(data, err)
436+
if err != nil {
437+
return nil, nil, cannotDecodeResponse(err)
438+
}
439+
440+
transformables, err := p.Decode(data)
441+
if err != nil {
442+
return nil, nil, cannotDecodeResponse(err)
443+
}
444+
return transformables, transformationContext, okResponse
445+
}
446+
391447
func sendStatus(w http.ResponseWriter, r *http.Request, res serverResponse) {
392448
contentType := "text/plain; charset=utf-8"
393449
if acceptsJSON(r) {
@@ -397,7 +453,10 @@ func sendStatus(w http.ResponseWriter, r *http.Request, res serverResponse) {
397453
w.WriteHeader(res.code)
398454

399455
responseCounter.Inc()
400-
res.counter.Inc()
456+
if res.counter != nil {
457+
res.counter.Inc()
458+
}
459+
401460
if res.err == nil {
402461
responseSuccesses.Inc()
403462
return

beater/metrics.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package beater
2+
3+
// import "github.com/elastic/beats/libbeat/monitoring"
4+
5+
// type EntityMetrics struct {
6+
// registry *monitoring.Registry
7+
// validationCount *monitoring.Int
8+
// validationError *monitoring.Int
9+
// transformations *monitoring.Int
10+
// }
11+
12+
// var errorMetricsReg = monitoring.Default.NewRegistry("apm-server.entity.error")
13+
14+
// var ErrorMetrics = EntityHandlingMetrics{
15+
// registry: errorMetricsReg,
16+
// transformations: monitoring.NewInt(errorMetricsReg, "transformations"),
17+
// validationTotal: monitoring.NewInt(errorMetricsReg, "validation.total"),
18+
// validationError: monitoring.NewInt(errorMetricsReg, "validation.errors"),
19+
// }
20+
21+
// var transactionMetricsReg = monitoring.Default.NewRegistry("apm-server.entity.transaction")
22+
23+
// var TransactionMetrics = EntityHandlingMetrics{
24+
// registry: transactionMetricsReg,
25+
// transformations: monitoring.NewInt(transactionMetricsReg, "transformations"),
26+
// validationTotal: monitoring.NewInt(transactionMetricsReg, "validation.total"),
27+
// validationError: monitoring.NewInt(transactionMetricsReg, "validation.errors"),
28+
// }
29+
30+
// // var errorMetrics =
31+
32+
// // var validationCount =
33+
34+
// // validationError = monitoring.NewInt(errorMetrics, "validation.errors")
35+
// // transformations = monitoring.NewInt(errorMetrics, "transformations")

beater/pub.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/pkg/errors"
99

1010
"github.com/elastic/apm-server/config"
11-
pr "github.com/elastic/apm-server/processor"
11+
"github.com/elastic/apm-server/model"
1212
"github.com/elastic/beats/libbeat/beat"
1313
)
1414

@@ -27,8 +27,9 @@ type publisher struct {
2727
}
2828

2929
type pendingReq struct {
30-
payload pr.Payload
31-
config config.Config
30+
transformable []model.Transformable
31+
config config.TransformConfig
32+
context *model.TransformContext
3233
}
3334

3435
var (
@@ -102,6 +103,9 @@ func (p *publisher) Send(req pendingReq) error {
102103

103104
func (p *publisher) run() {
104105
for req := range p.pendingRequests {
105-
p.client.PublishAll(req.payload.Transform(req.config))
106+
for _, transformable := range req.transformable {
107+
// todo: is this threadsafe?
108+
p.client.Publish(transformable.Transform(req.config, req.context))
109+
}
106110
}
107111
}

0 commit comments

Comments
 (0)