diff --git a/pyproject.toml b/pyproject.toml index 2e6502e..4fcd1b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ Homepage = "https://github.com/simple-repository/simple-repository-browser" [project.optional-dependencies] test = [ "pytest", + "pytest-asyncio", ] dev = [ "simple-repository-browser[test]", diff --git a/simple_repository_browser/_typing_compat.py b/simple_repository_browser/_typing_compat.py new file mode 100644 index 0000000..0d7a7ce --- /dev/null +++ b/simple_repository_browser/_typing_compat.py @@ -0,0 +1,31 @@ +import sys +import typing + +if typing.TYPE_CHECKING: + if sys.version_info >= (3, 11): + from typing import Self + else: + from typing_extensions import Self + +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override + +if sys.version_info >= (3, 8): + from typing import Protocol, TypedDict +else: + from typing_extensions import Protocol, TypedDict + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias + +__all__ = [ + "override", + "Self", + "TypedDict", + "TypeAlias", + "Protocol", +] diff --git a/simple_repository_browser/metadata_injector.py b/simple_repository_browser/metadata_injector.py index 7ced78c..020a34e 100644 --- a/simple_repository_browser/metadata_injector.py +++ b/simple_repository_browser/metadata_injector.py @@ -1,68 +1,135 @@ +""" +Extended MetadataInjector that supports sdist (.tar.gz) and zip (.zip) formats. + +This extends SimpleRepository's MetadataInjectorRepository to provide metadata extraction +for package formats beyond wheels. + +""" + from dataclasses import replace import pathlib import tarfile +import typing import zipfile from simple_repository import model from simple_repository.components.metadata_injector import MetadataInjectorRepository -def get_metadata_from_sdist(package_path: pathlib.Path) -> str: - archive = tarfile.TarFile.open(package_path) - names = archive.getnames() +def _extract_pkg_info_from_archive( + archive_names: typing.List[str], + extract_func: typing.Callable[[str], typing.Optional[typing.IO[bytes]]], + package_name: str, +) -> str: + """ + Extract PKG-INFO metadata from an archive. + + Args: + archive_names: List of file names in the archive + extract_func: Function to extract a file from the archive + package_name: Name of the package for error messages + + Returns: + Metadata content as string - pkg_info_files = [x.split("/") for x in names if "PKG-INFO" in x] + Raises: + ValueError: If no valid metadata is found + """ + pkg_info_files = [x.split("/") for x in archive_names if "PKG-INFO" in x] + # Sort by path length (descending) to prefer more specific/nested metadata files ordered_pkg_info = sorted(pkg_info_files, key=lambda pth: -len(pth)) for path in ordered_pkg_info: candidate = "/".join(path) - f = archive.extractfile(candidate) + f = extract_func(candidate) if f is None: continue - data = f.read().decode() - if "Metadata-Version" in data: - return data - raise ValueError(f"No metadata found in {package_path.name}") + try: + data = f.read().decode("utf-8") + if "Metadata-Version" in data: + return data + except (UnicodeDecodeError, OSError): + # Skip files that can't be decoded or read + continue + + raise ValueError(f"No valid PKG-INFO metadata found in {package_name}") + + +def get_metadata_from_sdist(package_path: pathlib.Path) -> str: + """Extract metadata from a source distribution (.tar.gz file).""" + with tarfile.TarFile.open(package_path) as archive: + names = archive.getnames() + + def extract_func(candidate: str) -> typing.Optional[typing.IO[bytes]]: + return archive.extractfile(candidate) + + return _extract_pkg_info_from_archive(names, extract_func, package_path.name) def get_metadata_from_zip(package_path: pathlib.Path) -> str: - # Used by pyreadline. (a zipfile) + """Extract metadata from a zip file (legacy format, used by packages like pyreadline).""" with zipfile.ZipFile(package_path) as archive: names = archive.namelist() - pkg_info_files = [x.split("/") for x in names if "PKG-INFO" in x] - ordered_pkg_info = sorted(pkg_info_files, key=lambda pth: -len(pth)) + def extract_func(candidate: str) -> typing.Optional[typing.IO[bytes]]: + try: + return archive.open(candidate, mode="r") + except (KeyError, zipfile.BadZipFile): + return None - for path in ordered_pkg_info: - candidate = "/".join(path) - f = archive.open(candidate, mode="r") - if f is None: - continue - data = f.read().decode() - if "Metadata-Version" in data: - return data - raise ValueError(f"No metadata found in {package_path.name}") + return _extract_pkg_info_from_archive(names, extract_func, package_path.name) class MetadataInjector(MetadataInjectorRepository): + """ + Extended MetadataInjector that supports multiple package formats. + + This class extends SimpleRepository's MetadataInjectorRepository to provide + metadata extraction for: + - Wheel files (.whl) - handled by parent class + - Source distributions (.tar.gz) - contains PKG-INFO files + - Zip files (.zip) - legacy format used by some packages + """ + + # Map of supported file extensions to their extraction functions + _EXTRACTORS: typing.Dict[ + str, typing.Callable[["MetadataInjector", pathlib.Path], str] + ] = { + ".whl": lambda self, path: self._get_metadata_from_wheel(path), + ".tar.gz": lambda self, path: get_metadata_from_sdist(path), + ".zip": lambda self, path: get_metadata_from_zip(path), + } + def _get_metadata_from_package(self, package_path: pathlib.Path) -> str: - if package_path.name.endswith(".whl"): - return self._get_metadata_from_wheel(package_path) - elif package_path.name.endswith(".tar.gz"): - return get_metadata_from_sdist(package_path) - elif package_path.name.endswith(".zip"): - return get_metadata_from_zip(package_path) - raise ValueError("Package provided is not a wheel") + """Extract metadata from a package file based on its extension.""" + package_name = package_path.name + + for extension, extractor in self._EXTRACTORS.items(): + if package_name.endswith(extension): + return extractor(self, package_path) + + # Provide more descriptive error message + supported_formats = ", ".join(self._EXTRACTORS.keys()) + raise ValueError( + f"Unsupported package format: {package_name}. " + f"Supported formats: {supported_formats}" + ) def _add_metadata_attribute( self, project_page: model.ProjectDetail, ) -> model.ProjectDetail: - """Add the data-core-metadata to all the packages distributed as wheels""" + """ + Add the data-core-metadata attribute to all supported package files. + + Unlike the parent class which only adds metadata attributes to wheel files, + this implementation adds them to all files with URLs, enabling metadata + requests for sdist and zip files as well. + """ files = [] for file in project_page.files: - if file.url and not file.dist_info_metadata: + matching_extension = file.filename.endswith(tuple(self._EXTRACTORS.keys())) + if matching_extension and not file.dist_info_metadata: file = replace(file, dist_info_metadata=True) files.append(file) - project_page = replace(project_page, files=tuple(files)) - return project_page + return replace(project_page, files=tuple(files)) diff --git a/simple_repository_browser/tests/test_metadata_injector.py b/simple_repository_browser/tests/test_metadata_injector.py new file mode 100644 index 0000000..5cd1678 --- /dev/null +++ b/simple_repository_browser/tests/test_metadata_injector.py @@ -0,0 +1,154 @@ +import pathlib +import typing + +import httpx +import pytest +from simple_repository import SimpleRepository, model +import simple_repository.errors + +from .._typing_compat import override +from ..metadata_injector import MetadataInjector + + +class FakeRepository(SimpleRepository): + """A repository which""" + + def __init__(self) -> None: + self.project_pages: dict[str, model.ProjectDetail] = {} + self.resources: dict[str, model.Resource] = {} + + @override + async def get_project_page( + self, + project_name: str, + *, + request_context: typing.Optional[model.RequestContext] = None, + ) -> model.ProjectDetail: + try: + return self.project_pages[project_name] + except: + raise simple_repository.errors.PackageNotFoundError(project_name) + + @override + async def get_resource( + self, + project_name: str, + resource_name: str, + *, + request_context: typing.Optional[model.RequestContext] = None, + ) -> model.Resource: + try: + return self.resources[resource_name] + except: + raise simple_repository.errors.ResourceUnavailable(resource_name) + + +@pytest.fixture +def repository() -> MetadataInjector: + return MetadataInjector( + source=FakeRepository(), + http_client=httpx.AsyncClient(), + ) + + +@pytest.fixture(scope="session") +def cache_dir() -> pathlib.Path: + cache_path = pathlib.Path(__file__).parent / "cache" + cache_path.mkdir(exist_ok=True) + return cache_path + + +async def download_package(url: str, cache_dir: pathlib.Path) -> pathlib.Path: + """Download package to cache if not already present.""" + filename = url.split("/")[-1] + cache_path = cache_dir / filename + + if cache_path.exists(): + return cache_path + + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get(url) + response.raise_for_status() + cache_path.write_bytes(response.content) + + return cache_path + + +@pytest.mark.parametrize( + ["url", "project", "version"], + [ + ( + "https://files.pythonhosted.org/packages/a7/56/5f481ac5fcde5eb0fcfd5f3a421c0b345842cd9e0019048b8adeb17a3ecc/simple_repository-0.9.0-py3-none-any.whl", + "simple-repository", + "0.9.0", + ), + ( + "https://files.pythonhosted.org/packages/2e/19/d7c972dfe90a353dbd3efbbe1d14a5951de80c99c9dc1b93cd998d51dc0f/numpy-2.3.1.tar.gz", + "numpy", + "2.3.1", + ), + ( + "https://files.pythonhosted.org/packages/5e/20/91f4ed6fdc3c399fc58e9af1f812a1f5cb002f479494ecacc39b6be96032/numpy-1.10.0.post2.tar.gz", + "numpy", + "1.10.0.post2", + ), + ( + "https://files.pythonhosted.org/packages/bc/7c/d724ef1ec3ab2125f38a1d53285745445ec4a8f19b9bb0761b4064316679/pyreadline-2.1.zip", + "pyreadline", + "2.1", + ), + ], +) +@pytest.mark.asyncio +async def test_get_metadata_from_packages( + cache_dir: pathlib.Path, + repository: MetadataInjector, + url: str, + project: str, + version: str, +) -> None: + """Test metadata extraction from different package formats.""" + package_path = await download_package(url, cache_dir) + + # Create a fake resource to test get_resource + filename = url.split("/")[-1] + fake_resource = model.LocalResource(package_path) + + fake_root = typing.cast(FakeRepository, repository.source) + fake_root.resources[filename] = fake_resource + + # Test get_resource returns metadata + resource = await repository.get_resource(project, filename + ".metadata") + assert isinstance(resource, model.TextResource) + + metadata = resource.text + + assert "Metadata-Version" in metadata + assert f"Name: {project}" in metadata + assert f"Version: {version}" in metadata + + +@pytest.mark.asyncio +async def test_add_metadata_attribute_adds_to_all_files( + repository: MetadataInjector, +) -> None: + """Test metadata attributes added to all files with URLs.""" + project_page = model.ProjectDetail( + meta=model.Meta("1.0"), + name="test-project", + files=( + model.File("test-1.0.whl", "", {}), + model.File("test-1.0.tar.gz", "", {}), + model.File("test-1.0.zip", "", {}), + model.File("test-1.0.egg", "", {}), + ), + ) + source = typing.cast(FakeRepository, repository.source) + source.project_pages["test-project"] = project_page + + detail = await repository.get_project_page("test-project") + + assert detail.files[0].dist_info_metadata is True # .whl + assert detail.files[1].dist_info_metadata is True # .tar.gz + assert detail.files[2].dist_info_metadata is True # .zip + assert detail.files[3].dist_info_metadata is None # .egg (no URL)