Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Homepage = "https://github.com/simple-repository/simple-repository-browser"
[project.optional-dependencies]
test = [
"pytest",
"pytest-asyncio",
]
dev = [
"simple-repository-browser[test]",
Expand Down
31 changes: 31 additions & 0 deletions simple_repository_browser/_typing_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import sys
import typing

if typing.TYPE_CHECKING:
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self

if sys.version_info >= (3, 12):
from typing import override
else:
from typing_extensions import override

if sys.version_info >= (3, 8):
from typing import Protocol, TypedDict
else:
from typing_extensions import Protocol, TypedDict

if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias

__all__ = [
"override",
"Self",
"TypedDict",
"TypeAlias",
"Protocol",
]
131 changes: 99 additions & 32 deletions simple_repository_browser/metadata_injector.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,135 @@
"""
Extended MetadataInjector that supports sdist (.tar.gz) and zip (.zip) formats.

This extends SimpleRepository's MetadataInjectorRepository to provide metadata extraction
for package formats beyond wheels.

"""

from dataclasses import replace
import pathlib
import tarfile
import typing
import zipfile

from simple_repository import model
from simple_repository.components.metadata_injector import MetadataInjectorRepository


def get_metadata_from_sdist(package_path: pathlib.Path) -> str:
archive = tarfile.TarFile.open(package_path)
names = archive.getnames()
def _extract_pkg_info_from_archive(
archive_names: typing.List[str],
extract_func: typing.Callable[[str], typing.Optional[typing.IO[bytes]]],
package_name: str,
) -> str:
"""
Extract PKG-INFO metadata from an archive.

Args:
archive_names: List of file names in the archive
extract_func: Function to extract a file from the archive
package_name: Name of the package for error messages

Returns:
Metadata content as string

pkg_info_files = [x.split("/") for x in names if "PKG-INFO" in x]
Raises:
ValueError: If no valid metadata is found
"""
pkg_info_files = [x.split("/") for x in archive_names if "PKG-INFO" in x]
# Sort by path length (descending) to prefer more specific/nested metadata files
ordered_pkg_info = sorted(pkg_info_files, key=lambda pth: -len(pth))

for path in ordered_pkg_info:
candidate = "/".join(path)
f = archive.extractfile(candidate)
f = extract_func(candidate)
if f is None:
continue
data = f.read().decode()
if "Metadata-Version" in data:
return data
raise ValueError(f"No metadata found in {package_path.name}")
try:
data = f.read().decode("utf-8")
if "Metadata-Version" in data:
return data
except (UnicodeDecodeError, OSError):
# Skip files that can't be decoded or read
continue

raise ValueError(f"No valid PKG-INFO metadata found in {package_name}")


def get_metadata_from_sdist(package_path: pathlib.Path) -> str:
"""Extract metadata from a source distribution (.tar.gz file)."""
with tarfile.TarFile.open(package_path) as archive:
names = archive.getnames()

def extract_func(candidate: str) -> typing.Optional[typing.IO[bytes]]:
return archive.extractfile(candidate)

return _extract_pkg_info_from_archive(names, extract_func, package_path.name)


def get_metadata_from_zip(package_path: pathlib.Path) -> str:
# Used by pyreadline. (a zipfile)
"""Extract metadata from a zip file (legacy format, used by packages like pyreadline)."""
with zipfile.ZipFile(package_path) as archive:
names = archive.namelist()

pkg_info_files = [x.split("/") for x in names if "PKG-INFO" in x]
ordered_pkg_info = sorted(pkg_info_files, key=lambda pth: -len(pth))
def extract_func(candidate: str) -> typing.Optional[typing.IO[bytes]]:
try:
return archive.open(candidate, mode="r")
except (KeyError, zipfile.BadZipFile):
return None

for path in ordered_pkg_info:
candidate = "/".join(path)
f = archive.open(candidate, mode="r")
if f is None:
continue
data = f.read().decode()
if "Metadata-Version" in data:
return data
raise ValueError(f"No metadata found in {package_path.name}")
return _extract_pkg_info_from_archive(names, extract_func, package_path.name)


class MetadataInjector(MetadataInjectorRepository):
"""
Extended MetadataInjector that supports multiple package formats.

This class extends SimpleRepository's MetadataInjectorRepository to provide
metadata extraction for:
- Wheel files (.whl) - handled by parent class
- Source distributions (.tar.gz) - contains PKG-INFO files
- Zip files (.zip) - legacy format used by some packages
"""

# Map of supported file extensions to their extraction functions
_EXTRACTORS: typing.Dict[
str, typing.Callable[["MetadataInjector", pathlib.Path], str]
] = {
".whl": lambda self, path: self._get_metadata_from_wheel(path),
".tar.gz": lambda self, path: get_metadata_from_sdist(path),
".zip": lambda self, path: get_metadata_from_zip(path),
}

def _get_metadata_from_package(self, package_path: pathlib.Path) -> str:
if package_path.name.endswith(".whl"):
return self._get_metadata_from_wheel(package_path)
elif package_path.name.endswith(".tar.gz"):
return get_metadata_from_sdist(package_path)
elif package_path.name.endswith(".zip"):
return get_metadata_from_zip(package_path)
raise ValueError("Package provided is not a wheel")
"""Extract metadata from a package file based on its extension."""
package_name = package_path.name

for extension, extractor in self._EXTRACTORS.items():
if package_name.endswith(extension):
return extractor(self, package_path)

# Provide more descriptive error message
supported_formats = ", ".join(self._EXTRACTORS.keys())
raise ValueError(
f"Unsupported package format: {package_name}. "
f"Supported formats: {supported_formats}"
)

def _add_metadata_attribute(
self,
project_page: model.ProjectDetail,
) -> model.ProjectDetail:
"""Add the data-core-metadata to all the packages distributed as wheels"""
"""
Add the data-core-metadata attribute to all supported package files.

Unlike the parent class which only adds metadata attributes to wheel files,
this implementation adds them to all files with URLs, enabling metadata
requests for sdist and zip files as well.
"""
files = []
for file in project_page.files:
if file.url and not file.dist_info_metadata:
matching_extension = file.filename.endswith(tuple(self._EXTRACTORS.keys()))
if matching_extension and not file.dist_info_metadata:
file = replace(file, dist_info_metadata=True)
files.append(file)
project_page = replace(project_page, files=tuple(files))
return project_page
return replace(project_page, files=tuple(files))
154 changes: 154 additions & 0 deletions simple_repository_browser/tests/test_metadata_injector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import pathlib
import typing

import httpx
import pytest
from simple_repository import SimpleRepository, model
import simple_repository.errors

from .._typing_compat import override
from ..metadata_injector import MetadataInjector


class FakeRepository(SimpleRepository):
"""A repository which"""

def __init__(self) -> None:
self.project_pages: dict[str, model.ProjectDetail] = {}
self.resources: dict[str, model.Resource] = {}

@override
async def get_project_page(
self,
project_name: str,
*,
request_context: typing.Optional[model.RequestContext] = None,
) -> model.ProjectDetail:
try:
return self.project_pages[project_name]
except:
raise simple_repository.errors.PackageNotFoundError(project_name)

@override
async def get_resource(
self,
project_name: str,
resource_name: str,
*,
request_context: typing.Optional[model.RequestContext] = None,
) -> model.Resource:
try:
return self.resources[resource_name]
except:
raise simple_repository.errors.ResourceUnavailable(resource_name)


@pytest.fixture
def repository() -> MetadataInjector:
return MetadataInjector(
source=FakeRepository(),
http_client=httpx.AsyncClient(),
)


@pytest.fixture(scope="session")
def cache_dir() -> pathlib.Path:
cache_path = pathlib.Path(__file__).parent / "cache"
cache_path.mkdir(exist_ok=True)
return cache_path


async def download_package(url: str, cache_dir: pathlib.Path) -> pathlib.Path:
"""Download package to cache if not already present."""
filename = url.split("/")[-1]
cache_path = cache_dir / filename

if cache_path.exists():
return cache_path

async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url)
response.raise_for_status()
cache_path.write_bytes(response.content)

return cache_path


@pytest.mark.parametrize(
["url", "project", "version"],
[
(
"https://files.pythonhosted.org/packages/a7/56/5f481ac5fcde5eb0fcfd5f3a421c0b345842cd9e0019048b8adeb17a3ecc/simple_repository-0.9.0-py3-none-any.whl",
"simple-repository",
"0.9.0",
),
(
"https://files.pythonhosted.org/packages/2e/19/d7c972dfe90a353dbd3efbbe1d14a5951de80c99c9dc1b93cd998d51dc0f/numpy-2.3.1.tar.gz",
"numpy",
"2.3.1",
),
(
"https://files.pythonhosted.org/packages/5e/20/91f4ed6fdc3c399fc58e9af1f812a1f5cb002f479494ecacc39b6be96032/numpy-1.10.0.post2.tar.gz",
"numpy",
"1.10.0.post2",
),
(
"https://files.pythonhosted.org/packages/bc/7c/d724ef1ec3ab2125f38a1d53285745445ec4a8f19b9bb0761b4064316679/pyreadline-2.1.zip",
"pyreadline",
"2.1",
),
],
)
@pytest.mark.asyncio
async def test_get_metadata_from_packages(
cache_dir: pathlib.Path,
repository: MetadataInjector,
url: str,
project: str,
version: str,
) -> None:
"""Test metadata extraction from different package formats."""
package_path = await download_package(url, cache_dir)

# Create a fake resource to test get_resource
filename = url.split("/")[-1]
fake_resource = model.LocalResource(package_path)

fake_root = typing.cast(FakeRepository, repository.source)
fake_root.resources[filename] = fake_resource

# Test get_resource returns metadata
resource = await repository.get_resource(project, filename + ".metadata")
assert isinstance(resource, model.TextResource)

metadata = resource.text

assert "Metadata-Version" in metadata
assert f"Name: {project}" in metadata
assert f"Version: {version}" in metadata


@pytest.mark.asyncio
async def test_add_metadata_attribute_adds_to_all_files(
repository: MetadataInjector,
) -> None:
"""Test metadata attributes added to all files with URLs."""
project_page = model.ProjectDetail(
meta=model.Meta("1.0"),
name="test-project",
files=(
model.File("test-1.0.whl", "", {}),
model.File("test-1.0.tar.gz", "", {}),
model.File("test-1.0.zip", "", {}),
model.File("test-1.0.egg", "", {}),
),
)
source = typing.cast(FakeRepository, repository.source)
source.project_pages["test-project"] = project_page

detail = await repository.get_project_page("test-project")

assert detail.files[0].dist_info_metadata is True # .whl
assert detail.files[1].dist_info_metadata is True # .tar.gz
assert detail.files[2].dist_info_metadata is True # .zip
assert detail.files[3].dist_info_metadata is None # .egg (no URL)