Skip to content

Commit 9ea9ad3

Browse files
authored
otel: use server.Config grpc server options (#4358)
1 parent d41599a commit 9ea9ad3

File tree

4 files changed

+64
-23
lines changed

4 files changed

+64
-23
lines changed

pkg/api/api.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import (
99
"context"
1010
"flag"
1111
"fmt"
12-
1312
"net/http"
1413

1514
"connectrpc.com/connect"
15+
1616
"github.com/felixge/fgprof"
1717
"github.com/go-kit/log"
1818
"github.com/grafana/dskit/kv/memberlist"
@@ -187,10 +187,10 @@ func (a *API) RegisterOverridesExporter(oe *exporter.OverridesExporter) {
187187
}
188188

189189
// RegisterDistributor registers the endpoints associated with the distributor.
190-
func (a *API) RegisterDistributor(d *distributor.Distributor, limits *validation.Overrides, multitenancyEnabled bool) {
190+
func (a *API) RegisterDistributor(d *distributor.Distributor, limits *validation.Overrides, multitenancyEnabled bool, cfg server.Config) {
191191
writePathOpts := a.registerOptionsWritePath(limits)
192192
pyroscopeHandler := pyroscope.NewPyroscopeIngestHandler(d, a.logger)
193-
otlpHandler := otlp.NewOTLPIngestHandler(d, a.logger, multitenancyEnabled)
193+
otlpHandler := otlp.NewOTLPIngestHandler(cfg, d, a.logger, multitenancyEnabled)
194194

195195
a.RegisterRoute("/ingest", pyroscopeHandler, writePathOpts...)
196196
a.RegisterRoute("/pyroscope/ingest", pyroscopeHandler, writePathOpts...)

pkg/ingester/otlp/ingest_handler.go

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,18 @@ import (
77
"strings"
88

99
"connectrpc.com/connect"
10+
"google.golang.org/grpc"
11+
"google.golang.org/grpc/codes"
12+
"google.golang.org/grpc/keepalive"
13+
"google.golang.org/protobuf/proto"
14+
1015
"github.com/go-kit/log"
1116
"github.com/go-kit/log/level"
1217
"github.com/google/uuid"
18+
"github.com/grafana/dskit/server"
1319
"github.com/grafana/dskit/user"
1420
pprofileotlp "go.opentelemetry.io/proto/otlp/collector/profiles/v1development"
1521
v1 "go.opentelemetry.io/proto/otlp/common/v1"
16-
"google.golang.org/grpc"
17-
"google.golang.org/grpc/codes"
18-
"google.golang.org/protobuf/proto"
1922

2023
"google.golang.org/grpc/status"
2124

@@ -44,14 +47,14 @@ type PushService interface {
4447
PushParsed(ctx context.Context, req *distirbutormodel.PushRequest) (*connect.Response[pushv1.PushResponse], error)
4548
}
4649

47-
func NewOTLPIngestHandler(svc PushService, l log.Logger, me bool) Handler {
50+
func NewOTLPIngestHandler(cfg server.Config, svc PushService, l log.Logger, me bool) Handler {
4851
h := &ingestHandler{
4952
svc: svc,
5053
log: l,
5154
multitenancyEnabled: me,
5255
}
5356

54-
grpcServer := grpc.NewServer()
57+
grpcServer := newGrpcServer(cfg)
5558
pprofileotlp.RegisterProfilesServiceServer(grpcServer, h)
5659

5760
h.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -69,6 +72,34 @@ func NewOTLPIngestHandler(svc PushService, l log.Logger, me bool) Handler {
6972
return h
7073
}
7174

75+
func newGrpcServer(cfg server.Config) *grpc.Server {
76+
grpcKeepAliveOptions := keepalive.ServerParameters{
77+
MaxConnectionIdle: cfg.GRPCServerMaxConnectionIdle,
78+
MaxConnectionAge: cfg.GRPCServerMaxConnectionAge,
79+
MaxConnectionAgeGrace: cfg.GRPCServerMaxConnectionAgeGrace,
80+
Time: cfg.GRPCServerTime,
81+
Timeout: cfg.GRPCServerTimeout,
82+
}
83+
84+
grpcKeepAliveEnforcementPolicy := keepalive.EnforcementPolicy{
85+
MinTime: cfg.GRPCServerMinTimeBetweenPings,
86+
PermitWithoutStream: cfg.GRPCServerPingWithoutStreamAllowed,
87+
}
88+
89+
grpcOptions := []grpc.ServerOption{
90+
grpc.KeepaliveParams(grpcKeepAliveOptions),
91+
grpc.KeepaliveEnforcementPolicy(grpcKeepAliveEnforcementPolicy),
92+
grpc.MaxRecvMsgSize(cfg.GRPCServerMaxRecvMsgSize),
93+
grpc.MaxSendMsgSize(cfg.GRPCServerMaxSendMsgSize),
94+
grpc.MaxConcurrentStreams(uint32(cfg.GRPCServerMaxConcurrentStreams)),
95+
grpc.NumStreamWorkers(uint32(cfg.GRPCServerNumWorkers)),
96+
}
97+
98+
grpcOptions = append(grpcOptions, cfg.GRPCOptions...)
99+
100+
return grpc.NewServer(grpcOptions...)
101+
}
102+
72103
func (h *ingestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
73104
h.handler.ServeHTTP(w, r)
74105
}

pkg/ingester/otlp/ingest_handler_test.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,26 @@ package otlp
22

33
import (
44
"context"
5+
"flag"
56
"os"
67
"sort"
78
"strings"
89
"testing"
910

10-
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
11-
"github.com/grafana/pyroscope/pkg/distributor/model"
12-
phlaremodel "github.com/grafana/pyroscope/pkg/model"
13-
"github.com/grafana/pyroscope/pkg/og/convert/pprof/strprofile"
14-
"github.com/grafana/pyroscope/pkg/test"
15-
"github.com/grafana/pyroscope/pkg/test/mocks/mockotlp"
16-
11+
"github.com/grafana/dskit/server"
1712
"github.com/stretchr/testify/assert"
1813
"github.com/stretchr/testify/mock"
1914
"github.com/stretchr/testify/require"
2015
v1experimental2 "go.opentelemetry.io/proto/otlp/collector/profiles/v1development"
2116
v1 "go.opentelemetry.io/proto/otlp/common/v1"
2217
v1experimental "go.opentelemetry.io/proto/otlp/profiles/v1development"
18+
19+
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
20+
"github.com/grafana/pyroscope/pkg/distributor/model"
21+
phlaremodel "github.com/grafana/pyroscope/pkg/model"
22+
"github.com/grafana/pyroscope/pkg/og/convert/pprof/strprofile"
23+
"github.com/grafana/pyroscope/pkg/test"
24+
"github.com/grafana/pyroscope/pkg/test/mocks/mockotlp"
2325
)
2426

2527
func TestGetServiceNameFromAttributes(t *testing.T) {
@@ -334,7 +336,7 @@ func TestConversion(t *testing.T) {
334336
}}}}},
335337
Dictionary: &b.dictionary}
336338
logger := test.NewTestingLogger(t)
337-
h := NewOTLPIngestHandler(svc, logger, false)
339+
h := NewOTLPIngestHandler(testConfig(), svc, logger, false)
338340
_, err := h.Export(context.Background(), req)
339341

340342
if td.expectedError == "" {
@@ -429,7 +431,7 @@ func TestSampleAttributes(t *testing.T) {
429431
}}}}},
430432
Dictionary: &otlpb.dictionary}
431433
logger := test.NewTestingLogger(t)
432-
h := NewOTLPIngestHandler(svc, logger, false)
434+
h := NewOTLPIngestHandler(testConfig(), svc, logger, false)
433435
_, err := h.Export(context.Background(), req)
434436
assert.NoError(t, err)
435437
require.Equal(t, 1, len(profiles))
@@ -593,7 +595,7 @@ func TestDifferentServiceNames(t *testing.T) {
593595
Dictionary: &otlpb.dictionary}
594596

595597
logger := test.NewTestingLogger(t)
596-
h := NewOTLPIngestHandler(svc, logger, false)
598+
h := NewOTLPIngestHandler(testConfig(), svc, logger, false)
597599
_, err := h.Export(context.Background(), req)
598600
require.NoError(t, err)
599601

@@ -649,3 +651,10 @@ func (o *otlpbuilder) addstr(s string) int32 {
649651
o.dictionary.StringTable = append(o.dictionary.StringTable, s)
650652
return idx
651653
}
654+
655+
func testConfig() server.Config {
656+
cfg := server.Config{}
657+
fs := flag.NewFlagSet("test", flag.PanicOnError)
658+
cfg.RegisterFlags(fs)
659+
return cfg
660+
}

pkg/pyroscope/modules.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ import (
1010
"time"
1111

1212
"connectrpc.com/connect"
13+
"google.golang.org/genproto/googleapis/api/httpbody"
14+
"google.golang.org/grpc/health/grpc_health_v1"
15+
"google.golang.org/protobuf/encoding/protojson"
16+
"gopkg.in/yaml.v3"
17+
1318
"github.com/go-kit/log"
1419
"github.com/go-kit/log/level"
1520
"github.com/grafana/dskit/dns"
@@ -29,10 +34,6 @@ import (
2934
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"
3035
"golang.org/x/net/http2"
3136
"golang.org/x/net/http2/h2c"
32-
"google.golang.org/genproto/googleapis/api/httpbody"
33-
"google.golang.org/grpc/health/grpc_health_v1"
34-
"google.golang.org/protobuf/encoding/protojson"
35-
"gopkg.in/yaml.v3"
3637

3738
statusv1 "github.com/grafana/pyroscope/api/gen/proto/go/status/v1"
3839
"github.com/grafana/pyroscope/pkg/adhocprofiles"
@@ -321,7 +322,7 @@ func (f *Pyroscope) initDistributor() (services.Service, error) {
321322
if err != nil {
322323
return nil, err
323324
}
324-
f.API.RegisterDistributor(d, f.Overrides, f.Cfg.MultitenancyEnabled)
325+
f.API.RegisterDistributor(d, f.Overrides, f.Cfg.MultitenancyEnabled, f.Cfg.Server)
325326
return d, nil
326327
}
327328

0 commit comments

Comments
 (0)