Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 94 additions & 2 deletions docs/subscriptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ As of v7.7.0, web3.py includes some additional convenient subscription managemen
1.) The subscription_manager
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

First, your ``w3`` (``AsyncWeb3``) instance now includes a new module, ``subscription_manager``. While you may still use the ``w3.eth.subscribe`` method from the previous example, the ``subscription_manager`` offers an additional way to start one or more subscriptions. We're going to pass in a list of events we want to subscribe to within the ``w3.subscription_manager.subscribe`` method.
The ``w3`` (``AsyncWeb3``) instance has a ``subscription_manager`` module. While you may
still use the ``w3.eth.subscribe`` method from the previous example, the
``subscription_manager`` offers an additional way to start one or more subscriptions and
provides better management of those subscriptions. We're going to pass in a list of
events we want to subscribe to within the ``w3.subscription_manager.subscribe`` method.

.. code-block:: python

Expand All @@ -71,13 +75,15 @@ To aid in defining those subscriptions, subscription type classes have been intr

sub1 = NewHeadsSubscription(
label="new-heads-mainnet", # optional label
handler=new_heads_handler
handler=new_heads_handler,
)

sub2 = PendingTxSubscription(
label="pending-tx-mainnet", # optional label
full_transactions=True,
handler=pending_tx_handler,
# optional parallelization flag (see Parallelizing subscriptions section below)
parallelize=True,
)

sub3 = LogsSubscription(
Expand Down Expand Up @@ -215,6 +221,92 @@ Let's put all the pieces together. This example will subscribe to new block head
asyncio.run(sub_manager())


Parallelizing subscriptions
---------------------------

.. important::

Parallelizing subscriptions does not guarantee that events will be processed in the
order they are received. Most events should still be processed in the order they are
received, but if a particular handler takes a long time to execute, newer events may
be processed first. It is recommended to set the ``parallelize`` flag to ``False``
(default behavior) for subscriptions that depend on the order of events.


If you have multiple subscriptions that can be processed in parallel, you can set the
``parallelize`` flag to ``True`` - either globally on the subscription manager, or
individually on each subscription. This control allows the subscription manager to
handle subscription processing concurrently. This flag can be set on the manager, as a
global setting, or on individual subscriptions. This can help with performance if
subscriptions are independent of each other, or do not rely on some external shared
state (no race conditions are present).

Global parallelization is off by default, meaning all subscriptions will be processed
sequentially unless you set the ``parallelize`` flag to ``True`` on the subscription
manager or individual subscriptions.

.. code-block:: python

sub1 = NewHeadsSubscription(
label="new-heads-mainnet",
handler=new_heads_handler,
parallelize=True, # process this subscription in parallel
)

sub2 = LogsSubscription(
label="WETH transfers",
address=weth_contract.address,
topics=[weth_contract.events.Transfer().topic],
handler=log_handler,
parallelize=False, # process sequentially (this is the default behavior)
)

sub3 = LogsSubscription(
label="WETH approvals",
address=weth_contract.address,
topics=[weth_contract.events.Approval().topic],
handler=approval_handler,
parallelize=True, # process this subscription in parallel
)

await w3.subscription_manager.subscribe([sub1, sub2])

Global parallelization can also be set on the subscription manager, which will apply to
all subscriptions unless overridden by an individual subscription's ``parallelize``
flag:

.. code-block:: python

# or set the parallelize flag globally on the subscription manager:
w3.subscription_manager.parallelize = True

# parallelize is set globally, so this will be processed in parallel
sub1 = NewHeadsSubscription(
label="new-heads-mainnet",
handler=new_heads_handler,
)

# this will be processed sequentially since ``parallelize`` is set to ``False``,
# overriding the global setting
sub2 = LogsSubscription(
label="WETH transfers",
address=weth_contract.address,
topics=[weth_contract.events.Transfer().topic],
handler=log_handler,
parallelize=False, # process sequentially
)

# this will also be processed in parallel
sub3 = LogsSubscription(
label="WETH approvals",
address=weth_contract.address,
topics=[weth_contract.events.Approval().topic],
handler=approval_handler,
)

await w3.subscription_manager.subscribe([sub1, sub2, sub3])


FAQ
---

Expand Down
1 change: 1 addition & 0 deletions newsfragments/3709.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support parallelization of subscription handling globally via the subscription manager ``parallelize`` flag, and on a per-subscription basis via the ``parallelize`` flag on the subscription itself.
Loading