Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 75 additions & 10 deletions web_monitoring/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
and results between them.
"""

from cloudpathlib import CloudPath
from cloudpathlib import CloudPath, S3Client, S3Path
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from web_monitoring.utils import detect_encoding, sniff_media_type
Expand Down Expand Up @@ -160,6 +160,7 @@
SNIFF_MEDIA_TYPES = frozenset((
'application/octet-stream',
'application/x-download',
'binary/octet-stream',
))

# Identifies a bare media type (that is, one without parameters)
Expand Down Expand Up @@ -286,6 +287,43 @@ def record_time(self, url, time):
MEMENTO_STATISTICS = RequestStatistics(leaderboard_size=10, leaderboard_min=10)


class S3HashStore:
"""
Store and track content-addressed data in S3.
"""

def __init__(self, bucket: str, extra_args: dict = {}) -> None:
self.bucket = bucket
self.extra_args = extra_args
self.seen_hashes = set()
self.lock = threading.Lock()

def store(self, data: bytes, hash: str = '', content_type: str = '') -> str:
if not hash:
hash = utils.hash_content(data)

if not content_type:
content_type = 'application/octet-stream'

archive = S3Path(f's3://{self.bucket}', client=S3Client(extra_args={
**self.extra_args,
'ContentType': content_type
}))
path = archive / hash

upload = False
with self.lock:
if hash not in self.seen_hashes:
self.seen_hashes.add(hash)
upload = True

if upload and not path.exists():
logger.info(f'Uploading to S3 (hash={hash})')
path.write_bytes(data)

return path.as_url()


class WaybackRecordsWorker(threading.Thread):
"""
WaybackRecordsWorker is a thread that takes CDX records from a queue and
Expand All @@ -297,7 +335,7 @@ class WaybackRecordsWorker(threading.Thread):

def __init__(self, records, results_queue, maintainers, tags, cancel,
failure_queue=None, session_options=None, adapter=None,
unplaybackable=None, version_cache=None):
unplaybackable=None, version_cache=None, archive_storage=None):
super().__init__()
self.results_queue = results_queue
self.failure_queue = failure_queue
Expand All @@ -314,6 +352,7 @@ def __init__(self, records, results_queue, maintainers, tags, cancel,
user_agent=USER_AGENT,
**session_options)
self.wayback = wayback.WaybackClient(session=session)
self.archive_storage = archive_storage

def is_active(self):
return not self.cancel.is_set()
Expand Down Expand Up @@ -370,8 +409,17 @@ def process_record(self, record):
"""
memento = self.wayback.get_memento(record, exact_redirects=False)
with memento:
return self.format_memento(memento, record, self.maintainers,
self.tags)
version = self.format_memento(memento, record, self.maintainers,
self.tags)
if self.archive_storage and version['version_hash']:
url = self.archive_storage.store(
memento.content,
hash=version['version_hash'],
content_type=version['media_type']
)
version['uri'] = url

return version

def format_memento(self, memento, cdx_record, maintainers, tags):
"""
Expand Down Expand Up @@ -565,7 +613,7 @@ def import_ia_db_urls(*, from_date=None, to_date=None, maintainers=None,
tags=None, skip_unchanged='resolved-response',
url_pattern=None, worker_count=0,
unplaybackable_path=None, dry_run=False,
precheck_versions=False):
precheck_versions=False, archive_storage=None):
client = db.Client.from_env()
logger.info('Loading known pages from web-monitoring-db instance...')
urls, version_filter = _get_db_page_url_info(client, url_pattern)
Expand Down Expand Up @@ -596,7 +644,8 @@ def import_ia_db_urls(*, from_date=None, to_date=None, maintainers=None,
unplaybackable_path=unplaybackable_path,
db_client=client,
dry_run=dry_run,
version_cache=version_cache)
version_cache=version_cache,
archive_storage=archive_storage)


# TODO: this function probably be split apart so `dry_run` doesn't need to
Expand All @@ -606,7 +655,8 @@ def import_ia_urls(urls, *, from_date=None, to_date=None,
skip_unchanged='resolved-response',
version_filter=None, worker_count=0,
create_pages=True, unplaybackable_path=None,
db_client=None, dry_run=False, version_cache=None):
db_client=None, dry_run=False, version_cache=None,
archive_storage=None):
for url in urls:
if not _is_valid(url):
raise ValueError(f'Invalid URL: "{url}"')
Expand Down Expand Up @@ -640,7 +690,8 @@ def import_ia_urls(urls, *, from_date=None, to_date=None,
tags,
stop_event,
unplaybackable=unplaybackable,
version_cache=version_cache))
version_cache=version_cache,
archive_storage=archive_storage))
memento_thread.start()

# Show a progress meter
Expand Down Expand Up @@ -991,6 +1042,8 @@ def main():
--precheck Check the list of versions in web-monitoring-db
and avoid re-importing duplicates.
--dry-run Don't upload data to web-monitoring-db.
--archive-s3 <bucket> Pre-upload response bodies to this S3 bucket
before sending import data to web-monitoring-db.
"""
arguments = docopt(doc, version='0.0.1')
if arguments['import']:
Expand All @@ -1004,6 +1057,16 @@ def main():
if not arguments.get('--dry-run'):
validate_db_credentials()

archive_storage = None
archive_bucket = arguments.get('--archive-s3')
if archive_bucket:
archive_storage = S3HashStore(archive_bucket, {
'ACL': 'public-read',
# Ideally, we'd gzip stuff, but the DB needs to learn to
# correctly read gzipped items first.
# 'ContentEncoding': 'gzip'
})

start_time = datetime.now(tz=timezone.utc)
if arguments['ia']:
import_ia_urls(
Expand All @@ -1014,7 +1077,8 @@ def main():
to_date=_parse_date_argument(arguments['<to_date>']),
skip_unchanged=skip_unchanged,
unplaybackable_path=unplaybackable_path,
dry_run=arguments.get('--dry-run'))
dry_run=arguments.get('--dry-run'),
archive_storage=archive_storage)
elif arguments['ia-known-pages']:
import_ia_db_urls(
from_date=_parse_date_argument(arguments['<from_date>']),
Expand All @@ -1026,7 +1090,8 @@ def main():
worker_count=int(arguments.get('--parallel')),
unplaybackable_path=unplaybackable_path,
dry_run=arguments.get('--dry-run'),
precheck_versions=arguments.get('--precheck'))
precheck_versions=arguments.get('--precheck'),
archive_storage=archive_storage)

end_time = datetime.now(tz=timezone.utc)
print(f'Completed at {end_time.isoformat()}')
Expand Down