Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 57 additions & 27 deletions modules/module-postgres-storage/src/storage/PostgresCompactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,33 +110,63 @@ export class PostgresCompactor {
let upperOpIdLimit = BIGINT_MAX;

while (true) {
const batch = await this.db.sql`
SELECT
op,
op_id,
source_table,
table_name,
row_id,
source_key,
bucket_name
FROM
bucket_data
WHERE
group_id = ${{ type: 'int4', value: this.group_id }}
AND bucket_name >= ${{ type: 'varchar', value: bucketLower }}
AND (
(
bucket_name = ${{ type: 'varchar', value: bucketUpper }}
AND op_id < ${{ type: 'int8', value: upperOpIdLimit }}
)
OR bucket_name < ${{ type: 'varchar', value: bucketUpper }} COLLATE "C" -- Use binary comparison
)
ORDER BY
bucket_name DESC,
op_id DESC
LIMIT
${{ type: 'int4', value: this.moveBatchQueryLimit }}
`
const bucketLowerValue = bucketLower ?? '';
const bucketUpperValue = bucketUpper ?? MAX_CHAR;
const bucketLowerParam = { type: 'varchar' as const, value: bucketLowerValue };
const bucketUpperParam = { type: 'varchar' as const, value: bucketUpperValue };
const limitParam = { type: 'int4' as const, value: this.moveBatchQueryLimit };
const groupParam = { type: 'int4' as const, value: this.group_id };
const upperOpParam = { type: 'int8' as const, value: upperOpIdLimit };

const batchStatement =
upperOpIdLimit === BIGINT_MAX
? this.db.sql`
SELECT
op,
op_id,
source_table,
table_name,
row_id,
source_key,
bucket_name
FROM
bucket_data
WHERE
group_id = ${groupParam}
AND bucket_name >= ${bucketLowerParam}
AND bucket_name < ${bucketUpperParam} COLLATE "C" -- Use binary comparison
ORDER BY
bucket_name DESC,
op_id DESC
LIMIT
${limitParam}
`
: this.db.sql`
SELECT
op,
op_id,
source_table,
table_name,
row_id,
source_key,
bucket_name
FROM
bucket_data
WHERE
group_id = ${groupParam}
AND bucket_name >= ${bucketLowerParam}
AND ROW (bucket_name COLLATE "C", op_id) < ROW (
${bucketUpperParam},
${upperOpParam}
)
ORDER BY
bucket_name DESC,
op_id DESC
LIMIT
${limitParam}
`;

const batch = await batchStatement
.decoded(
pick(models.BucketData, ['op', 'source_table', 'table_name', 'source_key', 'row_id', 'op_id', 'bucket_name'])
)
Expand Down