Skip to content

Commit a7ac751

Browse files
authored
Updated catalog compaction arg, small pipelines fixes (#10743)
* Updated catalog compaction enable argument from targetSizeMb to target-size * Small fixes for pipelines commands, defaults.
1 parent 81fd733 commit a7ac751

File tree

10 files changed

+56
-40
lines changed

10 files changed

+56
-40
lines changed

.changeset/brave-memes-raise.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"wrangler": patch
3+
---
4+
5+
Changes fileSizeMB->file-size for compaction arg. Small fixes for pipelines commands

packages/wrangler/src/__tests__/pipelines.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,7 +1366,7 @@ describe("wrangler pipelines", () => {
13661366
13671367
Batching:
13681368
File Size: none
1369-
Time Interval: 30s
1369+
Time Interval: 300s
13701370
13711371
Format:
13721372
Type: json"
@@ -1400,7 +1400,7 @@ describe("wrangler pipelines", () => {
14001400
14011401
Batching:
14021402
File Size: none
1403-
Time Interval: 30s
1403+
Time Interval: 300s
14041404
14051405
Format:
14061406
Type: json"
@@ -1564,7 +1564,7 @@ describe("wrangler pipelines", () => {
15641564
15651565
Batching:
15661566
File Size: none
1567-
Time Interval: 30s
1567+
Time Interval: 300s
15681568
15691569
Format:
15701570
Type: json"

packages/wrangler/src/__tests__/r2.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1301,7 +1301,7 @@ For more details, refer to: https://developers.cloudflare.com/r2/api/s3/tokens/"
13011301
)
13021302
);
13031303
await runWrangler(
1304-
"r2 bucket catalog compaction enable testBucket --token fakecloudflaretoken --targetSizeMb 512"
1304+
"r2 bucket catalog compaction enable testBucket --token fakecloudflaretoken --target-size 512"
13051305
);
13061306
expect(std.out).toMatchInlineSnapshot(
13071307
`"✨ Successfully enabled file compaction for the data catalog for bucket 'testBucket'."`
@@ -1332,8 +1332,8 @@ For more details, refer to: https://developers.cloudflare.com/r2/api/s3/tokens/"
13321332
-v, --version Show version number [boolean]
13331333
13341334
OPTIONS
1335-
--targetSizeMb The target size for compacted files (allowed values: 64, 128, 256, 512) [number] [default: 128]
1336-
--token A cloudflare api token with access to R2 and R2 Data Catalog which will be used to read/write files for compaction. [string] [required]"
1335+
--target-size The target size for compacted files in MB (allowed values: 64, 128, 256, 512) [number] [default: 128]
1336+
--token A cloudflare api token with access to R2 and R2 Data Catalog which will be used to read/write files for compaction. [string] [required]"
13371337
`);
13381338
expect(std.err).toMatchInlineSnapshot(`
13391339
"X [ERROR] Not enough non-option arguments: got 0, need at least 1
@@ -1345,7 +1345,7 @@ For more details, refer to: https://developers.cloudflare.com/r2/api/s3/tokens/"
13451345
it("should error if --token is not provided", async () => {
13461346
await expect(
13471347
runWrangler(
1348-
"r2 bucket catalog compaction enable testBucket --targetSizeMb 512"
1348+
"r2 bucket catalog compaction enable testBucket --target-size 512"
13491349
)
13501350
).rejects.toThrowErrorMatchingInlineSnapshot(
13511351
`[Error: Missing required argument: token]`

packages/wrangler/src/pipelines/cli/setup.ts

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
deleteStream,
1515
validateSql,
1616
} from "../client";
17+
import { SINK_DEFAULTS } from "../defaults";
1718
import { authorizeR2Bucket } from "../index";
1819
import {
1920
displayUsageExamples,
@@ -231,10 +232,8 @@ async function buildField(
231232
{ title: "string", value: "string" },
232233
{ title: "int32", value: "int32" },
233234
{ title: "int64", value: "int64" },
234-
{ title: "u_int32", value: "u_int32" },
235-
{ title: "u_int64", value: "u_int64" },
236-
{ title: "f32", value: "f32" },
237-
{ title: "f64", value: "f64" },
235+
{ title: "float32", value: "float32" },
236+
{ title: "float64", value: "float64" },
238237
{ title: "bool", value: "bool" },
239238
{ title: "timestamp", value: "timestamp" },
240239
{ title: "json", value: "json" },
@@ -419,13 +418,16 @@ async function setupR2Sink(
419418
});
420419
}
421420

422-
const fileSizeMB = await prompt("Roll file when size reaches (MB):", {
423-
defaultValue: "100",
424-
});
421+
const fileSizeMB = await prompt(
422+
"Roll file when size reaches (MB, minimum 5):",
423+
{
424+
defaultValue: "100",
425+
}
426+
);
425427
const intervalSeconds = await prompt(
426-
"Roll file when time reaches (seconds):",
428+
"Roll file when time reaches (seconds, minimum 10):",
427429
{
428-
defaultValue: "300",
430+
defaultValue: String(SINK_DEFAULTS.rolling_policy.interval_seconds),
429431
}
430432
);
431433

@@ -511,17 +513,20 @@ async function setupDataCatalogSink(setupConfig: SetupConfig): Promise<void> {
511513
{ title: "zstd", value: "zstd" },
512514
{ title: "lz4", value: "lz4" },
513515
],
514-
defaultOption: 0,
515-
fallbackOption: 0,
516+
defaultOption: 3,
517+
fallbackOption: 3,
516518
});
517519

518-
const fileSizeMB = await prompt("Roll file when size reaches (MB):", {
519-
defaultValue: "100",
520-
});
520+
const fileSizeMB = await prompt(
521+
"Roll file when size reaches (MB, minimum 5):",
522+
{
523+
defaultValue: "100",
524+
}
525+
);
521526
const intervalSeconds = await prompt(
522-
"Roll file when time reaches (seconds):",
527+
"Roll file when time reaches (seconds, minimum 10):",
523528
{
524-
defaultValue: "300",
529+
defaultValue: String(SINK_DEFAULTS.rolling_policy.interval_seconds),
525530
}
526531
);
527532

packages/wrangler/src/pipelines/cli/sinks/create.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ export const pipelinesSinksCreateCommand = createCommand({
6969
"roll-size": {
7070
describe: "Roll file size in MB",
7171
type: "number",
72-
default: SINK_DEFAULTS.rolling_policy.file_size_bytes / (1024 * 1024),
7372
},
7473
"roll-interval": {
7574
describe: "Roll file interval in seconds",
@@ -183,7 +182,7 @@ export const pipelinesSinksCreateCommand = createCommand({
183182
}
184183

185184
if (args.rollSize || args.rollInterval) {
186-
let file_size_bytes: number =
185+
let file_size_bytes: number | undefined =
187186
SINK_DEFAULTS.rolling_policy.file_size_bytes;
188187
let interval_seconds: number =
189188
SINK_DEFAULTS.rolling_policy.interval_seconds;
@@ -196,7 +195,7 @@ export const pipelinesSinksCreateCommand = createCommand({
196195
}
197196

198197
sinkConfig.config.rolling_policy = {
199-
file_size_bytes,
198+
...(file_size_bytes !== undefined && { file_size_bytes }),
200199
interval_seconds,
201200
};
202201
}

packages/wrangler/src/pipelines/cli/sinks/utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ export function displaySinkConfiguration(
5555

5656
const batching: Record<string, string> = {
5757
"File Size":
58-
fileSizeBytes === 0
58+
fileSizeBytes === undefined || fileSizeBytes === 0
5959
? "none"
6060
: `${Math.round(fileSizeBytes / (1024 * 1024))}MB`,
6161
"Time Interval": `${intervalSeconds}s`,

packages/wrangler/src/pipelines/cli/streams/utils.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ function generateSampleValue(field: SchemaField): SampleValue {
135135
return 42;
136136
case "int64":
137137
return "9223372036854775807"; // Large numbers as strings to avoid JS precision issues
138-
case "f32":
139-
case "f64":
138+
case "float32":
139+
case "float64":
140140
return 3.14;
141141
case "json":
142142
return { example: "json_value" };

packages/wrangler/src/pipelines/defaults.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ export const SINK_DEFAULTS = {
77
row_group_bytes: 1024 * 1024 * 1024,
88
} as ParquetFormat,
99
rolling_policy: {
10-
file_size_bytes: 0,
11-
interval_seconds: 30,
10+
file_size_bytes: undefined,
11+
interval_seconds: 300,
1212
},
1313
r2: {
1414
path: "",
@@ -38,11 +38,18 @@ export function applyDefaultsToSink(sink: Sink): Sink {
3838

3939
if (!withDefaults.config.rolling_policy) {
4040
withDefaults.config.rolling_policy = {
41-
file_size_bytes: SINK_DEFAULTS.rolling_policy.file_size_bytes,
4241
interval_seconds: SINK_DEFAULTS.rolling_policy.interval_seconds,
4342
};
43+
// Only add file_size_bytes if it has a value
44+
if (SINK_DEFAULTS.rolling_policy.file_size_bytes !== undefined) {
45+
withDefaults.config.rolling_policy.file_size_bytes =
46+
SINK_DEFAULTS.rolling_policy.file_size_bytes;
47+
}
4448
} else {
45-
if (!withDefaults.config.rolling_policy.file_size_bytes) {
49+
if (
50+
!withDefaults.config.rolling_policy.file_size_bytes &&
51+
SINK_DEFAULTS.rolling_policy.file_size_bytes !== undefined
52+
) {
4653
withDefaults.config.rolling_policy.file_size_bytes =
4754
SINK_DEFAULTS.rolling_policy.file_size_bytes;
4855
}

packages/wrangler/src/pipelines/types.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,8 @@ export type SchemaField = {
137137
| "bool"
138138
| "int32"
139139
| "int64"
140-
| "f32"
141-
| "f64"
140+
| "float32"
141+
| "float64"
142142
| "string"
143143
| "timestamp"
144144
| "json"
@@ -169,7 +169,7 @@ export type Sink = {
169169
time_pattern: string;
170170
};
171171
rolling_policy?: {
172-
file_size_bytes: number;
172+
file_size_bytes?: number;
173173
interval_seconds: number;
174174
};
175175
// r2_data_catalog specific fields
@@ -207,7 +207,7 @@ export interface CreateSinkRequest {
207207
time_pattern: string;
208208
};
209209
rolling_policy?: {
210-
file_size_bytes: number;
210+
file_size_bytes?: number;
211211
interval_seconds: number;
212212
};
213213
// R2 credentials (for r2 type)

packages/wrangler/src/r2/catalog.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,9 @@ export const r2BucketCatalogCompactionEnableCommand = createCommand({
181181
type: "string",
182182
demandOption: true,
183183
},
184-
targetSizeMb: {
184+
"target-size": {
185185
describe:
186-
"The target size for compacted files (allowed values: 64, 128, 256, 512)",
186+
"The target size for compacted files in MB (allowed values: 64, 128, 256, 512)",
187187
type: "number",
188188
demandOption: false,
189189
default: 128,
@@ -209,7 +209,7 @@ export const r2BucketCatalogCompactionEnableCommand = createCommand({
209209
config,
210210
accountId,
211211
args.bucket,
212-
args.targetSizeMb
212+
args.targetSize
213213
);
214214

215215
logger.log(

0 commit comments

Comments
 (0)