Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,786 changes: 2,564 additions & 2,222 deletions packages/examples/cvat/exchange-oracle/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/examples/cvat/exchange-oracle/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ starlette = ">=0.40.0" # avoid the vulnerability with multipart/form-data
cryptography = "<44.0.0" # human-protocol-sdk -> pgpy dep requires cryptography < 45
aiocache = {extras = ["msgpack", "redis"], version = "^0.12.3"} # convenient api for redis (async)
cachelib = "^0.13.0" # convenient api for redis (sync)
human-protocol-sdk = "^4.3.0"
human-protocol-sdk = "5.0.1"


[tool.poetry.group.dev.dependencies]
Expand Down
13 changes: 9 additions & 4 deletions packages/examples/cvat/exchange-oracle/src/core/oracle_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class JobLauncherEvent_EscrowCreated(OracleEvent):
pass # escrow is enough


class JobLauncherEvent_EscrowCanceled(OracleEvent):
class JobLauncherEvent_EscrowCanceledRequested(OracleEvent):
pass # escrow is enough


Expand All @@ -38,7 +38,7 @@ class RejectedAssignmentInfo(BaseModel):
assignments: list[RejectedAssignmentInfo]


class ExchangeOracleEvent_JobCreationFailed(OracleEvent):
class ExchangeOracleEvent_EscrowFailed(OracleEvent):
# no task_id, escrow is enough for now
reason: str

Expand All @@ -55,15 +55,20 @@ class ReputationOracleEvent_EscrowCompleted(OracleEvent):
pass


class ReputationOracleEvent_EscrowCanceled(OracleEvent):
pass # no data


_event_type_map = {
JobLauncherEventTypes.escrow_created: JobLauncherEvent_EscrowCreated,
JobLauncherEventTypes.escrow_canceled: JobLauncherEvent_EscrowCanceled,
JobLauncherEventTypes.cancellation_requested: JobLauncherEvent_EscrowCanceledRequested,
RecordingOracleEventTypes.job_completed: RecordingOracleEvent_JobCompleted,
RecordingOracleEventTypes.submission_rejected: RecordingOracleEvent_SubmissionRejected,
ExchangeOracleEventTypes.job_creation_failed: ExchangeOracleEvent_JobCreationFailed,
ExchangeOracleEventTypes.escrow_failed: ExchangeOracleEvent_EscrowFailed,
ExchangeOracleEventTypes.job_finished: ExchangeOracleEvent_JobFinished,
ExchangeOracleEventTypes.escrow_cleaned: ExchangeOracleEvent_EscrowCleaned,
ReputationOracleEventTypes.escrow_completed: ReputationOracleEvent_EscrowCompleted,
ReputationOracleEventTypes.escrow_canceled: ReputationOracleEvent_EscrowCanceled,
}


Expand Down
8 changes: 5 additions & 3 deletions packages/examples/cvat/exchange-oracle/src/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ class TaskTypes(str, Enum, metaclass=BetterEnumMeta):
audio_transcription = "audio_transcription"
audio_attribute_annotation = "audio_attribute_annotation"


class AudinoTaskTypes(str, Enum):
audio_transcription = ["is_start", "is_end", "is_transcription"]
audio_attribute_annotation= ["is_start", "is_end"]
audio_attribute_annotation = ["is_start", "is_end"]


class CvatLabelTypes(str, Enum, metaclass=BetterEnumMeta):
Expand All @@ -68,14 +69,14 @@ class OracleWebhookTypes(str, Enum, metaclass=BetterEnumMeta):


class ExchangeOracleEventTypes(str, Enum, metaclass=BetterEnumMeta):
job_creation_failed = "job_creation_failed"
escrow_failed = "escrow_failed"
job_finished = "job_finished"
escrow_cleaned = "escrow_cleaned"


class JobLauncherEventTypes(str, Enum, metaclass=BetterEnumMeta):
escrow_created = "escrow_created"
escrow_canceled = "escrow_canceled"
cancellation_requested = "cancellation_requested"


class RecordingOracleEventTypes(str, Enum, metaclass=BetterEnumMeta):
Expand All @@ -86,6 +87,7 @@ class RecordingOracleEventTypes(str, Enum, metaclass=BetterEnumMeta):
class ReputationOracleEventTypes(str, Enum, metaclass=BetterEnumMeta):
# TODO: rename to ReputationOracleEventType
escrow_completed = "escrow_completed"
escrow_canceled = "escrow_canceled"


class OracleWebhookStatuses(str, Enum, metaclass=BetterEnumMeta):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import src.services.webhook as oracle_db_service
from src import db
from src.core.config import CronConfig
from src.core.oracle_events import ExchangeOracleEvent_JobCreationFailed
from src.core.oracle_events import ExchangeOracleEvent_EscrowFailed
from src.core.types import JobStatuses, OracleWebhookTypes, ProjectStatuses
from src.crons._cron_job import cron_job
from src.db import SessionLocal
Expand Down Expand Up @@ -168,7 +168,7 @@ def track_task_creation(logger: logging.Logger, session: Session) -> None:
escrow_address=project.escrow_address,
chain_id=project.chain_id,
type=OracleWebhookTypes.job_launcher,
event=ExchangeOracleEvent_JobCreationFailed(reason=reason),
event=ExchangeOracleEvent_EscrowFailed(reason=reason),
)
elif status == cvat_api.UploadStatus.FINISHED:
try:
Expand Down Expand Up @@ -200,7 +200,7 @@ def track_task_creation(logger: logging.Logger, session: Session) -> None:
escrow_address=project.escrow_address,
chain_id=project.chain_id,
type=OracleWebhookTypes.job_launcher,
event=ExchangeOracleEvent_JobCreationFailed(reason=str(e)),
event=ExchangeOracleEvent_EscrowFailed(reason=str(e)),
)

if completed:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
from src.chain.kvstore import get_job_launcher_url
from src.core.config import Config, CronConfig
from src.core.oracle_events import (
ExchangeOracleEvent_EscrowCleaned,
ExchangeOracleEvent_JobCreationFailed,
ExchangeOracleEvent_EscrowFailed,
)
from src.core.types import JobLauncherEventTypes, Networks, OracleWebhookTypes, ProjectStatuses
from src.crons._cron_job import cron_job
from src.crons.webhooks._common import handle_webhook, process_outgoing_webhooks
from src.db.utils import ForUpdateParams
from src.handlers.completed_escrows import export_escrow_annotations
from src.handlers.escrow_cleanup import cleanup_escrow
from src.models.webhook import Webhook

Expand All @@ -36,7 +36,7 @@ def handle_failure(session: Session, webhook: Webhook, exc: Exception) -> None:
escrow_address=webhook.escrow_address,
chain_id=webhook.chain_id,
type=OracleWebhookTypes.job_launcher,
event=ExchangeOracleEvent_JobCreationFailed(reason=str(exc)),
event=ExchangeOracleEvent_EscrowFailed(reason=str(exc)),
)


Expand Down Expand Up @@ -98,11 +98,11 @@ def handle_job_launcher_event(webhook: Webhook, *, db_session: Session, logger:
)
raise

case JobLauncherEventTypes.escrow_canceled:
case JobLauncherEventTypes.cancellation_requested:
validate_escrow(
webhook.chain_id,
webhook.escrow_address,
accepted_states=[EscrowStatus.Pending, EscrowStatus.Cancelled],
accepted_states=[EscrowStatus.ToCancel],
)

projects = cvat_db_service.get_projects_by_escrow_address(
Expand Down Expand Up @@ -139,15 +139,11 @@ def handle_job_launcher_event(webhook: Webhook, *, db_session: Session, logger:
cvat_db_service.update_project_statuses_by_escrow_address(
db_session, webhook.escrow_address, webhook.chain_id, ProjectStatuses.canceled
)
cleanup_escrow(webhook.escrow_address, Networks(webhook.chain_id), projects)

oracle_db_service.outbox.create_webhook(
session=db_session,
escrow_address=webhook.escrow_address,
chain_id=webhook.chain_id,
type=OracleWebhookTypes.recording_oracle,
event=ExchangeOracleEvent_EscrowCleaned(),

export_escrow_annotations(
logger, webhook.chain_id, webhook.escrow_address, projects, db_session
)

case _:
raise AssertionError(f"Unknown job launcher event {webhook.event_type}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ def process_incoming_reputation_oracle_webhooks(logger: logging.Logger, session:
type=OracleWebhookTypes.recording_oracle,
event=ExchangeOracleEvent_EscrowCleaned(),
)

case ReputationOracleEventTypes.escrow_canceled:
projects = db_service.get_projects_by_escrow_address(
session, webhook.escrow_address
)
cleanup_escrow(webhook.escrow_address, Networks(webhook.chain_id), projects)

db_service.update_project_statuses_by_escrow_address(
session,
webhook.escrow_address,
webhook.chain_id,
status=ProjectStatuses.deleted,
)

case _:
raise TypeError(f"Unknown reputation oracle event {webhook.event_type}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def _download_with_retries(
raise
return None


def _export_escrow_annotations(
logger: logging.Logger,
chain_id: int,
Expand Down Expand Up @@ -138,13 +137,20 @@ def _export_escrow_annotations(
escrow_address=escrow_address,
)

oracle_db_service.outbox.create_webhook(
session,
escrow_address=escrow_address,
chain_id=chain_id,
type=OracleWebhookTypes.recording_oracle,
event=ExchangeOracleEvent_JobFinished(),
)
def export_escrow_annotations(
logger: logging.Logger,
chain_id: int,
escrow_address: str,
escrow_projects: Sequence[Project],
session: Session,
) -> None:
"""
Export annotations for the given escrow address.

This function fetches the manifest for the escrow, downloads the annotations
from CVAT, post-processes them, and uploads the results to cloud storage.
"""
_export_escrow_annotations(logger, chain_id, escrow_address, escrow_projects, session)

logger.info(
f"The escrow ({escrow_address=}) is completed, "
Expand Down Expand Up @@ -294,8 +300,8 @@ def _download_job_annotations(
job.cvat_project_id,
job.cvat_task_id,
job.cvat_id,
job_assignment.user.cvat_id,
job_assignment.id,
job_assignment.user.cvat_id if job_assignment else "NONE",
job_assignment.id if job_assignment else "NONE",
),
file=job_annotations_file,
)
Expand All @@ -315,6 +321,19 @@ def _handle_escrow_validation(
)
_export_escrow_annotations(logger, chain_id, escrow_address, escrow_projects, session)

oracle_db_service.outbox.create_webhook(
session,
escrow_address=escrow_address,
chain_id=chain_id,
type=OracleWebhookTypes.recording_oracle,
event=ExchangeOracleEvent_JobFinished(),
)

logger.info(
f"The escrow ({escrow_address=}) is completed, "
f"resulting annotations are processed successfully"
)


def handle_escrows_validations(logger: logging.Logger) -> None:
for _ in range(CronConfig.track_escrow_validations_chunk_size):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os
import zipfile
import math
import string
import random
from collections import defaultdict
from collections.abc import Sequence
from dataclasses import dataclass
Expand Down Expand Up @@ -94,13 +96,14 @@ def prepare_annotation_metafile(
else:
jobs_start_time: dict[int, float] = {}

characters = string.ascii_letters + string.digits
meta = AnnotationMeta(
jobs=[
JobMeta(
job_id=job.cvat_id,
annotation_filename=job_annotations[job.cvat_id].filename,
annotator_wallet_address=job.latest_assignment.user_wallet_address,
assignment_id=job.latest_assignment.id,
annotator_wallet_address=job.latest_assignment.user_wallet_address if job.latest_assignment else "NONE",
assignment_id=job.latest_assignment.id if job.latest_assignment else ''.join(random.choices(characters, k=10)),
task_id=job.cvat_task_id,
start_frame=job.start_frame,
stop_frame=job.stop_frame,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from src.core.config import Config
from src.core.oracle_events import OracleEvent, validate_event
from src.core.types import OracleWebhookStatuses, OracleWebhookTypes
from src.core.types import OracleWebhookStatuses, OracleWebhookTypes,JobLauncherEventTypes
from src.db.utils import ForUpdateParams
from src.db.utils import maybe_for_update as _maybe_for_update
from src.models.webhook import Webhook
Expand Down Expand Up @@ -124,14 +124,19 @@ def handle_webhook_success(self, session: Session, webhook_id: str) -> None:
session.execute(upd)

def handle_webhook_fail(self, session: Session, webhook_id: str) -> None:
webhook = session.query(Webhook).filter(Webhook.id == webhook_id).first()
if webhook and webhook.event_type == JobLauncherEventTypes.cancellation_requested:
max_retries = 100 # hardcoded value
else:
max_retries = Config.webhook_max_retries
upd = (
update(Webhook)
.where(Webhook.id == webhook_id)
.values(
attempts=Webhook.attempts + 1,
status=case(
(
Webhook.attempts + 1 >= Config.webhook_max_retries,
Webhook.attempts + 1 >= max_retries,
OracleWebhookStatuses.failed.value,
),
else_=OracleWebhookStatuses.pending.value,
Expand Down
Loading