Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/backup/backupsink/file_sst_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (s *FileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachp

empty := true
for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() {
if err := s.pacer.Pace(ctx); err != nil {
if _, err := s.pacer.Pace(ctx); err != nil {
return nil, err
}
if valid, err := iter.Valid(); !valid || err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/compaction_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func compactSpanEntry(
scratch = append(scratch, prefix...)
iter := sstIter.iter
for iter.SeekGE(trimmedStart); ; iter.NextKey() {
if err := pacer.Pace(ctx); err != nil {
if _, err := pacer.Pace(ctx); err != nil {
return err
}
var key storage.MVCCKey
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
// TODO(yevgeniy): rework this function: this function should simply
// return an error, and not rely on "handleError".
// It's hard to reason about this functions correctness otherwise.
_ = s.pacer.Pace(ctx)
_, _ = s.pacer.Pace(ctx)

switch r := req.(type) {
case *rowEvent:
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even
// Request CPU time to use for event consumption, block if this time is
// unavailable. If there is unused CPU time left from the last call to
// Pace, then use that time instead of blocking.
if err := c.pacer.Pace(ctx); err != nil {
if _, err := c.pacer.Pace(ctx); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (b *SSTBatcher) AddMVCCKeyLDR(ctx context.Context, key storage.MVCCKey, val
// Keys must be added in order.
func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error {
// Pace based on admission control before adding the key.
if err := b.pacer.Pace(ctx); err != nil {
if _, err := b.pacer.Pace(ctx); err != nil {
return err
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/bufalloc",
"//pkg/util/buildutil",
Expand Down
42 changes: 21 additions & 21 deletions pkg/kv/kvserver/rangefeed/buffered_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ type bufferedRegistration struct {
outputLoopCancelFn func()
disconnected bool

// catchUpIter is created by replcia under raftMu lock when registration is
// created. It is detached by output loop for processing and closed.
// If output loop was not started and catchUpIter is non-nil at the time
// that disconnect is called, it is closed by disconnect.
catchUpIter *CatchUpIterator
// catchUpSnap is created by a replica under the raftMu lock when a
// registration is created. It is detached by the output loop for
// processing and closed. If the output loop was not started and
// catchUpSnap is non-nil at the time that disconnect is called, it is
// closed by disconnect.
catchUpSnap *CatchUpSnapshot
}

// Number of events that have been written to the buffer but
Expand All @@ -74,7 +75,7 @@ func newBufferedRegistration(
streamCtx context.Context,
span roachpb.Span,
startTS hlc.Timestamp,
catchUpIter *CatchUpIterator,
catchUpSnap *CatchUpSnapshot,
withDiff bool,
withFiltering bool,
withOmitRemote bool,
Expand All @@ -101,7 +102,7 @@ func newBufferedRegistration(
buf: make(chan *sharedEvent, bufferSz),
blockWhenFull: blockWhenFull,
}
br.mu.catchUpIter = catchUpIter
br.mu.catchUpSnap = catchUpSnap
return br
}

Expand Down Expand Up @@ -166,9 +167,9 @@ func (br *bufferedRegistration) Disconnect(pErr *kvpb.Error) {
br.mu.Lock()
defer br.mu.Unlock()
if !br.mu.disconnected {
if br.mu.catchUpIter != nil {
br.mu.catchUpIter.Close()
br.mu.catchUpIter = nil
if br.mu.catchUpSnap != nil {
br.mu.catchUpSnap.Close()
br.mu.catchUpSnap = nil
}
if br.mu.outputLoopCancelFn != nil {
br.mu.outputLoopCancelFn()
Expand Down Expand Up @@ -297,20 +298,19 @@ func (br *bufferedRegistration) drainAllocations(ctx context.Context) {
// This uses the iterator provided when the registration was originally created;
// after the scan completes, the iterator will be closed.
//
// If the registration does not have a catchUpIteratorConstructor, this method
// is a no-op.
// If the registration does not have a catchUpSnap, this method is a no-op.
func (br *bufferedRegistration) maybeRunCatchUpScan(ctx context.Context) error {
catchUpIter := br.detachCatchUpIter()
if catchUpIter == nil {
catchUpSnap := br.detachCatchUpSnap()
if catchUpSnap == nil {
return nil
}
start := crtime.NowMono()
defer func() {
catchUpIter.Close()
catchUpSnap.Close()
br.metrics.RangeFeedCatchUpScanNanos.Inc(start.Elapsed().Nanoseconds())
}()

return catchUpIter.CatchUpScan(ctx, br.stream.SendUnbuffered, br.withDiff, br.withFiltering, br.withOmitRemote, br.bulkDelivery)
return catchUpSnap.CatchUpScan(ctx, br.stream.SendUnbuffered, br.withDiff, br.withFiltering, br.withOmitRemote, br.bulkDelivery)
}

// Wait for this registration to completely process its internal
Expand All @@ -335,13 +335,13 @@ func (br *bufferedRegistration) waitForCaughtUp(ctx context.Context) error {
return errors.Errorf("bufferedRegistration %v failed to empty in time", br.Range())
}

// detachCatchUpIter detaches the catchUpIter that was previously attached.
func (br *bufferedRegistration) detachCatchUpIter() *CatchUpIterator {
// detachCatchUpSnap detaches the catchUpSnap that was previously attached.
func (br *bufferedRegistration) detachCatchUpSnap() *CatchUpSnapshot {
br.mu.Lock()
defer br.mu.Unlock()
catchUpIter := br.mu.catchUpIter
br.mu.catchUpIter = nil
return catchUpIter
catchUpSnap := br.mu.catchUpSnap
br.mu.catchUpSnap = nil
return catchUpSnap
}

var overflowLogEvery = log.Every(5 * time.Second)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/buffered_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestBufferedSenderOnStreamShutdown(t *testing.T) {
}

// Add our stream to the stream manager.
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpSnap */
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
sm.NewStream(streamID, 1 /*rangeID*/))
require.True(t, registered)
Expand Down
Loading
Loading