Skip to content

Commit 3490f8a

Browse files
authored
Merge pull request #155965 from jeffswenson/backport25.2-155809
release-25.2: jobs: clean up corrupted auto SQL stats compaction
2 parents 9509fc0 + 7d3aac6 commit 3490f8a

File tree

3 files changed

+97
-27
lines changed

3 files changed

+97
-27
lines changed

pkg/jobs/registry.go

Lines changed: 64 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,10 +1178,14 @@ func (r *Registry) cleanupOldJobs(ctx context.Context, olderThan time.Time) erro
11781178
var done bool
11791179
var err error
11801180
done, maxID, err = r.cleanupOldJobsPage(ctx, olderThan, maxID, cleanupPageSize)
1181-
if err != nil || done {
1181+
if err != nil {
11821182
return err
11831183
}
1184+
if done {
1185+
break
1186+
}
11841187
}
1188+
return r.CleanupCorruptJobs(ctx)
11851189
}
11861190

11871191
// AbandonedJobInfoRowsCleanupQuery is used by the CLI command
@@ -1318,6 +1322,36 @@ func (r *Registry) cleanupOldJobsPage(
13181322
return !morePages, maxID, nil
13191323
}
13201324

1325+
const findCorruptJobsQuery = `
1326+
SELECT id
1327+
FROM system.jobs
1328+
LEFT JOIN system.job_info ON system.jobs.id = system.job_info.job_id
1329+
WHERE system.job_info.job_id IS NULL AND system.jobs.job_type = 'AUTO SQL STATS COMPACTION'
1330+
`
1331+
1332+
// CleanupCorruptJobs is a temporary cleanup function that deletes corrupt
1333+
// `AUTO SQL STATS COMPACTION` jobs. This function exists to clean up after
1334+
// #155165.
1335+
//
1336+
// TODO(jeffswenson): in a separate PR we should run this as a migration so we
1337+
// can guarantee the issue was cleaned up as of a specific version.
1338+
func (r *Registry) CleanupCorruptJobs(ctx context.Context) error {
1339+
return r.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
1340+
datums, err := txn.QueryBuffered(ctx, "get-corrupt-jobs", txn.KV(), findCorruptJobsQuery)
1341+
if err != nil {
1342+
return errors.Wrap(err, "querying for broken sql activity stats compaction jobs")
1343+
}
1344+
for _, row := range datums {
1345+
id := jobspb.JobID(tree.MustBeDInt(row[0]))
1346+
log.Dev.Errorf(ctx, "resetting broken sql activity stats compaction job %d", id)
1347+
if err := r.deleteJob(ctx, txn, id); err != nil {
1348+
return errors.Wrapf(err, "deleting broken sql activity stats compaction job %d", id)
1349+
}
1350+
}
1351+
return nil
1352+
})
1353+
}
1354+
13211355
// DeleteTerminalJobByID deletes the given job ID if it is in a
13221356
// terminal state. If it is is in a non-terminal state, an error is
13231357
// returned. This API should not be used.
@@ -1334,36 +1368,40 @@ func (r *Registry) DeleteTerminalJobByID(ctx context.Context, id jobspb.JobID) e
13341368
state := State(*row[0].(*tree.DString))
13351369
switch state {
13361370
case StateSucceeded, StateCanceled, StateFailed:
1337-
_, err := txn.Exec(
1338-
ctx, "delete-job", txn.KV(), "DELETE FROM system.jobs WHERE id = $1", id,
1339-
)
1371+
return r.deleteJob(ctx, txn, id)
1372+
default:
1373+
return errors.Newf("job %d has non-terminal state: %q", id, state)
1374+
}
1375+
})
1376+
}
1377+
1378+
func (r *Registry) deleteJob(ctx context.Context, txn isql.Txn, id jobspb.JobID) error {
1379+
_, err := txn.Exec(
1380+
ctx, "delete-job", txn.KV(), "DELETE FROM system.jobs WHERE id = $1", id,
1381+
)
1382+
if err != nil {
1383+
return err
1384+
}
1385+
for i, tbl := range jobMetadataTables {
1386+
if i > 0 {
1387+
v, err := txn.GetSystemSchemaVersion(ctx)
13401388
if err != nil {
13411389
return err
13421390
}
1343-
for i, tbl := range jobMetadataTables {
1344-
if i > 0 {
1345-
v, err := txn.GetSystemSchemaVersion(ctx)
1346-
if err != nil {
1347-
return err
1348-
}
1349-
if v.Less(clusterversion.V25_1_AddJobsTables.Version()) {
1350-
break
1351-
}
1352-
}
1353-
1354-
_, err = txn.Exec(
1355-
ctx, redact.RedactableString("delete-job-"+tbl), txn.KV(),
1356-
"DELETE FROM system."+tbl+" WHERE job_id = $1", id,
1357-
)
1358-
if err != nil {
1359-
return err
1360-
}
1391+
if v.Less(clusterversion.V25_1_AddJobsTables.Version()) {
1392+
break
13611393
}
1362-
return nil
1363-
default:
1364-
return errors.Newf("job %d has non-terminal state: %q", id, state)
13651394
}
1366-
})
1395+
1396+
_, err = txn.Exec(
1397+
ctx, redact.RedactableString("delete-job-"+tbl), txn.KV(),
1398+
"DELETE FROM system."+tbl+" WHERE job_id = $1", id,
1399+
)
1400+
if err != nil {
1401+
return err
1402+
}
1403+
}
1404+
return nil
13671405
}
13681406

13691407
// PauseRequested marks the job with id as paused-requested using the specified txn (may be nil).

pkg/jobs/registry_test.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,39 @@ func TestDeleteTerminalJobByID(t *testing.T) {
864864
require.NoError(t, r.DeleteTerminalJobByID(ctx, j.ID()))
865865
assertValidDeletion(j.ID())
866866
})
867+
}
868+
869+
func TestCleanupCorruptJobs(t *testing.T) {
870+
defer leaktest.AfterTest(t)()
871+
defer log.Scope(t).Close(t)
872+
873+
ctx := context.Background()
874+
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
875+
defer s.Stopper().Stop(ctx)
876+
877+
db := sqlutils.MakeSQLRunner(sqlDB)
878+
r := s.ApplicationLayer().JobRegistry().(*Registry)
867879

880+
// Create a SQL STATS COMPACTION job in running state with proper job_info records.
881+
jobID := r.MakeJobID()
882+
db.Exec(t, `INSERT INTO system.jobs (id, status, created, job_type) VALUES ($1, 'running', now(), $2)`, jobID, jobspb.TypeAutoSQLStatsCompaction.String())
883+
db.Exec(t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, 'legacy_payload', 'payload')`, jobID)
884+
885+
// Job has info records, so CleanupCorruptJobs should do nothing.
886+
require.NoError(t, r.CleanupCorruptJobs(ctx))
887+
888+
var count int
889+
db.QueryRow(t, "SELECT count(*) FROM system.jobs WHERE id = $1", jobID).Scan(&count)
890+
require.Equal(t, 1, count)
891+
892+
// Delete the job_info records to corrupt the running job.
893+
db.Exec(t, "DELETE FROM system.job_info WHERE job_id = $1", jobID)
894+
895+
// Now CleanupCorruptJobs should delete the corrupt job.
896+
require.NoError(t, r.CleanupCorruptJobs(ctx))
897+
898+
db.QueryRow(t, "SELECT count(*) FROM system.jobs WHERE id = $1", jobID).Scan(&count)
899+
require.Zero(t, count)
868900
}
869901

870902
// TestRunWithoutLoop tests that Run calls will trigger the execution of a
@@ -1073,7 +1105,6 @@ func TestJobIdleness(t *testing.T) {
10731105
})
10741106
}
10751107
})
1076-
10771108
}
10781109

10791110
// TestDisablingJobAdoptionClearsClaimSessionID tests that jobs adopted by a

pkg/sql/compact_sql_stats.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ func (e *scheduledSQLStatsCompactionExecutor) ExecuteJob(
174174
) error {
175175
if err := e.createSQLStatsCompactionJob(ctx, cfg, sj, txn); err != nil {
176176
e.metrics.NumFailed.Inc(1)
177+
return err
177178
}
178179

179180
e.metrics.NumStarted.Inc(1)

0 commit comments

Comments
 (0)