From eb2ea329de1df2a2da51a1434794214045742d60 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Thu, 7 Nov 2024 21:09:55 -0800 Subject: [PATCH 01/11] Implement heartbeat and abort signal. --- src/common/providers/https.ts | 67 ++++++++++++++++++++++++++++++++--- src/v2/providers/https.ts | 8 +++++ 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index d10696837..8c4fbf529 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -40,6 +40,8 @@ const JWT_REGEX = /^[a-zA-Z0-9\-_=]+?\.[a-zA-Z0-9\-_=]+?\.([a-zA-Z0-9\-_=]+)?$/; export const CALLABLE_AUTH_HEADER = "x-callable-context-auth"; /** @internal */ export const ORIGINAL_AUTH_HEADER = "x-original-auth"; +/** @internal */ +export const DEFAULT_HEARTBEAT_SECONDS = 30; /** An express request with the wire format representation of the request body. */ export interface Request extends express.Request { @@ -146,8 +148,21 @@ export interface CallableRequest { * to allow writing partial, streaming responses back to the client. */ export interface CallableProxyResponse { + /** + * Writes a chunk of the response body to the client. This method can be called + * multiple times to stream data progressively. + */ write: express.Response["write"]; + /** + * Indicates whether the client has requested and can handle streaming responses. + * This should be checked before attempting to stream data to avoid compatibility issues. + */ acceptsStreaming: boolean; + /** + * An AbortSignal that is triggered when the client disconnects or the + * request is terminated prematurely. + */ + signal: AbortSignal; } /** @@ -692,6 +707,13 @@ export interface CallableOptions { cors: cors.CorsOptions; enforceAppCheck?: boolean; consumeAppCheckToken?: boolean; + /** + * Time in seconds between sending heartbeat messages to keep the connection + * alive. Set to `null` to disable heartbeats. + * + * Defaults to 30 seconds. + */ + heartbeatSeconds?: number | null } /** @internal */ @@ -722,6 +744,22 @@ function wrapOnCallHandler( version: "gcfv1" | "gcfv2" ): (req: Request, res: express.Response) => Promise { return async (req: Request, res: express.Response): Promise => { + const abortController = new AbortController(); + let heartbeatInterval: NodeJS.Timeout | null = null; + + const cleanup = () => { + if (heartbeatInterval) { + clearInterval(heartbeatInterval); + heartbeatInterval = null; + } + req.removeAllListeners('close'); + }; + + req.on('close', () => { + cleanup() + abortController.abort(); + }); + try { if (!isValidRequest(req)) { logger.error("Invalid request, unable to process."); @@ -791,24 +829,41 @@ function wrapOnCallHandler( ...context, data, }; - // TODO: set up optional heartbeat const responseProxy: CallableProxyResponse = { write(chunk): boolean { - if (acceptsStreaming) { - const formattedData = encodeSSE({ message: chunk }); - return res.write(formattedData); - } // if client doesn't accept sse-protocol, response.write() is no-op. + if (!acceptsStreaming) { + return false + } + // if connection is already closed, response.write() is no-op. + if (abortController.signal.aborted) { + return false + } + const formattedData = encodeSSE({ message: chunk }); + return res.write(formattedData); }, acceptsStreaming, + signal: abortController.signal }; if (acceptsStreaming) { // SSE always responds with 200 res.status(200); + const heartbeatSeconds = options.heartbeatSeconds ?? DEFAULT_HEARTBEAT_SECONDS; + if (heartbeatSeconds !== null && heartbeatSeconds > 0) { + heartbeatInterval = setInterval( + () => res.write(": ping\n"), + heartbeatSeconds * 1000 + ); + } } // For some reason the type system isn't picking up that the handler // is a one argument function. result = await (handler as any)(arg, responseProxy); + + if (heartbeatInterval) { + clearInterval(heartbeatInterval); + heartbeatInterval = null; + } } // Encode the result as JSON to preserve types like Dates. @@ -837,6 +892,8 @@ function wrapOnCallHandler( } else { res.status(status).send(body); } + } finally { + cleanup() } }; } diff --git a/src/v2/providers/https.ts b/src/v2/providers/https.ts index 0c5fd2ba0..e29c06295 100644 --- a/src/v2/providers/https.ts +++ b/src/v2/providers/https.ts @@ -198,6 +198,14 @@ export interface CallableOptions extends HttpsOptions { * further decisions, such as requiring additional security checks or rejecting the request. */ consumeAppCheckToken?: boolean; + + /** + * Time in seconds between sending heartbeat messages to keep the connection + * alive. Set to `null` to disable heartbeats. + * + * Defaults to 30 seconds. + */ + heartbeatSeconds?: number | null } /** From 9fbdbfc607dd126f203957c6268d5d1af54f0a9b Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Thu, 7 Nov 2024 21:27:30 -0800 Subject: [PATCH 02/11] Add heartbeat test. --- spec/common/providers/https.spec.ts | 73 +++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 4 deletions(-) diff --git a/spec/common/providers/https.spec.ts b/spec/common/providers/https.spec.ts index 050577564..913b8d491 100644 --- a/spec/common/providers/https.spec.ts +++ b/spec/common/providers/https.spec.ts @@ -764,7 +764,7 @@ describe("onCallHandler", () => { "application/json", {}, { accept: "text/event-stream" } - ) as any; + ); const fn = https.onCallHandler( { cors: { origin: true, methods: "POST" }, @@ -776,7 +776,7 @@ describe("onCallHandler", () => { "gcfv2" ); - const resp = await runHandler(fn, mockReq); + const resp = await runHandler(fn, mockReq as any); const data = [`data: {"message":"hello"}`, `data: {"result":"world"}`]; expect(resp.body).to.equal([...data, ""].join("\n")); }); @@ -787,7 +787,7 @@ describe("onCallHandler", () => { "application/json", {}, { accept: "text/event-stream" } - ) as any; + ); const fn = https.onCallHandler( { cors: { origin: true, methods: "POST" }, @@ -798,10 +798,75 @@ describe("onCallHandler", () => { "gcfv2" ); - const resp = await runHandler(fn, mockReq); + const resp = await runHandler(fn, mockReq as any); const data = [`data: {"error":{"message":"INTERNAL","status":"INTERNAL"}}`]; expect(resp.body).to.equal([...data, ""].join("\n")); }); + + describe("Heartbeats", () => { + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + clock.restore(); + }); + + it("sends heartbeat messages at specified interval", async () => { + const mockReq = mockRequest( + { message: "test heartbeat" }, + "application/json", + {}, + { accept: "text/event-stream" } + ); + + const fn = https.onCallHandler( + { + cors: { origin: true, methods: "POST" }, + heartbeatSeconds: 5 + }, + async () => { + // Simulate long-running operation + await new Promise(resolve => setTimeout(resolve, 11_000)); + return "done"; + }, + "gcfv2" + ); + + const handlerPromise = runHandler(fn, mockReq as any); + clock.tick(11_000); + const resp = await handlerPromise; + expect(resp.body).to.include(': ping\n: ping\ndata: {"result":"done"}'); + }); + + it("respects null heartbeatSeconds option", async () => { + const mockReq = mockRequest( + { message: "test no heartbeat" }, + "application/json", + {}, + { accept: "text/event-stream" } + ); + + const fn = https.onCallHandler( + { + cors: { origin: true, methods: "POST" }, + heartbeatSeconds: null + }, + async () => { + await new Promise(resolve => setTimeout(resolve, 31_000)); + return "done"; + }, + "gcfv2" + ); + + const handlerPromise = runHandler(fn, mockReq as any); + clock.tick(31_000); + const resp = await handlerPromise; + expect(resp.body).to.include('data: {"result":"done"}'); + }); + }); }); }); From 7ce0c1723acd5b35023b804798e67f9957e23205 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Thu, 7 Nov 2024 22:19:33 -0800 Subject: [PATCH 03/11] Update sinon.js to get clock.tickAsync() --- package-lock.json | 136 ++++++++++++++++++---------------------------- package.json | 4 +- 2 files changed, 55 insertions(+), 85 deletions(-) diff --git a/package-lock.json b/package-lock.json index dc7bb9755..923b70df1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -30,7 +30,7 @@ "@types/nock": "^10.0.3", "@types/node": "^14.18.24", "@types/node-fetch": "^3.0.3", - "@types/sinon": "^7.0.13", + "@types/sinon": "^9.0.11", "@typescript-eslint/eslint-plugin": "^5.33.1", "@typescript-eslint/parser": "^5.33.1", "api-extractor-model-me": "^0.1.1", @@ -56,7 +56,7 @@ "prettier": "^2.7.1", "protobufjs-cli": "^1.1.1", "semver": "^7.3.5", - "sinon": "^7.3.2", + "sinon": "^9.2.4", "ts-node": "^10.4.0", "typescript": "^4.3.5", "yargs": "^15.3.1" @@ -1011,31 +1011,30 @@ "type-detect": "4.0.8" } }, - "node_modules/@sinonjs/formatio": { - "version": "3.2.2", - "resolved": "https://registry.npmjs.org/@sinonjs/formatio/-/formatio-3.2.2.tgz", - "integrity": "sha512-B8SEsgd8gArBLMD6zpRw3juQ2FVSsmdd7qlevyDqzS9WTCtvF55/gAL+h6gue8ZvPYcdiPdvueM/qm//9XzyTQ==", + "node_modules/@sinonjs/fake-timers": { + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/@sinonjs/fake-timers/-/fake-timers-6.0.1.tgz", + "integrity": "sha512-MZPUxrmFubI36XS1DI3qmI0YdN1gks62JtFZvxR67ljjSNCeK6U08Zx4msEWOXuofgqUt6zPHSi1H9fbjR/NRA==", "dev": true, "dependencies": { - "@sinonjs/commons": "^1", - "@sinonjs/samsam": "^3.1.0" + "@sinonjs/commons": "^1.7.0" } }, "node_modules/@sinonjs/samsam": { - "version": "3.3.3", - "resolved": "https://registry.npmjs.org/@sinonjs/samsam/-/samsam-3.3.3.tgz", - "integrity": "sha512-bKCMKZvWIjYD0BLGnNrxVuw4dkWCYsLqFOUWw8VgKF/+5Y+mE7LfHWPIYoDXowH+3a9LsWDMo0uAP8YDosPvHQ==", + "version": "5.3.1", + "resolved": "https://registry.npmjs.org/@sinonjs/samsam/-/samsam-5.3.1.tgz", + "integrity": "sha512-1Hc0b1TtyfBu8ixF/tpfSHTVWKwCBLY4QJbkgnE7HcwyvT2xArDxb4K7dMgqRm3szI+LJbzmW/s4xxEhv6hwDg==", "dev": true, "dependencies": { - "@sinonjs/commons": "^1.3.0", - "array-from": "^2.1.1", - "lodash": "^4.17.15" + "@sinonjs/commons": "^1.6.0", + "lodash.get": "^4.4.2", + "type-detect": "^4.0.8" } }, "node_modules/@sinonjs/text-encoding": { - "version": "0.7.2", - "resolved": "https://registry.npmjs.org/@sinonjs/text-encoding/-/text-encoding-0.7.2.tgz", - "integrity": "sha512-sXXKG+uL9IrKqViTtao2Ws6dy0znu9sOaP1di/jKGW1M6VssO8vlpXCQcpZ+jisQ1tTFAC5Jo/EOzFbggBagFQ==", + "version": "0.7.3", + "resolved": "https://registry.npmjs.org/@sinonjs/text-encoding/-/text-encoding-0.7.3.tgz", + "integrity": "sha512-DE427ROAphMQzU4ENbliGYrBSYPXF+TtLg9S8vzeA+OF4ZKzoDdzfL8sxuMUGS/lgRhM6j1URSk9ghf7Xo1tyA==", "dev": true }, "node_modules/@tootallnate/once": { @@ -1287,9 +1286,18 @@ } }, "node_modules/@types/sinon": { - "version": "7.5.2", - "resolved": "https://registry.npmjs.org/@types/sinon/-/sinon-7.5.2.tgz", - "integrity": "sha512-T+m89VdXj/eidZyejvmoP9jivXgBDdkOSBVQjU9kF349NEx10QdPNGxHeZUaj1IlJ32/ewdyXJjnJxyxJroYwg==", + "version": "9.0.11", + "resolved": "https://registry.npmjs.org/@types/sinon/-/sinon-9.0.11.tgz", + "integrity": "sha512-PwP4UY33SeeVKodNE37ZlOsR9cReypbMJOhZ7BVE0lB+Hix3efCOxiJWiE5Ia+yL9Cn2Ch72EjFTRze8RZsNtg==", + "dev": true, + "dependencies": { + "@types/sinonjs__fake-timers": "*" + } + }, + "node_modules/@types/sinonjs__fake-timers": { + "version": "8.1.5", + "resolved": "https://registry.npmjs.org/@types/sinonjs__fake-timers/-/sinonjs__fake-timers-8.1.5.tgz", + "integrity": "sha512-mQkU2jY8jJEF7YHjHvsQO8+3ughTL1mcnn96igfhONmR+fUPSKIkefQYpSe8bsly2Ep7oQbn/6VG5/9/0qcArQ==", "dev": true }, "node_modules/@types/tough-cookie": { @@ -1757,12 +1765,6 @@ "resolved": "https://registry.npmjs.org/array-flatten/-/array-flatten-1.1.1.tgz", "integrity": "sha512-PCVAQswWemu6UdxsDFFX/+gVeYqKAod3D3UVm91jHwynguOwAvYPhx8nNlM++NqRcK6CxxpUafjmhIdKiHibqg==" }, - "node_modules/array-from": { - "version": "2.1.1", - "resolved": "https://registry.npmjs.org/array-from/-/array-from-2.1.1.tgz", - "integrity": "sha512-GQTc6Uupx1FCavi5mPzBvVT7nEOeWMmUA9P95wpfpW1XwMSKs+KaymD5C2Up7KAUKg/mYwbsUYzdZWcoajlNZg==", - "dev": true - }, "node_modules/array-union": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/array-union/-/array-union-2.1.0.tgz", @@ -4475,12 +4477,6 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "node_modules/lolex": { - "version": "4.2.0", - "resolved": "https://registry.npmjs.org/lolex/-/lolex-4.2.0.tgz", - "integrity": "sha512-gKO5uExCXvSm6zbF562EvM+rd1kQDnB9AZBbiQVzf1ZmdDpxUSvpnAaVOP83N/31mRK8Ml8/VE8DMvsAZQ+7wg==", - "dev": true - }, "node_modules/long": { "version": "5.2.3", "resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz", @@ -4967,31 +4963,22 @@ } }, "node_modules/nise": { - "version": "1.5.3", - "resolved": "https://registry.npmjs.org/nise/-/nise-1.5.3.tgz", - "integrity": "sha512-Ymbac/94xeIrMf59REBPOv0thr+CJVFMhrlAkW/gjCIE58BGQdCj0x7KRCb3yz+Ga2Rz3E9XXSvUyyxqqhjQAQ==", + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/nise/-/nise-4.1.0.tgz", + "integrity": "sha512-eQMEmGN/8arp0xsvGoQ+B1qvSkR73B1nWSCh7nOt5neMCtwcQVYQGdzQMhcNscktTsWB54xnlSQFzOAPJD8nXA==", "dev": true, "dependencies": { - "@sinonjs/formatio": "^3.2.1", + "@sinonjs/commons": "^1.7.0", + "@sinonjs/fake-timers": "^6.0.0", "@sinonjs/text-encoding": "^0.7.1", "just-extend": "^4.0.2", - "lolex": "^5.0.1", "path-to-regexp": "^1.7.0" } }, - "node_modules/nise/node_modules/lolex": { - "version": "5.1.2", - "resolved": "https://registry.npmjs.org/lolex/-/lolex-5.1.2.tgz", - "integrity": "sha512-h4hmjAvHTmd+25JSwrtTIuwbKdwg5NzZVRMLn9saij4SZaepCrTCxPr35H/3bjwfMJtN+t3CX8672UIkglz28A==", - "dev": true, - "dependencies": { - "@sinonjs/commons": "^1.7.0" - } - }, "node_modules/nise/node_modules/path-to-regexp": { - "version": "1.8.0", - "resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-1.8.0.tgz", - "integrity": "sha512-n43JRhlUKUAlibEJhPeir1ncUID16QnEjNpwzNdO3Lm4ywrBpBZ5oLD0I6br9evr1Y9JTqwRtAh7JLoOzAQdVA==", + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-1.9.0.tgz", + "integrity": "sha512-xIp7/apCFJuUHdDLWe8O1HIkb0kQrOMb/0u6FXQjemHn/ii5LrIzU6bdECnsiTF/GjZkMEKg1xdiZwNqDYlZ6g==", "dev": true, "dependencies": { "isarray": "0.0.1" @@ -6162,50 +6149,33 @@ } }, "node_modules/sinon": { - "version": "7.5.0", - "resolved": "https://registry.npmjs.org/sinon/-/sinon-7.5.0.tgz", - "integrity": "sha512-AoD0oJWerp0/rY9czP/D6hDTTUYGpObhZjMpd7Cl/A6+j0xBE+ayL/ldfggkBXUs0IkvIiM1ljM8+WkOc5k78Q==", + "version": "9.2.4", + "resolved": "https://registry.npmjs.org/sinon/-/sinon-9.2.4.tgz", + "integrity": "sha512-zljcULZQsJxVra28qIAL6ow1Z9tpattkCTEJR4RBP3TGc00FcttsP5pK284Nas5WjMZU5Yzy3kAIp3B3KRf5Yg==", + "deprecated": "16.1.1", "dev": true, "dependencies": { - "@sinonjs/commons": "^1.4.0", - "@sinonjs/formatio": "^3.2.1", - "@sinonjs/samsam": "^3.3.3", - "diff": "^3.5.0", - "lolex": "^4.2.0", - "nise": "^1.5.2", - "supports-color": "^5.5.0" + "@sinonjs/commons": "^1.8.1", + "@sinonjs/fake-timers": "^6.0.1", + "@sinonjs/samsam": "^5.3.1", + "diff": "^4.0.2", + "nise": "^4.0.4", + "supports-color": "^7.1.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/sinon" } }, "node_modules/sinon/node_modules/diff": { - "version": "3.5.0", - "resolved": "https://registry.npmjs.org/diff/-/diff-3.5.0.tgz", - "integrity": "sha512-A46qtFgd+g7pDZinpnwiRJtxbC1hpgf0uzP3iG89scHk0AUC7A1TGxf5OiiOUv/JMZR8GOt8hL900hV0bOy5xA==", + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", + "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", "dev": true, "engines": { "node": ">=0.3.1" } }, - "node_modules/sinon/node_modules/has-flag": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-3.0.0.tgz", - "integrity": "sha512-sKJf1+ceQBr4SMkvQnBDNDtf4TXpVhVGateu0t918bl30FnbE2m4vNLX+VWe/dpjlb+HugGYzW7uQXH98HPEYw==", - "dev": true, - "engines": { - "node": ">=4" - } - }, - "node_modules/sinon/node_modules/supports-color": { - "version": "5.5.0", - "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", - "integrity": "sha512-QjVjwdXIt408MIiAqCX4oUKsgU2EqAGzs2Ppkm4aQYbjm+ZEWEcW4SfFNTr4uMNZma0ey4f5lgLrkB0aX0QMow==", - "dev": true, - "dependencies": { - "has-flag": "^3.0.0" - }, - "engines": { - "node": ">=4" - } - }, "node_modules/slash": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/slash/-/slash-3.0.0.tgz", diff --git a/package.json b/package.json index 4c7f861c0..eb738e1b0 100644 --- a/package.json +++ b/package.json @@ -287,7 +287,7 @@ "@types/nock": "^10.0.3", "@types/node": "^14.18.24", "@types/node-fetch": "^3.0.3", - "@types/sinon": "^7.0.13", + "@types/sinon": "^9.0.11", "@typescript-eslint/eslint-plugin": "^5.33.1", "@typescript-eslint/parser": "^5.33.1", "api-extractor-model-me": "^0.1.1", @@ -313,7 +313,7 @@ "prettier": "^2.7.1", "protobufjs-cli": "^1.1.1", "semver": "^7.3.5", - "sinon": "^7.3.2", + "sinon": "^9.2.4", "ts-node": "^10.4.0", "typescript": "^4.3.5", "yargs": "^15.3.1" From 003002f1ca2c829ad21b4c82faf402663e033968 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Thu, 7 Nov 2024 22:19:52 -0800 Subject: [PATCH 04/11] Fix tests. --- spec/common/providers/https.spec.ts | 31 +++++++++++++++++++++++++++-- spec/fixtures/mockrequest.ts | 6 ++++-- spec/helper.ts | 4 ++++ src/common/providers/https.ts | 29 +++++++++++++++------------ 4 files changed, 53 insertions(+), 17 deletions(-) diff --git a/spec/common/providers/https.spec.ts b/spec/common/providers/https.spec.ts index 913b8d491..1ecd1dceb 100644 --- a/spec/common/providers/https.spec.ts +++ b/spec/common/providers/https.spec.ts @@ -803,6 +803,33 @@ describe("onCallHandler", () => { expect(resp.body).to.equal([...data, ""].join("\n")); }); + it("stops processing when client disconnects", async () => { + const mockReq = mockRequest( + { message: "test abort" }, + "application/json", + {}, + { accept: "text/event-stream" } + ) as any; + + + const fn = https.onCallHandler( + { + cors: { origin: true, methods: "POST" }, + }, + async (req, resp) => { + resp.write("initial message"); + mockReq.emit('close'); + resp.write("should not be sent"); + return "done"; + }, + "gcfv2" + ); + + const resp = await runHandler(fn, mockReq); + + expect(resp.body).to.equal(`data: {"message":"initial message"}\n`); + }); + describe("Heartbeats", () => { let clock: sinon.SinonFakeTimers; @@ -836,7 +863,7 @@ describe("onCallHandler", () => { ); const handlerPromise = runHandler(fn, mockReq as any); - clock.tick(11_000); + await clock.tickAsync(11_000) const resp = await handlerPromise; expect(resp.body).to.include(': ping\n: ping\ndata: {"result":"done"}'); }); @@ -862,7 +889,7 @@ describe("onCallHandler", () => { ); const handlerPromise = runHandler(fn, mockReq as any); - clock.tick(31_000); + await clock.tickAsync(31_000); const resp = await handlerPromise; expect(resp.body).to.include('data: {"result":"done"}'); }); diff --git a/spec/fixtures/mockrequest.ts b/spec/fixtures/mockrequest.ts index c85ea36ed..c27f8e2cd 100644 --- a/spec/fixtures/mockrequest.ts +++ b/spec/fixtures/mockrequest.ts @@ -1,3 +1,5 @@ +import { EventEmitter } from 'node:stream'; + import * as jwt from 'jsonwebtoken'; import * as jwkToPem from 'jwk-to-pem'; import * as nock from 'nock'; @@ -5,14 +7,14 @@ import * as mockJWK from '../fixtures/credential/jwk.json'; import * as mockKey from '../fixtures/credential/key.json'; // MockRequest mocks an https.Request. -export class MockRequest { +export class MockRequest extends EventEmitter { public method: 'POST' | 'GET' | 'OPTIONS' = 'POST'; constructor( readonly body: any, readonly headers: { [name: string]: string } ) { - // This block intentionally left blank. + super() } public header(name: string): string { diff --git a/spec/helper.ts b/spec/helper.ts index 544061b0b..f4a6179e4 100644 --- a/spec/helper.ts +++ b/spec/helper.ts @@ -52,6 +52,10 @@ export function runHandler( private headers: { [name: string]: string } = {}; private callback: () => void; + constructor() { + request.on("close", () => this.end()) + } + public status(code: number) { this.statusCode = code; return this; diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index 8c4fbf529..01b0a2a28 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -871,6 +871,7 @@ function wrapOnCallHandler( // If there was some result, encode it in the body. const responseBody: HttpResponseBody = { result }; + if (acceptsStreaming) { res.write(encodeSSE(responseBody)); res.end(); @@ -878,22 +879,24 @@ function wrapOnCallHandler( res.status(200).send(responseBody); } } catch (err) { - let httpErr = err; - if (!(err instanceof HttpsError)) { - // This doesn't count as an 'explicit' error. - logger.error("Unhandled error", err); - httpErr = new HttpsError("internal", "INTERNAL"); - } + if (!abortController.signal.aborted) { + let httpErr = err; + if (!(err instanceof HttpsError)) { + // This doesn't count as an 'explicit' error. + logger.error("Unhandled error", err); + httpErr = new HttpsError("internal", "INTERNAL"); + } - const { status } = httpErr.httpErrorCode; - const body = { error: httpErr.toJSON() }; - if (req.header("accept") === "text/event-stream") { - res.send(encodeSSE(body)); - } else { - res.status(status).send(body); + const { status } = httpErr.httpErrorCode; + const body = { error: httpErr.toJSON() }; + if (req.header("accept") === "text/event-stream") { + res.send(encodeSSE(body)); + } else { + res.status(status).send(body); + } } } finally { - cleanup() + cleanup(); } }; } From e1b6833d6ea25fef02353bb20706deb595f6ee71 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Thu, 7 Nov 2024 22:26:05 -0800 Subject: [PATCH 05/11] Finishing touches. --- src/common/providers/https.ts | 37 ++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index 01b0a2a28..4d50ecab1 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -747,16 +747,15 @@ function wrapOnCallHandler( const abortController = new AbortController(); let heartbeatInterval: NodeJS.Timeout | null = null; - const cleanup = () => { + const clearHeartbeatInterval = () => { if (heartbeatInterval) { clearInterval(heartbeatInterval); heartbeatInterval = null; } - req.removeAllListeners('close'); - }; + } req.on('close', () => { - cleanup() + clearHeartbeatInterval(); abortController.abort(); }); @@ -859,24 +858,24 @@ function wrapOnCallHandler( // For some reason the type system isn't picking up that the handler // is a one argument function. result = await (handler as any)(arg, responseProxy); - - if (heartbeatInterval) { - clearInterval(heartbeatInterval); - heartbeatInterval = null; - } + clearHeartbeatInterval(); } - // Encode the result as JSON to preserve types like Dates. - result = encode(result); + if (!abortController.signal.aborted) { + // Encode the result as JSON to preserve types like Dates. + result = encode(result); - // If there was some result, encode it in the body. - const responseBody: HttpResponseBody = { result }; + // If there was some result, encode it in the body. + const responseBody: HttpResponseBody = { result }; - if (acceptsStreaming) { - res.write(encodeSSE(responseBody)); - res.end(); + if (acceptsStreaming) { + res.write(encodeSSE(responseBody)); + res.end(); + } else { + res.status(200).send(responseBody); + } } else { - res.status(200).send(responseBody); + res.end(); } } catch (err) { if (!abortController.signal.aborted) { @@ -894,9 +893,11 @@ function wrapOnCallHandler( } else { res.status(status).send(body); } + } else { + res.end(); } } finally { - cleanup(); + clearHeartbeatInterval(); } }; } From fe8a03031b1b9a4d7091bd54a9d128947ca418d6 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Thu, 7 Nov 2024 22:27:33 -0800 Subject: [PATCH 06/11] Fix formatting issues. --- spec/common/providers/https.spec.ts | 15 +++++++-------- spec/helper.ts | 2 +- src/common/providers/https.ts | 17 +++++++---------- src/v2/providers/https.ts | 2 +- 4 files changed, 16 insertions(+), 20 deletions(-) diff --git a/spec/common/providers/https.spec.ts b/spec/common/providers/https.spec.ts index 1ecd1dceb..266c5fba5 100644 --- a/spec/common/providers/https.spec.ts +++ b/spec/common/providers/https.spec.ts @@ -811,14 +811,13 @@ describe("onCallHandler", () => { { accept: "text/event-stream" } ) as any; - const fn = https.onCallHandler( { cors: { origin: true, methods: "POST" }, }, - async (req, resp) => { + (req, resp) => { resp.write("initial message"); - mockReq.emit('close'); + mockReq.emit("close"); resp.write("should not be sent"); return "done"; }, @@ -852,18 +851,18 @@ describe("onCallHandler", () => { const fn = https.onCallHandler( { cors: { origin: true, methods: "POST" }, - heartbeatSeconds: 5 + heartbeatSeconds: 5, }, async () => { // Simulate long-running operation - await new Promise(resolve => setTimeout(resolve, 11_000)); + await new Promise((resolve) => setTimeout(resolve, 11_000)); return "done"; }, "gcfv2" ); const handlerPromise = runHandler(fn, mockReq as any); - await clock.tickAsync(11_000) + await clock.tickAsync(11_000); const resp = await handlerPromise; expect(resp.body).to.include(': ping\n: ping\ndata: {"result":"done"}'); }); @@ -879,10 +878,10 @@ describe("onCallHandler", () => { const fn = https.onCallHandler( { cors: { origin: true, methods: "POST" }, - heartbeatSeconds: null + heartbeatSeconds: null, }, async () => { - await new Promise(resolve => setTimeout(resolve, 31_000)); + await new Promise((resolve) => setTimeout(resolve, 31_000)); return "done"; }, "gcfv2" diff --git a/spec/helper.ts b/spec/helper.ts index f4a6179e4..4f51d81cc 100644 --- a/spec/helper.ts +++ b/spec/helper.ts @@ -53,7 +53,7 @@ export function runHandler( private callback: () => void; constructor() { - request.on("close", () => this.end()) + request.on("close", () => this.end()); } public status(code: number) { diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index 4d50ecab1..9811dccac 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -713,7 +713,7 @@ export interface CallableOptions { * * Defaults to 30 seconds. */ - heartbeatSeconds?: number | null + heartbeatSeconds?: number | null; } /** @internal */ @@ -752,9 +752,9 @@ function wrapOnCallHandler( clearInterval(heartbeatInterval); heartbeatInterval = null; } - } + }; - req.on('close', () => { + req.on("close", () => { clearHeartbeatInterval(); abortController.abort(); }); @@ -832,27 +832,24 @@ function wrapOnCallHandler( write(chunk): boolean { // if client doesn't accept sse-protocol, response.write() is no-op. if (!acceptsStreaming) { - return false + return false; } // if connection is already closed, response.write() is no-op. if (abortController.signal.aborted) { - return false + return false; } const formattedData = encodeSSE({ message: chunk }); return res.write(formattedData); }, acceptsStreaming, - signal: abortController.signal + signal: abortController.signal, }; if (acceptsStreaming) { // SSE always responds with 200 res.status(200); const heartbeatSeconds = options.heartbeatSeconds ?? DEFAULT_HEARTBEAT_SECONDS; if (heartbeatSeconds !== null && heartbeatSeconds > 0) { - heartbeatInterval = setInterval( - () => res.write(": ping\n"), - heartbeatSeconds * 1000 - ); + heartbeatInterval = setInterval(() => res.write(": ping\n"), heartbeatSeconds * 1000); } } // For some reason the type system isn't picking up that the handler diff --git a/src/v2/providers/https.ts b/src/v2/providers/https.ts index e29c06295..fd8335157 100644 --- a/src/v2/providers/https.ts +++ b/src/v2/providers/https.ts @@ -205,7 +205,7 @@ export interface CallableOptions extends HttpsOptions { * * Defaults to 30 seconds. */ - heartbeatSeconds?: number | null + heartbeatSeconds?: number | null; } /** From 9e0df9823bb3b29b67bfc1963e7cf557e4a97745 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Fri, 8 Nov 2024 10:28:20 -0800 Subject: [PATCH 07/11] Only send heartbeat when data hasn't been written in the given interval. --- spec/common/providers/https.spec.ts | 34 ++++++++++++++++++++++-- spec/helper.ts | 1 + src/common/providers/https.ts | 40 +++++++++++++++++++++++------ 3 files changed, 65 insertions(+), 10 deletions(-) diff --git a/spec/common/providers/https.spec.ts b/spec/common/providers/https.spec.ts index 266c5fba5..354879e69 100644 --- a/spec/common/providers/https.spec.ts +++ b/spec/common/providers/https.spec.ts @@ -864,7 +864,37 @@ describe("onCallHandler", () => { const handlerPromise = runHandler(fn, mockReq as any); await clock.tickAsync(11_000); const resp = await handlerPromise; - expect(resp.body).to.include(': ping\n: ping\ndata: {"result":"done"}'); + const data = [": ping", ": ping", `data: {"result":"done"}`]; + expect(resp.body).to.equal([...data, ""].join("\n")); + }); + + it("doesn't send heartbeat messages if user writes data", async () => { + const mockReq = mockRequest( + { message: "test heartbeat" }, + "application/json", + {}, + { accept: "text/event-stream" } + ); + + const fn = https.onCallHandler( + { + cors: { origin: true, methods: "POST" }, + heartbeatSeconds: 5, + }, + async (resp, res) => { + await new Promise((resolve) => setTimeout(resolve, 3_000)); + res.write("hello"); + await new Promise((resolve) => setTimeout(resolve, 3_000)); + return "done"; + }, + "gcfv2" + ); + + const handlerPromise = runHandler(fn, mockReq as any); + await clock.tickAsync(10_000); + const resp = await handlerPromise; + const data = [`data: {"message":"hello"}`, `data: {"result":"done"}`]; + expect(resp.body).to.equal([...data, ""].join("\n")); }); it("respects null heartbeatSeconds option", async () => { @@ -890,7 +920,7 @@ describe("onCallHandler", () => { const handlerPromise = runHandler(fn, mockReq as any); await clock.tickAsync(31_000); const resp = await handlerPromise; - expect(resp.body).to.include('data: {"result":"done"}'); + expect(resp.body).to.equal('data: {"result":"done"}\n'); }); }); }); diff --git a/spec/helper.ts b/spec/helper.ts index 4f51d81cc..8f22d6ee2 100644 --- a/spec/helper.ts +++ b/spec/helper.ts @@ -86,6 +86,7 @@ export function runHandler( public write(writeBody: any) { this.sentBody += typeof writeBody === "object" ? JSON.stringify(writeBody) : writeBody; + return true; } public end() { diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index 9811dccac..947b0a3e3 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -747,15 +747,27 @@ function wrapOnCallHandler( const abortController = new AbortController(); let heartbeatInterval: NodeJS.Timeout | null = null; - const clearHeartbeatInterval = () => { + const clearScheduledHeartbeat = () => { if (heartbeatInterval) { - clearInterval(heartbeatInterval); + clearTimeout(heartbeatInterval); heartbeatInterval = null; } }; + const scheduleHeartbeat = (heartbeatSeconds: number) => { + clearScheduledHeartbeat(); // Clear any existing timeout + if (!abortController.signal.aborted) { + heartbeatInterval = setTimeout(() => { + if (!abortController.signal.aborted) { + res.write(": ping\n"); + scheduleHeartbeat(heartbeatSeconds); + } + }, heartbeatSeconds * 1000); + } + }; + req.on("close", () => { - clearHeartbeatInterval(); + clearScheduledHeartbeat(); abortController.abort(); }); @@ -828,6 +840,12 @@ function wrapOnCallHandler( ...context, data, }; + + const heartbeatSeconds = + options.heartbeatSeconds === undefined + ? DEFAULT_HEARTBEAT_SECONDS + : options.heartbeatSeconds; + const responseProxy: CallableProxyResponse = { write(chunk): boolean { // if client doesn't accept sse-protocol, response.write() is no-op. @@ -839,7 +857,13 @@ function wrapOnCallHandler( return false; } const formattedData = encodeSSE({ message: chunk }); - return res.write(formattedData); + const wrote = res.write(formattedData); + // + // Reset heartbeat timer after successful write + if (wrote && heartbeatInterval !== null && heartbeatSeconds > 0) { + scheduleHeartbeat(heartbeatSeconds); + } + return wrote; }, acceptsStreaming, signal: abortController.signal, @@ -847,15 +871,15 @@ function wrapOnCallHandler( if (acceptsStreaming) { // SSE always responds with 200 res.status(200); - const heartbeatSeconds = options.heartbeatSeconds ?? DEFAULT_HEARTBEAT_SECONDS; + if (heartbeatSeconds !== null && heartbeatSeconds > 0) { - heartbeatInterval = setInterval(() => res.write(": ping\n"), heartbeatSeconds * 1000); + scheduleHeartbeat(heartbeatSeconds); } } // For some reason the type system isn't picking up that the handler // is a one argument function. result = await (handler as any)(arg, responseProxy); - clearHeartbeatInterval(); + clearScheduledHeartbeat(); } if (!abortController.signal.aborted) { @@ -894,7 +918,7 @@ function wrapOnCallHandler( res.end(); } } finally { - clearHeartbeatInterval(); + clearScheduledHeartbeat(); } }; } From a0b9c694bcee1543b9477987da08776a2307367a Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Sat, 9 Nov 2024 11:28:18 -0800 Subject: [PATCH 08/11] Fix bugs. --- src/common/providers/https.ts | 26 ++++++++++++++------------ src/v2/providers/https.ts | 1 + 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index 947b0a3e3..58b792293 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -747,6 +747,11 @@ function wrapOnCallHandler( const abortController = new AbortController(); let heartbeatInterval: NodeJS.Timeout | null = null; + const heartbeatSeconds = + options.heartbeatSeconds === undefined + ? DEFAULT_HEARTBEAT_SECONDS + : options.heartbeatSeconds; + const clearScheduledHeartbeat = () => { if (heartbeatInterval) { clearTimeout(heartbeatInterval); @@ -754,19 +759,19 @@ function wrapOnCallHandler( } }; - const scheduleHeartbeat = (heartbeatSeconds: number) => { - clearScheduledHeartbeat(); // Clear any existing timeout + const scheduleHeartbeat = () => { + clearScheduledHeartbeat(); if (!abortController.signal.aborted) { heartbeatInterval = setTimeout(() => { if (!abortController.signal.aborted) { res.write(": ping\n"); - scheduleHeartbeat(heartbeatSeconds); + scheduleHeartbeat(); } }, heartbeatSeconds * 1000); } }; - req.on("close", () => { + res.on("close", () => { clearScheduledHeartbeat(); abortController.abort(); }); @@ -841,27 +846,24 @@ function wrapOnCallHandler( data, }; - const heartbeatSeconds = - options.heartbeatSeconds === undefined - ? DEFAULT_HEARTBEAT_SECONDS - : options.heartbeatSeconds; - const responseProxy: CallableProxyResponse = { write(chunk): boolean { // if client doesn't accept sse-protocol, response.write() is no-op. if (!acceptsStreaming) { return false; } + // if connection is already closed, response.write() is no-op. if (abortController.signal.aborted) { return false; } + const formattedData = encodeSSE({ message: chunk }); const wrote = res.write(formattedData); - // + // Reset heartbeat timer after successful write if (wrote && heartbeatInterval !== null && heartbeatSeconds > 0) { - scheduleHeartbeat(heartbeatSeconds); + scheduleHeartbeat(); } return wrote; }, @@ -873,7 +875,7 @@ function wrapOnCallHandler( res.status(200); if (heartbeatSeconds !== null && heartbeatSeconds > 0) { - scheduleHeartbeat(heartbeatSeconds); + scheduleHeartbeat(); } } // For some reason the type system isn't picking up that the handler diff --git a/src/v2/providers/https.ts b/src/v2/providers/https.ts index fd8335157..321c31765 100644 --- a/src/v2/providers/https.ts +++ b/src/v2/providers/https.ts @@ -395,6 +395,7 @@ export function onCall>( cors: { origin, methods: "POST" }, enforceAppCheck: opts.enforceAppCheck ?? options.getGlobalOptions().enforceAppCheck, consumeAppCheckToken: opts.consumeAppCheckToken, + heartbeatSeconds: opts.heartbeatSeconds, }, fixedLen, "gcfv2" From 6a265205d14031ed8fdacb79d16197720d3b29dc Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Sat, 9 Nov 2024 11:32:13 -0800 Subject: [PATCH 09/11] Fix test. --- spec/helper.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/helper.ts b/spec/helper.ts index 8f22d6ee2..5c8ca0c76 100644 --- a/spec/helper.ts +++ b/spec/helper.ts @@ -94,8 +94,8 @@ export function runHandler( } public on(event: string, callback: () => void) { - if (event !== "finish") { - throw new Error("MockResponse only implements the finish event"); + if (event !== "finish" && event !== "close") { + throw new Error("MockResponse only implements close and finish event"); } this.callback = callback; } From 4a98766c762703572a22ebe88b099969c06aa5c1 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Sat, 9 Nov 2024 11:54:23 -0800 Subject: [PATCH 10/11] Formatting. --- src/common/providers/https.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index 58b792293..812ff24e0 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -748,9 +748,7 @@ function wrapOnCallHandler( let heartbeatInterval: NodeJS.Timeout | null = null; const heartbeatSeconds = - options.heartbeatSeconds === undefined - ? DEFAULT_HEARTBEAT_SECONDS - : options.heartbeatSeconds; + options.heartbeatSeconds === undefined ? DEFAULT_HEARTBEAT_SECONDS : options.heartbeatSeconds; const clearScheduledHeartbeat = () => { if (heartbeatInterval) { From a6c203c42ba9a42f37556f06c8e66c190cb0a455 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Fri, 22 Nov 2024 14:56:47 -0800 Subject: [PATCH 11/11] Misc. formatting --- src/common/providers/https.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index a26b5a518..83eaba433 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -856,15 +856,12 @@ function wrapOnCallHandler( if (!acceptsStreaming) { return false; } - // if connection is already closed, response.write() is no-op. if (abortController.signal.aborted) { return false; } - const formattedData = encodeSSE({ message: chunk }); const wrote = res.write(formattedData); - // Reset heartbeat timer after successful write if (wrote && heartbeatInterval !== null && heartbeatSeconds > 0) { scheduleHeartbeat(); @@ -887,14 +884,11 @@ function wrapOnCallHandler( result = await (handler as any)(arg, responseProxy); clearScheduledHeartbeat(); } - if (!abortController.signal.aborted) { // Encode the result as JSON to preserve types like Dates. result = encode(result); - // If there was some result, encode it in the body. const responseBody: HttpResponseBody = { result }; - if (acceptsStreaming) { res.write(encodeSSE(responseBody)); res.end();