Skip to content

Commit e3a643b

Browse files
authored
integration: Check distributor health before push (#3167)
* integration: Check distributor health before push I observed a few flake, where the distributors had not discovered all ingesters. Added a distributor health check, so we can be sure all is setup. This also changes the listen address to loopback, so it won't display the Mac OS firewall warning everytime. * what a stupid linter ``` pkg/test/integration/cluster/cluster.go:365:53: `(*gatherCheck).addExpectValue` - `metricName` always receives `"pyroscope_ring_members"` (unparam) func (c *gatherCheck) addExpectValue(value float64, metricName string, labelPairs ...string) *gatherCheck { ``` * go.mod
1 parent 6e2d580 commit e3a643b

File tree

2 files changed

+139
-39
lines changed

2 files changed

+139
-39
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ require (
5454
github.com/parquet-go/parquet-go v0.18.1-0.20231004061202-cde8189c4c26
5555
github.com/pkg/errors v0.9.1
5656
github.com/prometheus/client_golang v1.17.0
57+
github.com/prometheus/client_model v0.5.0
5758
github.com/prometheus/common v0.45.0
5859
github.com/prometheus/prometheus v1.99.0
5960
github.com/samber/lo v1.38.1
@@ -189,7 +190,6 @@ require (
189190
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
190191
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
191192
github.com/prometheus/alertmanager v0.26.0 // indirect
192-
github.com/prometheus/client_model v0.5.0 // indirect
193193
github.com/prometheus/common/sigv4 v0.1.0 // indirect
194194
github.com/prometheus/exporter-toolkit v0.10.1-0.20230714054209-2f4150c63f97 // indirect
195195
github.com/prometheus/procfs v0.11.1 // indirect

pkg/test/integration/cluster/cluster.go

Lines changed: 138 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,23 @@ package cluster
22

33
import (
44
"context"
5+
"errors"
56
"flag"
67
"fmt"
78
"io"
89
"log"
10+
"math"
911
"math/rand"
1012
"net"
1113
"net/http"
1214
"os"
1315
"path/filepath"
16+
"strings"
1417
"sync"
1518
"time"
1619

1720
"github.com/prometheus/client_golang/prometheus"
21+
pm "github.com/prometheus/client_model/go"
1822
"golang.org/x/sync/errgroup"
1923

2024
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
@@ -116,6 +120,19 @@ func nodeNameFlags(nodeName string) []string {
116120
}
117121
}
118122

123+
func listenAddrFlags(listenAddr string) []string {
124+
return []string{
125+
"-compactor.ring.instance-addr=" + listenAddr,
126+
"-distributor.ring.instance-addr=" + listenAddr,
127+
"-ingester.lifecycler.addr=" + listenAddr,
128+
"-memberlist.advertise-addr=" + listenAddr,
129+
"-overrides-exporter.ring.instance-addr=" + listenAddr,
130+
"-query-frontend.instance-addr=" + listenAddr,
131+
"-query-scheduler.ring.instance-addr=" + listenAddr,
132+
"-store-gateway.sharding-ring.instance-addr=" + listenAddr,
133+
}
134+
}
135+
119136
func (c *Cluster) pickHealthyComponent(targets ...string) (addr string, err error) {
120137
results := make([][]string, len(targets))
121138

@@ -150,7 +167,7 @@ func (c *Cluster) Prepare() (err error) {
150167

151168
// allocate two tcp ports per component
152169
portsPerComponent := 3
153-
listenAddr := "0.0.0.0"
170+
listenAddr := "127.0.0.1"
154171
ports, err := getFreeTCPPorts(listenAddr, len(c.Components)*portsPerComponent)
155172
if err != nil {
156173
return err
@@ -184,6 +201,8 @@ func (c *Cluster) Prepare() (err error) {
184201

185202
comp.flags = append(
186203
nodeNameFlags(comp.nodeName()),
204+
listenAddrFlags("127.0.0.1")...)
205+
comp.flags = append(comp.flags,
187206
[]string{
188207
"-tracing.enabled=false", // data race
189208
"-distributor.replication-factor=3",
@@ -237,7 +256,11 @@ func (c *Cluster) Start(ctx context.Context) (err error) {
237256

238257
notReady := make(map[*Component]error)
239258

259+
countPerTarget := map[string]int{}
260+
240261
for _, comp := range c.Components {
262+
countPerTarget[comp.Target]++
263+
241264
p, err := comp.start(ctx)
242265
if err != nil {
243266
return err
@@ -273,7 +296,14 @@ func (c *Cluster) Start(ctx context.Context) (err error) {
273296
return err
274297
}
275298

276-
return t.querierReadyCheck(ctx, 3, 3)
299+
return t.querierReadyCheck(ctx, countPerTarget["ingester"], countPerTarget["store-gateway"])
300+
}
301+
if t.Target == "distributor" {
302+
if err := t.httpReadyCheck(ctx); err != nil {
303+
return err
304+
}
305+
306+
return t.distributorReadyCheck(ctx, countPerTarget["ingester"], countPerTarget["distributor"])
277307
}
278308

279309
return t.httpReadyCheck(ctx)
@@ -327,59 +357,129 @@ type Component struct {
327357
reg *prometheus.Registry
328358
}
329359

330-
func (comp *Component) querierReadyCheck(ctx context.Context, expectedIngesters, expectedStoreGateways int) error {
331-
metrics, err := comp.reg.Gather()
360+
type gatherCheck struct {
361+
g prometheus.Gatherer
362+
conditions []gatherCoditions
363+
}
364+
365+
//nolint:unparam
366+
func (c *gatherCheck) addExpectValue(value float64, metricName string, labelPairs ...string) *gatherCheck {
367+
c.conditions = append(c.conditions, gatherCoditions{
368+
metricName: metricName,
369+
labelPairs: labelPairs,
370+
expectedValue: value,
371+
})
372+
return c
373+
}
374+
375+
type gatherCoditions struct {
376+
metricName string
377+
labelPairs []string
378+
expectedValue float64
379+
}
380+
381+
func (c *gatherCoditions) String() string {
382+
b := strings.Builder{}
383+
b.WriteString(c.metricName)
384+
b.WriteRune('{')
385+
for i := 0; i < len(c.labelPairs); i += 2 {
386+
b.WriteString(c.labelPairs[i])
387+
b.WriteRune('=')
388+
b.WriteString(c.labelPairs[i+1])
389+
b.WriteRune(',')
390+
}
391+
s := b.String()
392+
return s[:len(s)-1] + "}"
393+
}
394+
395+
func (c *gatherCoditions) matches(pairs []*pm.LabelPair) bool {
396+
outer:
397+
for i := 0; i < len(c.labelPairs); i += 2 {
398+
for _, l := range pairs {
399+
if l.GetName() != c.labelPairs[i] {
400+
continue
401+
}
402+
if l.GetValue() == c.labelPairs[i+1] {
403+
continue outer // match move to next pair
404+
}
405+
return false // value wrong
406+
}
407+
return false // label not found
408+
}
409+
return true
410+
}
411+
412+
func (comp *Component) checkMetrics() *gatherCheck {
413+
return &gatherCheck{
414+
g: comp.reg,
415+
}
416+
}
417+
418+
func (g *gatherCheck) run(ctx context.Context) error {
419+
actualValues := make([]float64, len(g.conditions))
420+
421+
// maps from metric name to condition index
422+
nameMap := make(map[string][]int)
423+
for idx, c := range g.conditions {
424+
// not a number
425+
actualValues[idx] = math.NaN()
426+
nameMap[c.metricName] = append(nameMap[c.metricName], idx)
427+
}
428+
429+
// now gather actual metrics
430+
metrics, err := g.g.Gather()
332431
if err != nil {
333432
return err
334433
}
335434

336-
activeIngesters := 0
337-
activeStoreGateways := 0
338-
339435
for _, m := range metrics {
340436
if ctx.Err() != nil {
341437
return ctx.Err()
342438
}
343439

344-
if m.GetName() == "pyroscope_ring_members" {
345-
for _, sm := range m.GetMetric() {
346-
foundIngester := false
347-
foundStoreGateway := false
348-
foundActive := false
349-
for _, l := range sm.GetLabel() {
350-
if l.GetName() == "name" && l.GetValue() == "ingester" {
351-
foundIngester = true
352-
}
353-
if l.GetName() == "name" && l.GetValue() == "store-gateway-client" {
354-
foundStoreGateway = true
355-
}
356-
if l.GetName() == "state" && l.GetValue() == "ACTIVE" {
357-
foundActive = true
358-
}
359-
}
360-
if foundIngester && foundActive {
361-
if v := sm.GetGauge().GetValue(); v > 0 {
362-
activeIngesters = int(v)
363-
}
364-
}
365-
if foundStoreGateway && foundActive {
366-
if v := sm.GetGauge().GetValue(); v > 0 {
367-
activeStoreGateways = int(v)
368-
}
440+
conditions, ok := nameMap[m.GetName()]
441+
if !ok {
442+
continue
443+
}
444+
445+
// now iterate over all label pairs
446+
for _, sm := range m.GetMetric() {
447+
// check for each condition if it matches with he labels
448+
for _, condIdx := range conditions {
449+
if g.conditions[condIdx].matches(sm.Label) {
450+
actualValues[condIdx] = sm.GetGauge().GetValue() // TODO: handle other types
369451
}
370452
}
371453
}
372454
}
373455

374-
if activeIngesters != expectedIngesters {
375-
return fmt.Errorf("expected %d active ingesters, got %d", expectedIngesters, activeIngesters)
376-
}
377-
if activeStoreGateways != expectedStoreGateways {
378-
return fmt.Errorf("expected %d active store gateways, got %d", expectedStoreGateways, activeStoreGateways)
456+
errs := make([]error, len(actualValues))
457+
for idx, actual := range actualValues {
458+
cond := g.conditions[idx]
459+
if math.IsNaN(actual) {
460+
errs[idx] = fmt.Errorf("metric for %s not found", cond.String())
461+
continue
462+
}
463+
if actual != cond.expectedValue {
464+
errs[idx] = fmt.Errorf("unexpected value for %s: expected %f, got %f", cond.String(), cond.expectedValue, actual)
465+
}
379466
}
380467

381-
return nil
468+
return errors.Join(errs...)
469+
}
470+
471+
func (comp *Component) querierReadyCheck(ctx context.Context, expectedIngesters, expectedStoreGateways int) (err error) {
472+
check := comp.checkMetrics().
473+
addExpectValue(float64(expectedIngesters), "pyroscope_ring_members", "name", "ingester", "state", "ACTIVE").
474+
addExpectValue(float64(expectedStoreGateways), "pyroscope_ring_members", "name", "store-gateway-client", "state", "ACTIVE")
475+
return check.run(ctx)
476+
}
382477

478+
func (comp *Component) distributorReadyCheck(ctx context.Context, expectedIngesters, expectedDistributors int) (err error) {
479+
check := comp.checkMetrics().
480+
addExpectValue(float64(expectedIngesters), "pyroscope_ring_members", "name", "ingester", "state", "ACTIVE").
481+
addExpectValue(float64(expectedDistributors), "pyroscope_ring_members", "name", "distributor", "state", "ACTIVE")
482+
return check.run(ctx)
383483
}
384484

385485
func (comp *Component) httpReadyCheck(ctx context.Context) error {

0 commit comments

Comments
 (0)