Skip to content

Commit 2383667

Browse files
authored
Stop Disk cleaner when storage bucket is not defined (#3135)
* Stop Bucket cleaner when storage bucket is not defined Signed-off-by: Drumil Patel <[email protected]> * Delete storage bucket when high disk utilisation Signed-off-by: Drumil Patel <[email protected]> * Create new abstraction for getting uploaded block Signed-off-by: Drumil Patel <[email protected]> * Fix retention Signed-off-by: Drumil Patel <[email protected]> * Update blockdeletable check for both cleanuphighDiskUtilisation as well as deleteUploadedBlock Signed-off-by: Drumil Patel <[email protected]> * Fix Test for Retention Signed-off-by: Drumil Patel <[email protected]> * Fix Test for Retention Signed-off-by: Drumil Patel <[email protected]> * Fix Pkg Ingester Signed-off-by: Drumil Patel <[email protected]> --------- Signed-off-by: Drumil Patel <[email protected]>
1 parent cf9b77d commit 2383667

File tree

2 files changed

+42
-21
lines changed

2 files changed

+42
-21
lines changed

pkg/ingester/retention.go

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func (dc *diskCleaner) DeleteUploadedBlocks(ctx context.Context) int {
153153
}
154154

155155
for _, block := range blocks {
156-
if !dc.isBlockDeletable(block) {
156+
if !block.Uploaded || !dc.isExpired(block) {
157157
continue
158158
}
159159

@@ -233,7 +233,7 @@ func (dc *diskCleaner) CleanupBlocksWhenHighDiskUtilization(ctx context.Context)
233233
prevVolumeStats := &diskutil.VolumeStats{}
234234
filesDeleted := 0
235235
for _, block := range blocks {
236-
if !dc.isBlockDeletable(block) {
236+
if !dc.isExpired(block) {
237237
continue
238238
}
239239

@@ -289,12 +289,12 @@ func (dc *diskCleaner) CleanupBlocksWhenHighDiskUtilization(ctx context.Context)
289289
}
290290

291291
// isBlockDeletable returns true if this block can be deleted.
292-
func (dc *diskCleaner) isBlockDeletable(block *tenantBlock) bool {
292+
func (dc *diskCleaner) isExpired(block *tenantBlock) bool {
293293
// TODO(kolesnikovae):
294294
// Expiry defaults to -querier.query-store-after which should be deprecated,
295295
// blocks-storage.bucket-store.ignore-blocks-within can be used instead.
296296
expiryTs := time.Now().Add(-dc.policy.Expiry)
297-
return block.Uploaded && ulid.Time(block.ID.Time()).Before(expiryTs)
297+
return ulid.Time(block.ID.Time()).Before(expiryTs)
298298
}
299299

300300
// blocksByUploadAndAge implements sorting tenantBlock by uploaded then by age
@@ -359,6 +359,32 @@ type realFSBlockManager struct {
359359
FS fileSystem
360360
}
361361

362+
func (bm *realFSBlockManager) getUploadedBlockIds(tenantID string) (map[ulid.ULID]struct{}, error) {
363+
localDirPath := filepath.Join(bm.Root, tenantID, phlareDBLocalPath)
364+
365+
shipperPath := filepath.Join(localDirPath, shipper.MetaFilename)
366+
bytes, err := fs.ReadFile(bm.FS, shipperPath)
367+
if err != nil {
368+
if os.IsNotExist(err) {
369+
return make(map[ulid.ULID]struct{}), nil
370+
}
371+
return nil, err
372+
}
373+
374+
var meta shipper.Meta
375+
err = json.Unmarshal(bytes, &meta)
376+
if err != nil {
377+
return nil, err
378+
}
379+
380+
uploadedBlockIDs := make(map[ulid.ULID]struct{}, len(meta.Uploaded))
381+
for _, id := range meta.Uploaded {
382+
uploadedBlockIDs[id] = struct{}{}
383+
}
384+
385+
return uploadedBlockIDs, nil
386+
}
387+
362388
func (bm *realFSBlockManager) GetTenantIDs(ctx context.Context) ([]string, error) {
363389
if ctx.Err() != nil {
364390
return nil, ctx.Err()
@@ -391,23 +417,11 @@ func (bm *realFSBlockManager) GetBlocksForTenant(ctx context.Context, tenantID s
391417
return nil, err
392418
}
393419

394-
shipperPath := filepath.Join(localDirPath, shipper.MetaFilename)
395-
bytes, err := fs.ReadFile(bm.FS, shipperPath)
420+
uploadedBlockIDs, err := bm.getUploadedBlockIds(tenantID)
396421
if err != nil {
397422
return nil, err
398423
}
399424

400-
var meta shipper.Meta
401-
err = json.Unmarshal(bytes, &meta)
402-
if err != nil {
403-
return nil, err
404-
}
405-
406-
uploadedBlockIDs := make(map[ulid.ULID]struct{}, len(meta.Uploaded))
407-
for _, id := range meta.Uploaded {
408-
uploadedBlockIDs[id] = struct{}{}
409-
}
410-
411425
// Read blocks.
412426
blocks := make([]*tenantBlock, 0)
413427
for _, blockDir := range blockDirs {

pkg/ingester/retention_test.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,14 @@ func TestDiskCleaner_EnforceHighDiskUtilization(t *testing.T) {
252252
BytesAvailable: 100,
253253
BytesTotal: 200,
254254
}, nil).
255-
Once() // Expect the loop to break after a single block delete (since the subsequent blocks aren't uploaded).
255+
Once()
256+
vc.On("HasHighDiskUtilization", mock.Anything).
257+
Return(&diskutil.VolumeStats{
258+
HighDiskUtilization: false,
259+
BytesAvailable: 100,
260+
BytesTotal: 200,
261+
}, nil).
262+
Once()
256263

257264
dc := newDiskCleaner(log.NewNopLogger(), e, defaultRetentionPolicy(), phlaredb.Config{
258265
DataPath: "./data",
@@ -261,7 +268,7 @@ func TestDiskCleaner_EnforceHighDiskUtilization(t *testing.T) {
261268
dc.volumeChecker = vc
262269

263270
deleted, bytesFreed, hadHighDisk := dc.CleanupBlocksWhenHighDiskUtilization(context.Background())
264-
require.Equal(t, 1, deleted)
271+
require.Equal(t, 2, deleted)
265272
require.Equal(t, 100, bytesFreed)
266273
require.True(t, hadHighDisk)
267274
})
@@ -316,7 +323,7 @@ func TestDiskCleaner_EnforceHighDiskUtilization(t *testing.T) {
316323
})
317324
}
318325

319-
func TestDiskCleaner_isBlockDeletable(t *testing.T) {
326+
func TestDiskCleaner_isBlockDeletableForUploadedBlocks(t *testing.T) {
320327
tests := []struct {
321328
Name string
322329
Expiry time.Duration
@@ -369,7 +376,7 @@ func TestDiskCleaner_isBlockDeletable(t *testing.T) {
369376
t.Run(tt.Name, func(t *testing.T) {
370377
dc.policy.Expiry = tt.Expiry
371378

372-
got := dc.isBlockDeletable(tt.Block)
379+
got := tt.Block.Uploaded && dc.isExpired(tt.Block)
373380
require.Equal(t, tt.Want, got)
374381
})
375382
}

0 commit comments

Comments
 (0)