Skip to content
12 changes: 10 additions & 2 deletions tests/integration/it_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ def fund_wallet(wallet: Wallet) -> None:
client.request(LEDGER_ACCEPT_REQUEST)


async_wallet_fund_lock = asyncio.Lock()


async def fund_wallet_async(
wallet: Wallet, client: AsyncClient = ASYNC_JSON_RPC_CLIENT
) -> None:
Expand All @@ -119,8 +122,13 @@ async def fund_wallet_async(
destination=wallet.address,
amount=FUNDING_AMOUNT,
)
await sign_and_submit_async(payment, client, MASTER_WALLET, check_fee=True)
await client.request(LEDGER_ACCEPT_REQUEST)

# concurrent wallet_fund operations will attempt to advance the ledger at the same
# time. Consequently, all the funding operations fail, except for the first one.
# using a lock will serialize the access to this critical operation
async with async_wallet_fund_lock:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mvadari I found that adding a lock to this section of the code eliminated the "Account Not Found" error.

await sign_and_submit_async(payment, client, MASTER_WALLET, check_fee=True)
await client.request(LEDGER_ACCEPT_REQUEST)


# just submits a transaction to the ledger, synchronously
Expand Down
10 changes: 6 additions & 4 deletions tests/integration/reusable_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
# faster)
async def _set_up_reusable_values():
wallet = Wallet.create()
await fund_wallet_async(wallet)
destination = Wallet.create()
await fund_wallet_async(destination)
door_wallet = Wallet.create()
await fund_wallet_async(door_wallet)
witness_wallet = Wallet.create()
await fund_wallet_async(witness_wallet)
await asyncio.gather(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please stick to one change per PR - this is unrelated to this PR.

fund_wallet_async(wallet),
fund_wallet_async(destination),
fund_wallet_async(door_wallet),
fund_wallet_async(witness_wallet),
)

offer = await sign_and_reliable_submission_async(
OfferCreate(
Expand Down
54 changes: 33 additions & 21 deletions tests/integration/transactions/test_set_oracle.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import time

from tests.integration.integration_test_case import IntegrationTestCase
Expand All @@ -14,32 +15,43 @@
_PROVIDER = str_to_hex("provider")
_ASSET_CLASS = str_to_hex("currency")

lock = asyncio.Lock()


class TestSetOracle(IntegrationTestCase):
@test_async_and_sync(globals())
async def test_all_fields(self, client):
tx = OracleSet(
account=WALLET.address,
# if oracle_document_id is not modified, the (sync, async) +
# (json, websocket) combination of integration tests will update the same
# oracle object using identical "LastUpdateTime". Updates to an oracle must
# be more recent than its previous LastUpdateTime
# a unique value is obtained for each combination of test run within the
# implementation of the test_async_and_sync decorator.
oracle_document_id=self.value,
provider=_PROVIDER,
asset_class=_ASSET_CLASS,
last_update_time=int(time.time()),
price_data_series=[
PriceData(
base_asset="XRP", quote_asset="USD", asset_price=740, scale=1
),
PriceData(
base_asset="BTC", quote_asset="EUR", asset_price=100, scale=2
),
],
response = await sign_and_reliable_submission_async(
OracleSet(
account=WALLET.address,
# if oracle_document_id is not modified, the (sync, async) +
# (json, websocket) combination of integration tests will update the
# same oracle object using identical "LastUpdateTime".
# Updates to an oracle must be more recent than its previous
# LastUpdateTime a unique value is obtained for each combination
# of test run within the implementation of the
# test_async_and_sync decorator.
oracle_document_id=self.value,
provider=_PROVIDER,
asset_class=_ASSET_CLASS,
# contruct the OracleSet transaction in-place with submit-function,
# in order to obtain the most-recent timestamp.
# Otherwise, async execution of test cases might render this
# timestamp stale.
last_update_time=int(time.time()),
price_data_series=[
PriceData(
base_asset="XRP", quote_asset="USD", asset_price=740, scale=1
),
PriceData(
base_asset="BTC", quote_asset="EUR", asset_price=100, scale=2
),
],
),
WALLET,
client,
)
response = await sign_and_reliable_submission_async(tx, WALLET, client)

self.assertEqual(response.status, ResponseStatus.SUCCESS)
self.assertEqual(response.result["engine_result"], "tesSUCCESS")

Expand Down
11 changes: 7 additions & 4 deletions xrpl/asyncio/clients/websocket_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ async def _handler(self: WebsocketBase) -> None:
messages we check whether there is an outstanding future we need to resolve,
and if so do so.

Then we store the already-parsed JSON in our own queue for generic iteration.
If the message corresponds to a pending request, it is stored appropriately,
otherwise we store the already-parsed JSON in our own queue for generic
iteration.

As long as a given client remains open, this handler will be running as a Task.
"""
Expand All @@ -135,9 +137,10 @@ async def _handler(self: WebsocketBase) -> None:
# if this response corresponds to request, fulfill the Future
if "id" in response_dict and response_dict["id"] in self._open_requests:
self._open_requests[response_dict["id"]].set_result(response_dict)

# enqueue the response for the message queue
cast(_MESSAGES_TYPE, self._messages).put_nowait(response_dict)
else:
# a response that corresponds to a request is not enqueued again
# enqueue the response for the message queue
cast(_MESSAGES_TYPE, self._messages).put_nowait(response_dict)

def _set_up_future(self: WebsocketBase, request: Request) -> None:
"""
Expand Down