Skip to content
41 changes: 41 additions & 0 deletions bittensor/core/async_subtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,47 @@ async def get_block_hash(self, block: Optional[int] = None) -> str:
else:
return await self.substrate.get_chain_head()

async def get_parents(
self,
hotkey: str,
netuid: int,
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> list[tuple[float, str]]:
"""
This method retrieves the parent of a given hotkey and netuid. It queries the SubtensorModule's ParentKeys
storage function to get the children and formats them before returning as a tuple.

Arguments:
hotkey: The child hotkey SS58.
netuid: The netuid value.
block: The block number for which the children are to be retrieved.
block_hash: The hash of the block to retrieve the subnet unique identifiers from.
reuse_block: Whether to reuse the last-used block hash.

Returns:
A list of formatted parents [(proportion, parent)]
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
parents = await self.substrate.query(
module="SubtensorModule",
storage_function="ParentKeys",
params=[hotkey, netuid],
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
if parents:
formatted_parents = []
for proportion, parent in parents.value:
# Convert U64 to int
formatted_child = decode_account_id(parent[0])
normalized_proportion = u64_normalized_float(proportion)
formatted_parents.append((normalized_proportion, formatted_child))
return formatted_parents

return []

async def get_children(
self,
hotkey: str,
Expand Down
32 changes: 32 additions & 0 deletions bittensor/core/subtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,38 @@ def get_hyperparameter(

return getattr(result, "value", result)

def get_parents(
self, hotkey: str, netuid: int, block: Optional[int] = None
) -> list[tuple[float, str]]:
"""
This method retrieves the parent of a given hotkey and netuid. It queries the SubtensorModule's ParentKeys
storage function to get the children and formats them before returning as a tuple.

Arguments:
hotkey: The child hotkey SS58.
netuid: The netuid.
block: The block number for which the children are to be retrieved.

Returns:
A list of formatted parents [(proportion, parent)]
"""
parents = self.substrate.query(
module="SubtensorModule",
storage_function="ParentKeys",
params=[hotkey, netuid],
block_hash=self.determine_block_hash(block),
)
if parents:
formatted_parents = []
for proportion, parent in parents.value:
# Convert U64 to int
formatted_child = decode_account_id(parent[0])
normalized_proportion = u64_normalized_float(proportion)
formatted_parents.append((normalized_proportion, formatted_child))
return formatted_parents

return []

def get_children(
self, hotkey: str, netuid: int, block: Optional[int] = None
) -> tuple[bool, list[tuple[float, str]], str]:
Expand Down
1 change: 1 addition & 0 deletions bittensor/core/subtensor_api/subnets.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def __init__(self, subtensor: Union["_Subtensor", "_AsyncSubtensor"]):
self.bonds = subtensor.bonds
self.difficulty = subtensor.difficulty
self.get_all_subnets_info = subtensor.get_all_subnets_info
self.get_parents = subtensor.get_parents
self.get_children = subtensor.get_children
self.get_children_pending = subtensor.get_children_pending
self.get_current_weight_commit_info = subtensor.get_current_weight_commit_info
Expand Down
1 change: 1 addition & 0 deletions bittensor/core/subtensor_api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def add_legacy_methods(subtensor: "SubtensorApi"):
subtensor.get_balance = subtensor._subtensor.get_balance
subtensor.get_balances = subtensor._subtensor.get_balances
subtensor.get_block_hash = subtensor._subtensor.get_block_hash
subtensor.get_parents = subtensor._subtensor.get_parents
subtensor.get_children = subtensor._subtensor.get_children
subtensor.get_children_pending = subtensor._subtensor.get_children_pending
subtensor.get_commitment = subtensor._subtensor.get_commitment
Expand Down
6 changes: 5 additions & 1 deletion tests/e2e_tests/test_hotkeys.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_hotkeys(subtensor, alice_wallet, dave_wallet):
)
is True
)
logging.console.success(f"✅ Test [green]test_hotkeys[/green] passed")
logging.console.success("✅ Test [green]test_hotkeys[/green] passed")


@pytest.mark.asyncio
Expand Down Expand Up @@ -276,6 +276,10 @@ async def test_children(local_chain, subtensor, alice_wallet, bob_wallet, dave_w
assert success is True
assert children == [(1.0, bob_wallet.hotkey.ss58_address)]

parent_ = subtensor.get_parents(bob_wallet.hotkey.ss58_address, dave_subnet_netuid)

assert parent_ == [(1.0, alice_wallet.hotkey.ss58_address)]

# pending queue is empty
pending, cooldown = subtensor.get_children_pending(
alice_wallet.hotkey.ss58_address,
Expand Down
84 changes: 84 additions & 0 deletions tests/unit_tests/test_async_subtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1981,6 +1981,90 @@ async def test_get_children_substrate_request_exception(subtensor, mocker):
assert result == (False, [], "Formatted error message")


@pytest.mark.asyncio
async def test_get_parents_success(subtensor, mocker):
"""Tests get_parents when parents are successfully retrieved and formatted."""
# Preps
fake_hotkey = "valid_hotkey"
fake_netuid = 1
fake_parents = mocker.Mock(
value=[
(1000, ["parent_key_1"]),
(2000, ["parent_key_2"]),
]
)

mocked_query = mocker.AsyncMock(return_value=fake_parents)
subtensor.substrate.query = mocked_query

mocked_decode_account_id = mocker.Mock(
side_effect=["decoded_parent_key_1", "decoded_parent_key_2"]
)
mocker.patch.object(async_subtensor, "decode_account_id", mocked_decode_account_id)

expected_formatted_parents = [
(u64_normalized_float(1000), "decoded_parent_key_1"),
(u64_normalized_float(2000), "decoded_parent_key_2"),
]

# Call
result = await subtensor.get_parents(hotkey=fake_hotkey, netuid=fake_netuid)

# Asserts
mocked_query.assert_called_once_with(
block_hash=None,
module="SubtensorModule",
storage_function="ParentKeys",
params=[fake_hotkey, fake_netuid],
reuse_block_hash=False,
)
mocked_decode_account_id.assert_has_calls(
[mocker.call("parent_key_1"), mocker.call("parent_key_2")]
)
assert result == expected_formatted_parents


@pytest.mark.asyncio
async def test_get_parents_no_parents(subtensor, mocker):
"""Tests get_parents when there are no parents to retrieve."""
# Preps
fake_hotkey = "valid_hotkey"
fake_netuid = 1
fake_parents = []

mocked_query = mocker.AsyncMock(return_value=fake_parents)
subtensor.substrate.query = mocked_query

# Call
result = await subtensor.get_parents(hotkey=fake_hotkey, netuid=fake_netuid)

# Asserts
mocked_query.assert_called_once_with(
block_hash=None,
module="SubtensorModule",
storage_function="ParentKeys",
params=[fake_hotkey, fake_netuid],
reuse_block_hash=False,
)
assert result == []


@pytest.mark.asyncio
async def test_get_parents_substrate_request_exception(subtensor, mocker):
"""Tests get_parents when SubstrateRequestException is raised."""
# Preps
fake_hotkey = "valid_hotkey"
fake_netuid = 1
fake_exception = async_subtensor.SubstrateRequestException("Test Exception")

mocked_query = mocker.AsyncMock(side_effect=fake_exception)
subtensor.substrate.query = mocked_query

# Call
with pytest.raises(async_subtensor.SubstrateRequestException):
await subtensor.get_parents(hotkey=fake_hotkey, netuid=fake_netuid)


@pytest.mark.asyncio
async def test_get_children_pending(mock_substrate, subtensor):
mock_substrate.query.return_value.value = [
Expand Down
64 changes: 64 additions & 0 deletions tests/unit_tests/test_subtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3746,3 +3746,67 @@ def test_get_next_epoch_start_block(mocker, subtensor, call_return, expected):
)
subtensor.tempo.assert_called_once_with(netuid=netuid, block=block)
assert result == expected


def test_get_parents_success(subtensor, mocker):
"""Tests get_parents when parents are successfully retrieved and formatted."""
# Preps
fake_hotkey = "valid_hotkey"
fake_netuid = 1
fake_parents = mocker.Mock(
value=[
(1000, ["parent_key_1"]),
(2000, ["parent_key_2"]),
]
)

mocked_query = mocker.MagicMock(return_value=fake_parents)
subtensor.substrate.query = mocked_query

mocked_decode_account_id = mocker.Mock(
side_effect=["decoded_parent_key_1", "decoded_parent_key_2"]
)
mocker.patch.object(subtensor_module, "decode_account_id", mocked_decode_account_id)

expected_formatted_parents = [
(u64_normalized_float(1000), "decoded_parent_key_1"),
(u64_normalized_float(2000), "decoded_parent_key_2"),
]

# Call
result = subtensor.get_parents(hotkey=fake_hotkey, netuid=fake_netuid)

# Asserts
mocked_query.assert_called_once_with(
block_hash=None,
module="SubtensorModule",
storage_function="ParentKeys",
params=[fake_hotkey, fake_netuid],
)
mocked_decode_account_id.assert_has_calls(
[mocker.call("parent_key_1"), mocker.call("parent_key_2")]
)
assert result == expected_formatted_parents


def test_get_parents_no_parents(subtensor, mocker):
"""Tests get_parents when there are no parents to retrieve."""
# Preps
fake_hotkey = "valid_hotkey"
fake_netuid = 1
fake_parents = []

mocked_query = mocker.MagicMock(return_value=fake_parents)
subtensor.substrate.query = mocked_query

# Call
result = subtensor.get_parents(hotkey=fake_hotkey, netuid=fake_netuid)

# Asserts
mocked_query.assert_called_once_with(
block_hash=None,
module="SubtensorModule",
storage_function="ParentKeys",
params=[fake_hotkey, fake_netuid],
)
assert result == []