Skip to content

Commit 255fce2

Browse files
fix: Fix handling of duplicate delete requests (#18460)
1 parent 1e9d2f5 commit 255fce2

File tree

7 files changed

+301
-14
lines changed

7 files changed

+301
-14
lines changed

pkg/compactor/deletion/delete_request_batch.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,15 @@ func (b *deleteRequestBatch) addDeleteRequest(dr *DeleteRequest) {
7070
b.count++
7171
}
7272

73-
func (b *deleteRequestBatch) checkDuplicate(deleteRequest DeleteRequest) error {
73+
func (b *deleteRequestBatch) checkDuplicate(deleteRequest DeleteRequest) (bool, error) {
7474
ur, ok := b.deleteRequestsToProcess[deleteRequest.UserID]
7575
if !ok {
76-
return nil
76+
return false, nil
7777
}
7878
for _, requestLoadedForProcessing := range ur.requests {
7979
isDuplicate, err := requestLoadedForProcessing.IsDuplicate(&deleteRequest)
8080
if err != nil {
81-
return err
81+
return false, err
8282
}
8383
if isDuplicate {
8484
level.Info(util_log.Logger).Log(
@@ -88,10 +88,11 @@ func (b *deleteRequestBatch) checkDuplicate(deleteRequest DeleteRequest) error {
8888
"user", deleteRequest.UserID,
8989
)
9090
b.duplicateRequests = append(b.duplicateRequests, deleteRequest)
91+
return true, nil
9192
}
9293
}
9394

94-
return nil
95+
return false, nil
9596
}
9697

9798
func (b *deleteRequestBatch) expired(userID []byte, chk retention.Chunk, lbls labels.Labels, skipRequest func(*DeleteRequest) bool) (bool, filter.Func) {
@@ -163,3 +164,14 @@ func (b *deleteRequestBatch) getAllRequestsForUser(userID string) []*DeleteReque
163164

164165
return userRequests.requests
165166
}
167+
168+
func (b *deleteRequestBatch) getAllRequests() []*DeleteRequest {
169+
requests := make([]*DeleteRequest, 0, b.count)
170+
for _, ur := range b.deleteRequestsToProcess {
171+
for _, request := range ur.requests {
172+
requests = append(requests, request)
173+
}
174+
}
175+
176+
return requests
177+
}

pkg/compactor/deletion/delete_requests_db_sqlite.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ const (
3030
)
3131

3232
type sqlQuery struct {
33-
query string
34-
execOpts *sqlitex.ExecOptions
33+
query string
34+
execOpts *sqlitex.ExecOptions
35+
postUpdateExecCallback func(numChanges int) error
3536
}
3637

3738
type sqliteDB struct {
@@ -141,6 +142,11 @@ func (s *sqliteDB) Exec(ctx context.Context, updatesData bool, queries ...sqlQue
141142
if err := sqlitex.Execute(conn, query.query, query.execOpts); err != nil {
142143
return err
143144
}
145+
if updatesData && query.postUpdateExecCallback != nil {
146+
if err := query.postUpdateExecCallback(conn.Changes()); err != nil {
147+
return err
148+
}
149+
}
144150
}
145151

146152
if updatesData {

pkg/compactor/deletion/delete_requests_manager.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,9 +353,13 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess(kind DeleteRequestsK
353353
continue
354354
}
355355
}
356-
if err := batch.checkDuplicate(deleteRequest); err != nil {
356+
isDuplicate, err := batch.checkDuplicate(deleteRequest)
357+
if err != nil {
357358
return nil, err
358359
}
360+
if isDuplicate {
361+
continue
362+
}
359363
if reqCount >= d.batchSize {
360364
logBatchTruncation(reqCount, len(deleteRequests))
361365
break

0 commit comments

Comments
 (0)