Skip to content

Commit cc8a90f

Browse files
committed
Add documentation for parallelize flag
1 parent 7b3efc8 commit cc8a90f

File tree

1 file changed

+94
-2
lines changed

1 file changed

+94
-2
lines changed

docs/subscriptions.rst

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ As of v7.7.0, web3.py includes some additional convenient subscription managemen
4949
1.) The subscription_manager
5050
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5151

52-
First, your ``w3`` (``AsyncWeb3``) instance now includes a new module, ``subscription_manager``. While you may still use the ``w3.eth.subscribe`` method from the previous example, the ``subscription_manager`` offers an additional way to start one or more subscriptions. We're going to pass in a list of events we want to subscribe to within the ``w3.subscription_manager.subscribe`` method.
52+
The ``w3`` (``AsyncWeb3``) instance has a ``subscription_manager`` module. While you may
53+
still use the ``w3.eth.subscribe`` method from the previous example, the
54+
``subscription_manager`` offers an additional way to start one or more subscriptions and
55+
provides better management of those subscriptions. We're going to pass in a list of
56+
events we want to subscribe to within the ``w3.subscription_manager.subscribe`` method.
5357

5458
.. code-block:: python
5559
@@ -71,13 +75,15 @@ To aid in defining those subscriptions, subscription type classes have been intr
7175
7276
sub1 = NewHeadsSubscription(
7377
label="new-heads-mainnet", # optional label
74-
handler=new_heads_handler
78+
handler=new_heads_handler,
7579
)
7680
7781
sub2 = PendingTxSubscription(
7882
label="pending-tx-mainnet", # optional label
7983
full_transactions=True,
8084
handler=pending_tx_handler,
85+
# optional parallelization flag (see Parallelizing subscriptions section below)
86+
parallelize=True,
8187
)
8288
8389
sub3 = LogsSubscription(
@@ -215,6 +221,92 @@ Let's put all the pieces together. This example will subscribe to new block head
215221
asyncio.run(sub_manager())
216222
217223
224+
Parallelizing subscriptions
225+
---------------------------
226+
227+
.. important::
228+
229+
Parallelizing subscriptions does not guarantee that events will be processed in the
230+
order they are received. Most events should still be processed in the order they are
231+
received, but if a particular handler takes a long time to execute, newer events may
232+
be processed first. It is recommended to set the ``parallelize`` flag to ``False``
233+
(default behavior) for subscriptions that depend on the order of events.
234+
235+
236+
If you have multiple subscriptions that can be processed in parallel, you can set the
237+
``parallelize`` flag to ``True`` - either globally on the subscription manager, or
238+
individually on each subscription. This control allows the subscription manager to
239+
handle subscription processing concurrently. This flag can be set on the manager, as a
240+
global setting, or on individual subscriptions. This can help with performance if
241+
subscriptions are independent of each other, or do not rely on some external shared
242+
state (no race conditions are present).
243+
244+
Global parallelization is off by default, meaning all subscriptions will be processed
245+
sequentially unless you set the ``parallelize`` flag to ``True`` on the subscription
246+
manager or individual subscriptions.
247+
248+
.. code-block:: python
249+
250+
sub1 = NewHeadsSubscription(
251+
label="new-heads-mainnet",
252+
handler=new_heads_handler,
253+
parallelize=True, # process this subscription in parallel
254+
)
255+
256+
sub2 = LogsSubscription(
257+
label="WETH transfers",
258+
address=weth_contract.address,
259+
topics=[weth_contract.events.Transfer().topic],
260+
handler=log_handler,
261+
parallelize=False, # process sequentially (this is the default behavior)
262+
)
263+
264+
sub3 = LogsSubscription(
265+
label="WETH approvals",
266+
address=weth_contract.address,
267+
topics=[weth_contract.events.Approval().topic],
268+
handler=approval_handler,
269+
parallelize=True, # process this subscription in parallel
270+
)
271+
272+
await w3.subscription_manager.subscribe([sub1, sub2])
273+
274+
Global parallelization can also be set on the subscription manager, which will apply to
275+
all subscriptions unless overridden by an individual subscription's ``parallelize``
276+
flag:
277+
278+
.. code-block:: python
279+
280+
# or set the parallelize flag globally on the subscription manager:
281+
w3.subscription_manager.parallelize = True
282+
283+
# parallelize is set globally, so this will be processed in parallel
284+
sub1 = NewHeadsSubscription(
285+
label="new-heads-mainnet",
286+
handler=new_heads_handler,
287+
)
288+
289+
# this will be processed sequentially since ``parallelize`` is set to ``False``,
290+
# overriding the global setting
291+
sub2 = LogsSubscription(
292+
label="WETH transfers",
293+
address=weth_contract.address,
294+
topics=[weth_contract.events.Transfer().topic],
295+
handler=log_handler,
296+
parallelize=False, # process sequentially
297+
)
298+
299+
# this will also be processed in parallel
300+
sub3 = LogsSubscription(
301+
label="WETH approvals",
302+
address=weth_contract.address,
303+
topics=[weth_contract.events.Approval().topic],
304+
handler=approval_handler,
305+
)
306+
307+
await w3.subscription_manager.subscribe([sub1, sub2, sub3])
308+
309+
218310
FAQ
219311
---
220312

0 commit comments

Comments
 (0)