Skip to content
53 changes: 43 additions & 10 deletions pkg/metastore/compaction_raft_handler.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package metastore

import (
"context"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/hashicorp/raft"
"github.com/opentracing/opentracing-go/ext"
"go.etcd.io/bbolt"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1/raft_log"
"github.com/grafana/pyroscope/pkg/metastore/compaction"
"github.com/grafana/pyroscope/pkg/metastore/tracing"
)

type IndexReplacer interface {
Expand Down Expand Up @@ -43,8 +47,20 @@ func NewCompactionCommandHandler(
}

func (h *CompactionCommandHandler) GetCompactionPlanUpdate(
tx *bbolt.Tx, cmd *raft.Log, req *raft_log.GetCompactionPlanUpdateRequest,
) (*raft_log.GetCompactionPlanUpdateResponse, error) {
ctx context.Context, tx *bbolt.Tx, cmd *raft.Log, req *raft_log.GetCompactionPlanUpdateRequest,
) (resp *raft_log.GetCompactionPlanUpdateResponse, err error) {
span, _ := tracing.StartSpanFromContext(ctx, "raft.GetCompactionPlanUpdate")
span.SetTag("status_updates", len(req.StatusUpdates))
span.SetTag("assign_jobs_max", req.AssignJobsMax)
span.SetTag("raft_log_index", cmd.Index)
span.SetTag("raft_log_term", cmd.Term)
defer func() {
if err != nil {
ext.LogError(span, err)
}
span.Finish()
}()

// We need to generate a plan of the update caused by the new status
// report from the worker. The plan will be used to update the schedule
// after the Raft consensus is reached.
Expand Down Expand Up @@ -132,12 +148,26 @@ func (h *CompactionCommandHandler) GetCompactionPlanUpdate(
})
}

span.SetTag("assigned_jobs", len(p.AssignedJobs))
span.SetTag("new_jobs", len(p.NewJobs))
span.SetTag("evicted_jobs", len(p.EvictedJobs))
return &raft_log.GetCompactionPlanUpdateResponse{Term: cmd.Term, PlanUpdate: p}, nil
}

func (h *CompactionCommandHandler) UpdateCompactionPlan(
tx *bbolt.Tx, cmd *raft.Log, req *raft_log.UpdateCompactionPlanRequest,
) (*raft_log.UpdateCompactionPlanResponse, error) {
ctx context.Context, tx *bbolt.Tx, cmd *raft.Log, req *raft_log.UpdateCompactionPlanRequest,
) (resp *raft_log.UpdateCompactionPlanResponse, err error) {
span, _ := tracing.StartSpanFromContext(ctx, "raft.UpdateCompactionPlan")
span.SetTag("raft_log_index", cmd.Index)
span.SetTag("raft_log_term", cmd.Term)
span.SetTag("request_term", req.Term)
defer func() {
if err != nil {
ext.LogError(span, err)
}
span.Finish()
}()

if req.Term != cmd.Term || req.GetPlanUpdate() == nil {
level.Warn(h.logger).Log(
"msg", "rejecting compaction plan update; term mismatch: leader has changed",
Expand All @@ -147,18 +177,18 @@ func (h *CompactionCommandHandler) UpdateCompactionPlan(
return new(raft_log.UpdateCompactionPlanResponse), nil
}

if err := h.planner.UpdatePlan(tx, req.PlanUpdate); err != nil {
if err = h.planner.UpdatePlan(tx, req.PlanUpdate); err != nil {
level.Error(h.logger).Log("msg", "failed to update compaction planner", "err", err)
return nil, err
}

if err := h.scheduler.UpdateSchedule(tx, req.PlanUpdate); err != nil {
if err = h.scheduler.UpdateSchedule(tx, req.PlanUpdate); err != nil {
level.Error(h.logger).Log("msg", "failed to update compaction schedule", "err", err)
return nil, err
}

for _, job := range req.PlanUpdate.NewJobs {
if err := h.tombstones.DeleteTombstones(tx, cmd, job.Plan.Tombstones...); err != nil {
if err = h.tombstones.DeleteTombstones(tx, cmd, job.Plan.Tombstones...); err != nil {
level.Error(h.logger).Log("msg", "failed to delete tombstones", "err", err)
return nil, err
}
Expand All @@ -170,22 +200,25 @@ func (h *CompactionCommandHandler) UpdateCompactionPlan(
level.Warn(h.logger).Log("msg", "compacted blocks are missing; skipping", "job", job.State.Name)
continue
}
if err := h.tombstones.AddTombstones(tx, cmd, blockTombstonesForCompletedJob(job)); err != nil {
if err = h.tombstones.AddTombstones(tx, cmd, blockTombstonesForCompletedJob(job)); err != nil {
level.Error(h.logger).Log("msg", "failed to add tombstones", "err", err)
return nil, err
}
for _, block := range compacted.NewBlocks {
if err := h.compactor.Compact(tx, compaction.NewBlockEntry(cmd, block)); err != nil {
if err = h.compactor.Compact(tx, compaction.NewBlockEntry(cmd, block)); err != nil {
level.Error(h.logger).Log("msg", "failed to compact block", "err", err)
return nil, err
}
}
if err := h.index.ReplaceBlocks(tx, compacted); err != nil {
if err = h.index.ReplaceBlocks(tx, compacted); err != nil {
level.Error(h.logger).Log("msg", "failed to replace blocks", "err", err)
return nil, err
}
}

span.SetTag("new_jobs", len(req.PlanUpdate.NewJobs))
span.SetTag("completed_jobs", len(req.PlanUpdate.CompletedJobs))
span.SetTag("updated_jobs", len(req.PlanUpdate.UpdatedJobs))
return &raft_log.UpdateCompactionPlanResponse{PlanUpdate: req.PlanUpdate}, nil
}

Expand Down
28 changes: 22 additions & 6 deletions pkg/metastore/compaction_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -34,9 +36,20 @@ func NewCompactionService(
}

func (svc *CompactionService) PollCompactionJobs(
_ context.Context,
ctx context.Context,
req *metastorev1.PollCompactionJobsRequest,
) (*metastorev1.PollCompactionJobsResponse, error) {
) (resp *metastorev1.PollCompactionJobsResponse, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "CompactionService.PollCompactionJobs")
defer func() {
if err != nil {
ext.LogError(span, err)
}
span.Finish()
}()

span.SetTag("status_updates", len(req.GetStatusUpdates()))
span.SetTag("job_capacity", req.GetJobCapacity())

// This is a two-step process. To commit changes to the compaction plan,
// we need to ensure that all replicas apply exactly the same changes.
// Instead of relying on identical behavior across replicas and a
Expand Down Expand Up @@ -77,14 +90,14 @@ func (svc *CompactionService) PollCompactionJobs(
}

cmd := fsm.RaftLogEntryType(raft_log.RaftCommand_RAFT_COMMAND_GET_COMPACTION_PLAN_UPDATE)
resp, err := svc.raft.Propose(cmd, req)
proposeResp, err := svc.raft.Propose(ctx, cmd, req)
if err != nil {
if !raftnode.IsRaftLeadershipError(err) {
level.Error(svc.logger).Log("msg", "failed to prepare compaction plan", "err", err)
}
return nil, err
}
prepared := resp.(*raft_log.GetCompactionPlanUpdateResponse)
prepared := proposeResp.(*raft_log.GetCompactionPlanUpdateResponse)
planUpdate := prepared.GetPlanUpdate()

// Copy plan updates to the worker response. The job plan is only sent for
Expand Down Expand Up @@ -143,19 +156,22 @@ func (svc *CompactionService) PollCompactionJobs(
// scenario, and we don't want to stop the node/cluster). Instead, an
// empty response would indicate that the plan is rejected.
proposal := &raft_log.UpdateCompactionPlanRequest{Term: prepared.Term, PlanUpdate: planUpdate}
if resp, err = svc.raft.Propose(cmd, proposal); err != nil {
if proposeResp, err = svc.raft.Propose(ctx, cmd, proposal); err != nil {
if !raftnode.IsRaftLeadershipError(err) {
level.Error(svc.logger).Log("msg", "failed to update compaction plan", "err", err)
}
return nil, err
}
accepted := resp.(*raft_log.UpdateCompactionPlanResponse).GetPlanUpdate()
accepted := proposeResp.(*raft_log.UpdateCompactionPlanResponse).GetPlanUpdate()
if accepted == nil {
level.Warn(svc.logger).Log("msg", "compaction plan update rejected")
return nil, status.Error(codes.FailedPrecondition, "failed to update compaction plan")
}

// As of now, accepted plan always matches the proposed one,
// so our prepared worker response is still valid.

span.SetTag("assigned_jobs", len(workerResp.GetCompactionJobs()))
span.SetTag("assignment_updates", len(workerResp.GetAssignments()))
return workerResp, nil
}
58 changes: 43 additions & 15 deletions pkg/metastore/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,26 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/hashicorp/raft"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/bbolt"
"go.etcd.io/bbolt/errors"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"

"github.com/grafana/pyroscope/pkg/metastore/tracing"
)

type ContextRegistry interface {
Retrieve(id string) (context.Context, bool)
Delete(id string)
Size() int
}

// RaftHandler is a function that processes a Raft command.
// The implementation MUST be idempotent.
type RaftHandler[Req, Resp proto.Message] func(*bbolt.Tx, *raft.Log, Req) (Resp, error)
// The context parameter is used for tracing purposes and is only available on the leader.
type RaftHandler[Req, Resp proto.Message] func(context.Context, *bbolt.Tx, *raft.Log, Req) (Resp, error)

// StateRestorer is called during the FSM initialization
// to restore the state from a snapshot.
Expand Down Expand Up @@ -56,9 +66,10 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {

// FSM implements the raft.FSM interface.
type FSM struct {
logger log.Logger
config Config
metrics *metrics
logger log.Logger
config Config
contextRegistry ContextRegistry
metrics *metrics

mu sync.RWMutex
txns sync.WaitGroup
Expand All @@ -71,14 +82,15 @@ type FSM struct {
appliedIndex uint64
}

type handler func(tx *bbolt.Tx, cmd *raft.Log, raw []byte) (proto.Message, error)
type handler func(ctx context.Context, tx *tracingTx, cmd *raft.Log, raw []byte) (proto.Message, error)

func New(logger log.Logger, reg prometheus.Registerer, config Config) (*FSM, error) {
func New(logger log.Logger, reg prometheus.Registerer, config Config, contextRegistry ContextRegistry) (*FSM, error) {
fsm := FSM{
logger: logger,
config: config,
metrics: newMetrics(reg),
handlers: make(map[RaftLogEntryType]handler),
logger: logger,
config: config,
contextRegistry: contextRegistry,
metrics: newMetrics(reg),
handlers: make(map[RaftLogEntryType]handler),
}
db := newDB(logger, fsm.metrics, config)
if err := db.open(false); err != nil {
Expand All @@ -93,12 +105,12 @@ func (fsm *FSM) RegisterRestorer(r ...StateRestorer) {
}

func RegisterRaftCommandHandler[Req, Resp proto.Message](fsm *FSM, t RaftLogEntryType, handler RaftHandler[Req, Resp]) {
fsm.handlers[t] = func(tx *bbolt.Tx, cmd *raft.Log, raw []byte) (proto.Message, error) {
fsm.handlers[t] = func(ctx context.Context, tx *tracingTx, cmd *raft.Log, raw []byte) (proto.Message, error) {
req, err := unmarshal[Req](raw)
if err != nil {
return nil, err
}
return handler(tx, cmd, req)
return handler(ctx, tx.Tx, cmd, req)
}
}

Expand Down Expand Up @@ -234,6 +246,18 @@ func (fsm *FSM) applyCommand(cmd *raft.Log) any {
if err := e.UnmarshalBinary(cmd.Data); err != nil {
return errResponse(cmd, err)
}

ctx := context.Background()
if ctxID := string(cmd.Extensions); ctxID != "" {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the extension field of the raft Log type, to carry the context identifiers

https://github.com/hashicorp/raft/blob/c4fb904a43d299a2b9ba1000ae18a5b3f60994ab/log.go#L94

var found bool
if ctx, found = fsm.contextRegistry.Retrieve(ctxID); found {
defer fsm.contextRegistry.Delete(ctxID)
}
}

span, ctx := tracing.StartSpanFromContext(ctx, "fsm.applyCommand")
defer span.Finish()

if cmd.Index <= fsm.appliedIndex {
// Skip already applied commands at WAL restore.
// Note that the 0 index is a noop and is never applied to FSM.
Expand All @@ -253,20 +277,24 @@ func (fsm *FSM) applyCommand(cmd *raft.Log) any {

// Apply is never called concurrently with Restore, so we don't need
// to lock the FSM: db.boltdb is guaranteed to be in a consistent state.
tx, err := fsm.db.boltdb.Begin(true)
rawTx, err := fsm.db.boltdb.Begin(true)
if err != nil {
panic(fmt.Sprint("failed to begin write transaction:", err))
}

data, err := handle(tx, cmd, e.Data)
txSpan, ctx := opentracing.StartSpanFromContext(ctx, "boltdb.transaction")
txSpan.SetTag("writable", rawTx.Writable())
tx := newTracingTx(rawTx, txSpan, ctx)

data, err := handle(ctx, tx, cmd, e.Data)
if err != nil {
_ = tx.Rollback()
// NOTE(kolesnikovae): This has to be a hard failure as we assume
// that the in-memory state might have not been rolled back properly.
panic(fmt.Sprint("failed to apply command:", err))
}

if err = fsm.storeAppliedIndex(tx, cmd.Term, cmd.Index); err != nil {
if err = fsm.storeAppliedIndex(tx.Tx, cmd.Term, cmd.Index); err != nil {
panic(fmt.Sprint("failed to store applied index: %w", err))
}

Expand Down
48 changes: 48 additions & 0 deletions pkg/metastore/fsm/tracing_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package fsm

import (
"context"

"github.com/opentracing/opentracing-go"
"go.etcd.io/bbolt"
)

// tracingTx wraps a BoltDB transaction to automatically trace transaction lifecycle.
// It holds a span that encompasses the entire transaction, providing visibility into
// transaction timing without requiring manual instrumentation.
//
// The span should be created by the caller and will be finished when the transaction
// is committed or rolled back.
type tracingTx struct {
*bbolt.Tx
span opentracing.Span
spanCtx context.Context // Context with the span, for child operations
}

// newTracingTx creates a tracing transaction wrapper.
// The span parameter can be nil if no tracing is desired (e.g., on follower nodes).
func newTracingTx(tx *bbolt.Tx, span opentracing.Span, spanCtx context.Context) *tracingTx {
return &tracingTx{
Tx: tx,
span: span,
spanCtx: spanCtx,
}
}

// Commit commits the transaction and finishes the span.
func (t *tracingTx) Commit() error {
if t.span != nil {
defer t.span.Finish()
t.span.LogKV("operation", "commit")
}
return t.Tx.Commit()
}

// Rollback rolls back the transaction and finishes the span.
func (t *tracingTx) Rollback() error {
if t.span != nil {
defer t.span.Finish()
t.span.LogKV("operation", "rollback")
}
return t.Tx.Rollback()
}
Loading