Skip to content

Commit 2187524

Browse files
committed
feat: Better handling of REMOVING status + balance update fixes + cost deletion on FORGOTTEN / REMOVED messages
1 parent 42e2a3b commit 2187524

File tree

15 files changed

+293
-177
lines changed

15 files changed

+293
-177
lines changed

src/aleph/config.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ def get_defaults():
4141
"cron": {
4242
# Interval between cron job trackers runs, expressed in hours.
4343
"period": 0.5, # 30 mins
44-
# "period": 0.002, # 10 seconds
4544
},
4645
},
4746
"cache": {

src/aleph/db/accessors/balances.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from sqlalchemy.sql import Select
99

1010
from aleph.db.models import AlephBalanceDb
11+
from aleph.toolkit.timestamp import utc_now
1112
from aleph.types.db_session import DbSession
1213

1314

@@ -141,6 +142,8 @@ def update_balances(
141142
table from the temporary one.
142143
"""
143144

145+
last_update = utc_now()
146+
144147
session.execute(
145148
"CREATE TEMPORARY TABLE temp_balances AS SELECT * FROM balances WITH NO DATA" # type: ignore[arg-type]
146149
)
@@ -152,21 +155,21 @@ def update_balances(
152155
csv_balances = StringIO(
153156
"\n".join(
154157
[
155-
f"{address};{chain.value};{dapp or ''};{balance};{eth_height}"
158+
f"{address};{chain.value};{dapp or ''};{balance};{eth_height};{last_update}"
156159
for address, balance in balances.items()
157160
]
158161
)
159162
)
160163
cursor.copy_expert(
161-
"COPY temp_balances(address, chain, dapp, balance, eth_height) FROM STDIN WITH CSV DELIMITER ';'",
164+
"COPY temp_balances(address, chain, dapp, balance, eth_height, last_update) FROM STDIN WITH CSV DELIMITER ';'",
162165
csv_balances,
163166
)
164167
session.execute(
165168
"""
166-
INSERT INTO balances(address, chain, dapp, balance, eth_height)
167-
(SELECT address, chain, dapp, balance, eth_height FROM temp_balances)
169+
INSERT INTO balances(address, chain, dapp, balance, eth_height, last_update)
170+
(SELECT address, chain, dapp, balance, eth_height, last_update FROM temp_balances)
168171
ON CONFLICT ON CONSTRAINT balances_address_chain_dapp_uindex DO UPDATE
169-
SET balance = excluded.balance, eth_height = excluded.eth_height
172+
SET balance = excluded.balance, eth_height = excluded.eth_height, last_update = (CASE WHEN excluded.balance <> balances.balance THEN excluded.last_update ELSE balances.last_update END)
170173
WHERE excluded.eth_height > balances.eth_height
171174
""" # type: ignore[arg-type]
172175
)
@@ -177,8 +180,10 @@ def update_balances(
177180
session.execute("DROP TABLE temp_balances") # type: ignore[arg-type]
178181

179182

180-
def get_updated_balances(session: DbSession, last_update: dt.datetime):
181-
select_stmt = select(AlephBalanceDb.address, AlephBalanceDb.balance).filter(
182-
AlephBalanceDb.last_update >= last_update
183+
def get_updated_balance_accounts(session: DbSession, last_update: dt.datetime):
184+
select_stmt = (
185+
select(AlephBalanceDb.address)
186+
.where(AlephBalanceDb.last_update >= last_update)
187+
.distinct()
183188
)
184-
return (session.execute(select_stmt)).all()
189+
return (session.execute(select_stmt)).scalars().all()

src/aleph/db/accessors/cost.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@
22
from typing import Iterable, List, Optional
33

44
from aleph_message.models import PaymentType
5-
from sqlalchemy import asc, func, select
5+
from sqlalchemy import asc, delete, func, select
66
from sqlalchemy.dialects.postgresql import insert
77
from sqlalchemy.sql import Insert
88

99
from aleph.db.models import ChainTxDb, message_confirmations
1010
from aleph.db.models.account_costs import AccountCostsDb
11+
from aleph.db.models.messages import MessageStatusDb
1112
from aleph.toolkit.costs import format_cost
1213
from aleph.types.db_session import DbSession
14+
from aleph.types.message_status import MessageStatus
1315

1416

1517
def get_total_cost_for_address(
@@ -94,3 +96,21 @@ def make_costs_upsert_query(costs: List[AccountCostsDb]) -> Insert:
9496
"cost_stream": upsert_stmt.excluded.cost_stream,
9597
},
9698
)
99+
100+
101+
def delete_costs_for_message(session: DbSession, item_hash: str) -> None:
102+
delete_stmt = delete(AccountCostsDb).where(AccountCostsDb.item_hash == item_hash)
103+
session.execute(delete_stmt)
104+
105+
106+
def delete_costs_for_forgotten_and_deleted_messages(session: DbSession) -> None:
107+
delete_stmt = (
108+
delete(AccountCostsDb)
109+
.where(AccountCostsDb.item_hash == MessageStatusDb.item_hash)
110+
.where(
111+
(MessageStatusDb.status == MessageStatus.FORGOTTEN)
112+
| (MessageStatusDb.status == MessageStatus.REMOVED)
113+
)
114+
.execution_options(synchronize_session=False)
115+
)
116+
session.execute(delete_stmt)

src/aleph/db/accessors/files.py

Lines changed: 75 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from aleph.types.db_session import DbSession
99
from aleph.types.files import FileTag, FileType
1010
from aleph.types.sort_order import SortOrder
11+
from aleph.utils import make_file_tag
1112

1213
from ..models.files import (
1314
ContentFilePinDb,
@@ -112,36 +113,90 @@ def insert_grace_period_file_pin(
112113
file_hash: str,
113114
created: dt.datetime,
114115
delete_by: dt.datetime,
116+
item_hash: Optional[str] = None,
117+
owner: Optional[str] = None,
118+
ref: Optional[str] = None,
115119
) -> None:
116120
insert_stmt = insert(GracePeriodFilePinDb).values(
121+
item_hash=item_hash,
117122
file_hash=file_hash,
123+
owner=owner,
124+
ref=ref,
118125
created=created,
119126
type=FilePinType.GRACE_PERIOD,
120127
delete_by=delete_by,
121128
)
122129
session.execute(insert_stmt)
123130

124131

125-
def upsert_grace_period_file_pin(
132+
# TODO: Improve performance
133+
def update_file_pin_grace_period(
126134
session: DbSession,
127-
file_hash: str,
128-
created: dt.datetime,
135+
item_hash: str,
129136
delete_by: Union[dt.datetime, None],
130137
) -> None:
131-
insert_stmt = insert(GracePeriodFilePinDb).values(
132-
file_hash=file_hash,
133-
created=created,
134-
type=FilePinType.GRACE_PERIOD,
135-
delete_by=delete_by,
136-
)
137-
upsert_stmt = insert_stmt.on_conflict_do_update(
138-
constraint="file_pins_item_hash_type_key",
139-
set_={
140-
"created": created,
141-
"delete_by": delete_by,
142-
},
138+
if delete_by is None:
139+
delete_stmt = (
140+
delete(GracePeriodFilePinDb)
141+
.where(GracePeriodFilePinDb.item_hash == item_hash)
142+
.returning(
143+
GracePeriodFilePinDb.file_hash,
144+
GracePeriodFilePinDb.owner,
145+
GracePeriodFilePinDb.ref,
146+
GracePeriodFilePinDb.created,
147+
)
148+
)
149+
150+
grace_period = session.execute(delete_stmt).first()
151+
if grace_period is None:
152+
return
153+
154+
file_hash, owner, ref, created = grace_period
155+
156+
insert_message_file_pin(
157+
session=session,
158+
item_hash=item_hash,
159+
file_hash=file_hash,
160+
owner=owner,
161+
ref=ref,
162+
created=created,
163+
)
164+
else:
165+
delete_stmt = (
166+
delete(MessageFilePinDb)
167+
.where(MessageFilePinDb.item_hash == item_hash)
168+
.returning(
169+
MessageFilePinDb.file_hash,
170+
MessageFilePinDb.owner,
171+
MessageFilePinDb.ref,
172+
MessageFilePinDb.created,
173+
)
174+
)
175+
176+
message_pin = session.execute(delete_stmt).first()
177+
if message_pin is None:
178+
return
179+
180+
file_hash, owner, ref, created = message_pin
181+
182+
insert_grace_period_file_pin(
183+
session=session,
184+
item_hash=item_hash,
185+
file_hash=file_hash,
186+
owner=owner,
187+
ref=ref,
188+
created=created,
189+
delete_by=delete_by,
190+
)
191+
192+
refresh_file_tag(
193+
session=session,
194+
tag=make_file_tag(
195+
owner=owner,
196+
ref=ref,
197+
item_hash=item_hash,
198+
),
143199
)
144-
session.execute(upsert_stmt)
145200

146201

147202
def delete_grace_period_file_pins(session: DbSession, datetime: dt.datetime) -> None:
@@ -235,6 +290,10 @@ def get_file_tag(session: DbSession, tag: FileTag) -> Optional[FileTagDb]:
235290
return session.execute(select_stmt).scalar()
236291

237292

293+
def file_pin_exists(session: DbSession, item_hash: str) -> bool:
294+
return FilePinDb.exists(session=session, where=FilePinDb.item_hash == item_hash)
295+
296+
238297
def file_tag_exists(session: DbSession, tag: FileTag) -> bool:
239298
return FileTagDb.exists(session=session, where=FileTagDb.tag == tag)
240299

src/aleph/db/accessors/messages.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from sqlalchemy.sql import Insert, Select
1010
from sqlalchemy.sql.elements import literal
1111

12+
from aleph.db.accessors.cost import delete_costs_for_message
1213
from aleph.toolkit.timestamp import coerce_to_datetime, utc_now
1314
from aleph.types.channel import Channel
1415
from aleph.types.db_session import DbSession
@@ -49,6 +50,13 @@ def message_exists(session: DbSession, item_hash: str) -> bool:
4950
)
5051

5152

53+
def get_one_message_by_item_hash(
54+
session: DbSession, item_hash: str
55+
) -> Optional[RejectedMessageDb]:
56+
select_stmt = select(MessageDb).where(MessageDb.item_hash == item_hash)
57+
return session.execute(select_stmt).scalar_one_or_none()
58+
59+
5260
def make_matching_messages_query(
5361
hashes: Optional[Sequence[ItemHash]] = None,
5462
addresses: Optional[Sequence[str]] = None,
@@ -413,6 +421,11 @@ def forget_message(
413421
)
414422
session.execute(delete(MessageDb).where(MessageDb.item_hash == item_hash))
415423

424+
delete_costs_for_message(
425+
session=session,
426+
item_hash=item_hash,
427+
)
428+
416429

417430
def append_to_forgotten_by(
418431
session: DbSession, forgotten_message_hash: str, forget_message_hash: str

src/aleph/db/models/files.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ class FilePinDb(Base):
7777
owner = Column(String, nullable=True, index=True)
7878
item_hash = Column(String, nullable=True)
7979

80+
# Allow to recover MESSAGE pins refs marked for removing from grace period entries
81+
ref = Column(String, nullable=True)
82+
8083
file: StoredFileDb = relationship(StoredFileDb, back_populates="pins")
8184

8285
__mapper_args__: Dict[str, Any] = {
@@ -94,7 +97,7 @@ class TxFilePinDb(FilePinDb):
9497

9598

9699
class MessageFilePinDb(FilePinDb):
97-
ref = Column(String, nullable=True)
100+
# ref = Column(String, nullable=True)
98101

99102
__mapper_args__ = {
100103
"polymorphic_identity": FilePinType.MESSAGE.value,

src/aleph/handlers/content/forget.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ async def _forget_item_hash(
179179

180180
if message_status.status == MessageStatus.REJECTED:
181181
logger.info("Message %s was rejected, nothing to do.", item_hash)
182+
if message_status.status == MessageStatus.REMOVED:
183+
logger.info("Message %s was removed, nothing to do.", item_hash)
182184
if message_status.status == MessageStatus.FORGOTTEN:
183185
logger.info("Message %s is already forgotten, nothing to do.", item_hash)
184186
append_to_forgotten_by(

src/aleph/handlers/content/store.py

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import asyncio
99
import datetime as dt
1010
import logging
11-
from typing import List, Optional, Set
11+
from typing import List, Set
1212

1313
import aioipfs
1414
from aleph_message.models import ItemHash, ItemType, StoreContent
@@ -37,7 +37,7 @@
3737
from aleph.toolkit.costs import are_store_and_program_free
3838
from aleph.toolkit.timestamp import timestamp_to_datetime, utc_now
3939
from aleph.types.db_session import DbSession
40-
from aleph.types.files import FileTag, FileType
40+
from aleph.types.files import FileType
4141
from aleph.types.message_status import (
4242
FileUnavailable,
4343
InsufficientBalanceException,
@@ -46,7 +46,7 @@
4646
StoreCannotUpdateStoreWithRef,
4747
StoreRefNotFound,
4848
)
49-
from aleph.utils import item_type_from_hash
49+
from aleph.utils import item_type_from_hash, make_file_tag
5050

5151
LOGGER = logging.getLogger(__name__)
5252

@@ -60,36 +60,6 @@ def _get_store_content(message: MessageDb) -> StoreContent:
6060
return content
6161

6262

63-
def make_file_tag(owner: str, ref: Optional[str], item_hash: str) -> FileTag:
64-
"""
65-
Builds the file tag corresponding to a STORE message.
66-
67-
The file tag can be set to two different values:
68-
* if the `ref` field is not set, the tag will be set to <item_hash>.
69-
* if the `ref` field is set, two cases: if `ref` is an item hash, the tag is
70-
the value of the ref field. If it is a user-defined value, the tag is
71-
<owner>/<ref>.
72-
73-
:param owner: Owner of the file.
74-
:param ref: Value of the `ref` field of the message content.
75-
:param item_hash: Item hash of the message.
76-
:return: The computed file tag.
77-
"""
78-
79-
# When the user does not specify a ref, we use the item hash.
80-
if ref is None:
81-
return FileTag(item_hash)
82-
83-
# If ref is an item hash, return it as is
84-
try:
85-
_item_hash = ItemHash(ref)
86-
return FileTag(ref)
87-
except ValueError:
88-
pass
89-
90-
return FileTag(f"{owner}/{ref}")
91-
92-
9363
class StoreMessageHandler(ContentHandler):
9464
def __init__(self, storage_service: StorageService, grace_period: int):
9565
self.storage_service = storage_service

0 commit comments

Comments
 (0)