Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/proud-nails-grin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Add optional placement tags to dequeued messages for targeted scheduling
8 changes: 6 additions & 2 deletions apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const Env = z.object({
OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), // set on the runners

// Workload API settings (coordinator mode) - the workload API is what the run controller connects to
TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default("true"),
TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default(true),
TRIGGER_WORKLOAD_API_PROTOCOL: z
.string()
.transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase()))
Expand All @@ -32,7 +32,7 @@ const Env = z.object({
RUNNER_PRETTY_LOGS: BoolEnv.default(false),

// Dequeue settings (provider mode)
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true),
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250),
TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000),
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10),
Expand Down Expand Up @@ -77,6 +77,10 @@ const Env = z.object({
KUBERNETES_EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"),
KUBERNETES_EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"),

// Placement tags settings
PLACEMENT_TAGS_ENABLED: BoolEnv.default(false),
PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"),

// Metrics
METRICS_ENABLED: BoolEnv.default(true),
METRICS_COLLECT_DEFAULTS: BoolEnv.default(true),
Expand Down
7 changes: 6 additions & 1 deletion apps/supervisor/src/envUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLog

const logger = new SimpleStructuredLogger("env-util");

export const BoolEnv = z.preprocess((val) => {
const baseBoolEnv = z.preprocess((val) => {
if (typeof val !== "string") {
return val;
}

return ["true", "1"].includes(val.toLowerCase().trim());
}, z.boolean());

// Create a type-safe version that only accepts boolean defaults
export const BoolEnv = baseBoolEnv as Omit<typeof baseBoolEnv, "default"> & {
default: (value: boolean) => z.ZodDefault<typeof baseBoolEnv>;
};

export const AdditionalEnvVars = z.preprocess((val) => {
if (typeof val !== "string") {
return val;
Expand Down
1 change: 1 addition & 0 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ class ManagedSupervisor {
nextAttemptNumber: message.run.attemptNumber,
snapshotId: message.snapshot.id,
snapshotFriendlyId: message.snapshot.friendlyId,
placementTags: message.placementTags,
});

// Disabled for now
Expand Down
59 changes: 57 additions & 2 deletions apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
type WorkloadManagerCreateOptions,
type WorkloadManagerOptions,
} from "./types.js";
import type { EnvironmentType, MachinePreset } from "@trigger.dev/core/v3";
import type { EnvironmentType, MachinePreset, PlacementTag } from "@trigger.dev/core/v3";
import { env } from "../env.js";
import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js";
import { getRunnerId } from "../util.js";
Expand All @@ -13,6 +13,11 @@ type ResourceQuantities = {
[K in "cpu" | "memory" | "ephemeral-storage"]?: string;
};

interface PlacementConfig {
enabled: boolean;
prefix: string;
}

export class KubernetesWorkloadManager implements WorkloadManager {
private readonly logger = new SimpleStructuredLogger("kubernetes-workload-provider");
private k8s: K8sApi;
Expand All @@ -28,6 +33,56 @@ export class KubernetesWorkloadManager implements WorkloadManager {
}
}

private get placementConfig(): PlacementConfig {
return {
enabled: env.PLACEMENT_TAGS_ENABLED,
prefix: env.PLACEMENT_TAGS_PREFIX,
};
}

private addPlacementTags(
podSpec: Omit<k8s.V1PodSpec, "containers">,
placementTags?: PlacementTag[]
): Omit<k8s.V1PodSpec, "containers"> {
if (!this.placementConfig.enabled || !placementTags || placementTags.length === 0) {
return podSpec;
}

const nodeSelector: Record<string, string> = { ...podSpec.nodeSelector };

// Convert placement tags to nodeSelector labels
for (const tag of placementTags) {
const labelKey = `${this.placementConfig.prefix}/${tag.key}`;

// Print warnings (if any)
this.printTagWarnings(tag);

// For now we only support single values via nodeSelector
nodeSelector[labelKey] = tag.values?.[0] ?? "";
}

return {
...podSpec,
nodeSelector,
};
}

private printTagWarnings(tag: PlacementTag) {
if (!tag.values || tag.values.length === 0) {
// No values provided
this.logger.warn(
"[KubernetesWorkloadManager] Placement tag has no values, using empty string",
tag
);
} else if (tag.values.length > 1) {
// Multiple values provided
this.logger.warn(
"[KubernetesWorkloadManager] Placement tag has multiple values, only using first one",
tag
);
}
}

async create(opts: WorkloadManagerCreateOptions) {
this.logger.log("[KubernetesWorkloadManager] Creating container", { opts });

Expand All @@ -48,7 +103,7 @@ export class KubernetesWorkloadManager implements WorkloadManager {
},
},
spec: {
...this.#defaultPodSpec,
...this.addPlacementTags(this.#defaultPodSpec, opts.placementTags),
terminationGracePeriodSeconds: 60 * 60,
containers: [
{
Expand Down
3 changes: 2 additions & 1 deletion apps/supervisor/src/workloadManager/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type EnvironmentType, type MachinePreset } from "@trigger.dev/core/v3";
import type { EnvironmentType, MachinePreset, PlacementTag } from "@trigger.dev/core/v3";

export interface WorkloadManagerOptions {
workloadApiProtocol: "http" | "https";
Expand All @@ -23,6 +23,7 @@ export interface WorkloadManagerCreateOptions {
version: string;
nextAttemptNumber?: number;
dequeuedAt: Date;
placementTags?: PlacementTag[];
// identifiers
envId: string;
envType: EnvironmentType;
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,8 @@ const EnvironmentSchema = z.object({
.int()
.default(60_000 * 5), // 5 minutes

BATCH_TRIGGER_CACHED_RUNS_CHECK_ENABLED: BoolEnv.default(false),

BATCH_TRIGGER_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
BATCH_TRIGGER_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
BATCH_TRIGGER_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import { redirectWithErrorMessage } from "~/models/message.server";
import { logger } from "~/services/logger.server";
import { setPlan } from "~/services/platform.v3.server";
import { requireUser } from "~/services/session.server";
import { engine } from "~/v3/runEngine.server";
import { cn } from "~/utils/cn";
import { sendToPlain } from "~/utils/plain.server";

Expand Down Expand Up @@ -152,7 +153,9 @@ export async function action({ request, params }: ActionFunctionArgs) {
}
}

return setPlan(organization, request, form.callerPath, payload);
return setPlan(organization, request, form.callerPath, payload, {
invalidateBillingCache: engine.invalidateBillingCache.bind(engine),
});
}

const pricingDefinitions = {
Expand Down
7 changes: 5 additions & 2 deletions apps/webapp/app/runEngine/concerns/queues.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,11 @@ export class DefaultQueueManager implements QueueManager {
return task.queue.name ?? defaultQueueName;
}

async validateQueueLimits(environment: AuthenticatedEnvironment): Promise<QueueValidationResult> {
const queueSizeGuard = await guardQueueSizeLimitsForEnv(this.engine, environment);
async validateQueueLimits(
environment: AuthenticatedEnvironment,
itemsToAdd?: number
): Promise<QueueValidationResult> {
const queueSizeGuard = await guardQueueSizeLimitsForEnv(this.engine, environment, itemsToAdd);

logger.debug("Queue size guard result", {
queueSizeGuard,
Expand Down
Loading
Loading