Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""empty message

Revision ID: 8ece21fbeb47
Revises: 1c06d0ade60c
Create Date: 2025-03-18 09:58:57.469799

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import func


# revision identifiers, used by Alembic.
revision = '8ece21fbeb47'
down_revision = '1c06d0ade60c'
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"balances", sa.Column("last_update", sa.TIMESTAMP(
timezone=True), nullable=False, server_default=func.now(), onupdate=func.now())
)

op.create_table(
"cron_jobs",
sa.Column("id", sa.String(), nullable=False),
# Interval is specified in seconds
sa.Column("interval", sa.Integer(), nullable=False, default=24),
sa.Column("last_run", sa.TIMESTAMP(timezone=True), nullable=False),
sa.PrimaryKeyConstraint("id"),
)

op.execute(
"""
INSERT INTO cron_jobs(id, interval, last_run) VALUES ('balance', 3600, '2025-01-01 00:00:00')
"""
)

pass


def downgrade() -> None:
op.drop_column("balances", "last_update")

op.drop_table("cron_jobs")

pass
10 changes: 10 additions & 0 deletions src/aleph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from aleph.db.connection import make_db_url, make_engine, make_session_factory
from aleph.exceptions import InvalidConfigException, KeyNotFoundException
from aleph.jobs import start_jobs
from aleph.jobs.cron.balance_job import BalanceCronJob
from aleph.jobs.cron.cron_job import CronJob, cron_job_task
from aleph.network import listener_tasks
from aleph.services import p2p
from aleph.services.cache.materialized_views import refresh_cache_materialized_views
Expand Down Expand Up @@ -147,6 +149,10 @@ async def main(args: List[str]) -> None:
garbage_collector = GarbageCollector(
session_factory=session_factory, storage_service=storage_service
)
cron_job = CronJob(
session_factory=session_factory,
jobs={"balance": BalanceCronJob(session_factory=session_factory)},
)
chain_data_service = ChainDataService(
session_factory=session_factory,
storage_service=storage_service,
Expand Down Expand Up @@ -203,6 +209,10 @@ async def main(args: List[str]) -> None:
)
LOGGER.debug("Initialized garbage collector task")

LOGGER.debug("Initializing cron job task")
tasks.append(cron_job_task(config=config, cron_job=cron_job))
LOGGER.debug("Initialized cron job task")

LOGGER.debug("Running event loop")
await asyncio.gather(*tasks)

Expand Down
4 changes: 4 additions & 0 deletions src/aleph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def get_defaults():
# Maximum number of chain/sync events processed at the same time.
"max_concurrency": 20,
},
"cron": {
# Interval between cron job trackers runs, expressed in hours.
"period": 0.5, # 30 mins
},
},
"cache": {
"ttl": {
Expand Down
23 changes: 18 additions & 5 deletions src/aleph/db/accessors/balances.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime as dt
from decimal import Decimal
from io import StringIO
from typing import Dict, Mapping, Optional, Sequence
Expand All @@ -7,6 +8,7 @@
from sqlalchemy.sql import Select

from aleph.db.models import AlephBalanceDb
from aleph.toolkit.timestamp import utc_now
from aleph.types.db_session import DbSession


Expand Down Expand Up @@ -140,6 +142,8 @@ def update_balances(
table from the temporary one.
"""

last_update = utc_now()

session.execute(
"CREATE TEMPORARY TABLE temp_balances AS SELECT * FROM balances WITH NO DATA" # type: ignore[arg-type]
)
Expand All @@ -151,21 +155,21 @@ def update_balances(
csv_balances = StringIO(
"\n".join(
[
f"{address};{chain.value};{dapp or ''};{balance};{eth_height}"
f"{address};{chain.value};{dapp or ''};{balance};{eth_height};{last_update}"
for address, balance in balances.items()
]
)
)
cursor.copy_expert(
"COPY temp_balances(address, chain, dapp, balance, eth_height) FROM STDIN WITH CSV DELIMITER ';'",
"COPY temp_balances(address, chain, dapp, balance, eth_height, last_update) FROM STDIN WITH CSV DELIMITER ';'",
csv_balances,
)
session.execute(
"""
INSERT INTO balances(address, chain, dapp, balance, eth_height)
(SELECT address, chain, dapp, balance, eth_height FROM temp_balances)
INSERT INTO balances(address, chain, dapp, balance, eth_height, last_update)
(SELECT address, chain, dapp, balance, eth_height, last_update FROM temp_balances)
ON CONFLICT ON CONSTRAINT balances_address_chain_dapp_uindex DO UPDATE
SET balance = excluded.balance, eth_height = excluded.eth_height
SET balance = excluded.balance, eth_height = excluded.eth_height, last_update = (CASE WHEN excluded.balance <> balances.balance THEN excluded.last_update ELSE balances.last_update END)
WHERE excluded.eth_height > balances.eth_height
""" # type: ignore[arg-type]
)
Expand All @@ -174,3 +178,12 @@ def update_balances(
# tends to reuse connections. Dropping the table here guarantees it will not be present
# on the next run.
session.execute("DROP TABLE temp_balances") # type: ignore[arg-type]


def get_updated_balance_accounts(session: DbSession, last_update: dt.datetime):
select_stmt = (
select(AlephBalanceDb.address)
.where(AlephBalanceDb.last_update >= last_update)
.distinct()
)
return (session.execute(select_stmt)).scalars().all()
57 changes: 56 additions & 1 deletion src/aleph/db/accessors/cost.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
from typing import Iterable, List, Optional

from aleph_message.models import PaymentType
from sqlalchemy import func, select
from sqlalchemy import asc, delete, func, select
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.sql import Insert

from aleph.db.models import ChainTxDb, message_confirmations
from aleph.db.models.account_costs import AccountCostsDb
from aleph.db.models.messages import MessageStatusDb
from aleph.toolkit.costs import format_cost
from aleph.types.db_session import DbSession
from aleph.types.message_status import MessageStatus


def get_total_cost_for_address(
Expand All @@ -35,6 +38,40 @@ def get_total_cost_for_address(
return format_cost(Decimal(total_cost or 0))


def get_total_costs_for_address_grouped_by_message(
session: DbSession,
address: str,
payment_type: Optional[PaymentType] = PaymentType.hold,
):
total_prop = (
AccountCostsDb.cost_hold
if payment_type == PaymentType.hold
else AccountCostsDb.cost_stream
)

id_field = func.min(AccountCostsDb.id)

select_stmt = (
select(
AccountCostsDb.item_hash, ChainTxDb.height, func.sum(total_prop), id_field
)
.select_from(AccountCostsDb)
.join(
message_confirmations,
message_confirmations.c.item_hash == AccountCostsDb.item_hash,
)
.join(ChainTxDb, message_confirmations.c.tx_hash == ChainTxDb.hash)
.where(
(AccountCostsDb.owner == address)
& (AccountCostsDb.payment_type == payment_type)
)
.group_by(AccountCostsDb.item_hash, ChainTxDb.height)
.order_by(asc(id_field))
)

return (session.execute(select_stmt)).all()


def get_message_costs(session: DbSession, item_hash: str) -> Iterable[AccountCostsDb]:
select_stmt = select(AccountCostsDb).where(AccountCostsDb.item_hash == item_hash)
return (session.execute(select_stmt)).scalars().all()
Expand All @@ -59,3 +96,21 @@ def make_costs_upsert_query(costs: List[AccountCostsDb]) -> Insert:
"cost_stream": upsert_stmt.excluded.cost_stream,
},
)


def delete_costs_for_message(session: DbSession, item_hash: str) -> None:
delete_stmt = delete(AccountCostsDb).where(AccountCostsDb.item_hash == item_hash)
session.execute(delete_stmt)


def delete_costs_for_forgotten_and_deleted_messages(session: DbSession) -> None:
delete_stmt = (
delete(AccountCostsDb)
.where(AccountCostsDb.item_hash == MessageStatusDb.item_hash)
.where(
(MessageStatusDb.status == MessageStatus.FORGOTTEN)
| (MessageStatusDb.status == MessageStatus.REMOVED)
)
.execution_options(synchronize_session=False)
)
session.execute(delete_stmt)
31 changes: 31 additions & 0 deletions src/aleph/db/accessors/cron_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import datetime as dt
from typing import List, Optional

from sqlalchemy import delete, select, update

from aleph.db.models.cron_jobs import CronJobDb
from aleph.types.db_session import DbSession


def get_cron_jobs(session: DbSession) -> List[CronJobDb]:
select_stmt = select(CronJobDb)

return (session.execute(select_stmt)).scalars().all()


def get_cron_job(session: DbSession, id: str) -> Optional[CronJobDb]:
select_stmt = select(CronJobDb).where(CronJobDb.id == id)

return (session.execute(select_stmt)).scalar_one_or_none()


def update_cron_job(session: DbSession, id: str, last_run: dt.datetime) -> None:
update_stmt = update(CronJobDb).values(last_run=last_run).where(CronJobDb.id == id)

session.execute(update_stmt)


def delete_cron_job(session: DbSession, id: str) -> None:
delete_stmt = delete(CronJobDb).where(CronJobDb.id == id)

session.execute(delete_stmt)
83 changes: 82 additions & 1 deletion src/aleph/db/accessors/files.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import datetime as dt
from typing import Collection, Iterable, Optional, Tuple
from typing import Collection, Iterable, Optional, Tuple, Union

from sqlalchemy import delete, func, select
from sqlalchemy.dialects.postgresql import insert
Expand All @@ -8,6 +8,7 @@
from aleph.types.db_session import DbSession
from aleph.types.files import FileTag, FileType
from aleph.types.sort_order import SortOrder
from aleph.utils import make_file_tag

from ..models.files import (
ContentFilePinDb,
Expand Down Expand Up @@ -112,16 +113,92 @@ def insert_grace_period_file_pin(
file_hash: str,
created: dt.datetime,
delete_by: dt.datetime,
item_hash: Optional[str] = None,
owner: Optional[str] = None,
ref: Optional[str] = None,
) -> None:
insert_stmt = insert(GracePeriodFilePinDb).values(
item_hash=item_hash,
file_hash=file_hash,
owner=owner,
ref=ref,
created=created,
type=FilePinType.GRACE_PERIOD,
delete_by=delete_by,
)
session.execute(insert_stmt)


# TODO: Improve performance
def update_file_pin_grace_period(
session: DbSession,
item_hash: str,
delete_by: Union[dt.datetime, None],
) -> None:
if delete_by is None:
delete_stmt = (
delete(GracePeriodFilePinDb)
.where(GracePeriodFilePinDb.item_hash == item_hash)
.returning(
GracePeriodFilePinDb.file_hash,
GracePeriodFilePinDb.owner,
GracePeriodFilePinDb.ref,
GracePeriodFilePinDb.created,
)
)

grace_period = session.execute(delete_stmt).first()
if grace_period is None:
return

file_hash, owner, ref, created = grace_period

insert_message_file_pin(
session=session,
item_hash=item_hash,
file_hash=file_hash,
owner=owner,
ref=ref,
created=created,
)
else:
delete_stmt = (
delete(MessageFilePinDb)
.where(MessageFilePinDb.item_hash == item_hash)
.returning(
MessageFilePinDb.file_hash,
MessageFilePinDb.owner,
MessageFilePinDb.ref,
MessageFilePinDb.created,
)
)

message_pin = session.execute(delete_stmt).first()
if message_pin is None:
return

file_hash, owner, ref, created = message_pin

insert_grace_period_file_pin(
session=session,
item_hash=item_hash,
file_hash=file_hash,
owner=owner,
ref=ref,
created=created,
delete_by=delete_by,
)

refresh_file_tag(
session=session,
tag=make_file_tag(
owner=owner,
ref=ref,
item_hash=item_hash,
),
)


def delete_grace_period_file_pins(session: DbSession, datetime: dt.datetime) -> None:
delete_stmt = delete(GracePeriodFilePinDb).where(
GracePeriodFilePinDb.delete_by < datetime
Expand Down Expand Up @@ -213,6 +290,10 @@ def get_file_tag(session: DbSession, tag: FileTag) -> Optional[FileTagDb]:
return session.execute(select_stmt).scalar()


def file_pin_exists(session: DbSession, item_hash: str) -> bool:
return FilePinDb.exists(session=session, where=FilePinDb.item_hash == item_hash)


def file_tag_exists(session: DbSession, tag: FileTag) -> bool:
return FileTagDb.exists(session=session, where=FileTagDb.tag == tag)

Expand Down
Loading
Loading