Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
87ca2b9
feat: Adding filter_cdx and warc_by_cdx commands
malteos Aug 15, 2025
e331bee
Adding unit test for filter_cdx command, some refactoring
malteos Aug 19, 2025
4dcf3a1
Adding unit tests for warc_by_cdx and index resource record
malteos Aug 20, 2025
3496c93
Adding more unit tests
malteos Aug 22, 2025
dd1e4c6
Added unit tests for matcher
malteos Aug 22, 2025
c8dbcf0
Include subpackages
malteos Aug 22, 2025
77ae6ca
Added parallelization to filter_cdx command
malteos Aug 25, 2025
bfded06
removed test file
malteos Aug 25, 2025
eeb3fbb
minor fix
malteos Aug 25, 2025
016c586
bug fix
malteos Aug 25, 2025
44f3b09
Adding S3 writter and reader support for WARCs
malteos Aug 27, 2025
5f8d9e0
added comment on CDX format
malteos Aug 27, 2025
c53562f
Making index record optional, fixing prefix S3 handling
malteos Aug 27, 2025
b48b191
adding --parallel for warc by cdx command
malteos Aug 27, 2025
c58c883
fixed progress bar
malteos Aug 27, 2025
d1d2c76
disable progress bar
malteos Aug 27, 2025
5154e70
Added aioboto3 implementation for warcer
malteos Sep 1, 2025
d13f1a8
Small clean up
malteos Sep 10, 2025
aa69c54
updated format and feat CI
malteos Sep 17, 2025
d45a3da
fixing type hints for py38
malteos Sep 17, 2025
a81a2b4
fixed types and fail fast
malteos Sep 17, 2025
72c3201
fixed types and fail fast
malteos Sep 17, 2025
d9adf03
adding max file size to aioboto3 implementation; improving test coverage
malteos Sep 17, 2025
24b263e
S3 access to CI, more unit tests
malteos Sep 19, 2025
5840c4c
fix s3 access in action
malteos Sep 19, 2025
841fe07
disable s3 tests for py < 39
malteos Sep 19, 2025
eb0a4eb
fixed syntax
malteos Sep 19, 2025
3ddf1a4
fixed bad s3 bucket
malteos Sep 19, 2025
d8a627f
adding more tests
malteos Sep 19, 2025
155db05
more tests
malteos Sep 19, 2025
c57ac3d
Merge branch 'main' into feat/warc-by-cdx
malteos Sep 22, 2025
e670d12
fix CI
malteos Sep 23, 2025
807a39d
fix CI (2)
malteos Sep 23, 2025
5d208d6
fixing Ci for windows
malteos Sep 23, 2025
c8f984c
fixing Ci for windows
malteos Sep 23, 2025
63db23c
removed duplicated code
malteos Sep 23, 2025
e75f143
more windows test fixes
malteos Sep 23, 2025
578a8ae
more windows test fixes (2)
malteos Sep 23, 2025
4eaf366
more windows test fixes (3)
malteos Sep 23, 2025
ed63d2c
re-renable other platforms
malteos Sep 23, 2025
bc5ed79
adding s3_tmpdir fixture
malteos Sep 24, 2025
6441bf6
Adding docs to README and disable duplicated test matrix
malteos Sep 24, 2025
ada22ce
WIP: Refactor for Athena integration
malteos Sep 26, 2025
d0bbd9a
Adding log arg
malteos Sep 30, 2025
dfdefbc
WIP: unified implementation
malteos Oct 1, 2025
faca33e
WIP: unified implementation (2)
malteos Oct 1, 2025
9d03d80
WIP: unified implementation (3)
malteos Oct 1, 2025
7e4c80b
Adding keyboard interupt handling
malteos Oct 1, 2025
bb3f3a7
Adding wild card test
malteos Oct 1, 2025
ad2b326
Refactor to imap
malteos Oct 1, 2025
cca857a
Refactor to imap (2)
malteos Oct 1, 2025
0e03475
Make sure tests run with empty cache dir
malteos Oct 2, 2025
3664db1
CI tests only feature
malteos Oct 2, 2025
ced19e5
Adding args; minor fix
malteos Oct 2, 2025
d6e5a25
force consitent multiprocessing behaviour across platforms
malteos Oct 2, 2025
9b2aa35
Adding resource records with warcinfo id
malteos Oct 2, 2025
494fbc5
Adding tests for unified implementation
malteos Oct 8, 2025
a8e493e
fix type hints
malteos Oct 9, 2025
f7011bd
Adding settings via environment variables
malteos Oct 9, 2025
1d8d3ff
Re-enabled all unit tests
malteos Oct 9, 2025
cc6b35e
Adding MOCK_TIME env variable
malteos Oct 9, 2025
b0bb17f
Removed cdx fetcher from filter warc command
malteos Oct 9, 2025
6020dff
Adding float tol
malteos Oct 9, 2025
7796796
WIP athena integration
malteos Oct 13, 2025
56af9d2
Merge branch 'main' into feat/warc-by-cdx
malteos Oct 13, 2025
fac56ce
Adding Athena PoC
malteos Oct 15, 2025
e58da48
Fix type hints for py38
malteos Oct 15, 2025
1f17ecf
Fixed stats
malteos Oct 15, 2025
ed7046f
Fixed Athena check
malteos Oct 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 54 additions & 7 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,39 @@ on:
branches:
- main

# These permissions are needed to interact with AWS S3 via GitHub's OIDC Token endpoint
permissions:
id-token: write
contents: read
pull-requests: read

jobs:
unit-tests:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: true
matrix:
# The full test-suite is only run with os=ubuntu and py=3.12
python-version: [
'3.8', '3.9', '3.10', '3.11', '3.12', '3.13'
'3.8',
'3.9',
'3.10',
'3.11',
'3.12',
'3.13'
]
os: [ubuntu-latest]
EXTRA: [false] # used to force includes to get included
include:
- python-version: '3.8'
os: ubuntu-22.04 # oldest version on github actions
EXTRA: true
- python-version: '3.13'
os: ubuntu-latest
env:
LOGLEVEL=DEBUG
EXTRA: true
# disabled (duplicated matrix entry)
# - python-version: '3.13'
# os: ubuntu-latest
# env:
# LOGLEVEL=DEBUG
# EXTRA: true
- python-version: '3.13'
os: macos-latest
EXTRA: true
Expand Down Expand Up @@ -57,9 +70,43 @@ jobs:
- name: Install cdx_toolkit
run: pip install .[test]

- name: Configure AWS credentials from OIDC (disabled for forks)
if: github.event.pull_request.head.repo.full_name == github.repository || github.event_name == 'push'
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: arn:aws:iam::837454214164:role/GitHubActions-Role
aws-region: us-east-1

- name: Disable S3 unit tests for Python 3.8 (boto3 requires Python 3.9+)
if: ${{ startsWith(matrix.python-version, '3.8') }}
uses: actions/github-script@v7
with:
script: |
core.exportVariable('CDXT_DISABLE_S3_TESTS', '1')

- name: Set environment variables for faster unit tests (requests are mocked)
uses: actions/github-script@v7
with:
script: |
core.exportVariable('CDXT_MAX_ERRORS', '2')
core.exportVariable('CDXT_WARNING_AFTER_N_ERRORS', '2')
core.exportVariable('CDXT_DEFAULT_MIN_RETRY_INTERVAL', '0.01')
core.exportVariable('CDXT_CC_INDEX_MIN_RETRY_INTERVAL', '0.01')
core.exportVariable('CDXT_CC_DATA_MIN_RETRY_INTERVAL', '0.01')
core.exportVariable('CDXT_IA_MIN_RETRY_INTERVAL', '0.01')
core.exportVariable('LOGLEVEL', 'DEBUG')

# - name: Run tests (only feature)
# run: |
# # make test_coverage
# pytest -rA -s --doctest-modules --cov-report=xml --cov-append --cov cdx_toolkit tests/filter_warc tests/filter_cdx -v -v
# coverage report

- name: Run tests
run: |
make test_coverage
# make test_coverage
pytest -rA -s --doctest-modules --cov-report=xml --cov-append --cov cdx_toolkit tests/ -v -v
coverage report

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
Expand Down
106 changes: 106 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,112 @@ get the most recent N captures: --limit and limit= will return the
oldest N captures. With the 'mixed' ordering, a large enough limit=
will get close to returning the most recent N captures.

## Filtering CDX files

The command line cdxt can be used to filter CDX files based on a given
whitelist of URLs or SURTs. In particular, the filtering process
extracts all CDX entries that match with at least one entry in the
whitelist. All other CDX entries are discarded.

For matching, all URLs are converted into SURTs. A match occurs
when a given SURT from the CDX file starts with one of the prefixes
defined in the SURTS of whitelist.

The CDX filter can read and write files from local and remote file
systems, like S3 buckets. Multiple input files can be defined
using a glob pattern.

```
$ cdx filter_cdx <input_cdx_path> <whitelist_path> \
--filter-type <url or surt> \
[--input-glob <glob pattern like "*_cdx-*.gz"]
```

For example, you can filter CDX from Common Crawl as follows:

```
$ cdxt -v filter_cdx \
s3://commoncrawl/cc-index/collections \
/local/path/to/my-url-whitelist.txt \
s3://my-s3-bucket/filtered-cdxs --filter-type url \
--input-glob "/CC-MAIN-2024-30/indexes/*.gz" --overwrite
```

The whitelist file looks like this (one entry per line):

```
example.com
github.com/cococrawler
```

Filtering throughput depends on your machine. For reference,
on an AWS EC2 c5n.xlarge instance filtering all 300 CDX files
from CC-MAIN-2024-30 takes ~1.4 hours with 100k URLs in the whitelist.

## WARC extraction using CDX files

You can extract parts of WARC files using the cdxt command line script.
The WARC extraction can read CDX files from local and remote file
systems, like S3 buckets. Multiple CDX files can be defined
using a glob pattern. For downloading WARC parts from HTTP or S3, you can
define the download prefix, e.g., `s3://commoncrawl` for S3 download.

```
$ cdxt -v --cc warc_by_cdx \
<path_to_cdx> [--cdx-glob <glob pattern, e.g., "*.gz">] \
--prefix <output prefix> \
--warc-download-prefix=<warc download prefix, e.g., s3://commoncrawl> \
--creator <name and contact of creator> \
--operator <name and contact of creator> \
[--implementation <fsspec or aiobot3, defaults to fsspec>]
[--write-paths-as-resource-records <one or more paths for resource records>]
[--write-paths-as-resource-records-metadata <one or more paths for metadata of resource records>]
```

By default, we use a [fsspec](https://filesystem-spec.readthedocs.io/en/latest/index.html)
implementation to write and read to local or remote file systems.
For better throughput for S3 read/write, we have also a specific implementation
using [aioboto3](https://github.com/terricain/aioboto3) that you can enable with
the `--implementation=aioboto3` argument. With aioboto3, we achieved ~ 80 requests / second
on an AWS EC2 c5n.xlarge instance.

You can add one or multiple files with metadata as resource records to
the extracted WARC. For instance, this is useful to maintain the CDX filter
inputs, e.g., the whitelist list. To do this, you need to provide the
corresponding file paths as arguments `--write-paths-as-resource-records=s3:///my-s3-bucket/path/to/my-url-whitelist.txt`
and `--write-paths-as-resource-records-metadata=s3:///my-s3-bucket/path/to/metadata.json`.
The metadata file is optional and can have the following optional fields:

```json
{
"warc_content_type": "str",
"uri": "str",
"http_headers": {"k": "v"},
"warc_headers_dict": {"k": "v"}
}
```

This in one example for a metadata JSON file:

```json
{
"uri": "filter_cdx.gz",
"warc_content_type": "application/cdx",
}
```

The full WARC extraction command could look like this:

```
$ cdxt -v --cc warc_by_cdx \
s3://my-s3-bucket/filtered-cdxs --cdx-glob "*.gz" \
--prefix /local/path/filtered-warcs/ \
--warc-download-prefix=s3://commoncrawl \
--creator foo --operator bob \
--write-paths-as-resource-records=s3:///my-s3-bucket/path/to/my-url-whitelist.txt \
--write-paths-as-resource-records-metadata=s3:///my-s3-bucket/path/to/metadata.json
```

## TODO

Content downloading needs help with charset issues, preferably
Expand Down
56 changes: 37 additions & 19 deletions cdx_toolkit/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import json
from collections.abc import MutableMapping
import sys
import warnings

try:
Expand Down Expand Up @@ -37,7 +36,7 @@ def showNumPages(r):
elif isinstance(j, int): # ia always returns text, parsed as a json int
pages = j
else:
raise ValueError('surprised by showNumPages value of '+str(j))
raise ValueError('surprised by showNumPages value of ' + str(j))
return pages


Expand Down Expand Up @@ -80,18 +79,19 @@ def cdx_to_captures(resp, wb=None, warc_download_prefix=None):
lines = json.loads(text)
fields = lines.pop(0)
except (json.decoder.JSONDecodeError, KeyError, IndexError): # pragma: no cover
raise ValueError('cannot decode response, first bytes are '+repr(text[:50]))
raise ValueError('cannot decode response, first bytes are ' + repr(text[:50]))

ret = munge_fields(fields, lines)
return [CaptureObject(r, wb=wb, warc_download_prefix=warc_download_prefix) for r in ret]

raise ValueError('cannot decode response, first bytes are '+repr(text[:50])) # pragma: no cover
raise ValueError('cannot decode response, first bytes are ' + repr(text[:50])) # pragma: no cover


class CaptureObject(MutableMapping):
'''
"""
Represents a single capture of a webpage, plus less-visible info about how to fetch the content.
'''
"""

def __init__(self, data, wb=None, warc_download_prefix=None):
self.data = data
self.wb = wb
Expand Down Expand Up @@ -129,9 +129,9 @@ def content(self):

@property
def text(self):
'''
"""
Eventually this function will do something with the character set, but not yet.
'''
"""
return self.content.decode('utf-8', errors='replace')

# the remaining code treats self.data like a dict
Expand Down Expand Up @@ -176,8 +176,9 @@ def get_more(self):
if self.page == 0 and len(self.index_list) > 0 and self.endpoint < len(self.index_list):
LOGGER.info('get_more: fetching cdx from %s', self.index_list[self.endpoint])

status, objs = self.cdxfetcher.get_for_iter(self.endpoint, self.page,
params=self.params, index_list=self.index_list)
status, objs = self.cdxfetcher.get_for_iter(
self.endpoint, self.page, params=self.params, index_list=self.index_list
)
if status == 'last endpoint':
LOGGER.debug('get_more: I have reached the end')
return # caller will raise StopIteration
Expand Down Expand Up @@ -207,7 +208,16 @@ def __next__(self):


class CDXFetcher:
def __init__(self, source='cc', crawl=None, wb=None, warc_download_prefix=None, cc_mirror=None, cc_sort='mixed', loglevel=None):
def __init__(
self,
source='cc',
crawl=None,
wb=None,
warc_download_prefix=None,
cc_mirror=None,
cc_sort='mixed',
loglevel=None,
):
self.source = source
self.crawl = crawl
self.cc_sort = cc_sort
Expand Down Expand Up @@ -236,7 +246,14 @@ def __init__(self, source='cc', crawl=None, wb=None, warc_download_prefix=None,
LOGGER.setLevel(level=loglevel)

def customize_index_list(self, params):
if self.source == 'cc' and (self.crawl or 'crawl' in params or 'from' in params or 'from_ts' in params or 'to' in params or 'closest' in params):
if self.source == 'cc' and (
self.crawl
or 'crawl' in params
or 'from' in params
or 'from_ts' in params
or 'to' in params
or 'closest' in params
):
LOGGER.info('making a custom cc index list')
if self.crawl and 'crawl' not in params:
params['crawl'] = self.crawl
Expand Down Expand Up @@ -269,7 +286,9 @@ def get(self, url, **kwargs):
ret = []
for endpoint in index_list:
resp = myrequests_get(endpoint, params=params, cdx=True)
objs = cdx_to_captures(resp, wb=self.wb, warc_download_prefix=self.warc_download_prefix) # turns 400 and 404 into []
objs = cdx_to_captures(
resp, wb=self.wb, warc_download_prefix=self.warc_download_prefix
) # turns 400 and 404 into []
ret.extend(objs)
if 'limit' in params:
params['limit'] -= len(objs)
Expand Down Expand Up @@ -297,15 +316,14 @@ def iter(self, url, **kwargs):

def items(self, url, **kwargs): # pragma: no cover
warnings.warn(
'cdx.items() has been renamed to cdx.iter() and will be removed in cdx_toolkit 1.0',
FutureWarning
'cdx.items() has been renamed to cdx.iter() and will be removed in cdx_toolkit 1.0', FutureWarning
)
return self.iter(url, **kwargs)

def get_for_iter(self, endpoint, page, params={}, index_list=None):
'''
"""
Specalized get for the iterator
'''
"""
if endpoint >= len(index_list):
return 'last endpoint', []
if params.get('limit', -1) == 0:
Expand All @@ -325,12 +343,12 @@ def get_for_iter(self, endpoint, page, params={}, index_list=None):
return 'ok', ret

def get_size_estimate(self, url, as_pages=False, **kwargs):
'''
"""
Get the number of pages that match url

useful additional args: matchType='host' pageSize=1
or, url can end with * or start with *. to set the matchType
'''
"""
if 'details' in kwargs:
details = True
del kwargs['details']
Expand Down
Loading
Loading