-
Notifications
You must be signed in to change notification settings - Fork 4
Add simple generate summaries and totals functions that group by directory. #103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
rwblair
wants to merge
38
commits into
dandi:main
Choose a base branch
from
rwblair:enh/directory_based_totals
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 14 commits
Commits
Show all changes
38 commits
Select commit
Hold shift + click to select a range
9fa4307
Add simple generate summaries and totals functions that group by dire…
rwblair a3e1f71
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 948b171
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD 2b71944
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD e4566a4
Update src/s3_log_extraction/summarize/_generate_all_dataset_summarie…
rwblair 91c663f
pass dtype into rng.integers on new_index collision to match original…
rwblair 94381d7
make regex for log file name pattern a property of the extractor class
rwblair 1ff2b9c
Merge branch 'enh/directory_based_totals' of github.com:rwblair/s3-lo…
rwblair 385d211
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD 0e719e6
Update src/s3_log_extraction/extractors/_dandi_s3_log_access_extracto…
CodyCBakerPhD aeca81f
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD 15ce9bf
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD c06645a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] ad3ce93
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD 51f94da
resolve conflict
CodyCBakerPhD d5aa76f
chore: resolve conflict
CodyCBakerPhD bb39862
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD 89f79a4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 345f852
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD 0edea33
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD 9ae6c73
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] a892f1d
Update __init__.py
CodyCBakerPhD a7dd8a3
Update _cli.py
CodyCBakerPhD f4f6066
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 62a7cdc
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD f8d9ac1
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD 4d98bee
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD 0834b64
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD fefb9a9
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD f29b0cb
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD b38db38
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD 95836e5
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD 4c47bcf
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD 880549f
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD c0582f8
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD 2fc79dc
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD f0fb37c
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD e39230d
Merge branch 'main' into enh/directory_based_totals
CodyCBakerPhD File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,15 @@ | ||
| from ._generate_archive_totals import generate_archive_totals | ||
| from ._generate_archive_summaries import generate_archive_summaries | ||
| from ._generate_all_dandiset_totals import generate_all_dandiset_totals | ||
|
|
||
| from ._generate_all_dandiset_summaries import generate_all_dandiset_summaries | ||
| from ._generate_all_dandiset_totals import generate_all_dandiset_totals | ||
| from ._generate_all_dataset_summaries import generate_all_dataset_summaries | ||
| from ._generate_all_dataset_totals import generate_all_dataset_totals | ||
| from ._generate_archive_summaries import generate_archive_summaries | ||
| from ._generate_archive_totals import generate_archive_totals | ||
|
|
||
| __all__ = [ | ||
| "generate_all_dandiset_summaries", | ||
| "generate_all_dandiset_totals", | ||
| "generate_archive_totals", | ||
| "generate_all_dataset_summaries", | ||
| "generate_all_dataset_totals", | ||
| "generate_archive_summaries", | ||
| "generate_archive_totals", | ||
| ] |
166 changes: 166 additions & 0 deletions
166
src/s3_log_extraction/summarize/_generate_all_dataset_summaries.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,166 @@ | ||
| import collections | ||
| import datetime | ||
| import pathlib | ||
|
|
||
| import pandas | ||
| import tqdm | ||
|
|
||
| from ..config import get_extraction_directory, get_summary_directory | ||
| from ..ip_utils import load_ip_cache | ||
|
|
||
|
|
||
| def generate_summaries(level: int = 0) -> None: | ||
| extraction_directory = get_extraction_directory() | ||
|
|
||
| datasets = [item for item in extraction_directory.iterdir() if item.is_dir()] | ||
|
|
||
| summary_directory = get_summary_directory() | ||
| index_to_region = load_ip_cache(cache_type="index_to_region") | ||
|
|
||
| for dataset in tqdm.tqdm( | ||
| iterable=datasets, | ||
| total=len(datasets), | ||
| desc="Summarizing Datasets", | ||
| position=0, | ||
| leave=True, | ||
| mininterval=5.0, | ||
| smoothing=0, | ||
| unit="dataset", | ||
| ): | ||
| dataset_id = dataset.name | ||
| asset_directories = [file_path for file_path in dataset.rglob(pattern="*") if file_path.is_dir()] | ||
| _summarize_dataset( | ||
| dataset_id=dataset_id, | ||
| asset_directories=asset_directories, | ||
| summary_directory=summary_directory, | ||
| index_to_region=index_to_region, | ||
| ) | ||
|
|
||
|
|
||
| def _summarize_dataset( | ||
| *, | ||
| dataset_id: str, | ||
| asset_directories: list[pathlib.Path], | ||
| summary_directory: pathlib.Path, | ||
| index_to_region: dict[int, str], | ||
| ) -> None: | ||
| _summarize_dataset_by_day( | ||
| asset_directories=asset_directories, | ||
| summary_file_path=summary_directory / dataset_id / "dandiset_summary_by_day.tsv", | ||
| ) | ||
| _summarize_dataset_by_asset( | ||
| asset_directories=asset_directories, | ||
| summary_file_path=summary_directory / dataset_id / "dandiset_summary_by_asset.tsv", | ||
| ) | ||
| _summarize_dataset_by_region( | ||
| asset_directories=asset_directories, | ||
| summary_file_path=summary_directory / dataset_id / "dandiset_summary_by_region.tsv", | ||
| index_to_region=index_to_region, | ||
| ) | ||
|
|
||
|
|
||
| def _summarize_dataset_by_day(*, asset_directories: list[pathlib.Path], summary_file_path: pathlib.Path) -> None: | ||
| all_dates = [] | ||
| all_bytes_sent = [] | ||
| for asset_directory in asset_directories: | ||
| # TODO: Could add a step here to track which object IDs have been processed, and if encountered again | ||
| # Just copy the file over instead of reprocessing | ||
|
|
||
| timestamps_file_path = asset_directory / "timestamps.txt" | ||
|
|
||
| if not timestamps_file_path.exists(): | ||
| continue | ||
|
|
||
| dates = [ | ||
| datetime.datetime.strptime(str(timestamp.strip()), "%y%m%d%H%M%S").strftime(format="%Y-%m-%d") | ||
| for timestamp in timestamps_file_path.read_text().splitlines() | ||
| ] | ||
| all_dates.extend(dates) | ||
|
|
||
| bytes_sent_file_path = asset_directory / "bytes_sent.txt" | ||
| bytes_sent = [int(value.strip()) for value in bytes_sent_file_path.read_text().splitlines()] | ||
| all_bytes_sent.extend(bytes_sent) | ||
|
|
||
| summarized_activity_by_day = collections.defaultdict(int) | ||
| for date, bytes_sent in zip(all_dates, all_bytes_sent): | ||
| summarized_activity_by_day[date] += bytes_sent | ||
|
|
||
| if len(summarized_activity_by_day) == 0: | ||
| return | ||
|
|
||
| summary_file_path.parent.mkdir(parents=True, exist_ok=True) | ||
| summary_table = pandas.DataFrame( | ||
| data={ | ||
| "date": list(summarized_activity_by_day.keys()), | ||
| "bytes_sent": list(summarized_activity_by_day.values()), | ||
| } | ||
| ) | ||
| summary_table.sort_values(by="date", inplace=True) | ||
| summary_table.index = range(len(summary_table)) | ||
| summary_table.to_csv(path_or_buf=summary_file_path, mode="w", sep="\t", header=True, index=True) | ||
|
|
||
|
|
||
| def _summarize_dataset_by_asset(*, asset_directories: list[pathlib.Path], summary_file_path: pathlib.Path) -> None: | ||
| summarized_activity_by_asset = collections.defaultdict(int) | ||
| for asset_directory in asset_directories: | ||
| # TODO: Could add a step here to track which object IDs have been processed, and if encountered again | ||
| # Just copy the file over instead of reprocessing | ||
| bytes_sent_file_path = asset_directory / "bytes_sent.txt" | ||
|
|
||
| if not bytes_sent_file_path.exists(): | ||
| continue | ||
|
|
||
| bytes_sent = [int(value.strip()) for value in bytes_sent_file_path.read_text().splitlines()] | ||
|
|
||
| asset_path = str(asset_directory) | ||
| summarized_activity_by_asset[asset_path] += sum(bytes_sent) | ||
|
|
||
| if len(summarized_activity_by_asset) == 0: | ||
| return | ||
|
|
||
| summary_file_path.parent.mkdir(parents=True, exist_ok=True) | ||
| summary_table = pandas.DataFrame( | ||
| data={ | ||
| "asset_path": list(summarized_activity_by_asset.keys()), | ||
| "bytes_sent": list(summarized_activity_by_asset.values()), | ||
| } | ||
| ) | ||
| summary_table.to_csv(path_or_buf=summary_file_path, mode="w", sep="\t", header=True, index=True) | ||
|
|
||
|
|
||
| def _summarize_dataset_by_region( | ||
| *, asset_directories: list[pathlib.Path], summary_file_path: pathlib.Path, index_to_region: dict[int, str] | ||
| ) -> None: | ||
| all_regions = [] | ||
| all_bytes_sent = [] | ||
| for asset_directory in asset_directories: | ||
| # TODO: Could add a step here to track which object IDs have been processed, and if encountered again | ||
| # Just copy the file over instead of reprocessing | ||
| indexed_ips_file_path = asset_directory / "indexed_ips.txt" | ||
|
|
||
| if not indexed_ips_file_path.exists(): | ||
| continue | ||
|
|
||
| indexed_ips = [ip_index.strip() for ip_index in indexed_ips_file_path.read_text().splitlines()] | ||
| regions = [index_to_region.get(ip_index.strip(), "unknown") for ip_index in indexed_ips] | ||
| all_regions.extend(regions) | ||
|
|
||
| bytes_sent_file_path = asset_directory / "bytes_sent.txt" | ||
| bytes_sent = [int(value.strip()) for value in bytes_sent_file_path.read_text().splitlines()] | ||
| all_bytes_sent.extend(bytes_sent) | ||
|
|
||
| summarized_activity_by_region = collections.defaultdict(int) | ||
| for region, bytes_sent in zip(all_regions, all_bytes_sent): | ||
| summarized_activity_by_region[region] += bytes_sent | ||
|
|
||
| if len(summarized_activity_by_region) == 0: | ||
| return | ||
|
|
||
| summary_file_path.parent.mkdir(parents=True, exist_ok=True) | ||
| summary_table = pandas.DataFrame( | ||
| data={ | ||
| "region": list(summarized_activity_by_region.keys()), | ||
| "bytes_sent": list(summarized_activity_by_region.values()), | ||
| } | ||
| ) | ||
| summary_table.to_csv(path_or_buf=summary_file_path, mode="w", sep="\t", header=True, index=True) |
57 changes: 57 additions & 0 deletions
57
src/s3_log_extraction/summarize/_generate_all_dataset_totals.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| import json | ||
| import pathlib | ||
|
|
||
| import pandas | ||
|
|
||
| from ..config import get_summary_directory | ||
|
|
||
|
|
||
| def generate_all_dataset_totals( | ||
| summary_directory: str | pathlib.Path | None = None, | ||
| ) -> None: | ||
| """ | ||
| Generate top-level totals of summarized access activity for all dandisets. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| summary_directory : pathlib.Path | ||
| Path to the folder containing all Dandiset summaries of the S3 access logs. | ||
| """ | ||
| if summary_directory: | ||
| summary_directory = pathlib.Path(summary_directory) | ||
| else: | ||
| summary_directory = get_summary_directory() | ||
|
|
||
| # TODO: record progress over | ||
|
|
||
| all_dandiset_totals = {} | ||
| for dandiset_id_folder_path in summary_directory.iterdir(): | ||
| if not dandiset_id_folder_path.is_dir(): | ||
| continue # TODO: use better structure for separating mapped activity from summaries | ||
| dandiset_id = dandiset_id_folder_path.name | ||
|
|
||
| summary_file_path = summary_directory / dandiset_id / "dandiset_summary_by_region.tsv" | ||
| summary = pandas.read_table(filepath_or_buffer=summary_file_path) | ||
|
|
||
| unique_countries = {} | ||
| for region in summary["region"]: | ||
| if region in ["VPN", "GitHub", "unknown"]: | ||
| continue | ||
|
|
||
| country_code, region_name = region.split("/") | ||
| if "AWS" in country_code: | ||
| country_code = region_name.split("-")[0].upper() | ||
|
|
||
| unique_countries[country_code] = True | ||
|
|
||
| number_of_unique_regions = len(summary["region"]) | ||
| number_of_unique_countries = len(unique_countries) | ||
| all_dandiset_totals[dandiset_id] = { | ||
| "total_bytes_sent": int(summary["bytes_sent"].sum()), | ||
| "number_of_unique_regions": number_of_unique_regions, | ||
| "number_of_unique_countries": number_of_unique_countries, | ||
| } | ||
|
|
||
| top_level_summary_file_path = summary_directory / "all_dandiset_totals.json" | ||
| with top_level_summary_file_path.open(mode="w") as io: | ||
| json.dump(obj=all_dandiset_totals, fp=io) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.