Skip to content

Commit fa7f4b1

Browse files
authored
feat(k8s): add placement tags for flexible node selection (#2390)
* add tier scheduling support to supervisor * add billing info to dequeued message w/o cache * add cache with best effort invalidation * fix invalidate circular dep * add changeset * use new plan type on runs as fallback during dequeue * tidy up * be more explicit with plan type fallback * remove additional billing check from hot path * switch to placement tags * update changeset * update platform package * start using new entitlement response * ensure skipChecks optimization validates at batch level * add optional items to add to queue manager limits * make the bool env helper only accept boolean defaults * remove redundant private field * update placement tag helper to prevent unsupported tags
1 parent f767654 commit fa7f4b1

File tree

27 files changed

+513
-56
lines changed

27 files changed

+513
-56
lines changed

.changeset/proud-nails-grin.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Add optional placement tags to dequeued messages for targeted scheduling

apps/supervisor/src/env.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ const Env = z.object({
1515
OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), // set on the runners
1616

1717
// Workload API settings (coordinator mode) - the workload API is what the run controller connects to
18-
TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default("true"),
18+
TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default(true),
1919
TRIGGER_WORKLOAD_API_PROTOCOL: z
2020
.string()
2121
.transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase()))
@@ -32,7 +32,7 @@ const Env = z.object({
3232
RUNNER_PRETTY_LOGS: BoolEnv.default(false),
3333

3434
// Dequeue settings (provider mode)
35-
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
35+
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true),
3636
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250),
3737
TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000),
3838
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10),
@@ -77,6 +77,10 @@ const Env = z.object({
7777
KUBERNETES_EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"),
7878
KUBERNETES_EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"),
7979

80+
// Placement tags settings
81+
PLACEMENT_TAGS_ENABLED: BoolEnv.default(false),
82+
PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"),
83+
8084
// Metrics
8185
METRICS_ENABLED: BoolEnv.default(true),
8286
METRICS_COLLECT_DEFAULTS: BoolEnv.default(true),

apps/supervisor/src/envUtil.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,19 @@ import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLog
33

44
const logger = new SimpleStructuredLogger("env-util");
55

6-
export const BoolEnv = z.preprocess((val) => {
6+
const baseBoolEnv = z.preprocess((val) => {
77
if (typeof val !== "string") {
88
return val;
99
}
1010

1111
return ["true", "1"].includes(val.toLowerCase().trim());
1212
}, z.boolean());
1313

14+
// Create a type-safe version that only accepts boolean defaults
15+
export const BoolEnv = baseBoolEnv as Omit<typeof baseBoolEnv, "default"> & {
16+
default: (value: boolean) => z.ZodDefault<typeof baseBoolEnv>;
17+
};
18+
1419
export const AdditionalEnvVars = z.preprocess((val) => {
1520
if (typeof val !== "string") {
1621
return val;

apps/supervisor/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ class ManagedSupervisor {
247247
nextAttemptNumber: message.run.attemptNumber,
248248
snapshotId: message.snapshot.id,
249249
snapshotFriendlyId: message.snapshot.friendlyId,
250+
placementTags: message.placementTags,
250251
});
251252

252253
// Disabled for now

apps/supervisor/src/workloadManager/kubernetes.ts

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
type WorkloadManagerCreateOptions,
55
type WorkloadManagerOptions,
66
} from "./types.js";
7-
import type { EnvironmentType, MachinePreset } from "@trigger.dev/core/v3";
7+
import type { EnvironmentType, MachinePreset, PlacementTag } from "@trigger.dev/core/v3";
88
import { env } from "../env.js";
99
import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js";
1010
import { getRunnerId } from "../util.js";
@@ -13,6 +13,11 @@ type ResourceQuantities = {
1313
[K in "cpu" | "memory" | "ephemeral-storage"]?: string;
1414
};
1515

16+
interface PlacementConfig {
17+
enabled: boolean;
18+
prefix: string;
19+
}
20+
1621
export class KubernetesWorkloadManager implements WorkloadManager {
1722
private readonly logger = new SimpleStructuredLogger("kubernetes-workload-provider");
1823
private k8s: K8sApi;
@@ -28,6 +33,56 @@ export class KubernetesWorkloadManager implements WorkloadManager {
2833
}
2934
}
3035

36+
private get placementConfig(): PlacementConfig {
37+
return {
38+
enabled: env.PLACEMENT_TAGS_ENABLED,
39+
prefix: env.PLACEMENT_TAGS_PREFIX,
40+
};
41+
}
42+
43+
private addPlacementTags(
44+
podSpec: Omit<k8s.V1PodSpec, "containers">,
45+
placementTags?: PlacementTag[]
46+
): Omit<k8s.V1PodSpec, "containers"> {
47+
if (!this.placementConfig.enabled || !placementTags || placementTags.length === 0) {
48+
return podSpec;
49+
}
50+
51+
const nodeSelector: Record<string, string> = { ...podSpec.nodeSelector };
52+
53+
// Convert placement tags to nodeSelector labels
54+
for (const tag of placementTags) {
55+
const labelKey = `${this.placementConfig.prefix}/${tag.key}`;
56+
57+
// Print warnings (if any)
58+
this.printTagWarnings(tag);
59+
60+
// For now we only support single values via nodeSelector
61+
nodeSelector[labelKey] = tag.values?.[0] ?? "";
62+
}
63+
64+
return {
65+
...podSpec,
66+
nodeSelector,
67+
};
68+
}
69+
70+
private printTagWarnings(tag: PlacementTag) {
71+
if (!tag.values || tag.values.length === 0) {
72+
// No values provided
73+
this.logger.warn(
74+
"[KubernetesWorkloadManager] Placement tag has no values, using empty string",
75+
tag
76+
);
77+
} else if (tag.values.length > 1) {
78+
// Multiple values provided
79+
this.logger.warn(
80+
"[KubernetesWorkloadManager] Placement tag has multiple values, only using first one",
81+
tag
82+
);
83+
}
84+
}
85+
3186
async create(opts: WorkloadManagerCreateOptions) {
3287
this.logger.log("[KubernetesWorkloadManager] Creating container", { opts });
3388

@@ -48,7 +103,7 @@ export class KubernetesWorkloadManager implements WorkloadManager {
48103
},
49104
},
50105
spec: {
51-
...this.#defaultPodSpec,
106+
...this.addPlacementTags(this.#defaultPodSpec, opts.placementTags),
52107
terminationGracePeriodSeconds: 60 * 60,
53108
containers: [
54109
{

apps/supervisor/src/workloadManager/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { type EnvironmentType, type MachinePreset } from "@trigger.dev/core/v3";
1+
import type { EnvironmentType, MachinePreset, PlacementTag } from "@trigger.dev/core/v3";
22

33
export interface WorkloadManagerOptions {
44
workloadApiProtocol: "http" | "https";
@@ -23,6 +23,7 @@ export interface WorkloadManagerCreateOptions {
2323
version: string;
2424
nextAttemptNumber?: number;
2525
dequeuedAt: Date;
26+
placementTags?: PlacementTag[];
2627
// identifiers
2728
envId: string;
2829
envType: EnvironmentType;

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,8 @@ const EnvironmentSchema = z.object({
761761
.int()
762762
.default(60_000 * 5), // 5 minutes
763763

764+
BATCH_TRIGGER_CACHED_RUNS_CHECK_ENABLED: BoolEnv.default(false),
765+
764766
BATCH_TRIGGER_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
765767
BATCH_TRIGGER_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
766768
BATCH_TRIGGER_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),

apps/webapp/app/routes/resources.orgs.$organizationSlug.select-plan.tsx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import { redirectWithErrorMessage } from "~/models/message.server";
4242
import { logger } from "~/services/logger.server";
4343
import { setPlan } from "~/services/platform.v3.server";
4444
import { requireUser } from "~/services/session.server";
45+
import { engine } from "~/v3/runEngine.server";
4546
import { cn } from "~/utils/cn";
4647
import { sendToPlain } from "~/utils/plain.server";
4748

@@ -152,7 +153,9 @@ export async function action({ request, params }: ActionFunctionArgs) {
152153
}
153154
}
154155

155-
return setPlan(organization, request, form.callerPath, payload);
156+
return setPlan(organization, request, form.callerPath, payload, {
157+
invalidateBillingCache: engine.invalidateBillingCache.bind(engine),
158+
});
156159
}
157160

158161
const pricingDefinitions = {

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,11 @@ export class DefaultQueueManager implements QueueManager {
177177
return task.queue.name ?? defaultQueueName;
178178
}
179179

180-
async validateQueueLimits(environment: AuthenticatedEnvironment): Promise<QueueValidationResult> {
181-
const queueSizeGuard = await guardQueueSizeLimitsForEnv(this.engine, environment);
180+
async validateQueueLimits(
181+
environment: AuthenticatedEnvironment,
182+
itemsToAdd?: number
183+
): Promise<QueueValidationResult> {
184+
const queueSizeGuard = await guardQueueSizeLimitsForEnv(this.engine, environment, itemsToAdd);
182185

183186
logger.debug("Queue size guard result", {
184187
queueSizeGuard,

0 commit comments

Comments
 (0)