- 
          
 - 
                Notifications
    
You must be signed in to change notification settings  - Fork 1.3k
 
Fix: Resolve Memory Leak in Parser Buffer Management #3531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| 
           @regevbr , I noticed that you are author of the initial code here. Could you take a look on this change?  | 
    
| 
           I think your example is not covering the situation here. Once a buffer gets increased in size, the only way for it to get smaller is in a very specific case where the parsing ends and there is no more data to be parsed in the buffer. I guess that this case doesn't happen for you since the connection is never "resting". Also, since you probably have a lot of info coming on the stream and the processing is slower/the data is very big, the buffer gets bigger and bigger. Your solution will indeed reduce the buffer, but can potentially introduce the previous issues that were fixed in my PR as you will cause a lot of sync buffer allocations over and over again. I can suggest 2 things: 
 Can you at least try approach 2 locally and see if it resolves it for you? What are your thoughts on my approach? In addition, can you add a bench test that recreates your scenario, and then make sure that your fix works on it? @brianc, your input is also needed here, please  | 
    
          
 I came here to say this as well. Even a small reproducible script would be helpful to understand the behavior you are seeing. Also, some prior art for benchmarking: https://github.com/hjr3/node-postgres/blob/b1609b5732515f07cac32f699fbdb3d4f63d852d/packages/pg-bench/src/binary-format.ts  | 
    
| 
           I recall intentionally doing this or something similar in the past for performance reasons. I don't consider it a memory leak to grow the internal memory usage of the parser to the largest buffer received while the parser is allocated. It wont grow infinitely, so I don't think it's a "leak." It's just a space/time trade-off: "take more space to save time having to realloc memory more often." I would like to see benchmarks/numbers around the tradeoff to see what we give up performance wise going the other way.  | 
    
74db528    to
    e23b8f5      
    Compare
  
    Fix buffer shrinking: create new appropriately-sized buffer instead of just adjusting pointers when partial messages remain. This allows garbage collection of large processed buffers that were previously kept alive due to cursor-based buffer management. Resolves memory leak where large buffers would remain in memory indefinitely even when only a few bytes of data were still needed.
e23b8f5    to
    34b5c51      
    Compare
  
    Implement sophisticated buffer management that only shrinks when buffer is more than half empty, and reduces to half size (not exact size) to provide wiggle room for incoming data. This prevents both memory leaks from oversized buffers and performance issues from excessive reallocations in high-throughput scenarios. - Only shrink when buffer utilization < 50% - When shrinking, reduce to max(half current size, 2x remaining data) - Gradual reduction allows large buffers to shrink progressively - Falls back to cursor strategy when buffer utilization is reasonable
c161069    to
    68763e8      
    Compare
  
    | 
           Sure, thank you for comments. 
 @hjr3 , sure, will try.  | 
    
| 
           I actually took the chance to have a vibe coding challenge. I used claude and it took 55 versions to get here haha. import { TransformOptions } from 'stream'
import {
  Mode,
  bindComplete,
  parseComplete,
  closeComplete,
  noData,
  portalSuspended,
  copyDone,
  replicationStart,
  emptyQuery,
  ReadyForQueryMessage,
  CommandCompleteMessage,
  CopyDataMessage,
  CopyResponse,
  NotificationResponseMessage,
  RowDescriptionMessage,
  ParameterDescriptionMessage,
  Field,
  DataRowMessage,
  ParameterStatusMessage,
  BackendKeyDataMessage,
  DatabaseError,
  BackendMessage,
  MessageName,
  AuthenticationMD5Password,
  NoticeMessage,
} from './messages'
import { BufferReader } from './buffer-reader'
// every message is prefixed with a single bye
const CODE_LENGTH = 1
// every message has an int32 length which includes itself but does
// NOT include the code in the length
const LEN_LENGTH = 4
const HEADER_LENGTH = CODE_LENGTH + LEN_LENGTH
const UINT32_SIZE = 4
export type Packet = {
  code: number
  packet: Buffer
}
const emptyBuffer = Buffer.allocUnsafe(0)
type StreamOptions = TransformOptions & {
  mode: Mode
}
const enum MessageCodes {
  DataRow = 0x44, // D
  ParseComplete = 0x31, // 1
  BindComplete = 0x32, // 2
  CloseComplete = 0x33, // 3
  CommandComplete = 0x43, // C
  ReadyForQuery = 0x5a, // Z
  NoData = 0x6e, // n
  NotificationResponse = 0x41, // A
  AuthenticationResponse = 0x52, // R
  ParameterStatus = 0x53, // S
  BackendKeyData = 0x4b, // K
  ErrorMessage = 0x45, // E
  NoticeMessage = 0x4e, // N
  RowDescriptionMessage = 0x54, // T
  ParameterDescriptionMessage = 0x74, // t
  PortalSuspended = 0x73, // s
  ReplicationStart = 0x57, // W
  EmptyQuery = 0x49, // I
  CopyIn = 0x47, // G
  CopyOut = 0x48, // H
  CopyDone = 0x63, // c
  CopyData = 0x64, // d
}
export type MessageCallback = (msg: BackendMessage) => void
interface ParseResultMerge {
  needsMerge: true
  virtualOffset: number
}
interface ParseResultNoMerge {
  needsMerge: false
  remainingBuffer: Buffer
  remainingOffset: number
  remainingLength: number
}
type ParseResult = ParseResultMerge | ParseResultNoMerge
interface BufferLocation {
  buffer: Buffer
  offset: number
}
/**
 * VirtualBuffer provides a unified view over two separate buffers,
 * allowing seamless reading across buffer boundaries
 */
class VirtualBuffer {
  constructor(
    private oldBuffer: Buffer,
    private oldOffset: number,
    private oldLength: number,
    private newBuffer: Buffer
  ) {}
  get totalLength(): number {
    return this.oldLength + this.newBuffer.length
  }
  /**
   * Read a single byte at a virtual offset
   */
  readByteAt(virtualOffset: number): number {
    if (virtualOffset < this.oldLength) {
      return this.oldBuffer[this.oldOffset + virtualOffset]
    } else {
      return this.newBuffer[virtualOffset - this.oldLength]
    }
  }
  /**
   * Read a UInt32BE at a virtual offset, potentially spanning both buffers
   */
  readUInt32BEAt(virtualOffset: number): number {
    // If the entire UInt32 is in the old buffer
    if (virtualOffset + UINT32_SIZE <= this.oldLength) {
      return this.oldBuffer.readUInt32BE(this.oldOffset + virtualOffset)
    }
    // If the entire UInt32 is in the new buffer
    if (virtualOffset >= this.oldLength) {
      return this.newBuffer.readUInt32BE(virtualOffset - this.oldLength)
    }
    // UInt32 spans both buffers - create a temporary buffer and use readUInt32BE
    const tempBuffer = Buffer.allocUnsafe(UINT32_SIZE)
    for (let i = 0; i < UINT32_SIZE; i++) {
      tempBuffer[i] = this.readByteAt(virtualOffset + i)
    }
    return tempBuffer.readUInt32BE(0)
  }
  /**
   * Get buffer location for data, extracting into a new buffer if it spans both buffers
   * Always returns a BufferLocation with the appropriate buffer and offset
   */
  getBufferLocation(virtualOffset: number, length: number): BufferLocation {
    // Data entirely in old buffer
    if (virtualOffset + length <= this.oldLength) {
      return {
        buffer: this.oldBuffer,
        offset: this.oldOffset + virtualOffset
      }
    }
    // Data entirely in new buffer
    if (virtualOffset >= this.oldLength) {
      return {
        buffer: this.newBuffer,
        offset: virtualOffset - this.oldLength
      }
    }
    // Data spans both buffers - extract into a new buffer
    const oldBufferPart = this.oldLength - virtualOffset
    const newBufferPart = length - oldBufferPart
    const extractedBuffer = Buffer.allocUnsafe(length)
    this.oldBuffer.copy(extractedBuffer, 0, this.oldOffset + virtualOffset, this.oldOffset + this.oldLength)
    this.newBuffer.copy(extractedBuffer, oldBufferPart, 0, newBufferPart)
    return {
      buffer: extractedBuffer,
      offset: 0
    }
  }
  /**
   * Calculate remaining data after consuming up to virtualOffset
   */
  calculateRemaining(virtualOffset: number): ParseResult {
    const remainingLength = this.totalLength - virtualOffset
    if (remainingLength === 0) {
      // Nothing left
      return {
        needsMerge: false,
        remainingBuffer: emptyBuffer,
        remainingOffset: 0,
        remainingLength: 0
      }
    }
    // If all remaining data is in the new buffer
    if (virtualOffset >= this.oldLength) {
      const newOffset = virtualOffset - this.oldLength
      return {
        needsMerge: false,
        remainingBuffer: this.newBuffer,
        remainingOffset: newOffset,
        remainingLength: remainingLength
      }
    }
    // Remaining data spans both buffers - need to merge
    return {
      needsMerge: true,
      virtualOffset
    }
  }
}
export class Parser {
  private buffer: Buffer = emptyBuffer
  private bufferLength: number = 0
  private bufferOffset: number = 0
  private reader = new BufferReader()
  private mode: Mode
  
  constructor(opts?: StreamOptions) {
    if (opts?.mode === 'binary') {
      throw new Error('Binary mode not supported yet')
    }
    this.mode = opts?.mode || 'text'
  }
  
  public parse(buffer: Buffer, callback: MessageCallback) {
    const parseResult = this.parseVirtualBuffer(buffer, callback)
    
    if (parseResult.needsMerge) {
      this.updateBufferAfterParsing(parseResult.virtualOffset)
      this.mergeBuffer(buffer)
    } else {
      this.setRemainingBuffer(parseResult)
    }
  }
  private updateBufferAfterParsing(consumedBytes: number): void {
    this.bufferOffset += consumedBytes
    this.bufferLength -= consumedBytes
  }
  private setRemainingBuffer(parseResult: ParseResultNoMerge): void {
    this.buffer = parseResult.remainingBuffer
    this.bufferOffset = parseResult.remainingOffset
    this.bufferLength = parseResult.remainingLength
  }
  
  private parseVirtualBuffer(newBuffer: Buffer, callback: MessageCallback): ParseResult {
    const virtualBuffer = new VirtualBuffer(this.buffer, this.bufferOffset, this.bufferLength, newBuffer)
    let virtualOffset = 0
    
    while (virtualOffset + HEADER_LENGTH <= virtualBuffer.totalLength) {
      const messageInfo = this.readMessageHeader(virtualBuffer, virtualOffset)
      const fullMessageLength = CODE_LENGTH + messageInfo.length
      
      if (virtualOffset + fullMessageLength <= virtualBuffer.totalLength) {
        this.processCompleteMessage(virtualBuffer, virtualOffset, messageInfo, callback)
        virtualOffset += fullMessageLength
      } else {
        break // Incomplete message, stop parsing
      }
    }
    
    return virtualBuffer.calculateRemaining(virtualOffset)
  }
  private readMessageHeader(virtualBuffer: VirtualBuffer, virtualOffset: number): { code: number; length: number } {
    const code = virtualBuffer.readByteAt(virtualOffset)
    const length = virtualBuffer.readUInt32BEAt(virtualOffset + CODE_LENGTH)
    return { code, length }
  }
  private processCompleteMessage(
    virtualBuffer: VirtualBuffer,
    virtualOffset: number,
    messageInfo: { code: number; length: number },
    callback: MessageCallback
  ): void {
    const messageStartOffset = virtualOffset + HEADER_LENGTH
    const bufferLocation = virtualBuffer.getBufferLocation(messageStartOffset, messageInfo.length)
    const message = this.handlePacket(bufferLocation.offset, messageInfo.code, messageInfo.length, bufferLocation.buffer)
    callback(message)
  }
  
  private mergeBuffer(buffer: Buffer): void {
    if (this.bufferLength > 0) {
      const newLength = this.bufferLength + buffer.byteLength
      const newFullLength = newLength + this.bufferOffset
      
      if (newFullLength > this.buffer.byteLength) {
        // We can't concat the new buffer with the remaining one
        let newBuffer: Buffer
        
        if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) {
          // We can move the relevant part to the beginning of the buffer instead of allocating a new buffer
          newBuffer = this.buffer
        } else {
          // Allocate a new larger buffer
          let newBufferLength = this.buffer.byteLength * 2
          while (newLength >= newBufferLength) {
            newBufferLength *= 2
          }
          newBuffer = Buffer.allocUnsafe(newBufferLength)
        }
        
        // Move the remaining buffer to the new one
        this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength)
        this.buffer = newBuffer
        this.bufferOffset = 0
      }
      
      // Concat the new buffer with the remaining one
      buffer.copy(this.buffer, this.bufferOffset + this.bufferLength)
      this.bufferLength = newLength
    } else {
      this.buffer = buffer
      this.bufferOffset = 0
      this.bufferLength = buffer.byteLength
    }
  }
  private handlePacket(offset: number, code: number, length: number, bytes: Buffer): BackendMessage {
    switch (code) {
      case MessageCodes.BindComplete:
        return bindComplete
      case MessageCodes.ParseComplete:
        return parseComplete
      case MessageCodes.CloseComplete:
        return closeComplete
      case MessageCodes.NoData:
        return noData
      case MessageCodes.PortalSuspended:
        return portalSuspended
      case MessageCodes.CopyDone:
        return copyDone
      case MessageCodes.ReplicationStart:
        return replicationStart
      case MessageCodes.EmptyQuery:
        return emptyQuery
      case MessageCodes.DataRow:
        return this.parseDataRowMessage(offset, length, bytes)
      case MessageCodes.CommandComplete:
        return this.parseCommandCompleteMessage(offset, length, bytes)
      case MessageCodes.ReadyForQuery:
        return this.parseReadyForQueryMessage(offset, length, bytes)
      case MessageCodes.NotificationResponse:
        return this.parseNotificationMessage(offset, length, bytes)
      case MessageCodes.AuthenticationResponse:
        return this.parseAuthenticationResponse(offset, length, bytes)
      case MessageCodes.ParameterStatus:
        return this.parseParameterStatusMessage(offset, length, bytes)
      case MessageCodes.BackendKeyData:
        return this.parseBackendKeyData(offset, length, bytes)
      case MessageCodes.ErrorMessage:
        return this.parseErrorMessage(offset, length, bytes, 'error')
      case MessageCodes.NoticeMessage:
        return this.parseErrorMessage(offset, length, bytes, 'notice')
      case MessageCodes.RowDescriptionMessage:
        return this.parseRowDescriptionMessage(offset, length, bytes)
      case MessageCodes.ParameterDescriptionMessage:
        return this.parseParameterDescriptionMessage(offset, length, bytes)
      case MessageCodes.CopyIn:
        return this.parseCopyInMessage(offset, length, bytes)
      case MessageCodes.CopyOut:
        return this.parseCopyOutMessage(offset, length, bytes)
      case MessageCodes.CopyData:
        return this.parseCopyData(offset, length, bytes)
      default:
        return new DatabaseError('received invalid response: ' + code.toString(16), length, 'error')
    }
  }
  
  private parseReadyForQueryMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const status = this.reader.string(1)
    return new ReadyForQueryMessage(length, status)
  }
  
  private parseCommandCompleteMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const text = this.reader.cstring()
    return new CommandCompleteMessage(length, text)
  }
  
  private parseCopyData(offset: number, length: number, bytes: Buffer) {
    const chunk = bytes.slice(offset, offset + (length - 4))
    return new CopyDataMessage(length, chunk)
  }
  
  private parseCopyInMessage(offset: number, length: number, bytes: Buffer) {
    return this.parseCopyMessage(offset, length, bytes, 'copyInResponse')
  }
  
  private parseCopyOutMessage(offset: number, length: number, bytes: Buffer) {
    return this.parseCopyMessage(offset, length, bytes, 'copyOutResponse')
  }
  
  private parseCopyMessage(offset: number, length: number, bytes: Buffer, messageName: MessageName) {
    this.reader.setBuffer(offset, bytes)
    const isBinary = this.reader.byte() !== 0
    const columnCount = this.reader.int16()
    const message = new CopyResponse(length, messageName, isBinary, columnCount)
    for (let i = 0; i < columnCount; i++) {
      message.columnTypes[i] = this.reader.int16()
    }
    return message
  }
  
  private parseNotificationMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const processId = this.reader.int32()
    const channel = this.reader.cstring()
    const payload = this.reader.cstring()
    return new NotificationResponseMessage(length, processId, channel, payload)
  }
  
  private parseRowDescriptionMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const fieldCount = this.reader.int16()
    const message = new RowDescriptionMessage(length, fieldCount)
    for (let i = 0; i < fieldCount; i++) {
      message.fields[i] = this.parseField()
    }
    return message
  }
  
  private parseField(): Field {
    const name = this.reader.cstring()
    const tableID = this.reader.uint32()
    const columnID = this.reader.int16()
    const dataTypeID = this.reader.uint32()
    const dataTypeSize = this.reader.int16()
    const dataTypeModifier = this.reader.int32()
    const mode = this.reader.int16() === 0 ? 'text' : 'binary'
    return new Field(name, tableID, columnID, dataTypeID, dataTypeSize, dataTypeModifier, mode)
  }
  
  private parseParameterDescriptionMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const parameterCount = this.reader.int16()
    const message = new ParameterDescriptionMessage(length, parameterCount)
    for (let i = 0; i < parameterCount; i++) {
      message.dataTypeIDs[i] = this.reader.int32()
    }
    return message
  }
  
  private parseDataRowMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const fieldCount = this.reader.int16()
    const fields: any[] = new Array(fieldCount)
    for (let i = 0; i < fieldCount; i++) {
      const len = this.reader.int32()
      // a -1 for length means the value of the field is null
      fields[i] = len === -1 ? null : this.reader.string(len)
    }
    return new DataRowMessage(length, fields)
  }
  
  private parseParameterStatusMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const name = this.reader.cstring()
    const value = this.reader.cstring()
    return new ParameterStatusMessage(length, name, value)
  }
  
  private parseBackendKeyData(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const processID = this.reader.int32()
    const secretKey = this.reader.int32()
    return new BackendKeyDataMessage(length, processID, secretKey)
  }
  
  public parseAuthenticationResponse(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const code = this.reader.int32()
    // TODO(bmc): maybe better types here
    const message: BackendMessage & any = {
      name: 'authenticationOk',
      length,
    }
    switch (code) {
      case 0: // AuthenticationOk
        break
      case 3: // AuthenticationCleartextPassword
        if (message.length === 8) {
          message.name = 'authenticationCleartextPassword'
        }
        break
      case 5: // AuthenticationMD5Password
        if (message.length === 12) {
          message.name = 'authenticationMD5Password'
          const salt = this.reader.bytes(4)
          return new AuthenticationMD5Password(length, salt)
        }
        break
      case 10: // AuthenticationSASL
        {
          message.name = 'authenticationSASL'
          message.mechanisms = []
          let mechanism: string
          do {
            mechanism = this.reader.cstring()
            if (mechanism) {
              message.mechanisms.push(mechanism)
            }
          } while (mechanism)
        }
        break
      case 11: // AuthenticationSASLContinue
        message.name = 'authenticationSASLContinue'
        message.data = this.reader.string(length - 8)
        break
      case 12: // AuthenticationSASLFinal
        message.name = 'authenticationSASLFinal'
        message.data = this.reader.string(length - 8)
        break
      default:
        throw new Error('Unknown authenticationOk message type ' + code)
    }
    return message
  }
  
  private parseErrorMessage(offset: number, length: number, bytes: Buffer, name: MessageName) {
    this.reader.setBuffer(offset, bytes)
    const fields: Record<string, string> = {}
    let fieldType = this.reader.string(1)
    while (fieldType !== '\0') {
      fields[fieldType] = this.reader.cstring()
      fieldType = this.reader.string(1)
    }
    const messageValue = fields.M
    const message =
      name === 'notice' ? new NoticeMessage(length, messageValue) : new DatabaseError(messageValue, length, name)
    message.severity = fields.S
    message.code = fields.C
    message.detail = fields.D
    message.hint = fields.H
    message.position = fields.P
    message.internalPosition = fields.p
    message.internalQuery = fields.q
    message.where = fields.W
    message.schema = fields.s
    message.table = fields.t
    message.column = fields.c
    message.dataType = fields.d
    message.constraint = fields.n
    message.file = fields.F
    message.line = fields.L
    message.routine = fields.R
    return message
  }
} | 
    
| 
           @hjr3 , thank you for referencing the benchmarsk - they were very useful. @regevbr , I've also run benchmark against your solution - please take a look on results.  | 
    
| 
           Awsome, I created another version, in whcih we dont merge buffers, only when extracting concrete data out of them (like strings). Can you please benchmark it as well please? import { TransformOptions } from 'stream'
import {
    Mode,
    bindComplete,
    parseComplete,
    closeComplete,
    noData,
    portalSuspended,
    copyDone,
    replicationStart,
    emptyQuery,
    ReadyForQueryMessage,
    CommandCompleteMessage,
    CopyDataMessage,
    CopyResponse,
    NotificationResponseMessage,
    RowDescriptionMessage,
    ParameterDescriptionMessage,
    Field,
    DataRowMessage,
    ParameterStatusMessage,
    BackendKeyDataMessage,
    DatabaseError,
    BackendMessage,
    MessageName,
    AuthenticationMD5Password,
    NoticeMessage,
} from './messages'
import { BufferReader } from './buffer-reader'
// every message is prefixed with a single bye
const CODE_LENGTH = 1
// every message has an int32 length which includes itself but does
// NOT include the code in the length
const LEN_LENGTH = 4
const HEADER_LENGTH = CODE_LENGTH + LEN_LENGTH
const UINT32_SIZE = 4
export type Packet = {
    code: number
    packet: Buffer
}
const emptyBuffer = Buffer.allocUnsafe(0)
type StreamOptions = TransformOptions & {
    mode: Mode
}
const enum MessageCodes {
    DataRow = 0x44, // D
    ParseComplete = 0x31, // 1
    BindComplete = 0x32, // 2
    CloseComplete = 0x33, // 3
    CommandComplete = 0x43, // C
    ReadyForQuery = 0x5a, // Z
    NoData = 0x6e, // n
    NotificationResponse = 0x41, // A
    AuthenticationResponse = 0x52, // R
    ParameterStatus = 0x53, // S
    BackendKeyData = 0x4b, // K
    ErrorMessage = 0x45, // E
    NoticeMessage = 0x4e, // N
    RowDescriptionMessage = 0x54, // T
    ParameterDescriptionMessage = 0x74, // t
    PortalSuspended = 0x73, // s
    ReplicationStart = 0x57, // W
    EmptyQuery = 0x49, // I
    CopyIn = 0x47, // G
    CopyOut = 0x48, // H
    CopyDone = 0x63, // c
    CopyData = 0x64, // d
}
export type MessageCallback = (msg: BackendMessage) => void
interface BufferChunk {
    buffer: Buffer
    offset: number
    length: number
}
/**
 * VirtualBuffer provides a unified view over multiple buffers without merging them
 * Implements the same interface as BufferReader for compatibility
 */
class VirtualBuffer {
    private chunks: BufferChunk[] = []
    private _totalLength = 0
    private currentPosition = 0
    private encoding: string = 'utf-8'
    constructor(chunks: BufferChunk[] = []) {
        this.chunks = chunks.slice()
        this._totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0)
    }
    get totalLength(): number {
        return this._totalLength
    }
    get position(): number {
        return this.currentPosition
    }
    /**
     * Add a new buffer chunk to the virtual buffer
     */
    addChunk(buffer: Buffer, offset: number = 0, length: number = buffer.length - offset): void {
        if (length > 0) {
            this.chunks.push({ buffer, offset, length })
            this._totalLength += length
        }
    }
    /**
     * Peek at a byte at the current position without advancing
     */
    peekByte(): number {
        if (this.currentPosition >= this._totalLength) {
            throw new Error('Buffer overflow: attempted to peek beyond buffer end')
        }
        return this.readByteAt(this.currentPosition)
    }
    /**
     * Check if we can read a certain number of bytes from current position
     */
    canRead(bytes: number): boolean {
        return this.currentPosition + bytes <= this._totalLength
    }
    // BufferReader compatible methods
    /**
     * Read a single byte at the current position and advance
     */
    byte(): number {
        if (this.currentPosition >= this._totalLength) {
            throw new Error('Buffer overflow: attempted to read beyond buffer end')
        }
        const byte = this.readByteAt(this.currentPosition)
        this.currentPosition++
        return byte
    }
    /**
     * Read a signed 16-bit integer (big endian) and advance
     */
    int16(): number {
        if (this.currentPosition + 2 > this._totalLength) {
            throw new Error('Buffer overflow: attempted to read beyond buffer end')
        }
        const value = this.readUInt16BEAt(this.currentPosition)
        this.currentPosition += 2
        // Convert unsigned to signed
        return value > 0x7FFF ? value - 0x10000 : value
    }
    /**
     * Read a signed 32-bit integer (big endian) and advance
     */
    int32(): number {
        if (this.currentPosition + UINT32_SIZE > this._totalLength) {
            throw new Error('Buffer overflow: attempted to read beyond buffer end')
        }
        const value = this.readUInt32BEAt(this.currentPosition)
        this.currentPosition += UINT32_SIZE
        // Convert unsigned to signed
        return value > 0x7FFFFFFF ? value - 0x100000000 : value
    }
    /**
     * Read an unsigned 32-bit integer (big endian) and advance
     */
    uint32(): number {
        if (this.currentPosition + UINT32_SIZE > this._totalLength) {
            throw new Error('Buffer overflow: attempted to read beyond buffer end')
        }
        const value = this.readUInt32BEAt(this.currentPosition)
        this.currentPosition += UINT32_SIZE
        return value
    }
    /**
     * Read a string of specified length and advance
     */
    string(length: number): string {
        const bytes = this.bytes(length)
        return bytes.toString(this.encoding)
    }
    /**
     * Read a null-terminated string and advance
     */
    cstring(): string {
        const start = this.currentPosition
        let end = start
        // Find the null terminator
        while (end < this._totalLength && this.readByteAt(end) !== 0) {
            end++
        }
        // Read the string content (without null terminator)
        const length = end - start
        const result = length > 0 ? this.string(length) : ''
        // Skip the null terminator if we found one
        if (end < this._totalLength) {
            this.currentPosition++ // Skip null byte
        }
        return result
    }
    /**
     * Read bytes at the current position and advance
     */
    bytes(length: number): Buffer {
        if (this.currentPosition + length > this._totalLength) {
            throw new Error('Buffer overflow: attempted to read beyond buffer end')
        }
        const result = this.getBytesAt(this.currentPosition, length)
        this.currentPosition += length
        return result
    }
    /**
     * Create a new VirtualBuffer from current position onwards
     */
    slice(start: number = this.currentPosition, end?: number): VirtualBuffer {
        const endPos = end ?? this._totalLength
        const newChunks: BufferChunk[] = []
        let virtualPos = 0
        for (const chunk of this.chunks) {
            const chunkEnd = virtualPos + chunk.length
            if (virtualPos >= endPos) break
            if (chunkEnd <= start) {
                virtualPos = chunkEnd
                continue
            }
            const chunkStart = Math.max(start - virtualPos, 0)
            const chunkActualEnd = Math.min(endPos - virtualPos, chunk.length)
            const newLength = chunkActualEnd - chunkStart
            if (newLength > 0) {
                newChunks.push({
                    buffer: chunk.buffer,
                    offset: chunk.offset + chunkStart,
                    length: newLength
                })
            }
            virtualPos = chunkEnd
        }
        return new VirtualBuffer(newChunks)
    }
    /**
     * Seek to a specific position
     */
    seek(position: number): void {
        if (position < 0 || position > this._totalLength) {
            throw new Error(`Invalid seek position: ${position}`)
        }
        this.currentPosition = position
    }
    /**
     * Remove consumed chunks to free memory
     */
    compact(): void {
        if (this.currentPosition === 0) return
        let remainingOffset = this.currentPosition
        let newChunks: BufferChunk[] = []
        for (const chunk of this.chunks) {
            if (remainingOffset >= chunk.length) {
                remainingOffset -= chunk.length
                continue
            }
            if (remainingOffset > 0) {
                newChunks.push({
                    buffer: chunk.buffer,
                    offset: chunk.offset + remainingOffset,
                    length: chunk.length - remainingOffset
                })
                remainingOffset = 0
            } else {
                newChunks.push(chunk)
            }
        }
        this.chunks = newChunks
        this._totalLength -= this.currentPosition
        this.currentPosition = 0
    }
    // Private helper methods
    private findChunkAndOffset(virtualOffset: number): { chunkIndex: number; offsetInChunk: number } | null {
        let currentOffset = 0
        for (let i = 0; i < this.chunks.length; i++) {
            const chunk = this.chunks[i]
            if (virtualOffset >= currentOffset && virtualOffset < currentOffset + chunk.length) {
                return {
                    chunkIndex: i,
                    offsetInChunk: virtualOffset - currentOffset
                }
            }
            currentOffset += chunk.length
        }
        return null
    }
    private readByteAt(virtualOffset: number): number {
        const location = this.findChunkAndOffset(virtualOffset)
        if (!location) {
            throw new Error(`Invalid offset: ${virtualOffset}`)
        }
        const chunk = this.chunks[location.chunkIndex]
        return chunk.buffer[chunk.offset + location.offsetInChunk]
    }
    private readUInt32BEAt(virtualOffset: number): number {
        const location = this.findChunkAndOffset(virtualOffset)
        if (!location) {
            throw new Error(`Invalid offset: ${virtualOffset}`)
        }
        const chunk = this.chunks[location.chunkIndex]
        // If the entire UInt32 fits in this chunk
        if (location.offsetInChunk + UINT32_SIZE <= chunk.length) {
            return chunk.buffer.readUInt32BE(chunk.offset + location.offsetInChunk)
        }
        // UInt32 spans multiple chunks - read byte by byte
        const tempBuffer = Buffer.allocUnsafe(UINT32_SIZE)
        for (let i = 0; i < UINT32_SIZE; i++) {
            tempBuffer[i] = this.readByteAt(virtualOffset + i)
        }
        return tempBuffer.readUInt32BE(0)
    }
    private readUInt16BEAt(virtualOffset: number): number {
        const location = this.findChunkAndOffset(virtualOffset)
        if (!location) {
            throw new Error(`Invalid offset: ${virtualOffset}`)
        }
        const chunk = this.chunks[location.chunkIndex]
        // If the entire UInt16 fits in this chunk
        if (location.offsetInChunk + 2 <= chunk.length) {
            return chunk.buffer.readUInt16BE(chunk.offset + location.offsetInChunk)
        }
        // UInt16 spans chunks - read byte by byte
        const tempBuffer = Buffer.allocUnsafe(2)
        for (let i = 0; i < 2; i++) {
            tempBuffer[i] = this.readByteAt(virtualOffset + i)
        }
        return tempBuffer.readUInt16BE(0)
    }
    private getBytesAt(virtualOffset: number, length: number): Buffer {
        if (length === 0) return emptyBuffer
        const location = this.findChunkAndOffset(virtualOffset)
        if (!location) {
            throw new Error(`Invalid offset: ${virtualOffset}`)
        }
        const startChunk = this.chunks[location.chunkIndex]
        // If all bytes fit in a single chunk, return a slice
        if (location.offsetInChunk + length <= startChunk.length) {
            return startChunk.buffer.subarray(
                startChunk.offset + location.offsetInChunk,
                startChunk.offset + location.offsetInChunk + length
            )
        }
        // Bytes span multiple chunks - copy into new buffer
        const result = Buffer.allocUnsafe(length)
        let resultOffset = 0
        let remainingLength = length
        let currentVirtualOffset = virtualOffset
        while (remainingLength > 0) {
            const loc = this.findChunkAndOffset(currentVirtualOffset)
            if (!loc) break
            const chunk = this.chunks[loc.chunkIndex]
            const availableInChunk = chunk.length - loc.offsetInChunk
            const toCopy = Math.min(remainingLength, availableInChunk)
            chunk.buffer.copy(
                result,
                resultOffset,
                chunk.offset + loc.offsetInChunk,
                chunk.offset + loc.offsetInChunk + toCopy
            )
            resultOffset += toCopy
            remainingLength -= toCopy
            currentVirtualOffset += toCopy
        }
        return result
    }
}
export class Parser {
    private virtualBuffer = new VirtualBuffer()
    private mode: Mode
    constructor(opts?: StreamOptions) {
        if (opts?.mode === 'binary') {
            throw new Error('Binary mode not supported yet')
        }
        this.mode = opts?.mode || 'text'
    }
    public parse(buffer: Buffer, callback: MessageCallback): void {
        // Add new buffer to virtual buffer
        this.virtualBuffer.addChunk(buffer)
        // Parse all complete messages
        while (this.virtualBuffer.canRead(HEADER_LENGTH)) {
            const startPosition = this.virtualBuffer.position
            // Read message header
            const code = this.virtualBuffer.byte()
            const length = this.virtualBuffer.uint32()
            const payloadLength = length - LEN_LENGTH
            // Check if we have the complete message
            if (!this.virtualBuffer.canRead(payloadLength)) {
                // Incomplete message, rewind and wait for more data
                this.virtualBuffer.seek(startPosition)
                break
            }
            // Create a slice for just this message's payload
            const messageBuffer = this.virtualBuffer.slice(
                this.virtualBuffer.position,
                this.virtualBuffer.position + payloadLength
            )
            // Advance past this message
            this.virtualBuffer.seek(this.virtualBuffer.position + payloadLength)
            // Process the message using the virtual buffer directly
            const message = this.handlePacketWithVirtualBuffer(code, length, messageBuffer)
            callback(message)
        }
        // Compact the virtual buffer to free consumed chunks
        this.virtualBuffer.compact()
    }
    private handlePacketWithVirtualBuffer(code: number, length: number, messageBuffer: VirtualBuffer): BackendMessage {
        switch (code) {
            case MessageCodes.BindComplete:
                return bindComplete
            case MessageCodes.ParseComplete:
                return parseComplete
            case MessageCodes.CloseComplete:
                return closeComplete
            case MessageCodes.NoData:
                return noData
            case MessageCodes.PortalSuspended:
                return portalSuspended
            case MessageCodes.CopyDone:
                return copyDone
            case MessageCodes.ReplicationStart:
                return replicationStart
            case MessageCodes.EmptyQuery:
                return emptyQuery
            case MessageCodes.DataRow:
                return this.parseDataRowMessage(length, messageBuffer)
            case MessageCodes.CommandComplete:
                return this.parseCommandCompleteMessage(length, messageBuffer)
            case MessageCodes.ReadyForQuery:
                return this.parseReadyForQueryMessage(length, messageBuffer)
            case MessageCodes.NotificationResponse:
                return this.parseNotificationMessage(length, messageBuffer)
            case MessageCodes.AuthenticationResponse:
                return this.parseAuthenticationResponse(length, messageBuffer)
            case MessageCodes.ParameterStatus:
                return this.parseParameterStatusMessage(length, messageBuffer)
            case MessageCodes.BackendKeyData:
                return this.parseBackendKeyData(length, messageBuffer)
            case MessageCodes.ErrorMessage:
                return this.parseErrorMessage(length, 'error', messageBuffer)
            case MessageCodes.NoticeMessage:
                return this.parseErrorMessage(length, 'notice', messageBuffer)
            case MessageCodes.RowDescriptionMessage:
                return this.parseRowDescriptionMessage(length, messageBuffer)
            case MessageCodes.ParameterDescriptionMessage:
                return this.parseParameterDescriptionMessage(length, messageBuffer)
            case MessageCodes.CopyIn:
                return this.parseCopyInMessage(length, messageBuffer)
            case MessageCodes.CopyOut:
                return this.parseCopyOutMessage(length, messageBuffer)
            case MessageCodes.CopyData:
                return this.parseCopyDataMessage(length, messageBuffer)
            default:
                return new DatabaseError('received invalid response: ' + code.toString(16), length, 'error')
        }
    }
    // Updated parsing methods that use VirtualBuffer directly
    private parseReadyForQueryMessage(length: number, reader: VirtualBuffer): ReadyForQueryMessage {
        const status = reader.string(1)
        return new ReadyForQueryMessage(length, status)
    }
    private parseCommandCompleteMessage(length: number, reader: VirtualBuffer): CommandCompleteMessage {
        const text = reader.cstring()
        return new CommandCompleteMessage(length, text)
    }
    private parseCopyDataMessage(length: number, reader: VirtualBuffer): CopyDataMessage {
        const chunk = reader.bytes(length - 4)
        return new CopyDataMessage(length, chunk)
    }
    private parseCopyInMessage(length: number, reader: VirtualBuffer): CopyResponse {
        return this.parseCopyMessage(length, 'copyInResponse', reader)
    }
    private parseCopyOutMessage(length: number, reader: VirtualBuffer): CopyResponse {
        return this.parseCopyMessage(length, 'copyOutResponse', reader)
    }
    private parseCopyMessage(length: number, messageName: MessageName, reader: VirtualBuffer): CopyResponse {
        const isBinary = reader.byte() !== 0
        const columnCount = reader.int16()
        const message = new CopyResponse(length, messageName, isBinary, columnCount)
        for (let i = 0; i < columnCount; i++) {
            message.columnTypes[i] = reader.int16()
        }
        return message
    }
    private parseNotificationMessage(length: number, reader: VirtualBuffer): NotificationResponseMessage {
        const processId = reader.int32()
        const channel = reader.cstring()
        const payload = reader.cstring()
        return new NotificationResponseMessage(length, processId, channel, payload)
    }
    private parseRowDescriptionMessage(length: number, reader: VirtualBuffer): RowDescriptionMessage {
        const fieldCount = reader.int16()
        const message = new RowDescriptionMessage(length, fieldCount)
        for (let i = 0; i < fieldCount; i++) {
            message.fields[i] = this.parseField(reader)
        }
        return message
    }
    private parseField(reader: VirtualBuffer): Field {
        const name = reader.cstring()
        const tableID = reader.uint32()
        const columnID = reader.int16()
        const dataTypeID = reader.uint32()
        const dataTypeSize = reader.int16()
        const dataTypeModifier = reader.int32()
        const mode = reader.int16() === 0 ? 'text' : 'binary'
        return new Field(name, tableID, columnID, dataTypeID, dataTypeSize, dataTypeModifier, mode)
    }
    private parseParameterDescriptionMessage(length: number, reader: VirtualBuffer): ParameterDescriptionMessage {
        const parameterCount = reader.int16()
        const message = new ParameterDescriptionMessage(length, parameterCount)
        for (let i = 0; i < parameterCount; i++) {
            message.dataTypeIDs[i] = reader.int32()
        }
        return message
    }
    private parseDataRowMessage(length: number, reader: VirtualBuffer): DataRowMessage {
        const fieldCount = reader.int16()
        const fields: any[] = new Array(fieldCount)
        for (let i = 0; i < fieldCount; i++) {
            const len = reader.int32()
            // a -1 for length means the value of the field is null
            fields[i] = len === -1 ? null : reader.string(len)
        }
        return new DataRowMessage(length, fields)
    }
    private parseParameterStatusMessage(length: number, reader: VirtualBuffer): ParameterStatusMessage {
        const name = reader.cstring()
        const value = reader.cstring()
        return new ParameterStatusMessage(length, name, value)
    }
    private parseBackendKeyData(length: number, reader: VirtualBuffer): BackendKeyDataMessage {
        const processID = reader.int32()
        const secretKey = reader.int32()
        return new BackendKeyDataMessage(length, processID, secretKey)
    }
    private parseAuthenticationResponse(length: number, reader: VirtualBuffer): BackendMessage {
        const code = reader.int32()
        const message: BackendMessage & any = {
            name: 'authenticationOk',
            length,
        }
        switch (code) {
            case 0: // AuthenticationOk
                break
            case 3: // AuthenticationCleartextPassword
                if (message.length === 8) {
                    message.name = 'authenticationCleartextPassword'
                }
                break
            case 5: // AuthenticationMD5Password
                if (message.length === 12) {
                    message.name = 'authenticationMD5Password'
                    const salt = reader.bytes(4)
                    return new AuthenticationMD5Password(length, salt)
                }
                break
            case 10: // AuthenticationSASL
            {
                message.name = 'authenticationSASL'
                message.mechanisms = []
                let mechanism: string
                do {
                    mechanism = reader.cstring()
                    if (mechanism) {
                        message.mechanisms.push(mechanism)
                    }
                } while (mechanism)
            }
                break
            case 11: // AuthenticationSASLContinue
                message.name = 'authenticationSASLContinue'
                message.data = reader.string(length - 8)
                break
            case 12: // AuthenticationSASLFinal
                message.name = 'authenticationSASLFinal'
                message.data = reader.string(length - 8)
                break
            default:
                throw new Error('Unknown authenticationOk message type ' + code)
        }
        return message
    }
    private parseErrorMessage(length: number, name: MessageName, reader: VirtualBuffer): BackendMessage {
        const fields: Record<string, string> = {}
        let fieldType = reader.string(1)
        while (fieldType !== '\0') {
            fields[fieldType] = reader.cstring()
            fieldType = reader.string(1)
        }
        const messageValue = fields.M
        const message =
            name === 'notice' ? new NoticeMessage(length, messageValue) : new DatabaseError(messageValue, length, name)
        message.severity = fields.S
        message.code = fields.C
        message.detail = fields.D
        message.hint = fields.H
        message.position = fields.P
        message.internalPosition = fields.p
        message.internalQuery = fields.q
        message.where = fields.W
        message.schema = fields.s
        message.table = fields.t
        message.column = fields.c
        message.dataType = fields.d
        message.constraint = fields.n
        message.file = fields.F
        message.line = fields.L
        message.routine = fields.R
        return message
    }
} | 
    
| 
           I do like where this virtual buffer approach is headed. I think that's probably somewhat ideal - both good on space and doesn't allocate or copy any buffers. I think I wanted to take a stab at that long ago and didn't for whatever reason (probably laziness 😛 )  | 
    
| 
           We also need to bnechmark the cpu utilization between all approaches (and compare them to the original code before my PR)  | 
    
| 
           @regevbr , I've run benchmarks again and checked results - looks like the first version is better.  | 
    
| 
           Thanks. Exciting, I wonder how that can even be... @brianc thoughts? The fact that my first version is slightly worse than @wpietrucha is also mind-bugging  | 
    
          
 Hey I can't wait to take a look at this - sorry I'm absolutely slammed at work right now so might have to be later tonight or this weekend 😬 I also have trust in @hjr3 and @charmander to make the right call. 😄  | 
    
| 
           I ran the benchmarks locally and it does appear that the "shrink buffer when < 50% utilization" approach is the fastest. I am not sure I am seeing a 5% improvement, but benchmarks on a laptop are always a bit fuzzy. I have to spend more time on this, but thank you for the reproduction steps!  | 
    
| 
           I agree that this is a bug, but it’s surprising (in order for the root cause to be exactly as described) that you’d never get a packet that ends on a message boundary into the parser at any point (especially in a benchmark, at the end of an iteration). What happens if you trigger garbage collection manually in the benchmark?  | 
    
| 
           The bug I’ve opened #3533 to address might be adding confusion.  | 
    
          
 Great lets rerun the bench marks after this is merged and see if thye make more sense  | 
    
| 
           Hi all, is there any progress here? Seems like an importnat fix and sadly I don't have any time to have a look?  | 
    
Fix: Resolve Memory Leak in Parser Buffer Management
Problem
The
Parserclass inpg-protocolhad a significant memory leak caused by improper buffer management. Large buffers would remain in memory indefinitely, even after their data was processed, leading to steadily increasing memory usage in long-running applications.Here is what we observed in our application:
compressed-memory-leak-small.mov
Root Cause
When the parser processes messages and has remaining partial data, it would only adjust buffer cursors (
this.bufferOffset) instead of actually shrinking the buffer. This meant that if a very large message (e.g., 1MB) arrived, the internal buffer would grow to accommodate it but never shrink back, even if only a few bytes were still needed.Result: The parser could hold references to very large buffers indefinitely, preventing garbage collection.
Solution
When partial messages remain, create a new appropriately-sized buffer and copy only the remaining data:
Memory Management Flow
Before Fix:
After Fix:
Impact
Testing
The fix maintains full backward compatibility and all existing tests should pass. The memory management improvements will be most noticeable in applications that:
Files Changed:
packages/pg-protocol/src/parser.ts- Fixed buffer shrinking logic to prevent memory leaks