|
| 1 | +'use strict' |
| 2 | + |
| 3 | +const { createInflate, createGunzip, createBrotliDecompress, createZstdDecompress } = require('node:zlib') |
| 4 | +const { pipeline } = require('node:stream') |
| 5 | +const DecoratorHandler = require('../handler/decorator-handler') |
| 6 | + |
| 7 | +/** @typedef {import('node:stream').Transform} Transform */ |
| 8 | +/** @typedef {import('node:stream').Transform} Controller */ |
| 9 | +/** @typedef {Transform&import('node:zlib').Zlib} DecompressorStream */ |
| 10 | + |
| 11 | +/** @type {Record<string, () => DecompressorStream>} */ |
| 12 | +const supportedEncodings = { |
| 13 | + gzip: createGunzip, |
| 14 | + 'x-gzip': createGunzip, |
| 15 | + br: createBrotliDecompress, |
| 16 | + deflate: createInflate, |
| 17 | + compress: createInflate, |
| 18 | + 'x-compress': createInflate, |
| 19 | + ...(createZstdDecompress ? { zstd: createZstdDecompress } : {}) |
| 20 | +} |
| 21 | + |
| 22 | +const defaultSkipStatusCodes = /** @type {const} */ ([204, 304]) |
| 23 | + |
| 24 | +let warningEmitted = /** @type {boolean} */ (false) |
| 25 | + |
| 26 | +/** |
| 27 | + * @typedef {Object} DecompressHandlerOptions |
| 28 | + * @property {number[]|Readonly<number[]>} [skipStatusCodes=[204, 304]] - List of status codes to skip decompression for |
| 29 | + * @property {boolean} [skipErrorResponses] - Whether to skip decompression for error responses (status codes >= 400) |
| 30 | + */ |
| 31 | + |
| 32 | +class DecompressHandler extends DecoratorHandler { |
| 33 | + /** @type {Transform[]} */ |
| 34 | + #decompressors = [] |
| 35 | + /** @type {NodeJS.WritableStream&NodeJS.ReadableStream|null} */ |
| 36 | + #pipelineStream |
| 37 | + /** @type {Readonly<number[]>} */ |
| 38 | + #skipStatusCodes |
| 39 | + /** @type {boolean} */ |
| 40 | + #skipErrorResponses |
| 41 | + |
| 42 | + constructor (handler, { skipStatusCodes = defaultSkipStatusCodes, skipErrorResponses = true } = {}) { |
| 43 | + super(handler) |
| 44 | + this.#skipStatusCodes = skipStatusCodes |
| 45 | + this.#skipErrorResponses = skipErrorResponses |
| 46 | + } |
| 47 | + |
| 48 | + /** |
| 49 | + * Determines if decompression should be skipped based on encoding and status code |
| 50 | + * @param {string} contentEncoding - Content-Encoding header value |
| 51 | + * @param {number} statusCode - HTTP status code of the response |
| 52 | + * @returns {boolean} - True if decompression should be skipped |
| 53 | + */ |
| 54 | + #shouldSkipDecompression (contentEncoding, statusCode) { |
| 55 | + if (!contentEncoding || statusCode < 200) return true |
| 56 | + if (this.#skipStatusCodes.includes(statusCode)) return true |
| 57 | + if (this.#skipErrorResponses && statusCode >= 400) return true |
| 58 | + return false |
| 59 | + } |
| 60 | + |
| 61 | + /** |
| 62 | + * Creates a chain of decompressors for multiple content encodings |
| 63 | + * |
| 64 | + * @param {string} encodings - Comma-separated list of content encodings |
| 65 | + * @returns {Array<DecompressorStream>} - Array of decompressor streams |
| 66 | + */ |
| 67 | + #createDecompressionChain (encodings) { |
| 68 | + const parts = encodings.split(',') |
| 69 | + |
| 70 | + /** @type {DecompressorStream[]} */ |
| 71 | + const decompressors = [] |
| 72 | + |
| 73 | + for (let i = parts.length - 1; i >= 0; i--) { |
| 74 | + const encoding = parts[i].trim() |
| 75 | + if (!encoding) continue |
| 76 | + |
| 77 | + if (!supportedEncodings[encoding]) { |
| 78 | + decompressors.length = 0 // Clear if unsupported encoding |
| 79 | + return decompressors // Unsupported encoding |
| 80 | + } |
| 81 | + |
| 82 | + decompressors.push(supportedEncodings[encoding]()) |
| 83 | + } |
| 84 | + |
| 85 | + return decompressors |
| 86 | + } |
| 87 | + |
| 88 | + /** |
| 89 | + * Sets up event handlers for a decompressor stream using readable events |
| 90 | + * @param {DecompressorStream} decompressor - The decompressor stream |
| 91 | + * @param {Controller} controller - The controller to coordinate with |
| 92 | + * @returns {void} |
| 93 | + */ |
| 94 | + #setupDecompressorEvents (decompressor, controller) { |
| 95 | + decompressor.on('readable', () => { |
| 96 | + let chunk |
| 97 | + while ((chunk = decompressor.read()) !== null) { |
| 98 | + const result = super.onResponseData(controller, chunk) |
| 99 | + if (result === false) { |
| 100 | + break |
| 101 | + } |
| 102 | + } |
| 103 | + }) |
| 104 | + |
| 105 | + decompressor.on('error', (error) => { |
| 106 | + super.onResponseError(controller, error) |
| 107 | + }) |
| 108 | + } |
| 109 | + |
| 110 | + /** |
| 111 | + * Sets up event handling for a single decompressor |
| 112 | + * @param {Controller} controller - The controller to handle events |
| 113 | + * @returns {void} |
| 114 | + */ |
| 115 | + #setupSingleDecompressor (controller) { |
| 116 | + const decompressor = this.#decompressors[0] |
| 117 | + this.#setupDecompressorEvents(decompressor, controller) |
| 118 | + |
| 119 | + decompressor.on('end', () => { |
| 120 | + super.onResponseEnd(controller, {}) |
| 121 | + }) |
| 122 | + } |
| 123 | + |
| 124 | + /** |
| 125 | + * Sets up event handling for multiple chained decompressors using pipeline |
| 126 | + * @param {Controller} controller - The controller to handle events |
| 127 | + * @returns {void} |
| 128 | + */ |
| 129 | + #setupMultipleDecompressors (controller) { |
| 130 | + const lastDecompressor = this.#decompressors[this.#decompressors.length - 1] |
| 131 | + this.#setupDecompressorEvents(lastDecompressor, controller) |
| 132 | + |
| 133 | + this.#pipelineStream = pipeline(this.#decompressors, (err) => { |
| 134 | + if (err) { |
| 135 | + super.onResponseError(controller, err) |
| 136 | + return |
| 137 | + } |
| 138 | + super.onResponseEnd(controller, {}) |
| 139 | + }) |
| 140 | + } |
| 141 | + |
| 142 | + /** |
| 143 | + * Cleans up decompressor references to prevent memory leaks |
| 144 | + * @returns {void} |
| 145 | + */ |
| 146 | + #cleanupDecompressors () { |
| 147 | + this.#decompressors.length = 0 |
| 148 | + this.#pipelineStream = null |
| 149 | + } |
| 150 | + |
| 151 | + /** |
| 152 | + * @param {Controller} controller |
| 153 | + * @param {number} statusCode |
| 154 | + * @param {Record<string, string | string[] | undefined>} headers |
| 155 | + * @param {string} statusMessage |
| 156 | + * @returns {void} |
| 157 | + */ |
| 158 | + onResponseStart (controller, statusCode, headers, statusMessage) { |
| 159 | + const contentEncoding = headers['content-encoding'] |
| 160 | + |
| 161 | + // If content encoding is not supported or status code is in skip list |
| 162 | + if (this.#shouldSkipDecompression(contentEncoding, statusCode)) { |
| 163 | + return super.onResponseStart(controller, statusCode, headers, statusMessage) |
| 164 | + } |
| 165 | + |
| 166 | + const decompressors = this.#createDecompressionChain(contentEncoding.toLowerCase()) |
| 167 | + |
| 168 | + if (decompressors.length === 0) { |
| 169 | + this.#cleanupDecompressors() |
| 170 | + return super.onResponseStart(controller, statusCode, headers, statusMessage) |
| 171 | + } |
| 172 | + |
| 173 | + this.#decompressors = decompressors |
| 174 | + |
| 175 | + // Remove compression headers since we're decompressing |
| 176 | + const { 'content-encoding': _, 'content-length': __, ...newHeaders } = headers |
| 177 | + |
| 178 | + if (this.#decompressors.length === 1) { |
| 179 | + this.#setupSingleDecompressor(controller) |
| 180 | + } else { |
| 181 | + this.#setupMultipleDecompressors(controller) |
| 182 | + } |
| 183 | + |
| 184 | + super.onResponseStart(controller, statusCode, newHeaders, statusMessage) |
| 185 | + } |
| 186 | + |
| 187 | + /** |
| 188 | + * @param {Controller} controller |
| 189 | + * @param {Buffer} chunk |
| 190 | + * @returns {void} |
| 191 | + */ |
| 192 | + onResponseData (controller, chunk) { |
| 193 | + if (this.#decompressors.length > 0) { |
| 194 | + this.#decompressors[0].write(chunk) |
| 195 | + return |
| 196 | + } |
| 197 | + super.onResponseData(controller, chunk) |
| 198 | + } |
| 199 | + |
| 200 | + /** |
| 201 | + * @param {Controller} controller |
| 202 | + * @param {Record<string, string | string[]> | undefined} trailers |
| 203 | + * @returns {void} |
| 204 | + */ |
| 205 | + onResponseEnd (controller, trailers) { |
| 206 | + if (this.#decompressors.length > 0) { |
| 207 | + this.#decompressors[0].end() |
| 208 | + this.#cleanupDecompressors() |
| 209 | + return |
| 210 | + } |
| 211 | + super.onResponseEnd(controller, trailers) |
| 212 | + } |
| 213 | + |
| 214 | + /** |
| 215 | + * @param {Controller} controller |
| 216 | + * @param {Error} err |
| 217 | + * @returns {void} |
| 218 | + */ |
| 219 | + onResponseError (controller, err) { |
| 220 | + if (this.#decompressors.length > 0) { |
| 221 | + for (const decompressor of this.#decompressors) { |
| 222 | + decompressor.destroy(err) |
| 223 | + } |
| 224 | + this.#cleanupDecompressors() |
| 225 | + } |
| 226 | + super.onResponseError(controller, err) |
| 227 | + } |
| 228 | +} |
| 229 | + |
| 230 | +/** |
| 231 | + * Creates a decompression interceptor for HTTP responses |
| 232 | + * @param {DecompressHandlerOptions} [options] - Options for the interceptor |
| 233 | + * @returns {Function} - Interceptor function |
| 234 | + */ |
| 235 | +function createDecompressInterceptor (options = {}) { |
| 236 | + // Emit experimental warning only once |
| 237 | + if (!warningEmitted) { |
| 238 | + process.emitWarning( |
| 239 | + 'DecompressInterceptor is experimental and subject to change', |
| 240 | + 'ExperimentalWarning' |
| 241 | + ) |
| 242 | + warningEmitted = true |
| 243 | + } |
| 244 | + |
| 245 | + return (dispatch) => { |
| 246 | + return (opts, handler) => { |
| 247 | + const decompressHandler = new DecompressHandler(handler, options) |
| 248 | + return dispatch(opts, decompressHandler) |
| 249 | + } |
| 250 | + } |
| 251 | +} |
| 252 | + |
| 253 | +module.exports = createDecompressInterceptor |
0 commit comments