Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y libpq-dev libsodium-dev libgmp-dev
- run: |
pip install hatch coverage
pip install "click<8.2" hatch coverage

- run: |
sudo cp .github/openssl-ci.cnf /etc/ssl/openssl.cnf
Expand Down
8 changes: 4 additions & 4 deletions deployment/docker-build/dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,20 @@ services:

ipfs:
restart: always
image: ipfs/kubo:v0.35.0
image: ipfs/kubo:v0.37.0
ports:
- "4001:4001"
- "4001:4001/udp"
- "127.0.0.1:5001:5001"
volumes:
- "pyaleph-ipfs:/data/ipfs"
- "./kubo.json:/etc/kubo.json:ro"
- "./001-update-ipfs-config.sh:/container-init.d/001-update-ipfs-config.sh:ro"
environment:
- IPFS_PROFILE=server
- IPFS_TELEMETRY=off
networks:
- pyaleph
command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate",
"--config-file", "/etc/kubo.json"]
command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate"]


networks:
Expand Down
8 changes: 4 additions & 4 deletions deployment/docker-build/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,21 @@ services:

ipfs:
restart: always
image: ipfs/kubo:v0.35.0
image: ipfs/kubo:v0.37.0
ports:
- "4001:4001"
- "4001:4001/udp"
- "127.0.0.1:5001:5001"
- "127.0.0.1:8080:8080"
volumes:
- "pyaleph-ipfs:/data/ipfs"
- "./kubo.json:/etc/kubo.json:ro"
- "./001-update-ipfs-config.sh:/container-init.d/001-update-ipfs-config.sh:ro"
environment:
- IPFS_PROFILE=server
- IPFS_TELEMETRY=off
networks:
- pyaleph
command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate",
"--config-file", "/etc/kubo.json"]
command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate"]

postgres:
restart: always
Expand Down
3 changes: 2 additions & 1 deletion deployment/samples/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ services:

ipfs:
restart: always
image: ipfs/kubo:v0.35.0
image: ipfs/kubo:v0.37.0
ports:
- "4001:4001"
- "4001:4001/udp"
Expand All @@ -115,6 +115,7 @@ services:
- "./001-update-ipfs-config.sh:/container-init.d/001-update-ipfs-config.sh:ro"
environment:
- IPFS_PROFILE=server
- IPFS_TELEMETRY=off
networks:
- pyaleph
command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate"]
Expand Down
8 changes: 4 additions & 4 deletions deployment/samples/docker-monitoring/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,20 @@ services:

ipfs:
restart: always
image: ipfs/kubo:v0.35.0
image: ipfs/kubo:v0.37.0
ports:
- "4001:4001"
- "4001:4001/udp"
- "127.0.0.1:5001:5001"
volumes:
- "pyaleph-ipfs:/data/ipfs"
- "./kubo.json:/etc/kubo.json:ro"
- "./001-update-ipfs-config.sh:/container-init.d/001-update-ipfs-config.sh:ro"
environment:
- IPFS_PROFILE=server
- IPFS_TELEMETRY=off
networks:
- pyaleph
command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate",
"--config-file", "/etc/kubo.json"]
command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate"]

prometheus:
restart: always
Expand Down
31 changes: 17 additions & 14 deletions src/aleph/jobs/job_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from aleph.types.message_processing_result import RejectedMessage, WillRetryMessage
from aleph.types.message_status import (
ErrorCode,
FileNotFoundException,
InvalidMessageException,
MessageContentUnavailable,
RetryMessageException,
)

Expand Down Expand Up @@ -221,32 +221,30 @@ async def _handle_retry(
pending_message: PendingMessageDb,
exception: BaseException,
) -> Union[RejectedMessage, WillRetryMessage]:
if isinstance(exception, FileNotFoundException):

# Assume if the error_code attribute exists, get it, but if not assign to general Internal Error code.
error_code = (
exception.error_code
if hasattr(exception, "error_code")
else ErrorCode.INTERNAL_ERROR
)

if isinstance(exception, MessageContentUnavailable):
LOGGER.warning(
"Could not fetch message %s, putting it back in the fetch queue: %s",
pending_message.item_hash,
str(exception),
)
error_code = exception.error_code
session.execute(
update(PendingMessageDb)
.where(PendingMessageDb.id == pending_message.id)
.values(fetched=False)
)
elif isinstance(exception, RetryMessageException):
LOGGER.warning(
"%s error (%d) - message %s marked for retry",
exception.error_code.name,
exception.error_code.value,
pending_message.item_hash,
)
error_code = exception.error_code
schedule_next_attempt(session=session, pending_message=pending_message)
else:
elif not isinstance(exception, RetryMessageException):
LOGGER.exception(
"Unexpected error while fetching message", exc_info=exception
)
error_code = ErrorCode.INTERNAL_ERROR

if pending_message.retries >= self.max_retries:
LOGGER.warning(
"Rejecting pending message: %s - too many retries",
Expand All @@ -258,6 +256,11 @@ async def _handle_retry(
exception=exception,
)
else:
LOGGER.warning(
"Message %s marked for retry: %s",
pending_message.item_hash,
str(exception),
)
schedule_next_attempt(session=session, pending_message=pending_message)
return WillRetryMessage(
pending_message=pending_message, error_code=error_code
Expand Down
6 changes: 2 additions & 4 deletions src/aleph/services/ipfs/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from aleph.services.ipfs.common import make_ipfs_client
from aleph.services.utils import get_IP
from aleph.types.message_status import FileContentUnavailable, FileUnavailable
from aleph.types.message_status import FileUnavailable
from aleph.utils import run_in_executor

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -120,9 +120,7 @@ async def get_ipfs_size(
await asyncio.sleep(0.5)
continue
except asyncio.TimeoutError:
raise FileContentUnavailable(
"Could not retrieve IPFS content at this time"
)
raise FileUnavailable("Could not retrieve IPFS content at this time")
except (
concurrent.futures.CancelledError,
aiohttp.client_exceptions.ClientConnectorError,
Expand Down
18 changes: 0 additions & 18 deletions src/aleph/types/message_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,6 @@ def __init__(self, file_hash: str):
super().__init__(f"File not found: {file_hash}")


class FileContentNotFoundException(RetryMessageException):
"""
A file required to process the message could not be found, locally and/or
on the network.
"""

def __init__(self, file_hash: str):
super().__init__(f"File content not found: {file_hash}")


class MessageContentUnavailable(FileNotFoundException):
"""
The message content is not available at the moment (storage/IPFS item types).
Expand All @@ -195,14 +185,6 @@ class FileUnavailable(FileNotFoundException):
error_code = ErrorCode.FILE_UNAVAILABLE


class FileContentUnavailable(FileContentNotFoundException):
"""
A file pointed to by the message is not available at the moment.
"""

error_code = ErrorCode.FILE_UNAVAILABLE


class NoAmendTarget(InvalidMessageException):
"""
A POST with type = amend does not specify a value in the ref field.
Expand Down
4 changes: 2 additions & 2 deletions tests/services/test_ipfs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest

from aleph.services.ipfs.service import IpfsService
from aleph.types.message_status import FileContentUnavailable
from aleph.types.message_status import FileUnavailable


@pytest.mark.asyncio
Expand Down Expand Up @@ -205,7 +205,7 @@ async def test_get_ipfs_size_timeout_error():

service = IpfsService(ipfs_client=ipfs_client)

with pytest.raises(FileContentUnavailable):
with pytest.raises(FileUnavailable):
# Mock asyncio.sleep to not actually sleep during test
with patch("asyncio.sleep", new_callable=AsyncMock):
# Execute
Expand Down
Loading