Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subtensor Registry #1562

Merged
merged 8 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bittensor/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class UnstakeError(ChainTransactionError):
pass


class IdentityError(ChainTransactionError):
r"""Error raised when an identity transaction fails."""
pass


class NominationError(ChainTransactionError):
r"""Error raised when a nomination transaction fails."""
pass
Expand Down
72 changes: 72 additions & 0 deletions bittensor/subtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,78 @@ def root_set_weights(
prompt=prompt,
)

########################
#### Registry Calls ####
########################

""" Queries subtensor registry named storage with params and block. """

def query_identity(
self,
key: str,
block: Optional[int] = None,
) -> Optional[object]:
@retry(delay=2, tries=3, backoff=2, max_delay=4)
def make_substrate_call_with_retry():
with self.substrate as substrate:
return substrate.query(
module="Registry",
storage_function="IdentityOf",
params=[key],
block_hash=None
if block == None
else substrate.get_block_hash(block),
)

identity_info = make_substrate_call_with_retry()
return bittensor.utils.wallet_utils.decode_hex_identity_dict(
identity_info.value["info"]
)

def update_identity(
self,
wallet: "bittensor.wallet",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the wallet coldkey is not the subnet owner, this can essentially set identity for any coldkey on chain. Do we want to limit this to subnet owners only? (check the chain for current subnet coldkey owners)

E.g.:

owner_coldkeys = [sub.get_subnet_owner(i) for i in sub.get_all_subnet_netuids()]
if wallet.coldkey.ss58_address not in owner_coldkeys:
   raise ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, this is done by the chain, only subnet owners and root validators can set an identity

identified: str = None,
params: dict = {},
wait_for_inclusion: bool = True,
wait_for_finalization: bool = False,
) -> bool:
"""
Creates an identity extrinsics with the specific structure.
"""
if identified == None:
identified = wallet.coldkey.ss58_address

call_params = bittensor.utils.wallet_utils.create_identity_dict(**params)
call_params["identified"] = identified

@retry(delay=2, tries=3, backoff=2, max_delay=4)
def make_substrate_call_with_retry():
with self.substrate as substrate:
call = substrate.compose_call(
call_module="Registry",
call_function="set_identity",
call_params=call_params,
)
extrinsic = substrate.create_signed_extrinsic(
call=call, keypair=wallet.coldkey
)
response = substrate.submit_extrinsic(
extrinsic,
wait_for_inclusion=wait_for_inclusion,
wait_for_finalization=wait_for_finalization,
)
# We only wait here if we expect finalization.
if not wait_for_finalization and not wait_for_inclusion:
return True
response.process_events()
if response.is_success:
return True
else:
raise IdentityError(response.error_message)

return make_substrate_call_with_retry()

########################
#### Standard Calls ####
########################
Expand Down
66 changes: 65 additions & 1 deletion bittensor/utils/wallet_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# DEALINGS IN THE SOFTWARE.

from substrateinterface.utils import ss58
from typing import Union
from typing import Union, Optional

from .. import __ss58_format__
from substrateinterface import Keypair as Keypair
Expand Down Expand Up @@ -102,3 +102,67 @@ def is_valid_bittensor_address_or_public_key(address: Union[str, bytes]) -> bool
else:
# Invalid address type
return False


def create_identity_dict(
display: str = "",
legal: str = "",
web: str = "",
riot: str = "",
email: str = "",
pgp_fingerprint: Optional[str] = None,
image: str = "",
info: str = "",
twitter: str = "",
) -> dict:
"""
Creates a dictionary with structure for identity extrinsic. Must fit within 64 bits.

Args:
display (str): String to be converted and stored under 'display'.
legal (str): String to be converted and stored under 'legal'.
web (str): String to be converted and stored under 'web'.
riot (str): String to be converted and stored under 'riot'.
email (str): String to be converted and stored under 'email'.
pgp_fingerprint (str): String to be converted and stored under 'pgp_fingerprint'.
image (str): String to be converted and stored under 'image'.
info (str): String to be converted and stored under 'info'.
twitter (str): String to be converted and stored under 'twitter'.

Returns:
dict: A dictionary with the specified structure and byte string conversions.

Raises:
ValueError: If pgp_fingerprint is not exactly 20 bytes long when encoded.
"""
if pgp_fingerprint and len(pgp_fingerprint.encode()) != 20:
raise ValueError("pgp_fingerprint must be exactly 20 bytes long when encoded")

return {
"info": {
"additional": [[]],
"display": {f"Raw{len(display.encode())}": display.encode()},
"legal": {f"Raw{len(legal.encode())}": legal.encode()},
"web": {f"Raw{len(web.encode())}": web.encode()},
"riot": {f"Raw{len(riot.encode())}": riot.encode()},
"email": {f"Raw{len(email.encode())}": email.encode()},
"pgp_fingerprint": pgp_fingerprint.encode() if pgp_fingerprint else None,
"image": {f"Raw{len(image.encode())}": image.encode()},
"info": {f"Raw{len(info.encode())}": info.encode()},
"twitter": {f"Raw{len(twitter.encode())}": twitter.encode()},
}
}


def decode_hex_identity_dict(info_dictionary):
for key, value in info_dictionary.items():
if isinstance(value, dict):
item = list(value.values())[0]
if isinstance(item, str) and item.startswith("0x"):
try:
info_dictionary[key] = bytes.fromhex(item[2:]).decode()
except UnicodeDecodeError:
print(f"Could not decode: {key}: {item}")
else:
info_dictionary[key] = item
return info_dictionary