Skip to content

Commit fafc743

Browse files
committed
workload/schemachange: add INSPECT operation to random schema workload
Adds a new inspect operation to the schema change workload, enabling random generation of INSPECT TABLE and INSPECT DATABASE statements. Features: - Support for TABLE/DB targets, AS OF SYSTEM TIME - Always runs in DETACHED mode so that it can be run inside a transaciton - Results checked post-run via SHOW INSPECT ERRORS Errors reported in JSON, consistent with existing workload logs Closes #155483 Epic: CRDB-55075 Release note: none
1 parent 1e00b30 commit fafc743

File tree

4 files changed

+167
-2
lines changed

4 files changed

+167
-2
lines changed

pkg/workload/schemachange/operation_generator.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3456,6 +3456,71 @@ func (og *operationGenerator) validate(ctx context.Context, tx pgx.Tx) (*opStmt,
34563456
return validateStmt, errors.Errorf("Validation FAIL:\n%s", strings.Join(errs, "\n"))
34573457
}
34583458

3459+
func (og *operationGenerator) inspect(ctx context.Context, tx pgx.Tx) (*opStmt, error) {
3460+
stmt := makeOpStmt(OpStmtDML)
3461+
3462+
var sb strings.Builder
3463+
sb.WriteString("INSPECT ")
3464+
3465+
if og.randIntn(2) == 0 {
3466+
tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "")
3467+
if err != nil {
3468+
return nil, err
3469+
}
3470+
tableExists, err := og.tableExists(ctx, tx, tableName)
3471+
if err != nil {
3472+
return nil, err
3473+
}
3474+
sb.WriteString("TABLE ")
3475+
sb.WriteString(tableName.String())
3476+
if !tableExists {
3477+
stmt.expectedExecErrors.add(pgcode.UndefinedTable)
3478+
}
3479+
} else {
3480+
database, err := og.getDatabase(ctx, tx)
3481+
if err != nil {
3482+
return nil, err
3483+
}
3484+
useExisting := og.randIntn(100) < og.pctExisting(true)
3485+
var databaseName string
3486+
if useExisting {
3487+
databaseName = database
3488+
} else {
3489+
databaseName = fmt.Sprintf("inspect_db_%s", og.newUniqueSeqNumSuffix())
3490+
stmt.expectedExecErrors.add(pgcode.InvalidCatalogName)
3491+
}
3492+
sb.WriteString("DATABASE ")
3493+
sb.WriteString(databaseName)
3494+
}
3495+
3496+
asof := og.randomInspectAsOfClause()
3497+
sb.WriteString(asof)
3498+
// If we use an ASOF time with inspect, chances are it will conflict with
3499+
// the timestamp chosen for the transaction, and return an "inconsistent AS
3500+
// OF SYSTEM TIME timestamp" error.
3501+
stmt.potentialExecErrors.addAll(codesWithConditions{
3502+
{pgcode.FeatureNotSupported, asof != ""},
3503+
})
3504+
3505+
// Always run DETACHED as this allows us to use INSPECT inside of a
3506+
// transaction. We have post-processing at the end of the run to verify
3507+
// INSPECT didn't find any issues.
3508+
sb.WriteString(" WITH OPTIONS DETACHED")
3509+
stmt.sql = sb.String()
3510+
3511+
return stmt, nil
3512+
}
3513+
3514+
func (og *operationGenerator) randomInspectAsOfClause() string {
3515+
// Use AS OF SYSTEM TIME infrequently (~10% of the time) because transactions
3516+
// are never started with AS OF SYSTEM TIME, and INSPECT with AS OF inside a
3517+
// transaction without AS OF can cause conflicts.
3518+
if og.randIntn(10) == 0 {
3519+
return fmt.Sprintf(" AS OF SYSTEM TIME '-%ds'", og.randIntn(30)+1)
3520+
}
3521+
return ""
3522+
}
3523+
34593524
type column struct {
34603525
name tree.Name
34613526
typ *types.T

pkg/workload/schemachange/optype.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ const (
145145
dropView // DROP VIEW <view>
146146
truncateTable
147147

148+
// INSPECT ...
149+
inspect // INSPECT {TABLE|DATABASE} ...
150+
148151
// Unimplemented operations. TODO(sql-foundations): Audit and/or implement these operations.
149152
// alterDatabaseOwner
150153
// alterDatabasePlacement
@@ -206,14 +209,15 @@ const (
206209
)
207210

208211
func isDMLOpType(t opType) bool {
209-
return t == insertRow || t == selectStmt || t == validate
212+
return t == insertRow || t == selectStmt || t == validate || t == inspect
210213
}
211214

212215
var opFuncs = []func(*operationGenerator, context.Context, pgx.Tx) (*opStmt, error){
213216
// Non-DDL
214217
insertRow: (*operationGenerator).insertRow,
215218
selectStmt: (*operationGenerator).selectStmt,
216219
validate: (*operationGenerator).validate,
220+
inspect: (*operationGenerator).inspect,
217221

218222
// DDL Operations
219223
alterDatabaseAddRegion: (*operationGenerator).addRegion,
@@ -273,6 +277,7 @@ var opWeights = []int{
273277
insertRow: 10,
274278
selectStmt: 10,
275279
validate: 2, // validate twice more often
280+
inspect: 1,
276281

277282
// DDL Operations
278283
alterDatabaseAddRegion: 1,
@@ -334,6 +339,7 @@ var opDeclarativeVersion = map[opType]clusterversion.Key{
334339
insertRow: clusterversion.MinSupported,
335340
selectStmt: clusterversion.MinSupported,
336341
validate: clusterversion.MinSupported,
342+
inspect: clusterversion.MinSupported,
337343

338344
alterPolicy: clusterversion.V25_2,
339345
alterTableAddColumn: clusterversion.MinSupported,

pkg/workload/schemachange/optype_string.go

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/workload/schemachange/schemachange.go

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,92 @@ func setupSchemaChangePromCounter(reg prometheus.Registerer) schemaChangeCounter
163163
}
164164
}
165165

166+
func (s *schemaChange) logInspectErrors(
167+
ctx context.Context, pool *workload.MultiConnPool, log *atomicLog,
168+
) error {
169+
connPool := pool.Get()
170+
conn, err := connPool.Acquire(ctx)
171+
if err != nil {
172+
log.printLn(fmt.Sprintf("unable to acquire connection for SHOW INSPECT ERRORS: %v", err))
173+
return err
174+
}
175+
defer conn.Release()
176+
177+
rows, err := conn.Query(ctx, `SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY created DESC`)
178+
if err != nil {
179+
log.printLn(fmt.Sprintf("fetching INSPECT jobs failed: %v", err))
180+
return err
181+
}
182+
jobIDs, err := pgx.CollectRows(rows, pgx.RowTo[int64])
183+
rows.Close()
184+
if err != nil {
185+
log.printLn(fmt.Sprintf("collecting INSPECT job IDs failed: %v", err))
186+
return err
187+
}
188+
if len(jobIDs) == 0 {
189+
return nil
190+
}
191+
192+
type InspectJobResult struct {
193+
JobID int64 `json:"jobId"`
194+
Status string `json:"status"`
195+
Errors []map[string]any `json:"errors,omitempty"`
196+
}
197+
198+
type InspectErrorSummary struct {
199+
Message string `json:"message"`
200+
Jobs []InspectJobResult `json:"jobs"`
201+
}
202+
203+
summary := InspectErrorSummary{
204+
Message: "Inspect Job Errors",
205+
Jobs: make([]InspectJobResult, 0, len(jobIDs)),
206+
}
207+
208+
var totalErrors int
209+
for _, jobID := range jobIDs {
210+
query := fmt.Sprintf("SHOW INSPECT ERRORS FOR JOB %d WITH DETAILS", jobID)
211+
rows, err := conn.Query(ctx, query)
212+
if err != nil {
213+
log.printLn(fmt.Sprintf("%s failed: %v", query, err))
214+
continue
215+
}
216+
results, err := pgx.CollectRows(rows, pgx.RowToMap)
217+
rows.Close()
218+
if err != nil {
219+
log.printLn(fmt.Sprintf("collecting inspect errors for job %d failed: %v", jobID, err))
220+
continue
221+
}
222+
223+
jobResult := InspectJobResult{
224+
JobID: jobID,
225+
}
226+
227+
if len(results) == 0 {
228+
jobResult.Status = "no errors reported"
229+
} else {
230+
jobResult.Status = fmt.Sprintf("%d error rows", len(results))
231+
jobResult.Errors = results
232+
totalErrors += len(results)
233+
}
234+
235+
summary.Jobs = append(summary.Jobs, jobResult)
236+
}
237+
238+
// Output as JSON.
239+
jsonBytes, err := json.MarshalIndent(summary, "", " ")
240+
if err != nil {
241+
log.printLn(fmt.Sprintf("failed to marshal inspect errors to JSON: %v", err))
242+
return err
243+
}
244+
log.printLn(string(jsonBytes))
245+
246+
if totalErrors > 0 {
247+
return errors.Newf("found %d inspect errors across %d jobs", totalErrors, len(jobIDs))
248+
}
249+
return nil
250+
}
251+
166252
// Meta implements the workload.Generator interface.
167253
func (s *schemaChange) Meta() workload.Meta { return schemaChangeMeta }
168254

@@ -246,6 +332,8 @@ func (s *schemaChange) Ops(
246332

247333
ql := workload.QueryLoad{
248334
Close: func(_ context.Context) error {
335+
inspectErr := s.logInspectErrors(ctx, pool, stdoutLog)
336+
249337
// Create a new context for shutting down the tracer provider. The
250338
// provided context may be cancelled depending on why the workload is
251339
// shutting down and we always want to provide a period of time to flush
@@ -259,7 +347,7 @@ func (s *schemaChange) Ops(
259347
closeErr := s.closeJSONLogFile()
260348
shutdownErr := tracerProvider.Shutdown(ctx)
261349
s.schemaWorkloadResultAnnotator.logWorkloadStats(stdoutLog)
262-
return errors.CombineErrors(closeErr, shutdownErr)
350+
return errors.CombineErrors(inspectErr, errors.CombineErrors(closeErr, shutdownErr))
263351
},
264352
}
265353

@@ -564,6 +652,9 @@ func (w *schemaChangeWorker) run(ctx context.Context) error {
564652
return err
565653
}
566654
}
655+
if _, err := conn.Exec(ctx, "SET enable_inspect_command = true;"); err != nil {
656+
return err
657+
}
567658

568659
tx, err := conn.Begin(ctx)
569660
if err != nil {

0 commit comments

Comments
 (0)