Skip to content

Commit 969a742

Browse files
authored
Feat: Add automated balance checking and message lifecycle management (#798)
* WIP feat: Add a job that check changes in account balances to perform storage release and vm execution detaching * Balance tracker WIP * fix: set balance job interval to 1 hour * feat: Better handling of REMOVING status + balance update fixes + cost deletion on FORGOTTEN / REMOVED messages * fix: lint fixes + allow to forget messages that are in a removing state * fix: update balance clean cutoff height to 22196000 * fix: consider REMOVING status as PROCESSED for price calculation endpoint
1 parent fdbb730 commit 969a742

File tree

27 files changed

+1340
-83
lines changed

27 files changed

+1340
-83
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""empty message
2+
3+
Revision ID: 8ece21fbeb47
4+
Revises: 1c06d0ade60c
5+
Create Date: 2025-03-18 09:58:57.469799
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
from sqlalchemy.sql import func
11+
12+
13+
# revision identifiers, used by Alembic.
14+
revision = '8ece21fbeb47'
15+
down_revision = '1c06d0ade60c'
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade() -> None:
21+
op.add_column(
22+
"balances", sa.Column("last_update", sa.TIMESTAMP(
23+
timezone=True), nullable=False, server_default=func.now(), onupdate=func.now())
24+
)
25+
26+
op.create_table(
27+
"cron_jobs",
28+
sa.Column("id", sa.String(), nullable=False),
29+
# Interval is specified in seconds
30+
sa.Column("interval", sa.Integer(), nullable=False, default=24),
31+
sa.Column("last_run", sa.TIMESTAMP(timezone=True), nullable=False),
32+
sa.PrimaryKeyConstraint("id"),
33+
)
34+
35+
op.execute(
36+
"""
37+
INSERT INTO cron_jobs(id, interval, last_run) VALUES ('balance', 3600, '2025-01-01 00:00:00')
38+
"""
39+
)
40+
41+
pass
42+
43+
44+
def downgrade() -> None:
45+
op.drop_column("balances", "last_update")
46+
47+
op.drop_table("cron_jobs")
48+
49+
pass

src/aleph/commands.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
from aleph.db.connection import make_db_url, make_engine, make_session_factory
3030
from aleph.exceptions import InvalidConfigException, KeyNotFoundException
3131
from aleph.jobs import start_jobs
32+
from aleph.jobs.cron.balance_job import BalanceCronJob
33+
from aleph.jobs.cron.cron_job import CronJob, cron_job_task
3234
from aleph.network import listener_tasks
3335
from aleph.services import p2p
3436
from aleph.services.cache.materialized_views import refresh_cache_materialized_views
@@ -147,6 +149,10 @@ async def main(args: List[str]) -> None:
147149
garbage_collector = GarbageCollector(
148150
session_factory=session_factory, storage_service=storage_service
149151
)
152+
cron_job = CronJob(
153+
session_factory=session_factory,
154+
jobs={"balance": BalanceCronJob(session_factory=session_factory)},
155+
)
150156
chain_data_service = ChainDataService(
151157
session_factory=session_factory,
152158
storage_service=storage_service,
@@ -203,6 +209,10 @@ async def main(args: List[str]) -> None:
203209
)
204210
LOGGER.debug("Initialized garbage collector task")
205211

212+
LOGGER.debug("Initializing cron job task")
213+
tasks.append(cron_job_task(config=config, cron_job=cron_job))
214+
LOGGER.debug("Initialized cron job task")
215+
206216
LOGGER.debug("Running event loop")
207217
await asyncio.gather(*tasks)
208218

src/aleph/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ def get_defaults():
3838
# Maximum number of chain/sync events processed at the same time.
3939
"max_concurrency": 20,
4040
},
41+
"cron": {
42+
# Interval between cron job trackers runs, expressed in hours.
43+
"period": 0.5, # 30 mins
44+
},
4145
},
4246
"cache": {
4347
"ttl": {

src/aleph/db/accessors/balances.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime as dt
12
from decimal import Decimal
23
from io import StringIO
34
from typing import Dict, Mapping, Optional, Sequence
@@ -7,6 +8,7 @@
78
from sqlalchemy.sql import Select
89

910
from aleph.db.models import AlephBalanceDb
11+
from aleph.toolkit.timestamp import utc_now
1012
from aleph.types.db_session import DbSession
1113

1214

@@ -140,6 +142,8 @@ def update_balances(
140142
table from the temporary one.
141143
"""
142144

145+
last_update = utc_now()
146+
143147
session.execute(
144148
"CREATE TEMPORARY TABLE temp_balances AS SELECT * FROM balances WITH NO DATA" # type: ignore[arg-type]
145149
)
@@ -151,21 +155,21 @@ def update_balances(
151155
csv_balances = StringIO(
152156
"\n".join(
153157
[
154-
f"{address};{chain.value};{dapp or ''};{balance};{eth_height}"
158+
f"{address};{chain.value};{dapp or ''};{balance};{eth_height};{last_update}"
155159
for address, balance in balances.items()
156160
]
157161
)
158162
)
159163
cursor.copy_expert(
160-
"COPY temp_balances(address, chain, dapp, balance, eth_height) FROM STDIN WITH CSV DELIMITER ';'",
164+
"COPY temp_balances(address, chain, dapp, balance, eth_height, last_update) FROM STDIN WITH CSV DELIMITER ';'",
161165
csv_balances,
162166
)
163167
session.execute(
164168
"""
165-
INSERT INTO balances(address, chain, dapp, balance, eth_height)
166-
(SELECT address, chain, dapp, balance, eth_height FROM temp_balances)
169+
INSERT INTO balances(address, chain, dapp, balance, eth_height, last_update)
170+
(SELECT address, chain, dapp, balance, eth_height, last_update FROM temp_balances)
167171
ON CONFLICT ON CONSTRAINT balances_address_chain_dapp_uindex DO UPDATE
168-
SET balance = excluded.balance, eth_height = excluded.eth_height
172+
SET balance = excluded.balance, eth_height = excluded.eth_height, last_update = (CASE WHEN excluded.balance <> balances.balance THEN excluded.last_update ELSE balances.last_update END)
169173
WHERE excluded.eth_height > balances.eth_height
170174
""" # type: ignore[arg-type]
171175
)
@@ -174,3 +178,12 @@ def update_balances(
174178
# tends to reuse connections. Dropping the table here guarantees it will not be present
175179
# on the next run.
176180
session.execute("DROP TABLE temp_balances") # type: ignore[arg-type]
181+
182+
183+
def get_updated_balance_accounts(session: DbSession, last_update: dt.datetime):
184+
select_stmt = (
185+
select(AlephBalanceDb.address)
186+
.where(AlephBalanceDb.last_update >= last_update)
187+
.distinct()
188+
)
189+
return (session.execute(select_stmt)).scalars().all()

src/aleph/db/accessors/cost.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22
from typing import Iterable, List, Optional
33

44
from aleph_message.models import PaymentType
5-
from sqlalchemy import func, select
5+
from sqlalchemy import asc, delete, func, select
66
from sqlalchemy.dialects.postgresql import insert
77
from sqlalchemy.sql import Insert
88

9+
from aleph.db.models import ChainTxDb, message_confirmations
910
from aleph.db.models.account_costs import AccountCostsDb
11+
from aleph.db.models.messages import MessageStatusDb
1012
from aleph.toolkit.costs import format_cost
1113
from aleph.types.db_session import DbSession
14+
from aleph.types.message_status import MessageStatus
1215

1316

1417
def get_total_cost_for_address(
@@ -35,6 +38,40 @@ def get_total_cost_for_address(
3538
return format_cost(Decimal(total_cost or 0))
3639

3740

41+
def get_total_costs_for_address_grouped_by_message(
42+
session: DbSession,
43+
address: str,
44+
payment_type: Optional[PaymentType] = PaymentType.hold,
45+
):
46+
total_prop = (
47+
AccountCostsDb.cost_hold
48+
if payment_type == PaymentType.hold
49+
else AccountCostsDb.cost_stream
50+
)
51+
52+
id_field = func.min(AccountCostsDb.id)
53+
54+
select_stmt = (
55+
select(
56+
AccountCostsDb.item_hash, ChainTxDb.height, func.sum(total_prop), id_field
57+
)
58+
.select_from(AccountCostsDb)
59+
.join(
60+
message_confirmations,
61+
message_confirmations.c.item_hash == AccountCostsDb.item_hash,
62+
)
63+
.join(ChainTxDb, message_confirmations.c.tx_hash == ChainTxDb.hash)
64+
.where(
65+
(AccountCostsDb.owner == address)
66+
& (AccountCostsDb.payment_type == payment_type)
67+
)
68+
.group_by(AccountCostsDb.item_hash, ChainTxDb.height)
69+
.order_by(asc(id_field))
70+
)
71+
72+
return (session.execute(select_stmt)).all()
73+
74+
3875
def get_message_costs(session: DbSession, item_hash: str) -> Iterable[AccountCostsDb]:
3976
select_stmt = select(AccountCostsDb).where(AccountCostsDb.item_hash == item_hash)
4077
return (session.execute(select_stmt)).scalars().all()
@@ -59,3 +96,21 @@ def make_costs_upsert_query(costs: List[AccountCostsDb]) -> Insert:
5996
"cost_stream": upsert_stmt.excluded.cost_stream,
6097
},
6198
)
99+
100+
101+
def delete_costs_for_message(session: DbSession, item_hash: str) -> None:
102+
delete_stmt = delete(AccountCostsDb).where(AccountCostsDb.item_hash == item_hash)
103+
session.execute(delete_stmt)
104+
105+
106+
def delete_costs_for_forgotten_and_deleted_messages(session: DbSession) -> None:
107+
delete_stmt = (
108+
delete(AccountCostsDb)
109+
.where(AccountCostsDb.item_hash == MessageStatusDb.item_hash)
110+
.where(
111+
(MessageStatusDb.status == MessageStatus.FORGOTTEN)
112+
| (MessageStatusDb.status == MessageStatus.REMOVED)
113+
)
114+
.execution_options(synchronize_session=False)
115+
)
116+
session.execute(delete_stmt)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import datetime as dt
2+
from typing import List, Optional
3+
4+
from sqlalchemy import delete, select, update
5+
6+
from aleph.db.models.cron_jobs import CronJobDb
7+
from aleph.types.db_session import DbSession
8+
9+
10+
def get_cron_jobs(session: DbSession) -> List[CronJobDb]:
11+
select_stmt = select(CronJobDb)
12+
13+
return (session.execute(select_stmt)).scalars().all()
14+
15+
16+
def get_cron_job(session: DbSession, id: str) -> Optional[CronJobDb]:
17+
select_stmt = select(CronJobDb).where(CronJobDb.id == id)
18+
19+
return (session.execute(select_stmt)).scalar_one_or_none()
20+
21+
22+
def update_cron_job(session: DbSession, id: str, last_run: dt.datetime) -> None:
23+
update_stmt = update(CronJobDb).values(last_run=last_run).where(CronJobDb.id == id)
24+
25+
session.execute(update_stmt)
26+
27+
28+
def delete_cron_job(session: DbSession, id: str) -> None:
29+
delete_stmt = delete(CronJobDb).where(CronJobDb.id == id)
30+
31+
session.execute(delete_stmt)

src/aleph/db/accessors/files.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import datetime as dt
2-
from typing import Collection, Iterable, Optional, Tuple
2+
from typing import Collection, Iterable, Optional, Tuple, Union
33

44
from sqlalchemy import delete, func, select
55
from sqlalchemy.dialects.postgresql import insert
@@ -8,6 +8,7 @@
88
from aleph.types.db_session import DbSession
99
from aleph.types.files import FileTag, FileType
1010
from aleph.types.sort_order import SortOrder
11+
from aleph.utils import make_file_tag
1112

1213
from ..models.files import (
1314
ContentFilePinDb,
@@ -112,16 +113,92 @@ def insert_grace_period_file_pin(
112113
file_hash: str,
113114
created: dt.datetime,
114115
delete_by: dt.datetime,
116+
item_hash: Optional[str] = None,
117+
owner: Optional[str] = None,
118+
ref: Optional[str] = None,
115119
) -> None:
116120
insert_stmt = insert(GracePeriodFilePinDb).values(
121+
item_hash=item_hash,
117122
file_hash=file_hash,
123+
owner=owner,
124+
ref=ref,
118125
created=created,
119126
type=FilePinType.GRACE_PERIOD,
120127
delete_by=delete_by,
121128
)
122129
session.execute(insert_stmt)
123130

124131

132+
# TODO: Improve performance
133+
def update_file_pin_grace_period(
134+
session: DbSession,
135+
item_hash: str,
136+
delete_by: Union[dt.datetime, None],
137+
) -> None:
138+
if delete_by is None:
139+
delete_stmt = (
140+
delete(GracePeriodFilePinDb)
141+
.where(GracePeriodFilePinDb.item_hash == item_hash)
142+
.returning(
143+
GracePeriodFilePinDb.file_hash,
144+
GracePeriodFilePinDb.owner,
145+
GracePeriodFilePinDb.ref,
146+
GracePeriodFilePinDb.created,
147+
)
148+
)
149+
150+
grace_period = session.execute(delete_stmt).first()
151+
if grace_period is None:
152+
return
153+
154+
file_hash, owner, ref, created = grace_period
155+
156+
insert_message_file_pin(
157+
session=session,
158+
item_hash=item_hash,
159+
file_hash=file_hash,
160+
owner=owner,
161+
ref=ref,
162+
created=created,
163+
)
164+
else:
165+
delete_stmt = (
166+
delete(MessageFilePinDb)
167+
.where(MessageFilePinDb.item_hash == item_hash)
168+
.returning(
169+
MessageFilePinDb.file_hash,
170+
MessageFilePinDb.owner,
171+
MessageFilePinDb.ref,
172+
MessageFilePinDb.created,
173+
)
174+
)
175+
176+
message_pin = session.execute(delete_stmt).first()
177+
if message_pin is None:
178+
return
179+
180+
file_hash, owner, ref, created = message_pin
181+
182+
insert_grace_period_file_pin(
183+
session=session,
184+
item_hash=item_hash,
185+
file_hash=file_hash,
186+
owner=owner,
187+
ref=ref,
188+
created=created,
189+
delete_by=delete_by,
190+
)
191+
192+
refresh_file_tag(
193+
session=session,
194+
tag=make_file_tag(
195+
owner=owner,
196+
ref=ref,
197+
item_hash=item_hash,
198+
),
199+
)
200+
201+
125202
def delete_grace_period_file_pins(session: DbSession, datetime: dt.datetime) -> None:
126203
delete_stmt = delete(GracePeriodFilePinDb).where(
127204
GracePeriodFilePinDb.delete_by < datetime
@@ -213,6 +290,10 @@ def get_file_tag(session: DbSession, tag: FileTag) -> Optional[FileTagDb]:
213290
return session.execute(select_stmt).scalar()
214291

215292

293+
def file_pin_exists(session: DbSession, item_hash: str) -> bool:
294+
return FilePinDb.exists(session=session, where=FilePinDb.item_hash == item_hash)
295+
296+
216297
def file_tag_exists(session: DbSession, tag: FileTag) -> bool:
217298
return FileTagDb.exists(session=session, where=FileTagDb.tag == tag)
218299

0 commit comments

Comments
 (0)