Skip to content

Commit 80df885

Browse files
authored
Fix RabbitMQ connection timeouts during long operations. (#856)
1 parent cf60ba7 commit 80df885

File tree

5 files changed

+12
-1
lines changed

5 files changed

+12
-1
lines changed

src/aleph/chains/chain_data_service.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ async def make_pending_tx_exchange(config: Config) -> aio_pika.abc.AbstractExcha
231231
port=config.rabbitmq.port.value,
232232
login=config.rabbitmq.username.value,
233233
password=config.rabbitmq.password.value,
234+
heartbeat=config.rabbitmq.heartbeat.value,
234235
)
235236
channel = await mq_conn.channel()
236237
pending_tx_exchange = await channel.declare_exchange(

src/aleph/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ def get_defaults():
204204
"pending_message_exchange": "aleph-pending-messages",
205205
# Name of the RabbitMQ exchange used for sync/message events (input of the TX processor).
206206
"pending_tx_exchange": "aleph-pending-txs",
207+
# Heartbeat interval in seconds to prevent connection timeouts during long operations.
208+
"heartbeat": 600,
207209
},
208210
"redis": {
209211
# Hostname of the Redis service.

src/aleph/jobs/job_utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ async def _make_pending_queue(
4141
port=config.rabbitmq.port.value,
4242
login=config.rabbitmq.username.value,
4343
password=config.rabbitmq.password.value,
44+
heartbeat=config.rabbitmq.heartbeat.value,
4445
)
4546
channel = await mq_conn.channel()
4647

src/aleph/jobs/process_pending_messages.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,14 @@ async def new(
6363
mq_password: str,
6464
message_exchange_name: str,
6565
pending_message_exchange_name: str,
66+
mq_heartbeat: int,
6667
):
6768
mq_conn = await aio_pika.connect_robust(
68-
host=mq_host, port=mq_port, login=mq_username, password=mq_password
69+
host=mq_host,
70+
port=mq_port,
71+
login=mq_username,
72+
password=mq_password,
73+
heartbeat=mq_heartbeat,
6974
)
7075
channel = await mq_conn.channel()
7176
mq_message_exchange = await channel.declare_exchange(
@@ -179,6 +184,7 @@ async def fetch_and_process_messages_task(config: Config):
179184
mq_password=config.rabbitmq.password.value,
180185
message_exchange_name=config.rabbitmq.message_exchange.value,
181186
pending_message_exchange_name=config.rabbitmq.pending_message_exchange.value,
187+
mq_heartbeat=config.rabbitmq.heartbeat.value,
182188
)
183189

184190
async with pending_message_processor:

src/aleph/toolkit/rabbitmq.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ async def make_mq_conn(config) -> aio_pika.abc.AbstractConnection:
77
port=config.rabbitmq.port.value,
88
login=config.rabbitmq.username.value,
99
password=config.rabbitmq.password.value,
10+
heartbeat=config.rabbitmq.heartbeat.value,
1011
)
1112
return mq_conn

0 commit comments

Comments
 (0)