Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions dshell/plugins/quic/_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# SPDX-License-Identifier: BSD-3-Clause
"""
Minimal QUIC long-header helpers (pure-Python, CI-friendly).

We only parse the QUIC *long header* preamble:
- header form (always "long" here)
- version (uint32)
- DCID (Destination Connection ID)
- SCID (Source Connection ID)

RFC 9000 §17.2 layout (prefix):
[first byte][version(4)][dcid_len(1)][dcid][scid_len(1)][scid]...
We intentionally stop after SCID; fields after that vary by packet type.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Optional


@dataclass
class QuicLongHeaderMeta:
header_form: str # "long"
version: int # 32-bit version (0 for version negotiation)
dcid: bytes # Destination Connection ID
scid: bytes # Source Connection ID


def is_quic_long_header(data: bytes) -> bool:
"""
True if the buffer looks like a QUIC *long header* (MSB set) and is long enough
for [first byte][version][dcid_len] (>= 6 bytes).
"""
return len(data) >= 6 and (data[0] & 0x80) == 0x80


def parse_quic_long_header(data: bytes) -> Optional[QuicLongHeaderMeta]:
"""
Parse the QUIC long header preamble and return QuicLongHeaderMeta, or None if invalid.
Safe to call on arbitrary UDP payloads.
"""
try:
if not is_quic_long_header(data):
return None

# version: 4 bytes after first byte
version = int.from_bytes(data[1:5], "big")

p = 5
# DCID length + DCID
dcid_len = data[p]
p += 1
if p + dcid_len > len(data):
return None
dcid = data[p : p + dcid_len]
p += dcid_len

# SCID length + SCID
if p >= len(data):
return None
scid_len = data[p]
p += 1
if p + scid_len > len(data):
return None
scid = data[p : p + scid_len]

return QuicLongHeaderMeta(
header_form="long",
version=version,
dcid=dcid,
scid=scid,
)
except Exception:
# Be defensive: never raise from helpers
return None


__all__ = [
"QuicLongHeaderMeta",
"is_quic_long_header",
"parse_quic_long_header",
]
129 changes: 129 additions & 0 deletions dshell/plugins/quic/quic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# SPDX-License-Identifier: BSD-3-Clause
"""
QUIC (RFC 9000) decoder (long-header metadata)

Parses QUIC *long header* preamble from UDP payloads to extract:
- version (uint32)
- DCID (Destination Connection ID)
- SCID (Source Connection ID)

Defaults to UDP/443 but --ports can be used to add/override.
Never throws on malformed inputs; logs at debug and skips.

Example (table):
decode -r traffic.pcap -p quic --ports 443,8443

Columns:
ts, src, sport, dst, dport, version, dcid, scid, cid_len
"""

from __future__ import annotations

# Framework import (kept only here so helpers remain importable without Dshell deps)
from dshell.decoder import Decoder # type: ignore

# Import pure-Python helpers (no Dshell/pcap deps)
from ._helpers import (
QuicLongHeaderMeta,
is_quic_long_header as _is_quic_long_header,
parse_quic_long_header as _parse_quic_long_header,
)


class quic(Decoder): # Dshell convention: class name == plugin name
"""
QUIC long-header metadata extractor.
Uses pure-Python helpers for safe parsing.
"""

def __init__(self):
# Human-facing metadata shown by `decode -p quic -h`
Decoder.__init__(
self,
name="quic",
description="Extract QUIC long-header metadata (version, DCID, SCID)",
author="Akindotcome",
filter="udp", # BPF-level; we'll further filter by port(s)
)
# Default port(s); allow override via --ports
self.ports = {443}

# Output columns for table/CSV reporters
self.columns = (
"ts",
"src",
"sport",
"dst",
"dport",
"version",
"dcid",
"scid",
"cid_len",
)

def options(self, opts):
"""
Add CLI options: --ports 443,8443
"""
opts.add_option(
"ports",
"comma-separated UDP ports to consider as QUIC",
default="443",
)

def preparse(self, opts):
"""
Process CLI options before decoding starts.
"""
ports = set()
for tok in str(opts.ports).split(","):
tok = tok.strip()
if not tok:
continue
try:
ports.add(int(tok))
except ValueError:
self.warn(f"Invalid port '{tok}', skipping")
if ports:
self.ports = ports

def packet(self, pkt):
"""
Per-packet hook. `pkt` is Dshell’s packet object with helpers like:
- pkt.time
- pkt.ip, pkt.udp
- pkt.src, pkt.dst, pkt.sport, pkt.dport
- pkt.data (payload bytes)
If your Dshell version differs, map these accordingly.
"""
try:
# Only targeted UDP ports
if pkt.dport not in self.ports and pkt.sport not in self.ports:
return

data = bytes(pkt.data or b"")
meta = _parse_quic_long_header(data)
if not meta:
return # not a QUIC long header

# Render CIDs in hex for readability
dcid_hex = meta.dcid.hex()
scid_hex = meta.scid.hex()
cid_len = f"{len(meta.dcid)}/{len(meta.scid)}"

# Emit a row (works with table/CSV/JSON reporters)
self.write(
ts=pkt.time,
src=pkt.src,
sport=pkt.sport,
dst=pkt.dst,
dport=pkt.dport,
version=meta.version,
dcid=dcid_hex,
scid=scid_hex,
cid_len=cid_len,
)
except Exception as e:
# Be defensive; never take down the decode loop
self.debug(f"quic: decode error: {e!r}")
return
44 changes: 44 additions & 0 deletions tests/test_quic_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import importlib.util
import pathlib
import sys

HELPERS = pathlib.Path(__file__).parents[1] / "dshell" / "plugins" / "quic" / "_helpers.py"
spec = importlib.util.spec_from_file_location("dshell_quic_helpers", str(HELPERS))
mod = importlib.util.module_from_spec(spec)

# IMPORTANT: register the module before executing so dataclasses/typing can resolve it
sys.modules[spec.name] = mod
spec.loader.exec_module(mod)

# Pull symbols under test
is_quic_long_header = mod.is_quic_long_header
parse_quic_long_header = mod.parse_quic_long_header
QuicLongHeaderMeta = mod.QuicLongHeaderMeta


def test_quic_long_header_parsing_basic():
first = bytes([0xC3]) # long-header bit set
version = (1).to_bytes(4, "big")
dcid = b"\x01\x02\x03\x04\x05\x06\x07\x08"
scid = b"\xAA\xBB\xCC\xDD"
blob = first + version + bytes([len(dcid)]) + dcid + bytes([len(scid)]) + scid

assert is_quic_long_header(blob) is True

meta = parse_quic_long_header(blob)
assert isinstance(meta, QuicLongHeaderMeta)
assert meta.header_form == "long"
assert meta.version == 1
assert meta.dcid == dcid
assert meta.scid == scid


def test_not_quic_or_too_short():
assert is_quic_long_header(b"\x10\x00") is False
assert parse_quic_long_header(b"\x10\x00") is None


def test_malformed_lengths():
bad = bytes([0x80]) + b"\x00\x00\x00\x01" + bytes([50]) # dcid_len=50, exceeds buffer
assert is_quic_long_header(bad) is True
assert parse_quic_long_header(bad) is None