@@ -141,23 +141,30 @@ export interface CallableRequest<T = any> {
141141 * The raw request handled by the callable.
142142 */
143143 rawRequest : Request ;
144+
145+ /**
146+ * Whether this is a streaming request.
147+ * Code can be optimized by not trying to generate a stream of chunks to
148+ * call response.sendChunk on if request.acceptsStreaming is false.
149+ * It is always safe, however, to call response.sendChunk as this will
150+ * noop if acceptsStreaming is false.
151+ */
152+ acceptsStreaming : boolean ;
144153}
145154
146155/**
147- * CallableProxyResponse exposes subset of express.Response object
148- * to allow writing partial, streaming responses back to the client .
156+ * CallableProxyResponse allows streaming response chunks and listening to signals
157+ * triggered in events such as a disconnect .
149158 */
150- export interface CallableProxyResponse {
159+ export interface CallableResponse < T = unknown > {
151160 /**
152161 * Writes a chunk of the response body to the client. This method can be called
153162 * multiple times to stream data progressively.
163+ * Returns a promise of whether the data was written. This can be false, for example,
164+ * if the request was not a streaming request. Rejects if there is a network error.
154165 */
155- write : express . Response [ "write" ] ;
156- /**
157- * Indicates whether the client has requested and can handle streaming responses.
158- * This should be checked before attempting to stream data to avoid compatibility issues.
159- */
160- acceptsStreaming : boolean ;
166+ sendChunk : ( chunk : T ) => Promise < boolean > ;
167+
161168 /**
162169 * An AbortSignal that is triggered when the client disconnects or the
163170 * request is terminated prematurely.
@@ -586,13 +593,9 @@ async function checkTokens(
586593 auth : "INVALID" ,
587594 } ;
588595
589- await Promise . all ( [
590- Promise . resolve ( ) . then ( async ( ) => {
591- verifications . auth = await checkAuthToken ( req , ctx ) ;
592- } ) ,
593- Promise . resolve ( ) . then ( async ( ) => {
594- verifications . app = await checkAppCheckToken ( req , ctx , options ) ;
595- } ) ,
596+ [ verifications . auth , verifications . app ] = await Promise . all ( [
597+ checkAuthToken ( req , ctx ) ,
598+ checkAppCheckToken ( req , ctx , options ) ,
596599 ] ) ;
597600
598601 const logPayload = {
@@ -697,9 +700,9 @@ async function checkAppCheckToken(
697700}
698701
699702type v1CallableHandler = ( data : any , context : CallableContext ) => any | Promise < any > ;
700- type v2CallableHandler < Req , Res > = (
703+ type v2CallableHandler < Req , Res , Stream > = (
701704 request : CallableRequest < Req > ,
702- response ?: CallableProxyResponse
705+ response ?: CallableResponse < Stream >
703706) => Res ;
704707
705708/** @internal **/
@@ -718,9 +721,9 @@ export interface CallableOptions<T = any> {
718721}
719722
720723/** @internal */
721- export function onCallHandler < Req = any , Res = any > (
724+ export function onCallHandler < Req = any , Res = any , Stream = unknown > (
722725 options : CallableOptions < Req > ,
723- handler : v1CallableHandler | v2CallableHandler < Req , Res > ,
726+ handler : v1CallableHandler | v2CallableHandler < Req , Res , Stream > ,
724727 version : "gcfv1" | "gcfv2"
725728) : ( req : Request , res : express . Response ) => Promise < void > {
726729 const wrapped = wrapOnCallHandler ( options , handler , version ) ;
@@ -739,9 +742,9 @@ function encodeSSE(data: unknown): string {
739742}
740743
741744/** @internal */
742- function wrapOnCallHandler < Req = any , Res = any > (
745+ function wrapOnCallHandler < Req = any , Res = any , Stream = unknown > (
743746 options : CallableOptions < Req > ,
744- handler : v1CallableHandler | v2CallableHandler < Req , Res > ,
747+ handler : v1CallableHandler | v2CallableHandler < Req , Res , Stream > ,
745748 version : "gcfv1" | "gcfv2"
746749) : ( req : Request , res : express . Response ) => Promise < void > {
747750 return async ( req : Request , res : express . Response ) : Promise < void > => {
@@ -855,27 +858,41 @@ function wrapOnCallHandler<Req = any, Res = any>(
855858 const arg : CallableRequest < Req > = {
856859 ...context ,
857860 data,
861+ acceptsStreaming,
858862 } ;
859863
860- const responseProxy : CallableProxyResponse = {
861- write ( chunk ) : boolean {
864+ const responseProxy : CallableResponse < Stream > = {
865+ sendChunk ( chunk : Stream ) : Promise < boolean > {
862866 // if client doesn't accept sse-protocol, response.write() is no-op.
863867 if ( ! acceptsStreaming ) {
864- return false ;
868+ return Promise . resolve ( false ) ;
865869 }
866870 // if connection is already closed, response.write() is no-op.
867871 if ( abortController . signal . aborted ) {
868- return false ;
872+ return Promise . resolve ( false ) ;
869873 }
870874 const formattedData = encodeSSE ( { message : chunk } ) ;
871- const wrote = res . write ( formattedData ) ;
875+ let resolve : ( wrote : boolean ) => void ;
876+ let reject : ( err : Error ) => void ;
877+ const p = new Promise < boolean > ( ( res , rej ) => {
878+ resolve = res ;
879+ reject = rej ;
880+ } ) ;
881+ const wrote = res . write ( formattedData , ( error ) => {
882+ if ( error ) {
883+ reject ( error ) ;
884+ return ;
885+ }
886+ resolve ( wrote ) ;
887+ } ) ;
888+
872889 // Reset heartbeat timer after successful write
873890 if ( wrote && heartbeatInterval !== null && heartbeatSeconds > 0 ) {
874891 scheduleHeartbeat ( ) ;
875892 }
876- return wrote ;
893+
894+ return p ;
877895 } ,
878- acceptsStreaming,
879896 signal : abortController . signal ,
880897 } ;
881898 if ( acceptsStreaming ) {
0 commit comments