Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions pkg/workload/schemachange/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3456,6 +3456,71 @@ func (og *operationGenerator) validate(ctx context.Context, tx pgx.Tx) (*opStmt,
return validateStmt, errors.Errorf("Validation FAIL:\n%s", strings.Join(errs, "\n"))
}

func (og *operationGenerator) inspect(ctx context.Context, tx pgx.Tx) (*opStmt, error) {
stmt := makeOpStmt(OpStmtDML)

var sb strings.Builder
sb.WriteString("INSPECT ")

if og.randIntn(2) == 0 {
tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "")
if err != nil {
return nil, err
}
tableExists, err := og.tableExists(ctx, tx, tableName)
if err != nil {
return nil, err
}
sb.WriteString("TABLE ")
sb.WriteString(tableName.String())
if !tableExists {
stmt.expectedExecErrors.add(pgcode.UndefinedTable)
}
} else {
database, err := og.getDatabase(ctx, tx)
if err != nil {
return nil, err
}
useExisting := og.randIntn(100) < og.pctExisting(true)
var databaseName string
if useExisting {
databaseName = database
} else {
databaseName = fmt.Sprintf("inspect_db_%s", og.newUniqueSeqNumSuffix())
stmt.expectedExecErrors.add(pgcode.InvalidCatalogName)
}
sb.WriteString("DATABASE ")
sb.WriteString(databaseName)
}

asof := og.randomInspectAsOfClause()
sb.WriteString(asof)
// If we use an ASOF time with inspect, chances are it will conflict with
// the timestamp chosen for the transaction, and return an "inconsistent AS
// OF SYSTEM TIME timestamp" error.
stmt.potentialExecErrors.addAll(codesWithConditions{
{pgcode.FeatureNotSupported, asof != ""},
})

// Always run DETACHED as this allows us to use INSPECT inside of a
// transaction. We have post-processing at the end of the run to verify
// INSPECT didn't find any issues.
sb.WriteString(" WITH OPTIONS DETACHED")
stmt.sql = sb.String()

return stmt, nil
}

func (og *operationGenerator) randomInspectAsOfClause() string {
// Use AS OF SYSTEM TIME infrequently (~10% of the time) because transactions
// are never started with AS OF SYSTEM TIME, and INSPECT with AS OF inside a
// transaction without AS OF can cause conflicts.
if og.randIntn(10) == 0 {
return fmt.Sprintf(" AS OF SYSTEM TIME '-%ds'", og.randIntn(30)+1)
}
return ""
}

type column struct {
name tree.Name
typ *types.T
Expand Down
8 changes: 7 additions & 1 deletion pkg/workload/schemachange/optype.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ const (
dropView // DROP VIEW <view>
truncateTable

// INSPECT ...
inspect // INSPECT {TABLE|DATABASE} ...

// Unimplemented operations. TODO(sql-foundations): Audit and/or implement these operations.
// alterDatabaseOwner
// alterDatabasePlacement
Expand Down Expand Up @@ -206,14 +209,15 @@ const (
)

func isDMLOpType(t opType) bool {
return t == insertRow || t == selectStmt || t == validate
return t == insertRow || t == selectStmt || t == validate || t == inspect
}

var opFuncs = []func(*operationGenerator, context.Context, pgx.Tx) (*opStmt, error){
// Non-DDL
insertRow: (*operationGenerator).insertRow,
selectStmt: (*operationGenerator).selectStmt,
validate: (*operationGenerator).validate,
inspect: (*operationGenerator).inspect,

// DDL Operations
alterDatabaseAddRegion: (*operationGenerator).addRegion,
Expand Down Expand Up @@ -273,6 +277,7 @@ var opWeights = []int{
insertRow: 10,
selectStmt: 10,
validate: 2, // validate twice more often
inspect: 1,

// DDL Operations
alterDatabaseAddRegion: 1,
Expand Down Expand Up @@ -334,6 +339,7 @@ var opDeclarativeVersion = map[opType]clusterversion.Key{
insertRow: clusterversion.MinSupported,
selectStmt: clusterversion.MinSupported,
validate: clusterversion.MinSupported,
inspect: clusterversion.V25_4,

alterPolicy: clusterversion.V25_2,
alterTableAddColumn: clusterversion.MinSupported,
Expand Down
3 changes: 3 additions & 0 deletions pkg/workload/schemachange/optype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

105 changes: 99 additions & 6 deletions pkg/workload/schemachange/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,94 @@ func setupSchemaChangePromCounter(reg prometheus.Registerer) schemaChangeCounter
}
}

func (s *schemaChange) logInspectErrors(
ctx context.Context, pool *workload.MultiConnPool, log *atomicLog,
) error {
connPool := pool.Get()
conn, err := connPool.Acquire(ctx)
if err != nil {
log.printLn(fmt.Sprintf("unable to acquire connection for SHOW INSPECT ERRORS: %v", err))
return err
}
defer conn.Release()

rows, err := conn.Query(ctx, `SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY created DESC`)
if err != nil {
log.printLn(fmt.Sprintf("fetching INSPECT jobs failed: %v", err))
return err
}
jobIDs, err := pgx.CollectRows(rows, pgx.RowTo[int64])
rows.Close()
if err != nil {
log.printLn(fmt.Sprintf("collecting INSPECT job IDs failed: %v", err))
return err
}
if len(jobIDs) == 0 {
return nil
}

type InspectJobResult struct {
JobID int64 `json:"jobId"`
Status string `json:"status"`
Errors []map[string]any `json:"errors,omitempty"`
}

type InspectErrorSummary struct {
Message string `json:"message"`
Jobs []InspectJobResult `json:"jobs"`
}

summary := InspectErrorSummary{
Message: "Inspect Job Errors",
Jobs: make([]InspectJobResult, 0, len(jobIDs)),
}

var totalErrors int
for _, jobID := range jobIDs {
query := fmt.Sprintf(`
SELECT error_type, database_name, schema_name, table_name, primary_key, job_id, aost, details
FROM [SHOW INSPECT ERRORS FOR JOB %d WITH DETAILS]`, jobID)
rows, err := conn.Query(ctx, query)
if err != nil {
log.printLn(fmt.Sprintf("%s failed: %v", query, err))
continue
}
results, err := pgx.CollectRows(rows, pgx.RowToMap)
rows.Close()
if err != nil {
log.printLn(fmt.Sprintf("collecting inspect errors for job %d failed: %v", jobID, err))
continue
}

jobResult := InspectJobResult{
JobID: jobID,
}

if len(results) == 0 {
jobResult.Status = "no errors reported"
} else {
jobResult.Status = fmt.Sprintf("%d error rows", len(results))
jobResult.Errors = results
totalErrors += len(results)
}

summary.Jobs = append(summary.Jobs, jobResult)
}

// Output as JSON.
jsonBytes, err := json.MarshalIndent(summary, "", " ")
if err != nil {
log.printLn(fmt.Sprintf("failed to marshal inspect errors to JSON: %v", err))
return err
}
log.printLn(string(jsonBytes))

if totalErrors > 0 {
return errors.Newf("found %d inspect errors across %d jobs", totalErrors, len(jobIDs))
}
return nil
}

// Meta implements the workload.Generator interface.
func (s *schemaChange) Meta() workload.Meta { return schemaChangeMeta }

Expand Down Expand Up @@ -246,20 +334,22 @@ func (s *schemaChange) Ops(

ql := workload.QueryLoad{
Close: func(_ context.Context) error {
// Create a new context for shutting down the tracer provider. The
// provided context may be cancelled depending on why the workload is
// shutting down and we always want to provide a period of time to flush
// traces.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// Create a new context for shutting down the tracer provider and logging
// inspect errors. The provided context may be cancelled depending on why
// the workload is shutting down and we always want to provide a period of
// time to flush traces.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

inspectErr := s.logInspectErrors(ctx, pool, stdoutLog)

pool.Close()
watchDogPool.Close()

closeErr := s.closeJSONLogFile()
shutdownErr := tracerProvider.Shutdown(ctx)
s.schemaWorkloadResultAnnotator.logWorkloadStats(stdoutLog)
return errors.CombineErrors(closeErr, shutdownErr)
return errors.Join(inspectErr, closeErr, shutdownErr)
},
}

Expand Down Expand Up @@ -564,6 +654,9 @@ func (w *schemaChangeWorker) run(ctx context.Context) error {
return err
}
}
if _, err := conn.Exec(ctx, "SET enable_inspect_command = true;"); err != nil {
return err
}

tx, err := conn.Begin(ctx)
if err != nil {
Expand Down