Skip to content

Commit d248cfa

Browse files
jonesphillippetebacondarwin
authored andcommitted
Small fixes for pipelines commands, defaults.
1 parent d1cb921 commit d248cfa

File tree

8 files changed

+49
-33
lines changed

8 files changed

+49
-33
lines changed

.changeset/brave-memes-raise.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"wrangler": patch
3+
---
4+
5+
Changes fileSizeMB->file-size for compaction arg. Small fixes for pipelines commands

packages/wrangler/src/__tests__/pipelines.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,7 +1366,7 @@ describe("wrangler pipelines", () => {
13661366
13671367
Batching:
13681368
File Size: none
1369-
Time Interval: 30s
1369+
Time Interval: 300s
13701370
13711371
Format:
13721372
Type: json"
@@ -1400,7 +1400,7 @@ describe("wrangler pipelines", () => {
14001400
14011401
Batching:
14021402
File Size: none
1403-
Time Interval: 30s
1403+
Time Interval: 300s
14041404
14051405
Format:
14061406
Type: json"
@@ -1564,7 +1564,7 @@ describe("wrangler pipelines", () => {
15641564
15651565
Batching:
15661566
File Size: none
1567-
Time Interval: 30s
1567+
Time Interval: 300s
15681568
15691569
Format:
15701570
Type: json"

packages/wrangler/src/pipelines/cli/setup.ts

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
deleteStream,
1515
validateSql,
1616
} from "../client";
17+
import { SINK_DEFAULTS } from "../defaults";
1718
import { authorizeR2Bucket } from "../index";
1819
import {
1920
displayUsageExamples,
@@ -231,10 +232,8 @@ async function buildField(
231232
{ title: "string", value: "string" },
232233
{ title: "int32", value: "int32" },
233234
{ title: "int64", value: "int64" },
234-
{ title: "u_int32", value: "u_int32" },
235-
{ title: "u_int64", value: "u_int64" },
236-
{ title: "f32", value: "f32" },
237-
{ title: "f64", value: "f64" },
235+
{ title: "float32", value: "float32" },
236+
{ title: "float64", value: "float64" },
238237
{ title: "bool", value: "bool" },
239238
{ title: "timestamp", value: "timestamp" },
240239
{ title: "json", value: "json" },
@@ -419,13 +418,16 @@ async function setupR2Sink(
419418
});
420419
}
421420

422-
const fileSizeMB = await prompt("Roll file when size reaches (MB):", {
423-
defaultValue: "100",
424-
});
421+
const fileSizeMB = await prompt(
422+
"Roll file when size reaches (MB, minimum 5):",
423+
{
424+
defaultValue: "100",
425+
}
426+
);
425427
const intervalSeconds = await prompt(
426-
"Roll file when time reaches (seconds):",
428+
"Roll file when time reaches (seconds, minimum 10):",
427429
{
428-
defaultValue: "300",
430+
defaultValue: String(SINK_DEFAULTS.rolling_policy.interval_seconds),
429431
}
430432
);
431433

@@ -511,17 +513,20 @@ async function setupDataCatalogSink(setupConfig: SetupConfig): Promise<void> {
511513
{ title: "zstd", value: "zstd" },
512514
{ title: "lz4", value: "lz4" },
513515
],
514-
defaultOption: 0,
515-
fallbackOption: 0,
516+
defaultOption: 3,
517+
fallbackOption: 3,
516518
});
517519

518-
const fileSizeMB = await prompt("Roll file when size reaches (MB):", {
519-
defaultValue: "100",
520-
});
520+
const fileSizeMB = await prompt(
521+
"Roll file when size reaches (MB, minimum 5):",
522+
{
523+
defaultValue: "100",
524+
}
525+
);
521526
const intervalSeconds = await prompt(
522-
"Roll file when time reaches (seconds):",
527+
"Roll file when time reaches (seconds, minimum 10):",
523528
{
524-
defaultValue: "300",
529+
defaultValue: String(SINK_DEFAULTS.rolling_policy.interval_seconds),
525530
}
526531
);
527532

packages/wrangler/src/pipelines/cli/sinks/create.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ export const pipelinesSinksCreateCommand = createCommand({
6969
"roll-size": {
7070
describe: "Roll file size in MB",
7171
type: "number",
72-
default: SINK_DEFAULTS.rolling_policy.file_size_bytes / (1024 * 1024),
7372
},
7473
"roll-interval": {
7574
describe: "Roll file interval in seconds",
@@ -183,7 +182,7 @@ export const pipelinesSinksCreateCommand = createCommand({
183182
}
184183

185184
if (args.rollSize || args.rollInterval) {
186-
let file_size_bytes: number =
185+
let file_size_bytes: number | undefined =
187186
SINK_DEFAULTS.rolling_policy.file_size_bytes;
188187
let interval_seconds: number =
189188
SINK_DEFAULTS.rolling_policy.interval_seconds;
@@ -196,7 +195,7 @@ export const pipelinesSinksCreateCommand = createCommand({
196195
}
197196

198197
sinkConfig.config.rolling_policy = {
199-
file_size_bytes,
198+
...(file_size_bytes !== undefined && { file_size_bytes }),
200199
interval_seconds,
201200
};
202201
}

packages/wrangler/src/pipelines/cli/sinks/utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ export function displaySinkConfiguration(
5555

5656
const batching: Record<string, string> = {
5757
"File Size":
58-
fileSizeBytes === 0
58+
fileSizeBytes === undefined || fileSizeBytes === 0
5959
? "none"
6060
: `${Math.round(fileSizeBytes / (1024 * 1024))}MB`,
6161
"Time Interval": `${intervalSeconds}s`,

packages/wrangler/src/pipelines/cli/streams/utils.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ function generateSampleValue(field: SchemaField): SampleValue {
135135
return 42;
136136
case "int64":
137137
return "9223372036854775807"; // Large numbers as strings to avoid JS precision issues
138-
case "f32":
139-
case "f64":
138+
case "float32":
139+
case "float64":
140140
return 3.14;
141141
case "json":
142142
return { example: "json_value" };

packages/wrangler/src/pipelines/defaults.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ export const SINK_DEFAULTS = {
77
row_group_bytes: 1024 * 1024 * 1024,
88
} as ParquetFormat,
99
rolling_policy: {
10-
file_size_bytes: 0,
11-
interval_seconds: 30,
10+
file_size_bytes: undefined,
11+
interval_seconds: 300,
1212
},
1313
r2: {
1414
path: "",
@@ -38,11 +38,18 @@ export function applyDefaultsToSink(sink: Sink): Sink {
3838

3939
if (!withDefaults.config.rolling_policy) {
4040
withDefaults.config.rolling_policy = {
41-
file_size_bytes: SINK_DEFAULTS.rolling_policy.file_size_bytes,
4241
interval_seconds: SINK_DEFAULTS.rolling_policy.interval_seconds,
4342
};
43+
// Only add file_size_bytes if it has a value
44+
if (SINK_DEFAULTS.rolling_policy.file_size_bytes !== undefined) {
45+
withDefaults.config.rolling_policy.file_size_bytes =
46+
SINK_DEFAULTS.rolling_policy.file_size_bytes;
47+
}
4448
} else {
45-
if (!withDefaults.config.rolling_policy.file_size_bytes) {
49+
if (
50+
!withDefaults.config.rolling_policy.file_size_bytes &&
51+
SINK_DEFAULTS.rolling_policy.file_size_bytes !== undefined
52+
) {
4653
withDefaults.config.rolling_policy.file_size_bytes =
4754
SINK_DEFAULTS.rolling_policy.file_size_bytes;
4855
}

packages/wrangler/src/pipelines/types.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,8 @@ export type SchemaField = {
137137
| "bool"
138138
| "int32"
139139
| "int64"
140-
| "f32"
141-
| "f64"
140+
| "float32"
141+
| "float64"
142142
| "string"
143143
| "timestamp"
144144
| "json"
@@ -169,7 +169,7 @@ export type Sink = {
169169
time_pattern: string;
170170
};
171171
rolling_policy?: {
172-
file_size_bytes: number;
172+
file_size_bytes?: number;
173173
interval_seconds: number;
174174
};
175175
// r2_data_catalog specific fields
@@ -207,7 +207,7 @@ export interface CreateSinkRequest {
207207
time_pattern: string;
208208
};
209209
rolling_policy?: {
210-
file_size_bytes: number;
210+
file_size_bytes?: number;
211211
interval_seconds: number;
212212
};
213213
// R2 credentials (for r2 type)

0 commit comments

Comments
 (0)