Skip to content

Commit 4aa3bfc

Browse files
feat(v2): implement ReadIndex for linearizable reads (#3619)
* feat(v2): implement ReadIndex for linearizable reads * chore: improve error handling and remove MinReadyDuration * revert min ready check --------- Co-authored-by: Anton Kolesnikov <[email protected]>
1 parent 88ac084 commit 4aa3bfc

File tree

5 files changed

+78
-140
lines changed

5 files changed

+78
-140
lines changed

pkg/experiment/metastore/metastore.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ import (
44
"context"
55
"flag"
66
"fmt"
7-
87
"net"
98
"os"
109
"path/filepath"
1110
"strings"
11+
"sync"
1212
"time"
1313

1414
"github.com/go-kit/log"
@@ -40,8 +40,6 @@ const (
4040
raftTrailingLogs = 18 << 10
4141
raftSnapshotInterval = 180 * time.Second
4242
raftSnapshotThreshold = 8 << 10
43-
44-
metastoreRaftLeaderHealthServiceName = "metastore.v1.MetastoreService.RaftLeader"
4543
)
4644

4745
type Config struct {
@@ -135,8 +133,10 @@ type Metastore struct {
135133

136134
walDir string
137135

138-
metrics *metastoreMetrics
139-
client *metastoreclient.Client
136+
metrics *metastoreMetrics
137+
client *metastoreclient.Client
138+
139+
readyOnce sync.Once
140140
readySince time.Time
141141

142142
dnsProvider *dns.Provider
@@ -174,7 +174,7 @@ func (m *Metastore) Shutdown() error {
174174
return nil
175175
}
176176

177-
func (m *Metastore) starting(ctx context.Context) error {
177+
func (m *Metastore) starting(context.Context) error {
178178
if err := m.db.open(false); err != nil {
179179
return fmt.Errorf("failed to initialize database: %w", err)
180180
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package metastore
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
9+
)
10+
11+
// ReadIndex returns the current commit index and verifies leadership.
12+
func (m *Metastore) ReadIndex(context.Context, *metastorev1.ReadIndexRequest) (*metastorev1.ReadIndexResponse, error) {
13+
commitIndex := m.raft.CommitIndex()
14+
if err := m.raft.VerifyLeader().Error(); err != nil {
15+
return nil, wrapRetryableErrorWithRaftDetails(err, m.raft)
16+
}
17+
return &metastorev1.ReadIndexResponse{ReadIndex: commitIndex}, nil
18+
}
19+
20+
// waitLeaderCommitIndexAppliedLocally ensures the node is up-to-date for read operations,
21+
// providing linearizable read semantics. It calls metastore client ReadIndex
22+
// and waits for the local applied index to catch up to the returned read index.
23+
// This method should be used before performing local reads to ensure consistency.
24+
func (m *Metastore) waitLeaderCommitIndexAppliedLocally(ctx context.Context) error {
25+
r, err := m.client.ReadIndex(ctx, &metastorev1.ReadIndexRequest{})
26+
if err != nil {
27+
return err
28+
}
29+
if m.raft.AppliedIndex() >= r.ReadIndex {
30+
return nil
31+
}
32+
33+
t := time.NewTicker(10 * time.Millisecond)
34+
defer t.Stop()
35+
36+
// Wait for the read index to be applied
37+
for {
38+
select {
39+
case <-t.C:
40+
if m.raft.AppliedIndex() >= r.ReadIndex {
41+
return nil
42+
}
43+
case <-ctx.Done():
44+
return ctx.Err()
45+
}
46+
}
47+
}
48+
49+
// CheckReady verifies if the metastore is ready to serve requests by
50+
// ensuring the node is up-to-date with the leader's commit index.
51+
func (m *Metastore) CheckReady(ctx context.Context) error {
52+
if err := m.waitLeaderCommitIndexAppliedLocally(ctx); err != nil {
53+
return err
54+
}
55+
m.readyOnce.Do(func() {
56+
m.readySince = time.Now()
57+
})
58+
if w := m.config.MinReadyDuration - time.Since(m.readySince); w > 0 {
59+
return fmt.Errorf("%v before reporting readiness", w)
60+
}
61+
return nil
62+
}

pkg/experiment/metastore/metastore_readindex.go

Lines changed: 0 additions & 132 deletions
This file was deleted.

pkg/experiment/metastore/metastore_state_get_profile_stats.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"math"
66
"sync"
77

8+
"github.com/go-kit/log/level"
9+
810
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
911
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
1012
"github.com/grafana/pyroscope/pkg/experiment/metastore/index"
@@ -14,7 +16,10 @@ func (m *Metastore) GetProfileStats(
1416
ctx context.Context,
1517
r *metastorev1.GetProfileStatsRequest,
1618
) (*typesv1.GetProfileStatsResponse, error) {
17-
// TODO(kolesnikovae): ReadIndex
19+
if err := m.waitLeaderCommitIndexAppliedLocally(ctx); err != nil {
20+
level.Error(m.logger).Log("msg", "failed to wait for leader commit index", "err", err)
21+
return nil, err
22+
}
1823
return m.state.getProfileStats(r.TenantId, ctx)
1924
}
2025

pkg/experiment/metastore/metastore_state_query_metadata.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ func (m *Metastore) QueryMetadata(
2020
ctx context.Context,
2121
request *metastorev1.QueryMetadataRequest,
2222
) (*metastorev1.QueryMetadataResponse, error) {
23-
// TODO(kolesnikovae): ReadIndex
23+
if err := m.waitLeaderCommitIndexAppliedLocally(ctx); err != nil {
24+
level.Error(m.logger).Log("msg", "failed to wait for leader commit index", "err", err)
25+
return nil, err
26+
}
2427
return m.state.listBlocksForQuery(ctx, request)
2528
}
2629

0 commit comments

Comments
 (0)