Skip to content

Commit fdbb730

Browse files
nesitorAndres D. Molins
andauthored
Added balance pre-check on file pins (#799)
* Feature: To avoid saving un-needed files on IPFS, we added a balance pre-check on STORE messages before doing the message processing. * Split `verify_and_fetch` method in two and do the pre-balance-check before downloading it, only in IPFS case. * Fix: Solve missing references to old method `verify_and_fetch` and replaced by the new one `verify_message`. * Fix: Solve way of calculating the message_cost. * Fix: Solved errors and code quality issues. * Fix: Changes way of instantiate the model class. * Fix: Allow small files to be pinned by "free" * Fix: Added types compatibility * Added missing test cases for IPFS and content handler. * Fix: Solved test issues and applied good typing. * Fix: Removed redundant `if`. --------- Co-authored-by: Andres D. Molins <[email protected]>
1 parent 6f703fe commit fdbb730

File tree

9 files changed

+874
-23
lines changed

9 files changed

+874
-23
lines changed

src/aleph/handlers/content/content_handler.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,17 @@ async def process(self, session: DbSession, messages: List[MessageDb]) -> None:
4848
"""
4949
pass
5050

51+
async def pre_check_balance(self, session: DbSession, message: MessageDb) -> None:
52+
"""
53+
Checks whether the user has enough Aleph tokens before processing the message.
54+
55+
Raises InsufficientBalanceException if the balance of the user is too low.
56+
57+
:param session: DB session.
58+
:param message: Pending Message being processed.
59+
"""
60+
pass
61+
5162
async def check_balance(
5263
self, session: DbSession, message: MessageDb
5364
) -> List[AccountCostsDb] | None:

src/aleph/handlers/content/store.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import asyncio
99
import datetime as dt
1010
import logging
11+
from decimal import Decimal
1112
from typing import List, Optional, Set
1213

1314
import aioipfs
@@ -31,6 +32,7 @@
3132
from aleph.db.models.account_costs import AccountCostsDb
3233
from aleph.exceptions import AlephStorageException, UnknownHashError
3334
from aleph.handlers.content.content_handler import ContentHandler
35+
from aleph.schemas.cost_estimation_messages import CostEstimationStoreContent
3436
from aleph.services.cost import calculate_storage_size, get_total_and_detailed_costs
3537
from aleph.storage import StorageService
3638
from aleph.toolkit.constants import MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE, MiB
@@ -206,6 +208,67 @@ async def fetch_related_content(
206208
size=size,
207209
)
208210

211+
async def pre_check_balance(self, session: DbSession, message: MessageDb):
212+
content = _get_store_content(message)
213+
assert isinstance(content, StoreContent)
214+
215+
if are_store_and_program_free(message):
216+
return None
217+
218+
# This check is essential to ensure that files are not added to the system
219+
# on the current node when the configuration disables storing of files.
220+
config = get_config()
221+
ipfs_enabled = config.ipfs.enabled.value
222+
223+
current_balance = get_total_balance(session=session, address=content.address)
224+
current_cost = get_total_cost_for_address(
225+
session=session, address=content.address
226+
)
227+
228+
engine = content.item_type
229+
# Initially only do that balance pre-check for ipfs files.
230+
if engine == ItemType.ipfs and ipfs_enabled:
231+
ipfs_byte_size = await self.storage_service.ipfs_service.get_ipfs_size(
232+
content.item_hash
233+
)
234+
if ipfs_byte_size:
235+
storage_mib = Decimal(ipfs_byte_size / MiB)
236+
237+
# Allow users to pin small files
238+
if storage_mib and storage_mib <= (
239+
MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE / MiB
240+
):
241+
LOGGER.debug(
242+
f"Cost for {message.item_hash} supposed to be free as size is {storage_mib}"
243+
)
244+
return None
245+
246+
computable_content_data = {
247+
**content.model_dump(),
248+
"estimated_size_mib": int(storage_mib),
249+
}
250+
computable_content = CostEstimationStoreContent.model_validate(
251+
computable_content_data
252+
)
253+
254+
message_cost, _ = get_total_and_detailed_costs(
255+
session, computable_content, message.item_hash
256+
)
257+
else:
258+
message_cost = Decimal(0)
259+
else:
260+
message_cost = Decimal(0)
261+
262+
required_balance = current_cost + message_cost
263+
264+
if current_balance < required_balance:
265+
raise InsufficientBalanceException(
266+
balance=current_balance,
267+
required_balance=required_balance,
268+
)
269+
270+
return None
271+
209272
async def check_balance(
210273
self, session: DbSession, message: MessageDb
211274
) -> List[AccountCostsDb]:

src/aleph/handlers/message_handler.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -376,14 +376,12 @@ async def insert_costs(
376376
insert_stmt = make_costs_upsert_query(costs)
377377
session.execute(insert_stmt)
378378

379-
async def verify_and_fetch(
380-
self, session: DbSession, pending_message: PendingMessageDb
381-
) -> MessageDb:
379+
async def verify_message(self, pending_message: PendingMessageDb) -> MessageDb:
382380
await self.verify_signature(pending_message=pending_message)
383381
validated_message = await self.fetch_pending_message(
384382
pending_message=pending_message
385383
)
386-
await self.fetch_related_content(session=session, message=validated_message)
384+
387385
return validated_message
388386

389387
async def process(
@@ -430,11 +428,16 @@ async def process(
430428
error_code=ErrorCode.FORGOTTEN_DUPLICATE,
431429
)
432430

433-
message = await self.verify_and_fetch(
434-
session=session, pending_message=pending_message
435-
)
431+
# First check the message content and verify it
432+
message = await self.verify_message(pending_message=pending_message)
436433

434+
# Do a balance pre-check to avoid saving related data
437435
content_handler = self.get_content_handler(message.type)
436+
await content_handler.pre_check_balance(session=session, message=message)
437+
438+
# Fetch related content like the IPFS associated file
439+
await self.fetch_related_content(session=session, message=message)
440+
438441
await content_handler.check_dependencies(session=session, message=message)
439442
await content_handler.check_permissions(session=session, message=message)
440443
costs = await content_handler.check_balance(session=session, message=message)

src/aleph/jobs/fetch_pending_messages.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ def __init__(
5555
async def fetch_pending_message(self, pending_message: PendingMessageDb):
5656
with self.session_factory() as session:
5757
try:
58-
message = await self.message_handler.verify_and_fetch(
59-
session=session, pending_message=pending_message
58+
message = await self.message_handler.verify_message(
59+
pending_message=pending_message
6060
)
6161
session.execute(
6262
make_pending_message_fetched_statement(

src/aleph/services/cost.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,10 @@ def _get_product_price_type(
152152
settings: Settings,
153153
price_aggregate: Union[AggregateDb, dict],
154154
) -> ProductPriceType:
155-
if isinstance(content, StoreContent):
155+
if isinstance(content, (StoreContent, CostEstimationStoreContent)):
156156
return ProductPriceType.STORAGE
157157

158-
if isinstance(content, ProgramContent):
158+
if isinstance(content, (ProgramContent, CostEstimationProgramContent)):
159159
is_on_demand = not content.on.persistent
160160
return (
161161
ProductPriceType.PROGRAM
@@ -225,7 +225,7 @@ def _get_nb_compute_units(
225225
def _get_compute_unit_multiplier(content: CostComputableContent) -> int:
226226
compute_unit_multiplier = 1
227227
if (
228-
isinstance(content, ProgramContent)
228+
isinstance(content, (ProgramContent, CostEstimationProgramContent))
229229
and not content.on.persistent
230230
and content.environment.internet
231231
):
@@ -289,7 +289,7 @@ def _get_execution_volumes_costs(
289289
) -> List[AccountCostsDb]:
290290
volumes: List[RefVolume | SizedVolume] = []
291291

292-
if isinstance(content, InstanceContent):
292+
if isinstance(content, (InstanceContent, CostEstimationInstanceContent)):
293293
volumes.append(
294294
SizedVolume(
295295
CostType.EXECUTION_INSTANCE_VOLUME_ROOTFS,
@@ -298,7 +298,7 @@ def _get_execution_volumes_costs(
298298
)
299299
)
300300

301-
elif isinstance(content, ProgramContent):
301+
elif isinstance(content, (ProgramContent, CostEstimationProgramContent)):
302302
if (
303303
isinstance(content, CostEstimationProgramContent)
304304
and content.code.estimated_size_mib
@@ -365,7 +365,7 @@ def _get_execution_volumes_costs(
365365
# or with same values for different volumes causing unique key constraint errors
366366
name_prefix = f"#{i}"
367367

368-
if isinstance(volume, ImmutableVolume):
368+
if isinstance(volume, (ImmutableVolume, CostEstimationImmutableVolume)):
369369
name = (
370370
f"{name_prefix}:{volume.mount or CostType.EXECUTION_VOLUME_INMUTABLE}"
371371
)
@@ -582,7 +582,7 @@ def get_detailed_costs(
582582
settings = settings or _get_settings(session)
583583
pricing = pricing or _get_product_price(session, content, settings)
584584

585-
if isinstance(content, StoreContent):
585+
if isinstance(content, (StoreContent, CostEstimationStoreContent)):
586586
return _calculate_storage_costs(session, content, pricing, item_hash)
587587
else:
588588
return _calculate_executable_costs(session, content, pricing, item_hash)

src/aleph/services/ipfs/service.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,73 @@ async def get_public_address(self):
5757
if "127.0.0.1" in address and "/tcp" in address and "/p2p" in address:
5858
return address.replace("127.0.0.1", public_ip)
5959

60+
async def get_ipfs_size(
61+
self, hash: str, timeout: int = 1, tries: int = 1
62+
) -> Optional[int]:
63+
try_count = 0
64+
result = None
65+
while (result is None) and (try_count < tries):
66+
try_count += 1
67+
try:
68+
dag_node = await asyncio.wait_for(
69+
self.ipfs_client.dag.get(hash), timeout=timeout
70+
)
71+
if isinstance(dag_node, dict):
72+
if "Data" in dag_node and isinstance(dag_node["Data"], dict):
73+
# This is the common structure for UnixFS nodes after aioipfs parsing
74+
if "filesize" in dag_node["Data"]:
75+
result = dag_node["Data"]["filesize"]
76+
elif (
77+
"Tsize" in dag_node["Data"]
78+
): # Less common, but good to check
79+
result = dag_node["Data"]["Tsize"]
80+
elif (
81+
"Tsize" in dag_node
82+
): # Sometimes it might be at the top level directly
83+
result = dag_node["Tsize"]
84+
elif "Links" in dag_node and isinstance(dag_node["Links"], list):
85+
total_size = 0
86+
for link in dag_node["Links"]:
87+
# In case it's a link list, get the Tsize property if exists
88+
if "Tsize" in link and isinstance(link["Tsize"], int):
89+
total_size += link["Tsize"]
90+
else:
91+
LOGGER.error(
92+
f"Error: CID {hash} did not return a list structure. Type: {type(link)}"
93+
)
94+
result = total_size
95+
96+
elif (
97+
"Size" in dag_node
98+
): # Occasionally, 'Size' might refer to total size, but often it's block
99+
# size
100+
result = dag_node["Size"]
101+
else:
102+
# For raw blocks, dag_node might be bytes. block.stat is better for those.
103+
# For other codecs, the structure will vary.
104+
LOGGER.info(
105+
f"Warning: CID {hash} did not return a dictionary structure. Type: {type(dag_node)}"
106+
)
107+
block_stat = await asyncio.wait_for(
108+
self.ipfs_client.block.stat(hash), timeout=timeout
109+
)
110+
result = block_stat["Size"]
111+
except aioipfs.APIError:
112+
result = None
113+
await asyncio.sleep(0.5)
114+
continue
115+
except asyncio.TimeoutError:
116+
result = None
117+
await asyncio.sleep(0.5)
118+
except (
119+
concurrent.futures.CancelledError,
120+
aiohttp.client_exceptions.ClientConnectorError,
121+
):
122+
try_count -= 1 # do not count as a try.
123+
await asyncio.sleep(0.1)
124+
125+
return result
126+
60127
async def get_ipfs_content(
61128
self, hash: str, timeout: int = 1, tries: int = 1
62129
) -> Optional[bytes]:

0 commit comments

Comments
 (0)