diff --git a/beater/v2_handler.go b/beater/v2_handler.go index ae40094ae88..057d52dda04 100644 --- a/beater/v2_handler.go +++ b/beater/v2_handler.go @@ -90,7 +90,7 @@ func (v *v2Handler) handleRawModel(rawModel map[string]interface{}) (transform.T tr, err := model.modelDecoder(entry, err) if err != nil { - return tr, err + return nil, err } return tr, nil } @@ -104,7 +104,7 @@ func (v *v2Handler) readBatch(batchSize int, reader *decoder.NDJSONStreamReader, var err error var rawModel map[string]interface{} - eventables := []transform.Transformable{} + var eventables []transform.Transformable for i := 0; i < batchSize && err == nil; i++ { rawModel, err = reader.Read() if err != nil && err != io.EOF { @@ -124,6 +124,7 @@ func (v *v2Handler) readBatch(batchSize int, reader *decoder.NDJSONStreamReader, if err != nil { response.addWithOffendingDocument(SchemaValidationErr, err.Error(), reader.LastLine()) response.Invalid++ + continue } eventables = append(eventables, tr) } @@ -194,7 +195,7 @@ func (v *v2Handler) handleRequestBody(r *http.Request, ndReader *decoder.NDJSONS for { transformables, done := v.readBatch(batchSize, ndReader, resp) - if transformables != nil && len(transformables) > 0 { + if transformables != nil { err := report(r.Context(), pendingReq{ transformables: transformables, tcontext: tctx, diff --git a/beater/v2_integration_test.go b/beater/v2_integration_test.go index b3d5cec7051..563551432dd 100644 --- a/beater/v2_integration_test.go +++ b/beater/v2_integration_test.go @@ -52,16 +52,18 @@ func TestV2IntakeIntegration(t *testing.T) { handler := (&v2BackendRoute).Handler(c, report) for _, test := range []struct { - path string - name string + path string + name string + status int }{ - {path: "../testdata/intake-v2/errors.ndjson", name: "Errors"}, - {path: "../testdata/intake-v2/transactions.ndjson", name: "Transactions"}, - {path: "../testdata/intake-v2/spans.ndjson", name: "Spans"}, - {path: "../testdata/intake-v2/metrics.ndjson", name: "Metrics"}, - {path: "../testdata/intake-v2/minimal_process.ndjson", name: "MixedMinimalProcess"}, - {path: "../testdata/intake-v2/minimal_service.ndjson", name: "MinimalService"}, - {path: "../testdata/intake-v2/metadata_null_values.ndjson", name: "MetadataNullValues"}, + {status: 202, path: "../testdata/intake-v2/errors.ndjson", name: "Errors"}, + {status: 202, path: "../testdata/intake-v2/transactions.ndjson", name: "Transactions"}, + {status: 202, path: "../testdata/intake-v2/spans.ndjson", name: "Spans"}, + {status: 202, path: "../testdata/intake-v2/metrics.ndjson", name: "Metrics"}, + {status: 202, path: "../testdata/intake-v2/minimal_process.ndjson", name: "MixedMinimalProcess"}, + {status: 202, path: "../testdata/intake-v2/minimal_service.ndjson", name: "MinimalService"}, + {status: 202, path: "../testdata/intake-v2/metadata_null_values.ndjson", name: "MetadataNullValues"}, + {status: 400, path: "../testdata/intake-v2/invalid-event.ndjson", name: "InvalidEvent"}, } { b, err := loader.LoadDataAsBytes(test.path) @@ -80,7 +82,7 @@ func TestV2IntakeIntegration(t *testing.T) { r = r.WithContext(context.WithValue(r.Context(), requestTimeContextKey, reqTimestamp)) handler.ServeHTTP(w, r) - assert.Equal(t, 202, w.Code) + assert.Equal(t, test.status, w.Code) } } diff --git a/testdata/intake-v2/invalid-event.ndjson b/testdata/intake-v2/invalid-event.ndjson new file mode 100644 index 00000000000..011bef0b2bf --- /dev/null +++ b/testdata/intake-v2/invalid-event.ndjson @@ -0,0 +1,2 @@ +{"metadata": {"user": null, "process": {"ppid": null, "pid": 1234, "argv": null, "title": null}, "system": null, "service": {"name": "1234_service-12a3", "language": {"version": null, "name":"ecmascript"}, "agent": {"version": "3.14.0", "name": "elastic-node"}, "environment": null, "framework": null,"version": null, "runtime": null}}} +{ "transaction": { "id": 12345, "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "abcdefabcdef01234567", "type": "request", "duration": 32.592981 } }