Skip to content

Commit 2acce93

Browse files
authored
Add ipfs.pinning configuration section for separate pinning service with backward compatibility (#859)
1 parent 3cf3fbc commit 2acce93

File tree

4 files changed

+114
-24
lines changed

4 files changed

+114
-24
lines changed

src/aleph/config.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,12 @@ def get_defaults():
171171
"ipfs": {
172172
# Whether to enable storage and communication on IPFS.
173173
"enabled": True,
174-
# Hostname of the IPFS service.
174+
# Hostname of the IPFS.
175175
"host": "ipfs",
176176
# Port of the IPFS service.
177177
"port": 5001,
178+
# scheme of the IPFS service
179+
"scheme": "http",
178180
# IPFS pubsub topic used for liveness checks.
179181
"alive_topic": "ALEPH_ALIVE",
180182
# Delay between connection attempts to other nodes on the network.
@@ -184,6 +186,17 @@ def get_defaults():
184186
"/ip4/51.159.57.71/tcp/4001/p2p/12D3KooWBH3JVSBwHLNzxv7EzniBP3tDmjJaoa3EJBF9wyhZtHt2",
185187
"/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF",
186188
],
189+
# Pinning service configuration
190+
"pinning": {
191+
# Hostname of the IPFS pinning service (if different from main IPFS).
192+
"host": None,
193+
# Port of the IPFS pinning service (if different from main IPFS).
194+
"port": 5001,
195+
# Scheme of the ipfs pinning service.
196+
"scheme": "http",
197+
# Timeout for pinning operations (seconds)
198+
"timeout": 60,
199+
},
187200
},
188201
"rabbitmq": {
189202
# Hostname of the RabbitMQ service.

src/aleph/network.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from aleph.handlers.message_handler import MessagePublisher
1010
from aleph.services.cache.node_cache import NodeCache
1111
from aleph.services.ipfs import IpfsService
12-
from aleph.services.ipfs.common import make_ipfs_client
12+
from aleph.services.ipfs.common import make_ipfs_p2p_client
1313
from aleph.services.ipfs.pubsub import incoming_channel as incoming_ipfs_channel
1414
from aleph.services.storage.fileystem_engine import FileSystemStorageEngine
1515
from aleph.storage import StorageService
@@ -46,7 +46,7 @@ async def listener_tasks(
4646
from aleph.services.p2p.protocol import incoming_channel as incoming_p2p_channel
4747

4848
# TODO: these should be passed as parameters. This module could probably be a class instead?
49-
ipfs_client = make_ipfs_client(config)
49+
ipfs_client = make_ipfs_p2p_client(config)
5050
ipfs_service = IpfsService(ipfs_client=ipfs_client)
5151
storage_service = StorageService(
5252
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),

src/aleph/services/ipfs/common.py

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,61 @@ async def get_base_url(config):
88
return "http://{}:{}".format(config.ipfs.host.value, config.ipfs.port.value)
99

1010

11-
def make_ipfs_client(config: Config, timeout: int = 60) -> aioipfs.AsyncIPFS:
12-
host = config.ipfs.host.value
13-
port = config.ipfs.port.value
14-
11+
def make_ipfs_client(
12+
host: str,
13+
port: int,
14+
timeout: int = 60,
15+
scheme: str = "http",
16+
debug_level: int = logging.WARNING,
17+
) -> aioipfs.AsyncIPFS:
1518
return aioipfs.AsyncIPFS(
1619
host=host,
1720
port=port,
21+
scheme=scheme,
1822
read_timeout=timeout,
1923
conns_max=25,
2024
conns_max_per_host=10,
21-
debug=(config.logging.level.value <= logging.DEBUG),
25+
debug=debug_level,
2226
)
2327

2428

29+
def make_ipfs_p2p_client(config: Config, timeout: int = 60) -> aioipfs.AsyncIPFS:
30+
"""Create IPFS client for P2P operations (pubsub, content retrieval)."""
31+
# Always use main IPFS config for P2P operations
32+
host = config.ipfs.host.value
33+
port = int(config.ipfs.port.value)
34+
scheme = config.ipfs.scheme.value
35+
debug_level = config.logging.level.value <= logging.DEBUG
36+
37+
return make_ipfs_client(host, port, timeout, scheme, debug_level)
38+
39+
40+
def make_ipfs_pinning_client(config: Config, timeout: int = 60) -> aioipfs.AsyncIPFS:
41+
"""Create IPFS client for pinning operations."""
42+
# Use pinning specific config if provided, otherwise use main config
43+
if (
44+
hasattr(config.ipfs, "pinning")
45+
and config.ipfs.pinning.host.value
46+
and config.ipfs.pinning.port.value
47+
):
48+
host = config.ipfs.pinning.host.value
49+
port = int(config.ipfs.pinning.port.value)
50+
scheme = config.ipfs.pinning.scheme.value
51+
else:
52+
# Use main IPFS config as fallback
53+
host = config.ipfs.host.value
54+
port = int(config.ipfs.port.value)
55+
scheme = config.ipfs.scheme.value
56+
57+
# Get pinning-specific timeout if available
58+
if hasattr(config.ipfs, "pinning") and hasattr(config.ipfs.pinning, "timeout"):
59+
timeout = int(config.ipfs.pinning.timeout.value)
60+
61+
debug_level = config.logging.level.value <= logging.DEBUG
62+
63+
return make_ipfs_client(host, port, timeout, scheme, debug_level)
64+
65+
2566
def get_cid_version(ipfs_hash: str) -> int:
2667
if ipfs_hash.startswith("Qm") and 44 <= len(ipfs_hash) <= 46: # CIDv0
2768
return 0

src/aleph/services/ipfs/service.py

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import aioipfs
99
from configmanager import Config
1010

11-
from aleph.services.ipfs.common import make_ipfs_client
11+
from aleph.services.ipfs.common import make_ipfs_p2p_client, make_ipfs_pinning_client
1212
from aleph.services.utils import get_IP
1313
from aleph.types.message_status import FileUnavailable
1414
from aleph.utils import run_in_executor
@@ -19,19 +19,36 @@
1919

2020

2121
class IpfsService:
22-
def __init__(self, ipfs_client: aioipfs.AsyncIPFS):
23-
self.ipfs_client = ipfs_client
22+
def __init__(
23+
self,
24+
ipfs_client: aioipfs.AsyncIPFS,
25+
pinning_client: Optional[aioipfs.AsyncIPFS] = None,
26+
):
27+
self.ipfs_client = ipfs_client # For P2P operations
28+
self.pinning_client = pinning_client or ipfs_client # For pinning operations
2429

2530
@classmethod
2631
def new(cls, config: Config) -> Self:
27-
ipfs_client = make_ipfs_client(config)
28-
return cls(ipfs_client=ipfs_client)
32+
# Create P2P client (for content retrieval, pubsub)
33+
p2p_client = make_ipfs_p2p_client(config)
34+
35+
# Create separate pinning client if configured differently
36+
if _should_use_separate_pinning_client(config):
37+
LOGGER.info("Using separate IPFS client for pinning operations")
38+
pinning_client = make_ipfs_pinning_client(config)
39+
else:
40+
pinning_client = p2p_client
41+
42+
return cls(ipfs_client=p2p_client, pinning_client=pinning_client)
2943

3044
async def __aenter__(self):
3145
return self
3246

3347
async def close(self):
3448
await self.ipfs_client.close()
49+
# Only close pinning client if it's different from main client
50+
if self.pinning_client != self.ipfs_client:
51+
await self.pinning_client.close()
3552

3653
async def __aexit__(self, exc_type, exc_val, exc_tb):
3754
await self.close()
@@ -183,11 +200,11 @@ async def get_json(self, hash, timeout=1, tries=1):
183200
return result
184201

185202
async def add_json(self, value: bytes) -> str:
186-
result = self.ipfs_client.add_json(value)
203+
result = self.pinning_client.add_json(value)
187204
return result["Hash"]
188205

189206
async def add_bytes(self, value: bytes, cid_version: int = 0) -> str:
190-
result = await self.ipfs_client.add_bytes(value, cid_version=cid_version)
207+
result = await self.pinning_client.add_bytes(value, cid_version=cid_version)
191208
return result["Hash"]
192209

193210
async def _pin_add(self, cid: str, timeout: int = 30):
@@ -201,7 +218,8 @@ async def _pin_add(self, cid: str, timeout: int = 30):
201218
tick_timeout = timeout * 2
202219
last_progress = None
203220

204-
async for status in self.ipfs_client.pin.add(cid):
221+
# Use pinning client instead of main client
222+
async for status in self.pinning_client.pin.add(cid):
205223
# If the Pins key appears, the file is pinned.
206224
if "Pins" in status:
207225
break
@@ -235,15 +253,13 @@ async def pin_add(self, cid: str, timeout: int = 30, tries: int = 1):
235253
break
236254

237255
async def add_file(self, file_content: bytes):
238-
url = f"{self.ipfs_client.api_url}add"
256+
"""
257+
Add a file to IPFS using bytes as data.
239258
240-
async with aiohttp.ClientSession() as session:
241-
data = aiohttp.FormData()
242-
data.add_field("path", file_content)
243-
244-
resp = await session.post(url, data=data)
245-
resp.raise_for_status()
246-
return await resp.json()
259+
This is a backward-compatible wrapper around add_bytes().
260+
Uses the pinning client for write operations.
261+
"""
262+
return await self.add_bytes(file_content)
247263

248264
async def sub(self, topic: str):
249265
ipfs_client = self.ipfs_client
@@ -269,3 +285,23 @@ async def pub(self, topic: str, message: Union[str, bytes]):
269285

270286
ipfs_client = self.ipfs_client
271287
await ipfs_client.pubsub.pub(topic, message_str)
288+
289+
290+
def _should_use_separate_pinning_client(config: Config) -> bool:
291+
"""
292+
Determine if we should use a separate IPFS client for pinning operations.
293+
Returns True if pinning configuration is different from main IPFS configuration.
294+
"""
295+
if not hasattr(config.ipfs, "pinning"):
296+
return False
297+
298+
# Check if pinning host/port are specifically configured and different
299+
pinning_host = config.ipfs.pinning.host.value
300+
pinning_port = config.ipfs.pinning.port.value
301+
302+
if pinning_host and pinning_port:
303+
main_host = config.ipfs.host.value
304+
main_port = config.ipfs.port.value
305+
return (pinning_host != main_host) or (pinning_port != main_port)
306+
307+
return False

0 commit comments

Comments
 (0)