From d9a5b98b7d16795f10752253c1b6664f3a686ea5 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 21 Feb 2025 10:58:57 +0000 Subject: [PATCH] Add configurable redis TTL on realtime streams --- apps/webapp/app/env.server.ts | 5 ++++ .../realtime/redisRealtimeStreams.server.ts | 25 +++++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index c644503f42..ae870834a9 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -399,6 +399,11 @@ const EnvironmentSchema = z.object({ MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500), REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"), + REALTIME_STREAM_MAX_LENGTH: z.coerce.number().int().default(1000), + REALTIME_STREAM_TTL: z.coerce + .number() + .int() + .default(60 * 60 * 24), // 1 day in seconds BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000), BATCH_METADATA_OPERATIONS_FLUSH_ENABLED: z.string().default("1"), BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED: z.string().default("1"), diff --git a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts index 6eaee46a33..a01aa36ca4 100644 --- a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts @@ -3,6 +3,7 @@ import { AuthenticatedEnvironment } from "../apiAuth.server"; import { logger } from "../logger.server"; import { StreamIngestor, StreamResponder } from "./types"; import { LineTransformStream } from "./utils.server"; +import { env } from "~/env.server"; export type RealtimeStreamsOptions = { redis: RedisOptions | undefined; @@ -152,10 +153,30 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { value, }); - await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", value); + await redis.xadd( + streamKey, + "MAXLEN", + "~", + String(env.REALTIME_STREAM_MAX_LENGTH), + "*", + "data", + value + ); } - await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", END_SENTINEL); + // Send the END_SENTINEL and set TTL with a pipeline. + const pipeline = redis.pipeline(); + pipeline.xadd( + streamKey, + "MAXLEN", + "~", + String(env.REALTIME_STREAM_MAX_LENGTH), + "*", + "data", + END_SENTINEL + ); + pipeline.expire(streamKey, env.REALTIME_STREAM_TTL); + await pipeline.exec(); return new Response(null, { status: 200 }); } catch (error) {