diff --git a/.env.example b/.env.example index 0b5d19fa..82a6a169 100644 --- a/.env.example +++ b/.env.example @@ -1 +1,3 @@ WANDB_API_KEY="your_api_key" +DEALLOCATION_NOTIFY_URL="https://dev.neuralinternet.ai/api/gpus/webhook/deallocation" +STATUS_NOTIFY_URL="https://dev.neuralinternet.ai/api/gpus/webhook/status-change-warning" diff --git a/cert/gen_ca.sh b/cert/gen_ca.sh new file mode 100644 index 00000000..5a6e7671 --- /dev/null +++ b/cert/gen_ca.sh @@ -0,0 +1,74 @@ +#!/bin/bash + +############################################################# +# for generating root CA, server, client private key and cert +############################################################# + +ca_key_bits="4096" +ca_cert_expire_days="365" +pem_password="bittensor" +local_ip=$1 + +if [ "$local_ip" = "" ]; then + echo "Usage: ./gen_ca.sh " + exit 1 +fi + +echo "1.1. generate root CA" +openssl req -newkey rsa:"$ca_key_bits" -keyform PEM -keyout ca.key -x509 --days "$ca_cert_expire_days" -outform PEM -passout pass:"$pem_password" -out ca.cer -subj "/C=US/ST=NY/CN=ca.neuralinternet.ai/O=NI" + +#for register_api server +echo "2.1 Generate a server private key." +openssl genrsa -out server.key "$ca_key_bits" + +echo "2.2 Use the server private key to generate a certificate generation request." +openssl req -new -key server.key -out server.req -sha256 -subj "/C=US/ST=NY/CN=server.neuralinternet.ai/O=NI" + +echo "2.3 Use the certificate generation request and the CA cert to generate the server cert." +openssl x509 -req -in server.req -CA ca.cer -CAkey ca.key -CAcreateserial -set_serial 100 -days "$ca_cert_expire_days" -outform PEM -passin pass:"$pem_password" -out server.cer -sha256 -extensions v3_req -extfile <( +cat << EOF +[ v3_req ] +subjectAltName = @alt_names + +[ alt_names ] +IP.1 = 127.0.0.1 +IP.2 = 0.0.0.0 +IP.3 = "$local_ip" +EOF +) + + +echo "2.4 Convert the cer to PEM CRT format" +openssl x509 -inform PEM -in server.cer -out server.crt + +echo "2.5 Clean up – now that the cert has been created, we no longer need the request" +rm server.req + +#for frontend server +echo "3.1 Generate a client private key." +openssl genrsa -out client.key "$ca_key_bits" + +echo "3.2 Use the client private key to generate a certificate generation request." +openssl req -new -key client.key -out client.req -subj "/C=US/ST=NY/CN=client.neuralinternet.ai/O=NI" + +echo "3.3 Use the certificate generation request and the CA cert to generate the client cert." +openssl x509 -req -in client.req -CA ca.cer -CAkey ca.key -CAcreateserial -set_serial 101 -days "$ca_cert_expire_days" -outform PEM -out client.cer -passin pass:"$pem_password" -extensions v3_req -extfile <( +cat << EOF +[ v3_req ] +subjectAltName = @alt_names + +[ alt_names ] +IP.1 = 127.0.0.1 +IP.2 = 0.0.0.0 +IP.3 = "$local_ip" +EOF +) + +echo "3.4 Convert the client certificate and private key to pkcs#12 format for use by browsers." +openssl pkcs12 -export -inkey client.key -in client.cer -out client.p12 -passout pass:"$pem_password" + +echo "3.5. Convert the cer to PEM CRT format" +openssl x509 -inform PEM -in client.cer -out client.crt + +echo "3.6. Clean up – now that the cert has been created, we no longer need the request." +rm client.req \ No newline at end of file diff --git a/compute/__init__.py b/compute/__init__.py index f511d71b..c329a228 100644 --- a/compute/__init__.py +++ b/compute/__init__.py @@ -18,9 +18,9 @@ import string # Define the version of the template module. -__version__ = "1.4.4" -__minimal_miner_version__ = "1.4.2" -__minimal_validator_version__ = "1.4.4" +__version__ = "1.4.5" +__minimal_miner_version__ = "1.4.5" +__minimal_validator_version__ = "1.4.5" version_split = __version__.split(".") __version_as_int__ = (100 * int(version_split[0])) + (10 * int(version_split[1])) + (1 * int(version_split[2])) diff --git a/compute/protocol.py b/compute/protocol.py index c32b3d21..22ce3133 100644 --- a/compute/protocol.py +++ b/compute/protocol.py @@ -76,6 +76,18 @@ class Allocate(bt.Synapse): checking: bool = True output: dict = {} public_key: str = "" + docker_requirement: dict = { + "base_image": "ubuntu", + "ssh_key": "", + "ssh_port": 4444, + "volume_path": "/tmp", + "dockerfile": "" + } + docker_change: bool = False + docker_action: dict = { + "action": "", + "ssh_key": "", + } def deserialize(self) -> dict: """ diff --git a/compute/utils/parser.py b/compute/utils/parser.py index 0b01ac07..ce95f9a4 100644 --- a/compute/utils/parser.py +++ b/compute/utils/parser.py @@ -55,7 +55,6 @@ def __init__(self, description=None): help="List of coldkeys to whitelist. Default: [].", default=[], ) - self.add_validator_argument() self.add_miner_argument() @@ -109,9 +108,9 @@ def add_validator_argument(self): self.add_argument( "--validator.whitelist.updated.threshold", dest="validator_whitelist_updated_threshold", - help="Total quorum before starting the whitelist. Default: 70.", + help="Total quorum before starting the whitelist. Default: 90.", type=int, - default=60, + default=90, ) def add_miner_argument(self): @@ -147,15 +146,22 @@ def add_miner_argument(self): "--miner.whitelist.not.updated", action="store_true", dest="miner_whitelist_not_updated", - help="Whitelist validators not using the last version of the code. Default: False.", - default=False, + help="Whitelist validators not using the last version of the code. Default: True.", + default=True, ) self.add_argument( "--miner.whitelist.updated.threshold", dest="miner_whitelist_updated_threshold", - help="Total quorum before starting the whitelist. Default: 50.", + help="Total quorum before starting the whitelist. Default: 90.", + type=int, + default=90, + ) + # add ssh port argument + self.add_argument( + "--ssh.port", type=int, - default=60, + default=4444, + help="The ssh port for the allocation service.", ) @staticmethod diff --git a/compute/utils/version.py b/compute/utils/version.py index af860481..105b22d3 100644 --- a/compute/utils/version.py +++ b/compute/utils/version.py @@ -26,20 +26,21 @@ import git import requests import sys +from packaging import version as packaging_version def get_remote_version_to_number(pattern: str = "__version__"): latest_version = version2number(get_remote_version(pattern=pattern)) if not latest_version: - bt.logging.error(f"Github API call failed or version string is incorrect!") + bt.logging.error("Github API call failed or version string is incorrect!") return latest_version def version2number(version: str): try: - if version and type(version) is str: - version = version.split(".") - return (100 * int(version[0])) + (10 * int(version[1])) + (1 * int(version[2])) + if version and isinstance(version, str): + version_parts = version.split(".") + return (100 * int(version_parts[0])) + (10 * int(version_parts[1])) + (1 * int(version_parts[2])) except Exception as _: pass return None @@ -49,7 +50,6 @@ def get_remote_version(pattern: str = "__version__"): url = "https://raw.githubusercontent.com/neuralinternet/compute-subnet/main/compute/__init__.py" try: response = requests.get(url, timeout=30) - if response.status_code == 200: lines = response.text.split("\n") for line in lines: @@ -57,27 +57,29 @@ def get_remote_version(pattern: str = "__version__"): version_info = line.split("=")[1].strip(" \"'").replace('"', "") return version_info else: - print("Failed to get file content with status code:", response.status_code) + bt.logging.error(f"Failed to get file content with status code: {response.status_code}") return None except requests.exceptions.Timeout: - print("The request timed out after 30 seconds.") + bt.logging.error("The request timed out after 30 seconds.") return None except requests.exceptions.RequestException as e: - print("There was an error while handling the request:", e) + bt.logging.error(f"There was an error while handling the request: {e}") return None def get_local_version(): try: - # loading version from __init__.py here = path.abspath(path.dirname(__file__)) parent = here.rsplit("/", 1)[0] - with codecs.open(os.path.join(parent, "__init__.py"), encoding="utf-8") as init_file: - version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", init_file.read(), re.M) + init_file_path = os.path.join(parent, "__init__.py") + + with codecs.open(init_file_path, encoding="utf-8") as init_file: + content = init_file.read() + version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", content, re.M) version_string = version_match.group(1) return version_string except Exception as e: - bt.logging.error(f"Error getting local version. : {e}") + bt.logging.error(f"Error getting local version: {e}") return "" @@ -86,8 +88,8 @@ def check_version_updated(): local_version = get_local_version() bt.logging.info(f"Version check - remote_version: {remote_version}, local_version: {local_version}") - if version2number(local_version) < version2number(remote_version): - bt.logging.info(f"👩‍👦Update to the latest version is required") + if packaging_version.parse(local_version) < packaging_version.parse(remote_version): + bt.logging.info("👩‍👦Update to the latest version is required") return True else: return False @@ -96,24 +98,43 @@ def check_version_updated(): def update_repo(): try: repo = git.Repo(search_parent_directories=True) - origin = repo.remotes.origin + bt.logging.info(f"Current repository path: {repo.working_dir}") + + # Check for detached HEAD state + if repo.head.is_detached: + bt.logging.info("Repository is in a detached HEAD state. Switching to the main branch.") + repo.git.checkout('main') + + bt.logging.info(f"Current branch: {repo.active_branch.name}") + + stashed = False if repo.is_dirty(untracked_files=True): - bt.logging.error("Update failed: Uncommited changes detected. Please commit changes or run `git stash`") - return False + bt.logging.info("Stashing uncommitted changes") + repo.git.stash('push', '-m', 'Auto stash before updating') + stashed = True + try: bt.logging.info("Try pulling remote repository") - origin.pull() - bt.logging.info("pulling success") + origin.pull(rebase=True) + bt.logging.info("Pulling success") + + if stashed: + bt.logging.info("Applying stashed changes") + repo.git.stash('apply', '--index') + + # Restore the specific file from remote to ensure it is not overwritten by stash + repo.git.checkout('origin/main', '--', 'compute/__init__.py') + return True except git.exc.GitCommandError as e: - bt.logging.info(f"update : Merge conflict detected: {e} Recommend you manually commit changes and update") + bt.logging.info(f"Update: Merge conflict detected: {e}. Recommend you manually commit changes and update") + if stashed: + repo.git.stash('pop') return handle_merge_conflict(repo) - except Exception as e: - bt.logging.error(f"update failed: {e} Recommend you manually commit changes and update") - + bt.logging.error(f"Update failed: {e}. Recommend you manually commit changes and update") return False @@ -129,16 +150,16 @@ def handle_merge_conflict(repo): bt.logging.info(f"Resolving conflict in file: {file_path}") repo.git.checkout("--theirs", file_path) repo.index.commit("Resolved merge conflicts automatically") - bt.logging.info(f"Merge conflicts resolved, repository updated to remote state.") - bt.logging.info(f"✅ Repo update success") + bt.logging.info("Merge conflicts resolved, repository updated to remote state.") + bt.logging.info("✅ Repo update success") return True except git.GitCommandError as e: - bt.logging.error(f"update failed: {e} Recommend you manually commit changes and update") + bt.logging.error(f"Update failed: {e}. Recommend you manually commit changes and update") return False def restart_app(): - bt.logging.info("👩‍🦱app restarted due to the update") + bt.logging.info("👩‍🦱App restarted due to the update") python = sys.executable os.execl(python, python, *sys.argv) @@ -153,44 +174,55 @@ def try_update_packages(force=False): requirements_path = os.path.join(repo_path, "requirements.txt") + if not os.path.exists(requirements_path): + bt.logging.error("Requirements file does not exist.") + return + python_executable = sys.executable if force: - subprocess.check_call( - [python_executable], "-m", "pip", "install", "--force-reinstall", "--ignore-installed", "--no-deps", "-r", requirements_path - ) - subprocess.check_call([python_executable], "-m", "pip", "install", "--force-reinstall", "--ignore-installed", "--no-deps", "-e", ".") + subprocess.check_call([ + python_executable, "-m", "pip", "install", "--force-reinstall", "--ignore-installed", "--no-deps", "-r", requirements_path + ]) + subprocess.check_call([ + python_executable, "-m", "pip", "install", "--force-reinstall", "--ignore-installed", "--no-deps", "-e", repo_path + ]) else: - subprocess.check_call([python_executable], "-m", "pip", "install", "-r", requirements_path) - subprocess.check_call([python_executable], "-m", "pip", "install", "-e", ".") + subprocess.check_call([python_executable, "-m", "pip", "install", "-r", requirements_path]) + subprocess.check_call([python_executable, "-m", "pip", "install", "-e", repo_path]) bt.logging.info("📦Updating packages finished.") - - except Exception as e: + except subprocess.CalledProcessError as e: + bt.logging.error(f"Updating packages failed: {e}") if not force: try_update_packages(force=True) - bt.logging.info(f"Updating packages failed {e}") def try_update(): try: - if check_version_updated() is True: - bt.logging.info("found the latest version in the repo. try ♻️update...") - if update_repo() is True: + if check_version_updated(): + bt.logging.info("Found the latest version in the repo. Try ♻️update...") + if update_repo(): try_update_packages() - restart_app() + # Check if the update was successful by comparing the version again + if not check_version_updated(): + bt.logging.info("Update process completed successfully.") + # Restart the app only if necessary (for example, after all processes are done) + restart_app() + else: + bt.logging.info("Update process failed to update the version.") except Exception as e: - bt.logging.info(f"Try updating failed {e}") + bt.logging.error(f"Try updating failed: {e}") def check_hashcat_version(hashcat_path: str = "hashcat"): try: process = subprocess.run([hashcat_path, "--version"], capture_output=True, check=True) if process and process.stdout: - bt.logging.info(f"Version of hashcat found: {process.stdout.decode()}".strip("\n")) + bt.logging.info(f"Version of hashcat found: {process.stdout.decode().strip()}") return True except subprocess.CalledProcessError: bt.logging.error( - f"Hashcat is not available nor installed on the machine. Please make sure hashcat is available in your PATH or give the explicit location using the following argument: --miner.hashcat.path" + "Hashcat is not available nor installed on the machine. Please make sure hashcat is available in your PATH or give the explicit location using the following argument: --miner.hashcat.path" ) - exit() + exit() \ No newline at end of file diff --git a/compute/wandb/wandb.py b/compute/wandb/wandb.py index c0d5d69c..0f25a431 100644 --- a/compute/wandb/wandb.py +++ b/compute/wandb/wandb.py @@ -47,15 +47,36 @@ def __init__(self, config: bt.config, wallet: bt.wallet, role: str): # Try to get an existing run_id for the hotkey self.run_id = self.get_run_id(self.hotkey) - try: if self.run_id is None: - # No existing run_id, so initialize a new run - run = wandb.init(project=PUBLIC_WANDB_NAME, entity=PUBLIC_WANDB_ENTITY, name=self.run_name ) - self.run_id = run.id - # Store the new run_id in the database - self.save_run_id(self.hotkey, self.run_id) - wandb.finish() + filter_rule = { + "$and": [ + {"config.config.netuid": self.config.netuid}, + {"display_name": self.run_name}, + ] + } + # Get all runs with the run_name + runs = self.api.runs(f"{PUBLIC_WANDB_ENTITY}/{PUBLIC_WANDB_NAME}", filters=filter_rule) + # Get the latest run and init from the found run on wandb + if len(runs)>=1: + latest_run = runs[0] + self.run_id = latest_run.id + # Store the new run_id in the database + self.save_run_id(self.hotkey, self.run_id) + # Remove the unused run_id from the database + if len(runs) > 1: + for run in runs: + if run.id != self.run_id and run.state != "running": + run.delete(delete_artifacts=(True)) + wandb.finish() + # run can't be found on wandb either, so initialize a new run + elif len(runs)==0: + # No existing run_id, so initialize a new run + run = wandb.init(project=self.project.name, entity=self.entity, name=self.run_name) + self.run_id = run.id + # Store the new run_id in the database + self.save_run_id(self.hotkey, self.run_id) + wandb.finish() self.run = wandb.init(project=self.project.name, entity=self.entity, id=self.run_id, resume="allow") except Exception as e: @@ -192,18 +213,19 @@ def get_allocated_hotkeys(self, valid_validator_hotkeys, flag): This function gets all allocated hotkeys from all validators. Only relevant for validators. """ - # Query all runs in the project + # Query all runs in the project and Filter runs where the role is 'validator' self.api.flush() - runs = self.api.runs(f"{PUBLIC_WANDB_ENTITY}/{PUBLIC_WANDB_NAME}") + validator_runs = self.api.runs(path=f"{PUBLIC_WANDB_ENTITY}/{PUBLIC_WANDB_NAME}", + filters={"$and": [{"config.role": "validator"}, + {"config.config.netuid": self.config.netuid}, + {"config.allocated_hotkeys": {"$exists": True}},] + }) # Check if the runs list is empty - if not runs: + if not validator_runs: bt.logging.info("No validator info found in the project opencompute.") return [] - # Filter runs where the role is 'validator' - validator_runs = [run for run in runs if run.config.get('role') == 'validator'] - # Initialize an empty list to store allocated keys from runs with a valid signature allocated_keys_list = [] @@ -238,8 +260,11 @@ def get_miner_specs(self, queryable_uids): db_specs_dict = {} self.api.flush() - runs = self.api.runs(f"{PUBLIC_WANDB_ENTITY}/{PUBLIC_WANDB_NAME}") - + runs = self.api.runs(f"{PUBLIC_WANDB_ENTITY}/{PUBLIC_WANDB_NAME}", + filters={"$and": [{"config.role": "miner"}, + {"config.config.netuid": self.config.netuid}, + {"state": "running"}] + }) try: # Iterate over all runs in the opencompute project for index, run in enumerate(runs, start=1): @@ -303,3 +328,15 @@ def verify_run(self, run): bt.logging.info(f"Error verifying signature for Run ID: {run_id_str}, Name: {run.name}: {e}") return False + + def sync_allocated(self, hotkey): + """ + This function syncs the allocated status of the miner with the wandb run. + """ + # Fetch allocated hotkeys + allocated_hotkeys = self.get_allocated_hotkeys([], False) + + if hotkey in allocated_hotkeys: + return True + else: + return False diff --git a/neurons/Miner/allocate.py b/neurons/Miner/allocate.py index 374d9d0c..f72bd200 100644 --- a/neurons/Miner/allocate.py +++ b/neurons/Miner/allocate.py @@ -26,7 +26,7 @@ # Register for given timeline and device_requirement -def register_allocation(timeline, device_requirement, public_key): +def register_allocation(timeline, device_requirement, public_key, docker_requirement: dict): try: kill_status = kill_container() @@ -47,7 +47,7 @@ def register_allocation(timeline, device_requirement, public_key): ram_usage = {"capacity": str(int(ram_capacity / 1073741824)) + "g"} hard_disk_usage = {"capacity": str(int(hard_disk_capacity / 1073741824)) + "g"} - run_status = run_container(cpu_usage, ram_usage, hard_disk_usage, gpu_usage, public_key) + run_status = run_container(cpu_usage, ram_usage, hard_disk_usage, gpu_usage, public_key, docker_requirement) if run_status["status"]: bt.logging.info("Successfully allocated container.") diff --git a/neurons/Miner/container.py b/neurons/Miner/container.py index 195e7b2b..f459d44f 100644 --- a/neurons/Miner/container.py +++ b/neurons/Miner/container.py @@ -59,20 +59,25 @@ def kill_container(): running_container = container break if running_container: - running_container.stop() + # stop and remove the container by using the SIGTERM signal to PID 1 (init) process in the container + if running_container.status == "running": + running_container.exec_run(cmd="kill -15 1") + running_container.wait() + # running_container.stop() running_container.remove() - # bt.logging.info("Container was killed successfully") - return True + # Remove all dangling images + client.images.prune(filters={"dangling": True}) + bt.logging.info("Container was killed successfully") else: - # bt.logging.info("Unable to find container") - return False + bt.logging.info("Unable to find container") + return True except Exception as e: - # bt.logging.info(f"Error killing container {e}") + bt.logging.info(f"Error killing container {e}") return False # Run a new docker container with the given docker_name, image_name and device information -def run_container(cpu_usage, ram_usage, hard_disk_usage, gpu_usage, public_key): +def run_container(cpu_usage, ram_usage, hard_disk_usage, gpu_usage, public_key, docker_requirement: dict): try: client, containers = get_docker() # Configuration @@ -82,18 +87,29 @@ def run_container(cpu_usage, ram_usage, hard_disk_usage, gpu_usage, public_key): hard_disk_capacity = hard_disk_usage["capacity"] # e.g : 100g gpu_capacity = gpu_usage["capacity"] # e.g : all + docker_image = docker_requirement.get("base_image") + docker_volume = docker_requirement.get("volume_path") + docker_ssh_key = docker_requirement.get("ssh_key") + docker_ssh_port = docker_requirement.get("ssh_port") + docker_appendix = docker_requirement.get("dockerfile") + + if docker_appendix is None or docker_appendix == "": + docker_appendix = "echo 'Hello World!'" + # Step 1: Build the Docker image with an SSH server dockerfile_content = ( """ - FROM ubuntu + FROM {} RUN apt-get update && apt-get install -y openssh-server - RUN mkdir -p /run/sshd # Create the /run/sshd directory - RUN echo 'root:'{}'' | chpasswd - RUN sed -i 's/#PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config - RUN sed -i 's/#PasswordAuthentication yes/PasswordAuthentication yes/' /etc/ssh/sshd_config - RUN sed -i 's/#ListenAddress 0.0.0.0/ListenAddress 0.0.0.0/' /etc/ssh/sshd_config + RUN mkdir -p /run/sshd && echo 'root:'{}'' | chpasswd + RUN sed -i 's/#PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config && \ + sed -i 's/#PasswordAuthentication yes/PasswordAuthentication yes/' /etc/ssh/sshd_config && \ + sed -i 's/#PubkeyAuthentication yes/PubkeyAuthentication yes/' /etc/ssh/sshd_config && \ + sed -i 's/#ListenAddress 0.0.0.0/ListenAddress 0.0.0.0/' /etc/ssh/sshd_config + RUN {} + RUN mkdir -p /root/.ssh/ && echo '{}' > /root/.ssh/authorized_keys && chmod 600 /root/.ssh/authorized_keys CMD ["/usr/sbin/sshd", "-D"] - """.format(password) + """.format(docker_image, password, docker_appendix, docker_ssh_key) ) # Ensure the tmp directory exists within the current directory @@ -105,8 +121,9 @@ def run_container(cpu_usage, ram_usage, hard_disk_usage, gpu_usage, public_key): with open(dockerfile_path, "w") as dockerfile: dockerfile.write(dockerfile_content) - # Build the Docker image - client.images.build(path=os.path.dirname(dockerfile_path), dockerfile=os.path.basename(dockerfile_path), tag=image_name) + # Build the Docker image and remove the intermediate containers + client.images.build(path=os.path.dirname(dockerfile_path), dockerfile=os.path.basename(dockerfile_path), tag=image_name, + rm=True) # Create the Docker volume with the specified size # client.volumes.create(volume_name, driver = 'local', driver_opts={'size': hard_disk_capacity}) @@ -120,14 +137,16 @@ def run_container(cpu_usage, ram_usage, hard_disk_usage, gpu_usage, public_key): detach=True, device_requests=device_requests, environment=["NVIDIA_VISIBLE_DEVICES=all"], - ports={22: ssh_port}, + ports={22: docker_ssh_port}, + init=True, + restart_policy={"Name": "on-failure", "MaximumRetryCount": 3}, +# volumes={ docker_volume: {'bind': '/root/workspace/', 'mode': 'rw'}}, ) # Check the status to determine if the container ran successfully - if container.status == "created": - #bt.logging.info("Container was created successfully.") - info = {"username": "root", "password": password, "port": ssh_port} + bt.logging.info("Container was created successfully.") + info = {"username": "root", "password": password, "port": docker_ssh_port} info_str = json.dumps(info) public_key = public_key.encode("utf-8") encrypted_info = rsa.encrypt_data(public_key, info_str) @@ -143,10 +162,10 @@ def run_container(cpu_usage, ram_usage, hard_disk_usage, gpu_usage, public_key): return {"status": True, "info": encrypted_info} else: - # bt.logging.info(f"Container falied with status : {container.status}") + bt.logging.info(f"Container falied with status : {container.status}") return {"status": False} except Exception as e: - # bt.logging.info(f"Error running container {e}") + bt.logging.info(f"Error running container {e}") return {"status": False} @@ -155,11 +174,11 @@ def check_container(): try: client, containers = get_docker() for container in containers: - if container_name in container.name: + if container_name in container.name and container.status == "running": return True return False except Exception as e: - # bt.logging.info(f"Error checking container {e}") + bt.logging.info(f"Error checking container {e}") return False @@ -206,3 +225,103 @@ def build_check_container(image_name: str, container_name: str): pass finally: client.close() + + +def build_sample_container(): + """ + Build a sample container to speed up the process of building the container + """ + try: + client = docker.from_env() + images = client.images.list(all=True) + + for image in images: + if image.tags: + if image_name in image.tags[0]: + bt.logging.info("Sample container image already exists.") + return {"status": True} + + password = password_generator(10) + + # Step 1: Build the Docker image with an SSH server + dockerfile_content = ( + """ + FROM ubuntu + RUN apt-get update && apt-get install -y openssh-server + RUN mkdir -p /run/sshd && echo 'root:'{}'' | chpasswd + RUN sed -i 's/#PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config && \ + sed -i 's/#PasswordAuthentication yes/PasswordAuthentication yes/' /etc/ssh/sshd_config && \ + sed -i 's/#PubkeyAuthentication yes/PubkeyAuthentication yes/' /etc/ssh/sshd_config && \ + sed -i 's/#ListenAddress 0.0.0.0/ListenAddress 0.0.0.0/' /etc/ssh/sshd_config + RUN mkdir -p /root/.ssh/ && echo '{}' > /root/.ssh/authorized_keys && chmod 600 /root/.ssh/authorized_keys + CMD ["/usr/sbin/sshd", "-D"] + """.format(password, "") + ) + + # Ensure the tmp directory exists within the current directory + tmp_dir_path = os.path.join('.', 'tmp') + os.makedirs(tmp_dir_path, exist_ok=True) + + # Path for the Dockerfile within the tmp directory + dockerfile_path = os.path.join(tmp_dir_path, 'dockerfile') + with open(dockerfile_path, "w") as dockerfile: + dockerfile.write(dockerfile_content) + + # Build the Docker image and remove the intermediate containers + client.images.build(path=os.path.dirname(dockerfile_path), dockerfile=os.path.basename(dockerfile_path), + tag=image_name, rm=True) + # Create the Docker volume with the specified size + # client.volumes.create(volume_name, driver = 'local', driver_opts={'size': hard_disk_capacity}) + + bt.logging.info("Sample container image was created successfully.") + return {"status": True} + except Exception as e: + bt.logging.info(f"Error build sample container {e}") + return {"status": False} + + +def restart_container(): + try: + client, containers = get_docker() + running_container = None + for container in containers: + if container_name in container.name: + running_container = container + break + if running_container: + # stop and remove the container by using the SIGTERM signal to PID 1 (init) process in the container + if running_container.status == "running": + running_container.exec_run(cmd="kill -15 1") + running_container.wait() + running_container.restart() + return {"status": True} + else: + bt.logging.info("Unable to find container") + return {"status": False} + except Exception as e: + bt.logging.info(f"Error restart container {e}") + return {"status": False} + + +def exchange_key_container(new_ssh_key: str): + try: + client, containers = get_docker() + running_container = None + for container in containers: + if container_name in container.name: + running_container = container + break + if running_container: + # stop and remove the container by using the SIGTERM signal to PID 1 (init) process in the container + if running_container.status == "running": + running_container.exec_run(cmd=f"bash -c \"echo '{new_ssh_key}' > /root/.ssh/authorized_keys & sync & sleep 1\"") + running_container.exec_run(cmd="kill -15 1") + running_container.wait() + running_container.restart() + return {"status": True} + else: + bt.logging.info("Unable to find container") + return {"status": False} + except Exception as e: + bt.logging.info(f"Error changing SSH key on container {e}") + return {"status": False} \ No newline at end of file diff --git a/neurons/Validator/calculate_pow_score.py b/neurons/Validator/calculate_pow_score.py index ff554eff..0d6ee0a8 100644 --- a/neurons/Validator/calculate_pow_score.py +++ b/neurons/Validator/calculate_pow_score.py @@ -35,7 +35,7 @@ def prevent_none(val): # Calculate score based on the performance information -def calc_score(response, hotkey, allocated_hotkeys, mock=False): +def calc_score(response, hotkey, allocated_hotkeys, max_score_uid, mock=False): """ Method to calculate the score attributed to this miner dual uid - hotkey :param response: @@ -60,15 +60,12 @@ def calc_score(response, hotkey, allocated_hotkeys, mock=False): challenge_difficulty_avg = prevent_none(response["last_20_difficulty_avg"]) has_docker = response.get("has_docker", False) - if last_20_challenge_failed >= 10 or challenge_successes == 0: - return 0 - # Define base weights for the PoW - success_weight = 1 - difficulty_weight = 1 - time_elapsed_weight = 0.3 - failed_penalty_weight = 0.4 - allocation_weight = 0.21 + success_weight = 1.0 + difficulty_weight = 1.0 + time_elapsed_weight = 0.4 + failed_penalty_weight = 0.35 + allocation_weight = 0.10 # Just in case but in theory, it is not possible to fake the difficulty as it is sent by the validator # Same occurs for the time, it's calculated by the validator so miners cannot fake it @@ -99,17 +96,23 @@ def calc_score(response, hotkey, allocated_hotkeys, mock=False): allocation_score = difficulty_modifier * allocation_weight allocation_status = hotkey in allocated_hotkeys + if last_20_challenge_failed >= 19 or challenge_successes == 0 and not allocation_status: + return 0 + # Calculate the score max_score_challenge = 100 * (success_weight + difficulty_weight + time_elapsed_weight) - max_score_allocation = 100 * allocation_weight + max_score_allocation = max_score_challenge * allocation_weight max_score = max_score_challenge + max_score_allocation final_score = difficulty + successes + time_elapsed - failed_penalty + # denormalize max score + max_score_uid_dn = max_score_uid * max_score + # Docker and specs penalty penalty = not(has_docker) if allocation_status: - final_score = max_score_challenge * (1-allocation_weight) + allocation_score + final_score = max_score_uid_dn * (1 + allocation_score/100) else: final_score = difficulty + successes + time_elapsed - failed_penalty if penalty: diff --git a/neurons/miner.py b/neurons/miner.py index 48678947..3a2a9b68 100644 --- a/neurons/miner.py +++ b/neurons/miner.py @@ -21,7 +21,8 @@ import time import traceback import typing - +import multiprocessing +import base64 import bittensor as bt from compute import ( @@ -48,7 +49,14 @@ get_remote_version, ) from neurons.Miner.allocate import check_allocation, register_allocation, deregister_allocation, check_if_allocated -from neurons.Miner.container import build_check_container +from neurons.Miner.container import ( + build_check_container, + build_sample_container, + check_container, + kill_container, + restart_container, + exchange_key_container, +) from compute.wandb.wandb import ComputeWandb from neurons.Miner.allocate import check_allocation, register_allocation from neurons.Miner.pow import check_cuda_availability, run_miner_pow @@ -115,9 +123,6 @@ def __init__(self): self._wallet = bt.wallet(config=self.config) bt.logging.info(f"Wallet: {self.wallet}") - self.wandb = ComputeWandb(self.config, self.wallet, os.path.basename(__file__)) - self.wandb.update_specs() - # Subtensor manages the blockchain connection, facilitating interaction with the Bittensor blockchain. self._subtensor = ComputeSubnetSubtensor(config=self.config) bt.logging.info(f"Subtensor: {self.subtensor}") @@ -129,6 +134,10 @@ def __init__(self): build_check_container('my-compute-subnet','sn27-check-container') has_docker, msg = check_docker_availability() + # Build sample container image to speed up the allocation process + sample_docker = multiprocessing.Process(target=build_sample_container) + sample_docker.start() + if not has_docker: bt.logging.error(msg) exit(1) @@ -149,6 +158,29 @@ def __init__(self): self.sync_status() self.init_axon() + # Step 4: Initialize wandb + self.wandb = ComputeWandb(self.config, self.wallet, os.path.basename(__file__)) + self.wandb.update_specs() + + # check allocation status + file_path = 'allocation_key' + if os.path.exists(file_path): + # Open the file in read mode ('r') and read the data + with open(file_path, 'r') as file: + allocation_key_encoded = file.read() + + if not self.wandb.sync_allocated(self.wallet.hotkey.ss58_address) and allocation_key_encoded: + # Decode the base64-encoded public key from the file + public_key = base64.b64decode(allocation_key_encoded).decode('utf-8') + deregister_allocation(public_key) + self.wandb.update_allocated(None) + bt.logging.info("Allocation is not exist in wandb. Resetting the allocation status.") + + if check_container() and not allocation_key_encoded: + kill_container() + self.wandb.update_allocated(None) + bt.logging.info("Container is already running without allocated. Killing the container.") + self.request_specs_processor = RequestSpecsProcessor() self.last_updated_block = self.current_block - (self.current_block % 100) @@ -331,6 +363,10 @@ def allocate(self, synapse: Allocate) -> Allocate: timeline = synapse.timeline device_requirement = synapse.device_requirement checking = synapse.checking + docker_requirement = synapse.docker_requirement + docker_requirement['ssh_port'] = int(self.config.ssh.port) + docker_change = synapse.docker_change + docker_action = synapse.docker_action if checking is True: if timeline > 0: @@ -341,12 +377,25 @@ def allocate(self, synapse: Allocate) -> Allocate: result = check_if_allocated(public_key=public_key) synapse.output = result else: - public_key = synapse.public_key - if timeline > 0: - result = register_allocation(timeline, device_requirement, public_key) - synapse.output = result + if docker_change is True: + if docker_action['action'] == 'exchange_key': + public_key = synapse.public_key + new_ssh_key = docker_action['ssh_key'] + result = exchange_key_container(new_ssh_key) + synapse.output = result + elif docker_action['action'] == 'restart': + public_key = synapse.public_key + result = restart_container() + synapse.output = result + else: + bt.logging.info(f"Unknown action: {docker_action['action']}") else: - result = deregister_allocation(public_key) + public_key = synapse.public_key + if timeline > 0: + result = register_allocation(timeline, device_requirement, public_key, docker_requirement) + synapse.output = result + else: + result = deregister_allocation(public_key) synapse.output = result self.update_allocation(synapse) return synapse @@ -479,11 +528,11 @@ async def start(self): # Log chain data to wandb chain_data = { "Block": self.current_block, - "Stake": float(self.metagraph.S[self.miner_subnet_uid].numpy()), - "Trust": float(self.metagraph.T[self.miner_subnet_uid].numpy()), - "Consensus": float(self.metagraph.C[self.miner_subnet_uid].numpy()), - "Incentive": float(self.metagraph.I[self.miner_subnet_uid].numpy()), - "Emission": float(self.metagraph.E[self.miner_subnet_uid].numpy()), + "Stake": float(self.metagraph.S[self.miner_subnet_uid]), + "Trust": float(self.metagraph.T[self.miner_subnet_uid]), + "Consensus": float(self.metagraph.C[self.miner_subnet_uid]), + "Incentive": float(self.metagraph.I[self.miner_subnet_uid]), + "Emission": float(self.metagraph.E[self.miner_subnet_uid]), } self.wandb.log_chain_data(chain_data) diff --git a/neurons/register.py b/neurons/register.py index be821036..f895606e 100644 --- a/neurons/register.py +++ b/neurons/register.py @@ -400,11 +400,13 @@ def deallocate(wandb): timeout=60, ) if deregister_response and deregister_response["status"] is True: - update_allocation_db(result_hotkey, info, False) - update_allocation_wandb(wandb) print("Resource de-allocated successfully.") else: - print("De-allocation not successfull, please try again.") + print("No Response from axon server, Resource de-allocated successfully .") + + update_allocation_db(result_hotkey, info, False) + update_allocation_wandb(wandb) + else: print("No allocation details found for the provided hotkey.") diff --git a/neurons/register_api.py b/neurons/register_api.py new file mode 100644 index 00000000..629140e9 --- /dev/null +++ b/neurons/register_api.py @@ -0,0 +1,2420 @@ +# The MIT License (MIT) +# Copyright © 2023 Crazydevlegend +# Copyright © 2023 Rapiiidooo +# Copyright @ 2024 Skynet +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. +# +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. +# Step 1: Import necessary libraries and modules + +import argparse +import base64 +import os +import json +import bittensor as bt +import torch +import time +from datetime import datetime, timezone +import asyncio +import multiprocessing +import uuid +import requests +import socket +from urllib3.exceptions import InsecureRequestWarning +import urllib3 +urllib3.disable_warnings(InsecureRequestWarning) +from dotenv import load_dotenv + +# Import Compute Subnet Libraries +import RSAEncryption as rsa +from compute.axon import ComputeSubnetSubtensor +from compute.protocol import Allocate +from compute.utils.db import ComputeDb +from compute.utils.parser import ComputeArgPaser +from compute.wandb.wandb import ComputeWandb +from neurons.Validator.database.allocate import ( + select_allocate_miners_hotkey, + update_allocation_db, + get_miner_details, +) + +# Import FastAPI Libraries +import uvicorn +from fastapi import ( + FastAPI, + status, + Request, + WebSocket, + WebSocketDisconnect, +) +from fastapi.encoders import jsonable_encoder +from fastapi.responses import JSONResponse +from fastapi.exceptions import RequestValidationError +from fastapi.concurrency import run_in_threadpool +from pydantic import BaseModel, Field +from typing import Optional, Union, List + +# Constants +DEFAULT_SSL_MODE = 2 # 1 for client CERT optional, 2 for client CERT_REQUIRED +DEFAULT_API_PORT = 8903 # default port for the API +DATA_SYNC_PERIOD = 600 # metagraph resync time +ALLOCATE_CHECK_PERIOD = 300 # timeout check period +ALLOCATE_CHECK_COUNT = 6 # maximum timeout count +MAX_NOTIFY_RETRY = 3 # maximum notify count +NOTIFY_RETRY_PERIOD = 10 # notify retry interval +PUBLIC_WANDB_NAME = "opencompute" +PUBLIC_WANDB_ENTITY = "neuralinternet" + + +class UserConfig(BaseModel): + netuid: str = Field(default="15") + subtensor_network: str = Field(default="test", alias="subtensor.network") + subtensor_chain_endpoint: Union[str, None] = Field( + default="", alias="subtensor.chain_endpoint" + ) + wallet_name: str = Field(default="validator", alias="wallet.name") + wallet_hotkey: str = Field(default="default", alias="wallet.hotkey") + logging_debug: Union[str, None] = Field(default="", alias="logging.debug") + + +class DeviceRequirement(BaseModel): + cpu_count: int = Field(default=1, description="CPU count") + gpu_type: str = Field(default="gpu", description="GPU Name") + gpu_size: int = Field(default=3, description="GPU size in GB") + ram: int = Field(default=1, description="RAM size in GB") + hard_disk: int = Field(default=1, description="Hard disk size in GB") + timeline: int = Field(default=90, description="Rent Timeline in day") # timeline=90 day by spec, 30 day by hotkey + + +class Allocation(BaseModel): + resource: str = "" + hotkey: str = "" + regkey: str = "" + ssh_ip: str = "" + ssh_port: int = 4444 + ssh_username: str = "" + ssh_password: str = "" + ssh_command: str = "" + ssh_key: str = "" + uuid_key: str = "" + + +class DockerRequirement(BaseModel): + base_image: str = "ubuntu" + ssh_key: str = "" + volume_path: str = "/tmp" + dockerfile: str = "" + + +class UserInfo(BaseModel): + user_id: str = "" # wallet.hokey.ss58address + user_pass: str = "" # wallet.public_key hashed value + jwt_token: str = "" # jwt token + + +class ResourceGPU(BaseModel): + gpu_name: str = "" + gpu_capacity: int = 0 + gpu_count: int = 1 + + +class Resource(BaseModel): + hotkey: str = "" + cpu_count: int = 1 + gpu_name: str = "" + gpu_capacity: str = "" + gpu_count: int = 1 + ram: str = "0" + hard_disk: str = "0" + allocate_status: str = "" # "Avail." or "Res." + + +class Specs(BaseModel): + details: str = "" + + +class ResourceQuery(BaseModel): + gpu_name: Optional[str] = None + cpu_count_min: Optional[int] = None + cpu_count_max: Optional[int] = None + gpu_capacity_min: Optional[float] = None + gpu_capacity_max: Optional[float] = None + hard_disk_total_min: Optional[float] = None + hard_disk_total_max: Optional[float] = None + ram_total_min: Optional[float] = None + ram_total_max: Optional[float] = None + + +# Response Models +class SuccessResponse(BaseModel): + success: bool = True + message: str + data: Optional[dict] = None + + +class ErrorResponse(BaseModel): + success: bool = False + message: str + err_detail: Optional[str] = None + + +class RegisterAPI: + def __init__( + self, + config: Optional[bt.config] = None, + wallet: Optional[bt.wallet] = None, + subtensor: Optional[bt.subtensor] = None, + dendrite: Optional[bt.dendrite] = None, + metagraph: Optional[bt.metagraph] = None, + wandb: Optional[ComputeWandb] = None, + ): + + # Compose User Config Data with bittensor config + # Get the config from the user config + if config is None: + # Step 1: Parse the bittensor and compute subnet config + self.config = self._init_config() + + # Set up logging with the provided configuration and directory. + bt.logging.set_debug(self.config.logging.debug) + bt.logging.set_trace(self.config.logging.trace) + bt.logging(config=self.config, logging_dir=self.config.full_path) + bt.logging.info( + f"Running validator register for subnet: {self.config.netuid} " + f"on network: {self.config.subtensor.chain_endpoint} with config:") + + # Log the configuration for reference. + bt.logging.info(self.config) + bt.logging.info("Setting up bittensor objects.") + + # The wallet holds the cryptographic key pairs for the validator. + self.wallet = bt.wallet(config=self.config) + bt.logging.info(f"Wallet: {self.wallet}") + + self.wandb = ComputeWandb(self.config, self.wallet, "validator.py") + + # The subtensor is our connection to the Bittensor blockchain. + self.subtensor = ComputeSubnetSubtensor(config=self.config) + bt.logging.info(f"Subtensor: {self.subtensor}") + + # Dendrite is the RPC client; it lets us send messages to other nodes (axons) in the network. + self.dendrite = bt.dendrite(wallet=self.wallet) + bt.logging.info(f"Dendrite: {self.dendrite}") + + # The metagraph holds the state of the network, letting us know about other miners. + self.metagraph = self.subtensor.metagraph(self.config.netuid) + bt.logging.info(f"Metagraph: {self.metagraph}") + + # Set the IP address and port for the API server + if self.config.axon.ip == "[::]": + self.ip_addr = "0.0.0.0" + else: + self.ip_addr = self.config.axon.ip + + if self.config.axon.port is None: + self.port = DEFAULT_API_PORT + else: + self.port = self.config.axon.port + + else: + self.config = config.copy() + # Wallet is the keypair that lets us sign messages to the blockchain. + self.wallet = wallet + # The subtensor is our connection to the Bittensor blockchain. + self.subtensor = subtensor + # Dendrite is the RPC client; it lets us send messages to other nodes (axons) in the network. + self.dendrite = dendrite + # The metagraph holds the state of the network, letting us know about other miners. + self.metagraph = metagraph + # Initialize the W&B logging + self.wandb = wandb + + if self.config.axon.ip == "[::]": + self.ip_addr = "0.0.0.0" + else: + self.ip_addr = self.config.axon.ip + + if self.config.axon.port is None: + self.port = DEFAULT_API_PORT + else: + self.port = self.config.axon.port + + if self.config.logging.trace: + self.app = FastAPI(debug=False) + else: + self.app = FastAPI(debug=False, docs_url=None, redoc_url=None) + + load_dotenv() + self._setup_routes() + self.process = None + self.websocket_connection = None + self.allocation_table = [] + self.checking_allocated = [] + self.notify_retry_table = [] + self.deallocation_notify_url = os.getenv("DEALLOCATION_NOTIFY_URL") + self.status_notify_url = os.getenv("STATUS_NOTIFY_URL") + + def _setup_routes(self): + # Define a custom validation error handler + @self.app.exception_handler(RequestValidationError) + async def validation_exception_handler(request: Request, exc: RequestValidationError): + # Customize the error response + errors = exc.errors() + custom_errors = [{"field": err['loc'][-1], "message": err['msg']} for err in errors] + return JSONResponse( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + content={ + "success": False, + "message": "Validation Error, Please check the request body.", + "err_detail": custom_errors, + }, + ) + + @self.app.on_event("startup") + async def startup_event(): + """ + This function is called when the application starts.
+ It initializes the database connection and other necessary components.
+ """ + # Setup the repeated task + self.metagraph_task = asyncio.create_task(self._refresh_metagraph()) + self.allocate_check_task = asyncio.create_task(self._check_allocation()) + bt.logging.info(f"Register API server is started on https://{self.ip_addr}:{self.port}") + + @self.app.on_event("shutdown") + async def shutdown_event(): + """ + This function is called when the application stops.
+ """ + pass + + # Entry point for the API + @self.app.get("/", tags=["Root"]) + async def read_root(): + return { + "message": "Welcome to Compute Subnet Allocation API, Please access the API via endpoint." + } + + @self.app.websocket(path="/connect", name="websocket") + async def websocket_endpoint(websocket: WebSocket): + await websocket.accept() + self.websocket_connection = websocket + bt.logging.info("API: Websocket connection established") + while True: + try: + # data = await websocket.receive_text() + msg = { + "type": "keepalive", + "payload": { + "time": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ'), + } + } + await websocket.send_text(json.dumps(msg)) + await asyncio.sleep(30) + except WebSocketDisconnect: + bt.logging.info(f"API: Websocket connection closed") + await self.websocket_connection.close() + self.websocket_connection = None + break + + @self.app.post( + "/service/allocate_spec", + tags=["Allocation"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "Resource was successfully allocated", + }, + 400: { + "model": ErrorResponse, + "description": "Invalid allocation request", + }, + 401: { + "model": ErrorResponse, + "description": "Missing authorization", + }, + 404: { + "model": ErrorResponse, + "description": "Fail to allocate resource", + }, + 422: { + "model": ErrorResponse, + "description": "Validation Error, Please check the request body.", + }, + }, + ) + async def allocate_spec(requirements: DeviceRequirement, docker_requirement: DockerRequirement) -> JSONResponse: + """ + The GPU resource allocate API endpoint.
+ requirements: The GPU resource requirements which contain the GPU type, GPU size, ram, hard_disk + and booking timeline.
+ """ + # client_host = request.client.host + if requirements: + device_requirement = { + "cpu": {"count": requirements.cpu_count}, + "gpu": {}, + "hard_disk": {"capacity": requirements.hard_disk * 1024.0 ** 3}, + "ram": {"capacity": requirements.ram * 1024.0 ** 3}, + } + if requirements.gpu_type != "" and int(requirements.gpu_size) != 0: + device_requirement["gpu"] = { + "count": 1, + "capacity": int(requirements.gpu_size) * 1000, + "type": requirements.gpu_type, + } + + # Generate UUID + uuid_key = str(uuid.uuid1()) + + timeline = int(requirements.timeline) + private_key, public_key = rsa.generate_key_pair() + run_start = time.time() + result = await run_in_threadpool(self._allocate_container, device_requirement, + timeline, public_key, docker_requirement.dict()) + + if result["status"] is False: + bt.logging.info(f"API: Allocation Failed : {result['msg']}") + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "success": False, + "message": "Fail to allocate resource", + "err_detail": result["msg"], + }, + ) + + run_end = time.time() + time_eval = run_end - run_start + # bt.logging.info(f"API: Create docker container in: {run_end - run_start:.2f} seconds") + + result_hotkey = result["hotkey"] + result_info = result["info"] + private_key = private_key.encode("utf-8") + decrypted_info_str = rsa.decrypt_data( + private_key, base64.b64decode(result_info) + ) + + # Iterate through the miner specs details to get gpu_name + db = ComputeDb() + specs_details = await run_in_threadpool(get_miner_details, db) + db.close() + + for key, details in specs_details.items(): + if str(key) == str(result_hotkey) and details: + try: + gpu_miner = details["gpu"] + gpu_name = str( + gpu_miner["details"][0]["name"] + ).lower() + break + except (KeyError, IndexError, TypeError): + gpu_name = "Invalid details" + else: + gpu_name = "No details available" + + info = json.loads(decrypted_info_str) + info["ip"] = result["ip"] + info["resource"] = gpu_name + info["regkey"] = public_key + info["ssh_key"] = docker_requirement.ssh_key + info["uuid"] = uuid_key + + await asyncio.sleep(1) + + allocated = Allocation() + allocated.resource = info["resource"] + allocated.hotkey = result_hotkey + # allocated.regkey = info["regkey"] + allocated.ssh_key = info["ssh_key"] + allocated.ssh_ip = info["ip"] + allocated.ssh_port = info["port"] + allocated.ssh_username = info["username"] + allocated.ssh_password = info["password"] + allocated.uuid_key = info["uuid"] + allocated.ssh_command = f"ssh {info['username']}@{result['ip']} -p {str(info['port'])}" + + update_allocation_db(result_hotkey, info, True) + await self._update_allocation_wandb() + bt.logging.info(f"API: Resource {result_hotkey} was successfully allocated") + + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "Resource was successfully allocated", + "data": jsonable_encoder(allocated), + }, + ) + + else: + bt.logging.error(f"API: Invalid allocation request") + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "success": False, + "message": "Invalid allocation request", + "err_detail": "Invalid requirement, please check the requirements", + }, + ) + + @self.app.post( + "/service/allocate_hotkey", + tags=["Allocation"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "Resource was successfully allocated", + }, + 400: { + "model": ErrorResponse, + "description": "Invalid allocation request", + }, + 401: { + "model": ErrorResponse, + "description": "Missing authorization", + }, + 404: { + "model": ErrorResponse, + "description": "Fail to allocate resource", + }, + 422: { + "model": ErrorResponse, + "description": "Validation Error, Please check the request body.", + }, + }, + ) + async def allocate_hotkey(hotkey: str, ssh_key: Optional[str] = None, + docker_requirement: Optional[DockerRequirement] = None) -> JSONResponse: + """ + The GPU allocate by hotkey API endpoint.
+ User use this API to book a specific miner.
+ hotkey: The miner hotkey to allocate the gpu resource.
+ """ + if hotkey: + # client_host = request.client.host + requirements = DeviceRequirement() + requirements.gpu_type = "" + requirements.gpu_size = 0 + requirements.timeline = 30 + + # Generate UUID + uuid_key = str(uuid.uuid1()) + + private_key, public_key = rsa.generate_key_pair() + if ssh_key: + if docker_requirement is None: + docker_requirement = DockerRequirement() + docker_requirement.ssh_key = ssh_key + else: + docker_requirement.ssh_key = ssh_key + + run_start = time.time() + result = await run_in_threadpool(self._allocate_container_hotkey, requirements, hotkey, + requirements.timeline, public_key, docker_requirement.dict()) + if result["status"] is False: + bt.logging.error(f"API: Allocation {hotkey} Failed : {result['msg']}") + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "success": False, + "message": "Fail to allocate resource", + "err_detail": result["msg"], + }, + ) + + run_end = time.time() + time_eval = run_end - run_start + # bt.logging.info(f"API: Create docker container in: {run_end - run_start:.2f} seconds") + + # Iterate through the miner specs details to get gpu_name + db = ComputeDb() + specs_details = await run_in_threadpool(get_miner_details, db) + for key, details in specs_details.items(): + if str(key) == str(hotkey) and details: + try: + gpu_miner = details["gpu"] + gpu_name = str(gpu_miner["details"][0]["name"]).lower() + break + except (KeyError, IndexError, TypeError): + gpu_name = "Invalid details" + else: + gpu_name = "No details available" + + result_hotkey = result["hotkey"] + result_info = result["info"] + private_key = private_key.encode("utf-8") + decrypted_info_str = rsa.decrypt_data( + private_key, base64.b64decode(result_info) + ) + + info = json.loads(decrypted_info_str) + info["ip"] = result["ip"] + info["resource"] = gpu_name + info["regkey"] = public_key + info["ssh_key"] = docker_requirement.ssh_key + info["uuid"] = uuid_key + + await asyncio.sleep(1) + allocated = Allocation() + allocated.resource = info["resource"] + allocated.hotkey = result_hotkey + allocated.ssh_key = info["ssh_key"] + # allocated.regkey = info["regkey"] + allocated.ssh_ip = info["ip"] + allocated.ssh_port = info["port"] + allocated.ssh_username = info["username"] + allocated.ssh_password = info["password"] + allocated.uuid_key = info["uuid"] + allocated.ssh_command = f"ssh {info['username']}@{result['ip']} -p {str(info['port'])}" + + update_allocation_db(result_hotkey, info, True) + await self._update_allocation_wandb() + + bt.logging.info(f"API: Resource {allocated.hotkey} was successfully allocated") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "Resource was successfully allocated", + "data": jsonable_encoder(allocated), + }, + ) + else: + bt.logging.error(f"API: Invalid allocation request") + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "success": False, + "message": "Invalid allocation request", + "err_detail": "Invalid hotkey, please check the hotkey", + }, + ) + + @self.app.post( + "/service/deallocate", + tags=["Allocation"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "Resource deallocated successfully.", + }, + 400: { + "model": ErrorResponse, + "description": "Deallocation not successfully, please try again.", + }, + 401: {"model": ErrorResponse, "description": "Missing authorization"}, + 403: { + "model": ErrorResponse, + "description": "An error occurred during de-allocation", + }, + 404: { + "model": ErrorResponse, + "description": "No allocation details found for the provided hotkey.", + }, + 422: { + "model": ErrorResponse, + "description": "Validation Error, Please check the request body.", + }, + }, + ) + async def deallocate(hotkey: str, uuid_key: str, request: Request, notify_flag: bool = False) -> JSONResponse: + """ + The GPU deallocate API endpoint.
+ hotkey: The miner hotkey to deallocate the gpu resource.
+ """ + client_host = request.client.host + # Instantiate the connection to the db + db = ComputeDb() + cursor = db.get_cursor() + + try: + # Retrieve the allocation details for the given hotkey + cursor.execute( + "SELECT details, hotkey FROM allocation WHERE hotkey = ?", + (hotkey,), + ) + row = cursor.fetchone() + + if row: + # Parse the JSON string in the 'details' column + info = json.loads(row[0]) + result_hotkey = row[1] + + username = info["username"] + password = info["password"] + port = info["port"] + ip = info["ip"] + regkey = info["regkey"] + uuid_key_db = info["uuid"] + + if uuid_key_db == uuid_key: + index = self.metagraph.hotkeys.index(hotkey) + axon = self.metagraph.axons[index] + run_start = time.time() + retry_count = 0 + + while retry_count < MAX_NOTIFY_RETRY: + allocate_class = Allocate(timeline=0, device_requirement={}, checking=False, public_key=regkey) + deregister_response = await run_in_threadpool( + self.dendrite.query, axon, allocate_class, timeout=60 + ) + run_end = time.time() + time_eval = run_end - run_start + # bt.logging.info(f"API: Stop docker container in: {run_end - run_start:.2f} seconds") + + if deregister_response and deregister_response["status"] is True: + bt.logging.info(f"API: Resource {hotkey} deallocated successfully") + break + else: + retry_count += 1 + bt.logging.info(f"API: Resource {hotkey} no response to deallocated signal - retry {retry_count}") + await asyncio.sleep(1) + + if retry_count == MAX_NOTIFY_RETRY: + bt.logging.error(f"API: Resource {hotkey} deallocated successfully without response.") + + deallocated_at = datetime.now(timezone.utc) + update_allocation_db(result_hotkey, info, False) + await self._update_allocation_wandb() + + # Notify the deallocation event when the client is localhost + if notify_flag: + response = await self._notify_allocation_status( + event_time=deallocated_at, + hotkey=hotkey, + uuid=uuid_key, + event="DEALLOCATION", + details=f"deallocate trigger via API interface" + ) + + if response: + bt.logging.info(f"API: Notify deallocation event is success on {hotkey} ") + else: + bt.logging.info(f"API: Notify deallocation event is failed on {hotkey} ") + self.notify_retry_table.append( + { + "deallocated_at": deallocated_at, + "hotkey": hotkey, + "uuid": uuid_key, + "event": "DEALLOCATION", + "details": "deallocate trigger via API interface" + } + ) + + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "Resource deallocated successfully.", + }, + ) + else: + bt.logging.error(f"API: Invalid UUID key for {hotkey}") + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "success": False, + "message": "Deallocation not successfully, please try again.", + "err_detail": "Invalid UUID key", + }, + ) + + else: + bt.logging.info(f"API: No allocation details found for the provided hotkey") + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "success": False, + "message": "No allocation details found for the provided hotkey.", + "err_detail": "No allocation details found for the provided hotkey.", + }, + ) + except Exception as e: + bt.logging.error(f"API: An error occurred during deallocation {e.__repr__()}") + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "success": False, + "message": "An error occurred during deallocation.", + "err_detail": e.__repr__(), + }, + ) + finally: + cursor.close() + db.close() + + @self.app.post( + path="/service/check_miner_status", + tags=["Allocation"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "Resource deallocated successfully.", + }, + 403: { + "model": ErrorResponse, + "description": "An error occurred while retrieving hotkey status.", + }, + } + ) + async def check_miner_status(hotkey_list: List[str]) -> JSONResponse: + checking_list = [] + for hotkey in hotkey_list: + checking_result = { + "hotkey": hotkey, + "status": "Not Found" + } + for axon in self.metagraph.axons: + if axon.hotkey == hotkey: + try: + register_response = await run_in_threadpool(self.dendrite.query, + axon, Allocate(timeline=1, checking=True, ), + timeout=60) + if register_response: + if register_response["status"] is True: + checking_result = {"hotkey": hotkey, "status": "Docker OFFLINE"} + else: + checking_result = {"hotkey": hotkey, "status": "Docker ONLINE"} + else: + checking_result = {"hotkey": hotkey, "status": "Miner NO_RESPONSE"} + except Exception as e: + bt.logging.error( + f"API: An error occur during the : {e}" + ) + checking_result = {"hotkey": hotkey, "status": "Unknown"} + checking_list.append(checking_result) + + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "List hotkey status successfully.", + "data": jsonable_encoder(checking_list), + }, + ) + + @self.app.post(path="/service/restart_docker", + tags=["Allocation"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "Resource restart successfully.", + }, + 403: { + "model": ErrorResponse, + "description": "An error occurred while restarting docker.", + }, + }) + async def restart_docker(hotkey: str, uuid_key: str) -> JSONResponse: + # Instantiate the connection to the db + db = ComputeDb() + cursor = db.get_cursor() + + try: + # Retrieve the allocation details for the given hotkey + cursor.execute( + "SELECT details, hotkey FROM allocation WHERE hotkey = ?", + (hotkey,), + ) + row = cursor.fetchone() + + if row: + # Parse the JSON string in the 'details' column + info = json.loads(row[0]) + result_hotkey = row[1] + + username = info["username"] + password = info["password"] + port = info["port"] + ip = info["ip"] + regkey = info["regkey"] + uuid_key_db = info["uuid"] + + docker_action = { + "action": "restart", + "ssh_key": "", + } + + if uuid_key_db == uuid_key: + index = self.metagraph.hotkeys.index(hotkey) + axon = self.metagraph.axons[index] + run_start = time.time() + allocate_class = Allocate(timeline=0, device_requirement={}, checking=False, public_key=regkey, + docker_change=True, docker_action=docker_action) + response = await run_in_threadpool( + self.dendrite.query, axon, allocate_class, timeout=60 + ) + run_end = time.time() + time_eval = run_end - run_start + # bt.logging.info(f"API: Stop docker container in: {run_end - run_start:.2f} seconds") + + if response and response["status"] is True: + bt.logging.info(f"API: Resource {hotkey} docker restart successfully") + else: + bt.logging.error(f"API: Resource {hotkey} docker restart without response.") + + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "Resource restarted successfully.", + }, + ) + else: + bt.logging.error(f"API: Invalid UUID key for {hotkey}") + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "success": False, + "message": "Restart not successfully, please try again.", + "err_detail": "Invalid UUID key", + }, + ) + + else: + bt.logging.info(f"API: No allocation details found for the provided hotkey") + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "success": False, + "message": "No allocation details found for the provided hotkey.", + "err_detail": "No allocation details found for the provided hotkey.", + }, + ) + except Exception as e: + bt.logging.error(f"API: An error occurred during restart operation {e.__repr__()}") + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "success": False, + "message": "An error occurred during restart operation.", + "err_detail": e.__repr__(), + }, + ) + finally: + cursor.close() + db.close() + + + @self.app.post("/service/exchange_docker_key", + tags=["Allocation"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "Resource ssh_key was changed successfully.", + }, + 403: { + "model": ErrorResponse, + "description": "An error occurred while exchanging docker key.", + }, + }) + async def exchange_docker_key(hotkey: str, uuid_key: str, ssh_key: str) -> JSONResponse: + # Instantiate the connection to the db + db = ComputeDb() + cursor = db.get_cursor() + + try: + # Retrieve the allocation details for the given hotkey + cursor.execute( + "SELECT details, hotkey FROM allocation WHERE hotkey = ?", + (hotkey,), + ) + row = cursor.fetchone() + + if row: + # Parse the JSON string in the 'details' column + info = json.loads(row[0]) + result_hotkey = row[1] + + username = info["username"] + password = info["password"] + port = info["port"] + ip = info["ip"] + regkey = info["regkey"] + uuid_key_db = info["uuid"] + + docker_action = { + "action": "exchange_key", + "ssh_key": ssh_key, + } + + if uuid_key_db == uuid_key: + index = self.metagraph.hotkeys.index(hotkey) + axon = self.metagraph.axons[index] + run_start = time.time() + allocate_class = Allocate(timeline=0, device_requirement={}, checking=False, public_key=regkey, + docker_change=True, docker_action=docker_action) + response = await run_in_threadpool( + self.dendrite.query, axon, allocate_class, timeout=60 + ) + run_end = time.time() + time_eval = run_end - run_start + # bt.logging.info(f"API: Stop docker container in: {run_end - run_start:.2f} seconds") + + if response and response["status"] is True: + bt.logging.info(f"API: Resource {hotkey} docker ssh_key exchange successfully") + else: + bt.logging.error(f"API: Resource {hotkey} docker ssh_key exchange without response.") + + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "Resource ssh_key is exchanged successfully.", + }, + ) + else: + bt.logging.error(f"API: Invalid UUID key for {hotkey}") + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "success": False, + "message": "Exchange ssh_key not successfully, please try again.", + "err_detail": "Invalid UUID key", + }, + ) + + else: + bt.logging.info(f"API: No allocation details found for the provided hotkey") + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "success": False, + "message": "No allocation details found for the provided hotkey.", + "err_detail": "No allocation details found for the provided hotkey.", + }, + ) + except Exception as e: + bt.logging.error(f"API: An error occurred during exchange ssh_key operation {e.__repr__()}") + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "success": False, + "message": "An error occurred during exchange ssh_key operation.", + "err_detail": e.__repr__(), + }, + ) + finally: + cursor.close() + db.close() + + @self.app.post( + "/list/allocations_sql", + tags=["SQLite"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "List allocations successfully.", + }, + 401: { + "model": ErrorResponse, + "description": "Missing authorization token", + }, + 403: { + "model": ErrorResponse, + "description": "An error occurred while retrieving allocation details", + }, + 404: { + "model": ErrorResponse, + "description": "There is no allocation available", + }, + 422: { + "model": ErrorResponse, + "description": "Validation Error, Please check the request body.", + }, + }, + ) + async def list_allocations() -> JSONResponse: + """ + The list allocation API endpoint.
+ The API will return the current allocation on the validator.
+ """ + db = ComputeDb() + cursor = db.get_cursor() + allocation_list = [] + + try: + # Retrieve all records from the allocation table + cursor.execute("SELECT id, hotkey, details FROM allocation") + rows = cursor.fetchall() + + bt.logging.info(f"API: List Allocation Resources") + + if not rows: + bt.logging.info( + f"API: No resources allocated. Allocate a resource with validator" + ) + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "success": False, + "message": "No resources found.", + "data": "No allocated resources found. Allocate a resource with validator.", + }, + ) + + for row in rows: + id, hotkey, details = row + info = json.loads(details) + entry = Allocation() + + entry.hotkey = hotkey + # entry.regkey = info["regkey"] + entry.resource = info["resource"] + entry.ssh_username = info["username"] + entry.ssh_password = info["password"] + entry.ssh_port = info["port"] + entry.ssh_ip = info["ip"] + entry.ssh_command = ( + f"ssh {info['username']}@{info['ip']} -p {info['port']}" + ) + entry.uuid_key = info["uuid"] + entry.ssh_key = info["ssh_key"] + allocation_list.append(entry) + + except Exception as e: + bt.logging.error( + f"API: An error occurred while retrieving allocation details: {e}" + ) + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "success": False, + "message": "An error occurred while retrieving allocation details.", + "err_detail": e.__repr__(), + }, + ) + finally: + cursor.close() + db.close() + + bt.logging.info(f"API: List allocations successfully") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "List allocations successfully.", + "data": jsonable_encoder(allocation_list), + }, + ) + + @self.app.post( + "/list/resources_sql", + tags=["SQLite"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "List resources successfully.", + }, + 401: {"model": ErrorResponse, "description": "Missing authorization"}, + 404: { + "model": ErrorResponse, + "description": "There is no resource available", + }, + 422: { + "model": ErrorResponse, + "description": "Validation Error, Please check the request body.", + }, + }, + ) + async def list_resources(query: ResourceQuery = None, + stats: bool = False, + page_size: Optional[int] = None, + page_number: Optional[int] = None) -> JSONResponse: + """ + The list resources API endpoint.
+ The API will return the current miner resource and their detail specs on the validator.
+ query: The query parameter to filter the resources.
+ """ + db = ComputeDb() + specs_details = await run_in_threadpool(get_miner_details, db) + bt.logging.info(f"API: List resources on compute subnet") + + # check wandb for available hotkeys + # self.wandb.api.flush() + running_hotkey = [] + filter_rule = { + "$and": [ + {"config.config.netuid": self.config.netuid}, + {"config.role": "miner"}, + {"state": "running"}, + ] + } + runs = await run_in_threadpool(self.wandb.api.runs, + f"{PUBLIC_WANDB_ENTITY}/{PUBLIC_WANDB_NAME}", filter_rule) + for run in runs: + run_config = run.config + run_hotkey = run_config.get("hotkey") + running_hotkey.append(run_hotkey) + + # Initialize a dictionary to keep track of GPU instances + resource_list = [] + gpu_instances = {} + total_gpu_counts = {} + + # Get the allocated hotkeys from wandb + allocated_hotkeys = await run_in_threadpool(self.wandb.get_allocated_hotkeys, [], False) + + if specs_details: + # Iterate through the miner specs details and print the table + for hotkey, details in specs_details.items(): + if hotkey in running_hotkey: + if details: # Check if details are not empty + resource = Resource() + try: + # Extract GPU details + gpu_miner = details["gpu"] + gpu_capacity = "{:.2f}".format( + (gpu_miner["capacity"] / 1024) + ) + gpu_name = str(gpu_miner["details"][0]["name"]).lower() + gpu_count = gpu_miner["count"] + + # Extract CPU details + cpu_miner = details["cpu"] + cpu_count = cpu_miner["count"] + + # Extract RAM details + ram_miner = details["ram"] + ram = "{:.2f}".format( + ram_miner["available"] / 1024.0 ** 3 + ) + + # Extract Hard Disk details + hard_disk_miner = details["hard_disk"] + hard_disk = "{:.2f}".format( + hard_disk_miner["free"] / 1024.0 ** 3 + ) + + # Update the GPU instances count + gpu_key = (gpu_name, gpu_count) + gpu_instances[gpu_key] = ( + gpu_instances.get(gpu_key, 0) + 1 + ) + total_gpu_counts[gpu_name] = ( + total_gpu_counts.get(gpu_name, 0) + gpu_count + ) + + except (KeyError, IndexError, TypeError): + gpu_name = "Invalid details" + gpu_capacity = "N/A" + gpu_count = "N/A" + cpu_count = "N/A" + ram = "N/A" + hard_disk = "N/A" + else: + gpu_name = "No details available" + gpu_capacity = "N/A" + gpu_count = "N/A" + cpu_count = "N/A" + ram = "N/A" + hard_disk = "N/A" + + # Allocation status + # allocate_status = "N/A" + + if hotkey in allocated_hotkeys: + allocate_status = "reserved" + else: + allocate_status = "available" + + add_resource = False + # Print the row with column separators + resource.hotkey = hotkey + + try: + if gpu_name != "Invalid details" and gpu_name != "No details available": + if query is None or query == {}: + add_resource = True + else: + if query.gpu_name is not None and query.gpu_name not in gpu_name: + continue + if (query.gpu_capacity_max is not None and + float(gpu_capacity) > query.gpu_capacity_max): + continue + if (query.gpu_capacity_min is not None and + float(gpu_capacity) < query.gpu_capacity_min): + continue + if (query.cpu_count_max is not None and + int(cpu_count) > query.cpu_count_max): + continue + if (query.cpu_count_min is not None and + int(cpu_count) < query.cpu_count_min): + continue + if (query.ram_total_max is not None and + float(ram) > query.ram_total_max): + continue + if (query.ram_total_min is not None and + float(ram) < query.ram_total_min): + continue + if (query.hard_disk_total_max is not None and + float(hard_disk) > query.hard_disk_total_max): + continue + if (query.hard_disk_total_min is not None and + float(hard_disk) < query.hard_disk_total_min): + continue + add_resource = True + + if add_resource: + resource.cpu_count = int(cpu_count) + resource.gpu_name = gpu_name + resource.gpu_capacity = float(gpu_capacity) + resource.gpu_count = int(gpu_count) + resource.ram = float(ram) + resource.hard_disk = float(hard_disk) + resource.allocate_status = allocate_status + resource_list.append(resource) + except (KeyError, IndexError, TypeError, ValueError) as e: + bt.logging.error(f"API: Error occurred while filtering resources: {e}") + continue + + if stats: + status_counts = {"available": 0, "reserved": 0, "total": 0} + try: + for item in resource_list: + status_code = item.dict()["allocate_status"] + if status_code in status_counts: + status_counts[status_code] += 1 + status_counts["total"] += 1 + except Exception as e: + bt.logging.error(f"API: Error occurred while counting status: {e}") + status_counts = {"available": 0, "reserved": 0, "total": 0} + + bt.logging.info(f"API: List resources successfully") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "List resources successfully", + "data": jsonable_encoder({"stats": status_counts}), + }, + ) + else: + if page_number: + page_size = page_size if page_size else 50 + result = self._paginate_list(resource_list, page_number, page_size) + else: + result = { + "page_items": resource_list, + "page_number": 1, + "page_size": len(resource_list), + "next_page_number": None, + } + + bt.logging.info(f"API: List resources successfully") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "List resources successfully", + "data": jsonable_encoder(result), + }, + ) + + else: + bt.logging.info(f"API: There is no resource available") + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "success": False, + "message": "There is no resource available", + "err_detail": "No resources found.", + }, + ) + + @self.app.post("/list/all_runs", + tags=["WandB"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "List run resources successfully.", + }, + 401: {"model": ErrorResponse, "description": "Missing authorization"}, + 404: { + "model": ErrorResponse, + "description": "Error occurred while getting runs from wandb", + }, + 422: { + "model": ErrorResponse, + "description": "Validation Error, Please check the request body.", + }, + } + ) + async def list_all_runs(hotkey: Optional[str] = None, page_size: Optional[int] = None, + page_number: Optional[int] = None) -> JSONResponse: + """ + This function gets all run resources. + """ + db_list = [] + try: + # self.wandb.api.flush() + if hotkey: + filter_rule = { + "$and": [ + {"config.config.netuid": self.config.netuid}, + {"config.hotkey": hotkey}, + {"state": "running"}, + ] + } + else: + filter_rule = { + "$and": [ + {"config.config.netuid": self.config.netuid}, + {"state": "running"}, + ] + } + runs = await run_in_threadpool(self.wandb.api.runs, + f"{PUBLIC_WANDB_ENTITY}/{PUBLIC_WANDB_NAME}", filter_rule) + + if runs: + # Iterate over all runs in the opencompute project + for index, run in enumerate(runs, start=1): + # Access the run's configuration + run_id = run.id + run_name = run.name + run_description = run.description + run_config = run.config + run_state = run.state + # run_start_at = datetime.strptime(run.created_at, '%Y-%m-%dT%H:%M:%S') + configs = run_config.get("config") + append_entry = True + + # append the data to the db_list + if configs and append_entry: + db_specs_dict = {index: { + "id": run_id, + "name": run_name, + "description": run_description, + "configs": configs, + "state": run_state, + "start_at": run.created_at + }} + db_list.append(db_specs_dict) + + if page_number: + page_size = page_size if page_size else 50 + result = self._paginate_list(db_list, page_number, page_size) + else: + result = { + "page_items": db_list, + "page_number": 1, + "page_size": len(db_list), + "next_page_number": None, + } + + bt.logging.info(f"API: List run resources successfully") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "List run resources successfully.", + "data": jsonable_encoder(result), + }, + ) + + else: + bt.logging.info(f"API: no runs available") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "No runs available", + "data": {}, + }, + ) + + except Exception as e: + # Handle the exception by logging an error message + bt.logging.error(f"API: An error occurred while getting specs from wandb: {e}") + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "success": False, + "message": "Error occurred while getting runs from wandb", + "err_detail": e.__repr__(), + }, + ) + + @self.app.post( + "/list/specs", + tags=["WandB"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "List spec resources successfully.", + }, + 401: {"model": ErrorResponse, "description": "Missing authorization"}, + 404: { + "model": ErrorResponse, + "description": "Error occurred while getting specs from wandb", + }, + 422: { + "model": ErrorResponse, + "description": "Validation Error, Please check the request body.", + }, + }, + ) + async def list_specs(hotkey: Optional[str] = None, + page_size: Optional[int] = None, + page_number: Optional[int] = None) -> JSONResponse: + """ + The list specs API endpoint.
+ """ + db_list = [] + + try: + # self.wandb.api.flush() + if hotkey: + filter_rule = { + "$and": [ + {"config.role": "miner"}, + {"config.config.netuid": self.config.netuid}, + {"state": "running"}, + {"config.hotkey": hotkey}, + {"config.specs": {"$exists": True}}, + ] + } + else: + filter_rule = { + "$and": [ + {"config.role": "miner"}, + {"config.config.netuid": self.config.netuid}, + {"config.specs": {"$exists": True}}, + {"state": "running"}, + ] + } + + runs = await run_in_threadpool(self.wandb.api.runs, + f"{PUBLIC_WANDB_ENTITY}/{PUBLIC_WANDB_NAME}", filter_rule) + + if runs: + # Iterate over all runs in the opencompute project + for index, run in enumerate(runs, start=1): + # Access the run's configuration + run_config = run.config + run_state = run.state + hotkey = run_config.get("hotkey") + specs = run_config.get("specs") + configs = run_config.get("config") + + # check the signature + if hotkey and specs: + db_specs_dict = {index: {"hotkey": hotkey, "configs": configs, + "specs": specs, "state": run_state}} + db_list.append(db_specs_dict) + + if page_number: + page_size = page_size if page_size else 50 + result = self._paginate_list(db_list, page_number, page_size) + else: + result = { + "page_items": db_list, + "page_number": 1, + "page_size": len(db_list), + "next_page_number": None, + } + + # Return the db_specs_dict for further use or inspection + bt.logging.info(f"API: List specs successfully") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "List specs successfully", + "data": jsonable_encoder(result), + }, + ) + + else: + bt.logging.info(f"API: no specs available") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "No specs available", + "data": {}, + }, + ) + + except Exception as e: + # Handle the exception by logging an error message + bt.logging.error( + f"API: An error occurred while getting specs from wandb: {e}" + ) + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "success": False, + "message": "Error occurred while getting specs from wandb", + "err_detail": e.__repr__(), + }, + ) + + @self.app.post("/list/run_by_name", + tags=["WandB"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "List run resources successfully.", + }, + 401: {"model": ErrorResponse, "description": "Missing authorization"}, + 404: { + "model": ErrorResponse, + "description": "Error occurred while getting run from wandb", + }, + 422: { + "model": ErrorResponse, + "description": "Validation Error, Please check the request body.", + }, + } + ) + async def list_run_name(run_name: str) -> JSONResponse: + """ + This function gets runs by name. + """ + db_specs_dict = {} + try: + # self.wandb.api.flush() + filter_rule = { + "$and": [ + {"config.config.netuid": self.config.netuid}, + {"display_name": run_name}, + {"state": "running"}, + ] + } + + runs = await run_in_threadpool(self.wandb.api.runs, + f"{PUBLIC_WANDB_ENTITY}/{PUBLIC_WANDB_NAME}", filter_rule) + + if runs: + # Iterate over all runs in the opencompute project + for index, run in enumerate(runs, start=1): + # Access the run's configuration + run_id = run.id + run_name = run.name + run_description = run.description + run_config = run.config + hotkey = run_config.get("hotkey") + configs = run_config.get("config") + + # check the signature + if hotkey and configs: + db_specs_dict[index] = {"id": run_id, "name": run_name, "description": run_description, + "config": configs} + + bt.logging.info(f"API: list run by name is success") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "List run by name", + "data": jsonable_encoder(db_specs_dict), + }, + ) + + else: + bt.logging.info(f"API: no run available") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "No run available", + "data": {}, + }, + ) + + except Exception as e: + # Handle the exception by logging an error message + bt.logging.error(f"API: An error occurred while getting specs from wandb: {e}") + + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "success": False, + "message": "Error occurred while run from wandb", + "err_detail": e.__repr__(), + }, + ) + + @self.app.post("/list/available", + tags=["WandB"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "List available resources successfully.", + }, + 401: {"model": ErrorResponse, "description": "Missing authorization"}, + 404: { + "model": ErrorResponse, + "description": "Error occurred while fetch available miner from wandb", + }, + 422: { + "model": ErrorResponse, + "description": "Validation Error, Please check the request body.", + }, + } + ) + async def list_available_miner(rent_status: bool = False, + page_size: Optional[int] = None, + page_number: Optional[int] = None) -> JSONResponse: + """ + This function gets all available miners. + """ + db_list = [] + try: + self.wandb.api.flush() + if rent_status: + filter_rule = { + "config.allocated": {"$regex": "\\d.*"}, + "config.config.netuid": self.config.netuid, + "config.role": "miner", + "state": "running", + } + else: + filter_rule = { + "$or": [ + {"config.allocated": {"$regex": "null"}}, + {"config.allocated": {"$exists": False}}, + ], + "config.config.netuid": self.config.netuid, + "config.role": "miner", + "state": "running", + } + + runs = await run_in_threadpool(self.wandb.api.runs, + f"{PUBLIC_WANDB_ENTITY}/{PUBLIC_WANDB_NAME}", filter_rule) + + if runs: + # Iterate over all runs in the opencompute project + for index, run in enumerate(runs, start=1): + # Access the run's configuration + run_config = run.config + hotkey = run_config.get("hotkey") + specs = run.config.get("specs") + configs = run_config.get("config") + + # check the signature + if hotkey and configs: + db_specs_dict = {index: {"hotkey": hotkey, "details": specs}} + db_list.append(db_specs_dict) + + if page_number: + page_size = page_size if page_size else 50 + result = self._paginate_list(db_list, page_number, page_size) + else: + result = { + "page_items": db_list, + "page_number": 1, + "page_size": len(db_list), + "next_page_number": None, + } + else: + bt.logging.info(f"API: No available miners") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "No available miner", + "data": {}, + }, + ) + + if rent_status: + bt.logging.info(f"API: List rented miners is success") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "List rented miners", + "data": jsonable_encoder(result), + }, + ) + else: + bt.logging.info(f"API: List available miners is success") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "List available miners", + "data": jsonable_encoder(result), + }, + ) + + except Exception as e: + # Handle the exception by logging an error message + bt.logging.error(f"API: An error occurred while fetching available miner from wandb: {e}") + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "success": False, + "message": "Error occurred while fetching available miner from wandb", + "err_detail": e.__repr__(), + }, + ) + + @self.app.post("/list/allocated_hotkeys", + tags=["WandB"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "List available resources successfully.", + }, + 401: {"model": ErrorResponse, "description": "Missing authorization"}, + 404: { + "model": ErrorResponse, + "description": "Error occurred while fetch allocated hotkey from wandb", + }, + 422: { + "model": ErrorResponse, + "description": "Validation Error, Please check the request body.", + }, + } + ) + async def list_allocated_hotkeys() -> JSONResponse: + """ + This function gets all allocated hotkeys from all validators. + Only relevant for validators. + """ + try: + self.wandb.api.flush() + filter_rule = { + "$and": [ + {"config.role": "validator"}, + {"config.config.netuid": self.config.netuid}, + {"config.allocated_hotkeys": {"$regex": "\\d.*"}}, + {"state": "running"}, + ] + } + + # Query all runs in the project and Filter runs where the role is 'validator' + validator_runs = await run_in_threadpool(self.wandb.api.runs, + f"{PUBLIC_WANDB_ENTITY}/{PUBLIC_WANDB_NAME}", filter_rule) + + # Check if the runs list is empty + if not validator_runs: + bt.logging.info(f"API: No validator with allocated info in the project opencompute.") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "No validator with allocated info in the project opencompute.", + "data": {}, + }, + ) + + except Exception as e: + bt.logging.error(f"API: list_allocated_hotkeys error with {e.__repr__()}") + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "success": False, + "message": "Error occurred while fetching allocated hotkey from wandb", + "err_detail": e.__repr__(), + }, + ) + + # Initialize an empty list to store allocated keys from runs with a valid signature + allocated_keys_list = [] + + # Verify the signature for each validator run + for run in validator_runs: + try: + # Access the run's configuration + run_config = run.config + # hotkey = run_config.get("hotkey") + allocated_keys = run_config.get("allocated_hotkeys") + # id = run_config.get("id") + # name = run_config.get("name") + + # valid_validator_hotkey = hotkey in valid_validator_hotkeys + # Allow all validator hotkeys for data retrieval only + # if verify_run(id,name, hotkey, allocated_keys) and allocated_keys and valid_validator_hotkey: + allocated_keys_list.extend(allocated_keys) # Add the keys to the list + + except Exception as e: + bt.logging.error(f"API: Run ID: {run.id}, Name: {run.name}, Error: {e}") + + bt.logging.info(f"API: List allocated hotkeys is success") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "List allocated hotkeys", + "data": jsonable_encoder(allocated_keys_list), + }, + ) + + @self.app.post("/test/notify", + tags=["Testing"], + response_model=SuccessResponse | ErrorResponse, + responses={ + 200: { + "model": SuccessResponse, + "description": "Notify allocation event testing is success", + }, + 400: { + "model": ErrorResponse, + "description": "Notify allocation event testing is failed", + }, + 422: { + "model": ErrorResponse, + "description": "Validation Error, Please check the request body.", + }, + } + ) + async def test_notify(hotkey: str = None, uuid_key: str = None, event: str = None) -> JSONResponse: + """ + This function is used to test the notification system. + """ + try: + if not hotkey: + hotkey = "test_hotkey" + if not uuid_key: + uuid_key = str(uuid.uuid1()) + if not event: + event = "DEALLOCATION" + # Notify the allocation event + response = await self._notify_allocation_status( + event_time=datetime.now(timezone.utc), + hotkey=hotkey, + uuid=uuid_key, + event=event, + details="test notify event message", + ) + + if response: + bt.logging.info(f"API: Notify allocation event testing is success") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "success": True, + "message": "Notify allocation event testing is success", + }, + ) + else: + bt.logging.error(f"API: Notify allocation event testing is failed") + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "success": False, + "message": "Notify allocation event testing is failed", + }, + ) + + except Exception as e: + bt.logging.error(f"API: An error occurred while testing notify: {e}") + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "success": False, + "message": "Error occurred while testing notify", + "err_detail": e.__repr__(), + }, + ) + + + @staticmethod + def _init_config(): + """ + This function is responsible for setting up and parsing command-line arguments. + :return: config + """ + parser = ComputeArgPaser(description="This script aims to help allocation with the compute subnet.") + config = parser.config + + # Step 3: Set up logging directory + # Logging is crucial for monitoring and debugging purposes. + config.full_path = os.path.expanduser( + "{}/{}/{}/netuid{}/{}/{}/".format( + config.logging.logging_dir, + config.wallet.name, + config.wallet.hotkey, + config.netuid, + "validator", + "register" + ) + ) + # Ensure the logging directory exists. + if not os.path.exists(config.full_path): + os.makedirs(config.full_path, exist_ok=True) + + # Return the parsed config. + return config + + @staticmethod + def _get_config(user_config: UserConfig, requirements: Union[DeviceRequirement, None] = None): + """ + Get the config from user config and spec requirement for the API.
+ user_config: The user configuration which contain the validator's hotkey and wallet information.
+ requirements: The device requirements.
+ """ + parser = argparse.ArgumentParser() + # Adds bittensor specific arguments + parser.add_argument( + "--netuid", type=int, default=27, help="The chain subnet uid." + ) + # parser.add_argument("--gpu_type", type=str, help="The GPU type.") + # parser.add_argument("--gpu_size", type=int, help="The GPU memory in MB.") + # parser.add_argument("--timeline", type=int, help="The allocation timeline.") + bt.subtensor.add_args(parser) + bt.logging.add_args(parser) + bt.wallet.add_args(parser) + + if not user_config.subtensor_chain_endpoint: + if user_config.subtensor_network == "finney": + user_config.subtensor_chain_endpoint = ( + "wss://entrypoint-finney.opentensor.ai:443" + ) + elif user_config.subtensor_network == "test": + user_config.subtensor_chain_endpoint = ( + "wss://test.finney.opentensor.ai:443" + ) + + # Add user configuration and requirement to list for the bt config parser + args_list = [] + for entry in [user_config, requirements]: + if entry: + for k, v in entry.__fields__.items(): + args_list.append(f"--{v.alias}") + args_list.append(getattr(entry, k)) + + # Parse the initial config to check for provided arguments + config = bt.config(parser=parser, args=args_list) + + # Set up logging directory + config.full_path = os.path.expanduser( + "{}/{}/{}/netuid{}/{}".format( + config.logging.logging_dir, + config.wallet.name, + config.wallet.hotkey, + config.netuid, + "validator", + ) + ) + if not os.path.exists(config.full_path): + os.makedirs(config.full_path, exist_ok=True) + + return config + + def _allocate_container(self, device_requirement, timeline, public_key, docker_requirement: dict): + """ + Allocate the container with the given device requirement.
+ """ + # Generate ssh connection for given device requirements and timeline + # Instantiate the connection to the db + db = ComputeDb() + + # Find out the candidates + candidates_hotkey = select_allocate_miners_hotkey(db, device_requirement) + + axon_candidates = [] + for axon in self.metagraph.axons: + if axon.hotkey in candidates_hotkey: + axon_candidates.append(axon) + + responses = self.dendrite.query( + axon_candidates, + Allocate( + timeline=timeline, device_requirement=device_requirement, checking=True + ), + ) + + final_candidates_hotkey = [] + + for index, response in enumerate(responses): + hotkey = axon_candidates[index].hotkey + if response and response["status"] is True: + final_candidates_hotkey.append(hotkey) + + # Check if there are candidates + if len(final_candidates_hotkey) <= 0: + return {"status": False, "msg": "Requested resource is not available."} + + # Sort the candidates with their score + scores = torch.ones_like(self.metagraph.S, dtype=torch.float32) + + score_dict = { + hotkey: score + for hotkey, score in zip( + [axon.hotkey for axon in self.metagraph.axons], scores + ) + } + sorted_hotkeys = sorted( + final_candidates_hotkey, + key=lambda hotkey: score_dict.get(hotkey, 0), + reverse=True, + ) + + # Loop the sorted candidates and check if one can allocate the device + for hotkey in sorted_hotkeys: + index = self.metagraph.hotkeys.index(hotkey) + axon = self.metagraph.axons[index] + register_response = self.dendrite.query( + axon, + Allocate( + timeline=timeline, + device_requirement=device_requirement, + checking=False, + public_key=public_key, + docker_requirement=docker_requirement, + ), + timeout=100, + ) + if register_response and register_response["status"] is True: + register_response["ip"] = axon.ip + register_response["hotkey"] = axon.hotkey + return register_response + + # Close the db connection + db.close() + + return {"status": False, "msg": "Requested resource is not available."} + + def _allocate_container_hotkey(self, requirements, hotkey, timeline, public_key, docker_requirement: dict): + """ + Allocate the container with the given hotkey.
+ Generate ssh connection for given device requirements and timeline.
+ """ + device_requirement = {"cpu": {"count": 1}, "gpu": { + "count": 1, + "capacity": int(requirements.gpu_size) * 1000, + "type": requirements.gpu_type, + }, "hard_disk": {"capacity": 1073741824}, "ram": {"capacity": 1073741824}} + + # Instantiate the connection to the db + for axon in self.metagraph.axons: + if axon.hotkey == hotkey: + check_allocation = self.dendrite.query( + axon, + Allocate( + timeline=timeline, + device_requirement=device_requirement, + checking=True, + ), + timeout=60, + ) + if check_allocation and check_allocation["status"] is True: + register_response = self.dendrite.query( + axon, + Allocate( + timeline=timeline, + device_requirement=device_requirement, + checking=False, + public_key=public_key, + docker_requirement=docker_requirement, + ), + timeout=100, + ) + if register_response and register_response["status"] is True: + register_response["ip"] = axon.ip + register_response["hotkey"] = axon.hotkey + return register_response + + return {"status": False, "msg": "Requested resource is not available."} + + async def _update_allocation_wandb(self, ): + """ + Update the allocated hotkeys in wandb.
+ """ + hotkey_list = [] + + # Instantiate the connection to the db + db = ComputeDb() + cursor = db.get_cursor() + + try: + # Retrieve all records from the allocation table + cursor.execute("SELECT id, hotkey, details FROM allocation") + rows = cursor.fetchall() + + for row in rows: + id, hotkey, details = row + hotkey_list.append(hotkey) + + except Exception as e: + print(f"An error occurred while retrieving allocation details: {e}") + finally: + cursor.close() + db.close() + try: + await run_in_threadpool(self.wandb.update_allocated_hotkeys, hotkey_list) + except Exception as e: + bt.logging.info(f"API: Error updating wandb : {e}") + return + + async def _refresh_metagraph(self): + """ + Refresh the metagraph by resync_period.
+ """ + while True: + if self.metagraph: + self.metagraph.sync(lite=True, subtensor=self.subtensor) + # bt.logging.info(f"API: Metagraph refreshed") + await asyncio.sleep(DATA_SYNC_PERIOD) + + async def _refresh_allocation(self): + """ + Refresh the allocation by resync_period.
+ """ + while True: + self.allocation_table = self.wandb.get_allocated_hotkeys([], False) + bt.logging.info(f"API: Allocation refreshed: {self.allocation_table}") + await asyncio.sleep(DATA_SYNC_PERIOD) + + + async def _notify_allocation_status(self, event_time: datetime, hotkey: str, + uuid: str, event: str, details: str | None = ""): + """ + Notify the allocation by hotkey and status.
+ """ + headers = { + 'accept': '*/*', + 'Content-Type': 'application/json', + } + if event == "DEALLOCATION": + msg = { + "time": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ'), + "deallocated_at": event_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ'), + "hotkey": hotkey, + "status": event, + "uuid": uuid, + } + notify_url = self.deallocation_notify_url + elif event == "OFFLINE" or event == "ONLINE": + msg = { + "time": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ'), + "status_change_at": event_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ'), + "hotkey": hotkey, + "status": event, + "uuid": uuid, + } + notify_url = self.status_notify_url + + retries = 0 + while retries < MAX_NOTIFY_RETRY: + try: + # Send the POST request + data = json.dumps(msg) + response = await run_in_threadpool( + requests.post, notify_url, headers=headers, data=data, timeout=3, json=True, verify=False, + cert=("cert/server.cer", "cert/server.key"), + ) + # Check for the expected ACK in the response + if response.status_code == 200 or response.status_code == 201: + response_data = response.json() + # if response_data.get("status") == "success": # ACK response + # return response + # else: + # print(f"ACK not received, response: {response_data}") + # bt.logging.info(f"API: Notify success with {hotkey} status code: " + # {response.status_code}, response: {response.text}") + return response_data + else: + bt.logging.info(f"API: Notify failed with {hotkey} status code: " + f"{response.status_code}, response: {response.text}") + return None + + except requests.exceptions.RequestException as e: + bt.logging.info(f"API: Notify {hotkey} failed: {e}") + + # Increment the retry counter and wait before retrying + retries += 1 + await asyncio.sleep(NOTIFY_RETRY_PERIOD) + + return None + + async def _check_allocation(self): + """ + Check the allocation by resync_period.
+ """ + while True: + db = ComputeDb() + cursor = db.get_cursor() + try: + # Retrieve all records from the allocation table + cursor.execute("SELECT id, hotkey, details FROM allocation") + rows = cursor.fetchall() + for row in rows: + id, hotkey, details = row + info = json.loads(details) + + index = self.metagraph.hotkeys.index(hotkey) + axon = self.metagraph.axons[index] + uuid_key = info.get("uuid") + + register_response = await run_in_threadpool(self.dendrite.query, axon, + Allocate(timeline=1, checking=True, ), timeout=60) + if register_response and register_response["status"] is False: + + if hotkey in self.checking_allocated: + response = await self._notify_allocation_status( + event_time=deallocated_at, + hotkey=hotkey, + uuid=uuid_key, + event="ONLINE", + details=f"GPU Resume for {ALLOCATE_CHECK_PERIOD} seconds" + ) + self.checking_allocated = [x for x in self.checking_allocated if x != hotkey] + + # bt.logging.info(f"API: Allocation is still running for hotkey: {hotkey}") + else: + # handle the case when no response is received or the docker is not running + self.checking_allocated.append(hotkey) + # bt.logging.info(f"API: No response timeout is triggered for hotkey: {hotkey}") + deallocated_at = datetime.now(timezone.utc) + response = await self._notify_allocation_status( + event_time=deallocated_at, + hotkey=hotkey, + uuid=uuid_key, + event="OFFLINE", + details=f"No response timeout for {ALLOCATE_CHECK_PERIOD} seconds" + ) + if not response: + pass + + if self.checking_allocated.count(hotkey) >= ALLOCATE_CHECK_COUNT: + deallocated_at = datetime.now(timezone.utc) + # update the allocation table + update_allocation_db(hotkey, info, False) + await self._update_allocation_wandb() + response = await self._notify_allocation_status( + event_time=deallocated_at, + hotkey=hotkey, + uuid=uuid_key, + event="DEALLOCATION", + details=f"No response timeout for {ALLOCATE_CHECK_COUNT} times" + ) + bt.logging.info(f"API: deallocate event triggered due to {hotkey} " + f"is timeout for {ALLOCATE_CHECK_COUNT} times") + + # remove the hotkey from checking table + if not response: + self.notify_retry_table.append({"event_time": deallocated_at, + "hotkey": hotkey, + "uuid": uuid_key, + "event": "DEALLOCATION", + "details": "Retry deallocation notify event triggered"}) + + for entry in self.notify_retry_table: + response = await self._notify_allocation_status(event_time=entry["event_time"], + hotkey=entry["hotkey"], + uuid=entry["uuid"], + event=entry["event"], + details=entry["details"]) + if response: + self.notify_retry_table.remove(entry) + bt.logging.info(f"API: Notify {entry['event']} retry event is success on {entry['hotkey']} ") + else: + bt.logging.info(f"API: Notify {entry['event']} retry event is failed on {entry['hotkey']} ") + + except Exception as e: + bt.logging.error(f"API: Error occurred while checking allocation: {e}") + finally: + # bt.logging.info(f"API: Allocation checking triggered") + await asyncio.sleep(ALLOCATE_CHECK_PERIOD) + + @staticmethod + def _paginate_list(items, page_number, page_size): + # Calculate the start and end indices of the items on the current page + start_index = (page_number - 1) * page_size + end_index = start_index + page_size + + # Get the items on the current page + page_items = items[start_index:end_index] + + # Determine if there are more pages + has_next_page = end_index < len(items) + next_page_number = page_number + 1 if has_next_page else None + + return { + "page_items": page_items, + "page_number": page_number, + "page_size": page_size, + "next_page_number": next_page_number + } + + @staticmethod + def check_port_open(host, port, hotkey): + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(1) # Set a timeout for the connection attempt + result = sock.connect_ex((host, port)) + if result == 0: + bt.logging.info(f"API: Port {port} on {host} is open for {hotkey}") + return True + else: + bt.logging.info(f"API: Port {port} on {host} is closed for {hotkey}") + return False + except socket.gaierror: + bt.logging.warning(f"API: Hostname {host} could not be resolved") + return False + except socket.error: + bt.logging.error(f"API: Couldn't connect to server {host}") + return False + + def run(self): + """ + Run the FastAPI app.
+ """ + if os.path.exists("cert/ca.cer") and os.path.exists("cert/server.key") and os.path.exists("cert/server.cer"): + uvicorn.run( + self.app, + host=self.ip_addr, + port=self.port, + log_level="critical", + ssl_keyfile="cert/server.key", + ssl_certfile="cert/server.cer", + ssl_cert_reqs=DEFAULT_SSL_MODE, # 1 for client CERT optional, 2 for client CERT_REQUIRED + ssl_ca_certs="cert/ca.cer", + ) + else: + bt.logging.error(f"API: No SSL certificate found, please generate one with /cert/gen_ca.sh") + exit(1) + + def start(self): + """ + Start the FastAPI app in the process.
+ """ + self.process = multiprocessing.Process( + target=self.run, args=(), daemon=True + ).start() + + def stop(self): + """ + Stop the FastAPI app in the process.
+ """ + if self.process: + self.process.terminate() + self.process.join() + + +# Run the FastAPI app +if __name__ == "__main__": + os.environ["WANDB_SILENT"] = "true" + register_app = RegisterAPI() + register_app.run() diff --git a/neurons/validator.py b/neurons/validator.py index 31a43331..e09b12b7 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -158,8 +158,6 @@ def __init__(self): self._wallet = bt.wallet(config=self.config) bt.logging.info(f"Wallet: {self.wallet}") - self.wandb = ComputeWandb(self.config, self.wallet, os.path.basename(__file__)) - # The subtensor is our connection to the Bittensor blockchain. self._subtensor = ComputeSubnetSubtensor(config=self.config) bt.logging.info(f"Subtensor: {self.subtensor}") @@ -176,6 +174,9 @@ def __init__(self): self.db = ComputeDb() self.miners: dict = select_miners(self.db) + # Initialize wandb + self.wandb = ComputeWandb(self.config, self.wallet, os.path.basename(__file__)) + # Step 3: Set up initial scoring weights for validation bt.logging.info("Building validation weights.") self.uids: list = self.metagraph.uids.tolist() @@ -185,6 +186,9 @@ def __init__(self): self.last_updated_block = self.current_block - (self.current_block % 100) + # Initialize allocated_hotkeys as an empty list + self.allocated_hotkeys = [] + # Init the thread. self.lock = threading.Lock() self.threads: List[threading.Thread] = [] @@ -269,7 +273,7 @@ def sync_scores(self): self.stats = select_challenge_stats(self.db) # Fetch allocated hotkeys - allocated_hotkeys = self.wandb.get_allocated_hotkeys(self.get_valid_validator_hotkeys(), True) + self.allocated_hotkeys = self.wandb.get_allocated_hotkeys(self.get_valid_validator_hotkeys(), True) # Fetch docker requirement has_docker_hotkeys = select_has_docker_miners_hotkey(self.db) @@ -288,7 +292,14 @@ def sync_scores(self): else: self.stats[uid]["has_docker"] = False - score = calc_score(self.stats[uid], hotkey=hotkey, allocated_hotkeys=allocated_hotkeys) + # Find the maximum score of all uids excluding allocated uids + max_score_uids = max( + self.stats[uid]["score"] + for uid in self.stats + if "score" in self.stats[uid] and self.stats[uid].get("ss58_address") not in self.allocated_hotkeys + ) + + score = calc_score(self.stats[uid], hotkey=hotkey, allocated_hotkeys=self.allocated_hotkeys, max_score_uid=max_score_uids) self.stats[uid]["score"] = score except (ValueError, KeyError): score = 0 @@ -678,6 +689,8 @@ async def start(self): for _uid in self.uids[i : i + self.validator_challenge_batch_size]: try: axon = self._queryable_uids[_uid] + if axon.hotkey in self.allocated_hotkeys: + continue difficulty = self.calc_difficulty(_uid) password, _hash, _salt, mode, chars, mask = run_validator_pow(length=difficulty) self.pow_requests[_uid] = (password, _hash, _salt, mode, chars, mask, difficulty) @@ -733,10 +746,10 @@ async def start(self): # Log chain data to wandb chain_data = { "Block": self.current_block, - "Stake": float(self.metagraph.S[self.validator_subnet_uid].numpy()), - "Rank": float(self.metagraph.R[self.validator_subnet_uid].numpy()), - "vTrust": float(self.metagraph.validator_trust[self.validator_subnet_uid].numpy()), - "Emission": float(self.metagraph.E[self.validator_subnet_uid].numpy()), + "Stake": float(self.metagraph.S[self.validator_subnet_uid]), + "Rank": float(self.metagraph.R[self.validator_subnet_uid]), + "vTrust": float(self.metagraph.validator_trust[self.validator_subnet_uid]), + "Emission": float(self.metagraph.E[self.validator_subnet_uid]), } self.wandb.log_chain_data(chain_data) diff --git a/requirements.txt b/requirements.txt index 92f39c57..8bac0209 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,4 +12,5 @@ torch==2.1.2 wandb==0.16.6 pyfiglet==1.0.2 python-dotenv==1.0.1 +requests==2.31.0 diff --git a/test-scripts/benchmark.py b/test-scripts/benchmark.py index bfba3640..66d06ec0 100644 --- a/test-scripts/benchmark.py +++ b/test-scripts/benchmark.py @@ -3,6 +3,7 @@ # Copyright © 2023 Crazydevlegend # Copyright © 2023 GitPhantomman # Copyright © 2024 Andrew O'Flaherty +# Copyright © 2024 Thomas Chu # # Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated # documentation files (the “Software”), to deal in the Software without restriction, including without limitation @@ -30,31 +31,46 @@ import subprocess import time from typing import List, Union, Tuple +import threading import bittensor as bt from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa +import base64 +import struct challenges_solved = {} challenge_solve_durations = {} challenge_totals = {} +challenge_allocated = {} min_diff = compute.pow_min_difficulty max_diff = compute.pow_max_difficulty +pow_quantity = 3 + +pow_mode_list = ["610", "8900", "1410", "10810", "1710", "7801", "19500"] +pow_mode_blake2b = "610" +pow_mode_scrypt = "8900" +pow_mode_sha256 = "1410" +pow_mode_sha384 = "10810" +pow_mode_sha512 = "1710" +pow_mode_sap = "7801" +pow_mode_ruby = "19500" + class Challenge: """Store challenge object properties.""" - def __init__(self, - _hash: str = "", - salt: str = "", - mode: str = "", - chars: str = "", - mask: str = "", - difficulty: int = min_diff, - run_id: str = "", - ): + def __init__(self, + _hash: str = "", + salt: str = "", + mode: str = "", + chars: str = "", + mask: str = "", + difficulty: int = min_diff, + run_id: str = "", + ): self._hash = _hash self.salt = salt self.mode = mode @@ -93,16 +109,17 @@ def check_docker_availability() -> Tuple[bool, str]: try: # Run 'docker --version' command - result = subprocess.run(["docker", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) + result = subprocess.run(["docker", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, + check=True) # If the command was successful, Docker is installed docker_version = result.stdout.strip() - + if check_docker_container('sn27-benchmark-container') is True: return True, docker_version else: error_message = "Docker is installed, but is unable to create or run a container. Please verify your system's permissions." return False, error_message - + except Exception as e: # Catch all exceptions # If the command failed, Docker is not installed error_message = ( @@ -111,23 +128,23 @@ def check_docker_availability() -> Tuple[bool, str]: "Note: running a miner within containerized instances is not supported." ) return False, error_message - + def check_docker_container(container_id_or_name: str): """Confirm the benchmark container can be created and returns the correct output in its logs.""" try: # Start the container - subprocess.run(["docker", "start", container_id_or_name], - check=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL) + subprocess.run(["docker", "start", container_id_or_name], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) # Wait for the container to finish running subprocess.run(["docker", "wait", container_id_or_name], - check=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL) - + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + # Get the logs from the container logs_result = subprocess.run( ["docker", "logs", container_id_or_name], @@ -146,7 +163,7 @@ def check_docker_container(container_id_or_name: str): except subprocess.CalledProcessError as e: # Handle errors from the Docker CLI return False - + def check_cuda_availability(): """Verify the number of available CUDA devices (Nvidia GPUs)""" @@ -158,14 +175,114 @@ def check_cuda_availability(): else: print(Fore.RED + "CUDA is not available or not properly configured on this system.") -def gen_hash(password, salt=None): - """Generate the hash and salt for a challenge.""" - salt = secrets.token_hex(8) if salt is None else salt - salted_password = password + salt - data = salted_password.encode("utf-8") - hash_result = hashlib.blake2b(data).hexdigest() - return f"$BLAKE2${hash_result}", salt +def random_numeric_string(length): + numbers = '0123456789' + return ''.join([numbers[random.randint(0, 9)] for _ in range(length)]) + + +def gen_hash(password, salt=None, mode: str = compute.pow_default_mode): + """ + Generate the hash and salt for a challenge. + :param password: The password to hash. + :param salt: The salt to use for the hash. + :param mode: The hashcat mode to use for the hash. + :return: The hash and salt for the challenge. + """ + if mode == pow_mode_scrypt: # For Scrypt + salt = secrets.token_bytes(24) if salt is None else base64.b64decode(salt.encode("utf-8")) + password_bytes = password.encode('ascii') + hashed_password = hashlib.scrypt(password_bytes, salt=salt, n=1024, r=1, p=1, dklen=32) + hash_result = str(base64.b64encode(hashed_password).decode('utf-8')) + salt = str(base64.b64encode(salt).decode('utf-8')) + return f"SCRYPT:1024:1:1:{hash_result}", salt + elif mode== pow_mode_blake2b or mode== pow_mode_sha256 or mode== pow_mode_sha384 or mode== pow_mode_sha512: # For Blake2b-512, SHA-256, SHA-384, SHA-512 + salt = secrets.token_hex(8) if salt is None else salt + salted_password = password + salt + data = salted_password.encode("utf-8") + padding = "" + if mode == pow_mode_blake2b: + hash_result = hashlib.blake2b(data).hexdigest() + padding = "$BLAKE2$" + elif mode == pow_mode_sha256: + hash_result = hashlib.sha256(data).hexdigest() + elif mode == pow_mode_sha384: + hash_result = hashlib.sha384(data).hexdigest() + elif mode == pow_mode_sha512: + hash_result = hashlib.sha512(data).hexdigest() + return f"{padding}{hash_result}", salt + elif mode == pow_mode_sap: # For SAP CODVN F/G (PASSCODE) + if not salt: + salt = random_numeric_string(8) + + theMagicArray_s = ( + b"\x91\xac\x51\x14\x9f\x67\x54\x43\x24\xe7\x3b\xe0\x28\x74\x7b\xc2" + b"\x86\x33\x13\xeb\x5a\x4f\xcb\x5c\x08\x0a\x73\x37\x0e\x5d\x1c\x2f" + b"\x33\x8f\xe6\xe5\xf8\x9b\xae\xdd\x16\xf2\x4b\x8d\x2c\xe1\xd4\xdc" + b"\xb0\xcb\xdf\x9d\xd4\x70\x6d\x17\xf9\x4d\x42\x3f\x9b\x1b\x11\x94" + b"\x9f\x5b\xc1\x9b\x06\x05\x9d\x03\x9d\x5e\x13\x8a\x1e\x9a\x6a\xe8" + b"\xd9\x7c\x14\x17\x58\xc7\x2a\xf6\xa1\x99\x63\x0a\xd7\xfd\x70\xc3" + b"\xf6\x5e\x74\x13\x03\xc9\x0b\x04\x26\x98\xf7\x26\x8a\x92\x93\x25" + b"\xb0\xa2\x0d\x23\xed\x63\x79\x6d\x13\x32\xfa\x3c\x35\x02\x9a\xa3" + b"\xb3\xdd\x8e\x0a\x24\xbf\x51\xc3\x7c\xcd\x55\x9f\x37\xaf\x94\x4c" + b"\x29\x08\x52\x82\xb2\x3b\x4e\x37\x9f\x17\x07\x91\x11\x3b\xfd\xcd" + ) + + salt = salt.upper() + word_salt = (password + salt).encode('utf-8') + digest = hashlib.sha1(word_salt).digest() + + a, b, c, d, e = struct.unpack("IIIII", digest) + + length_magic_array = 0x20 + offset_magic_array = 0 + + length_magic_array += ((a >> 0) & 0xff) % 6 + length_magic_array += ((a >> 8) & 0xff) % 6 + length_magic_array += ((a >> 16) & 0xff) % 6 + length_magic_array += ((a >> 24) & 0xff) % 6 + length_magic_array += ((b >> 0) & 0xff) % 6 + length_magic_array += ((b >> 8) & 0xff) % 6 + length_magic_array += ((b >> 16) & 0xff) % 6 + length_magic_array += ((b >> 24) & 0xff) % 6 + length_magic_array += ((c >> 0) & 0xff) % 6 + length_magic_array += ((c >> 8) & 0xff) % 6 + offset_magic_array += ((c >> 16) & 0xff) % 8 + offset_magic_array += ((c >> 24) & 0xff) % 8 + offset_magic_array += ((d >> 0) & 0xff) % 8 + offset_magic_array += ((d >> 8) & 0xff) % 8 + offset_magic_array += ((d >> 16) & 0xff) % 8 + offset_magic_array += ((d >> 24) & 0xff) % 8 + offset_magic_array += ((e >> 0) & 0xff) % 8 + offset_magic_array += ((e >> 8) & 0xff) % 8 + offset_magic_array += ((e >> 16) & 0xff) % 8 + offset_magic_array += ((e >> 24) & 0xff) % 8 + + hash_str = (password.encode('utf-8') + + theMagicArray_s[offset_magic_array:offset_magic_array + length_magic_array] + + salt.encode('utf-8')) + + hash_buf = hashlib.sha1(hash_str).hexdigest() + hash_val = salt + "$" + hash_buf.upper()[:20] + "0" * 20 + return hash_val, salt + + elif mode == pow_mode_ruby: # For Ruby on Rails Restful-Authentication + if not salt: + salt = random_numeric_string(12) + site_key = random_numeric_string(12) + # Construct the base string with separators + base_string = f"{site_key}--{salt}--{password}--{site_key}" + # Apply SHA-1 iteratively for 10 rounds (including the initial one) + digest = hashlib.sha1(base_string.encode('utf-8')).hexdigest() + for _ in range(9): + digest = hashlib.sha1(f"{digest}--{salt}--{password}--{site_key}".encode('utf-8')).hexdigest() + # Format the final hash string + return f"{digest}", f"{salt}:{site_key}" + + else: + bt.logging.error("Not recognized hash mode") + return + def gen_hash_password(available_chars=compute.pow_default_chars, length=min_diff): """Generate a random string to be used as the challenge hash password.""" @@ -183,32 +300,38 @@ def gen_hash_password(available_chars=compute.pow_default_chars, length=min_diff random.seed(seed) return "".join(random.choice(available_chars) for _ in range(length)) -def gen_challenge_details(available_chars=compute.pow_default_chars, length=min_diff): +def gen_challenge_details(available_chars=compute.pow_default_chars, length=min_diff, mode: str = compute.pow_default_mode): """Generate the hashing details for a challenge.""" try: password = gen_hash_password(available_chars=available_chars, length=length) _mask = "".join(["?1" for _ in range(length)]) - _hash, _salt = gen_hash(password) + _hash, _salt = gen_hash(password=password, mode=mode) return password, _hash, _salt, _mask except Exception as e: print(f"Error during PoW generation (gen_challenge_details): {e}") return None def gen_challenge( - mode = compute.pow_default_mode, - length = min_diff, - run_id: str = "" + length=min_diff, + run_id: str = "", + device_list: List[str] = [], + random_challenge: str = "N", ) -> Challenge: """Generate a challenge from a given hashcat mode, difficulty, and identifier.""" - + challenge = Challenge() + if random_challenge=="Y" or random_challenge=="y": + challenge.mode = random.choice(pow_mode_list) + else: + challenge.mode = compute.pow_default_mode available_chars = compute.pow_default_chars available_chars = list(available_chars) random.shuffle(available_chars) available_chars = "".join(available_chars) - password, challenge._hash, challenge.salt, challenge.mask = gen_challenge_details(available_chars=available_chars[:10], length=length) - challenge.mode, challenge.chars, challenge.difficulty, challenge.run_id = mode, available_chars[:10], length, run_id + password, challenge._hash, challenge.salt, challenge.mask = gen_challenge_details( + available_chars=available_chars[:10], length=length, mode=challenge.mode) + challenge.chars, challenge.difficulty, challenge.run_id = available_chars[:10], length, run_id return challenge def hashcat_verify(_hash, output) -> Union[str, None]: @@ -220,13 +343,19 @@ def hashcat_verify(_hash, output) -> Union[str, None]: return None def run_hashcat( - challenges: List[Challenge], - timeout: int = compute.pow_timeout, - hashcat_path: str = compute.miner_hashcat_location, - hashcat_workload_profile: str = "3", - hashcat_extended_options: str = "", -) -> bool : + challenges: List[Challenge], + timeout: int = compute.pow_timeout, + hashcat_path: str = compute.miner_hashcat_location, + hashcat_workload_profile: str = "3", + hashcat_extended_options: str = "", + device_list: List[str] = [], + run_sequence: bool = False, + +) -> bool: """Solve a list of challenges and output the results.""" + threading_list = [] + max_device_id = len(device_list) + device_id = 1 for challenge in challenges: _hash = challenge._hash @@ -237,83 +366,168 @@ def run_hashcat( run_id = challenge.run_id difficulty = challenge.difficulty - unknown_error_message = f"Difficulty {difficulty} challenge ID {run_id}: ❌ run_hashcat execution failed" - start_time = time.time() - - try: - - command = [ - hashcat_path, - f"{_hash}:{salt}", - "-a", - "3", - "-D", - "2", - "--session", - f"{run_id}", - "-m", - mode, - "-1", - str(chars), - mask, - "-w", - hashcat_workload_profile, - hashcat_extended_options, - ] - - process = subprocess.run( - command, - capture_output=True, - text=True, - timeout=30, - ) - - execution_time = time.time() - start_time - - if process.returncode == 0: - if process.stdout: - result = hashcat_verify(_hash, process.stdout) - bt.logging.success( - f"Difficulty {difficulty} challenge ID {run_id}: ✅ Result {result} found in {execution_time:0.2f} seconds !" - ) - - if difficulty in challenges_solved: - challenges_solved[difficulty] += 1 - challenge_solve_durations[difficulty] += execution_time - else: - challenges_solved[difficulty] = 1 - challenge_solve_durations[difficulty] = execution_time - continue + bt.logging.info(f"Running hash:{_hash} with id:{run_id} with mode:{mode} on #{device_id} GPU ") + if device_id in challenge_allocated: + challenge_allocated[device_id] += 1 + else: + challenge_allocated[device_id] = 1 + + if run_sequence: + # hashcat thread function + threading_list.append( + threading.Thread(target=hashcat_thread, args=(difficulty, hashcat_path, _hash, salt, run_id, mode, + chars, mask, hashcat_workload_profile, + hashcat_extended_options, device_id))) + if device_id < max_device_id: + device_id += 1 else: - error_message = f"Difficulty {difficulty} challenge ID {run_id}: ❌ Hashcat execution failed with code {process.returncode}: {process.stderr}" - bt.logging.warning(error_message) - continue + device_id = 1 + else: + hashcat_thread(difficulty, hashcat_path, _hash, salt, run_id, mode, chars, mask, hashcat_workload_profile, + hashcat_extended_options, device_id) - except subprocess.TimeoutExpired: - #execution_time = time.time() - start_time - error_message = f"Difficulty {difficulty} challenge ID {run_id}: ❌ Hashcat execution timed out" - bt.logging.warning(error_message) - continue + if run_sequence: + for task in threading_list: + task.start() - except Exception as e: - #execution_time = time.time() - start_time - bt.logging.warning(f"{unknown_error_message}: {e}") - continue + for task in threading_list: + task.join() - bt.logging.warning(f"{unknown_error_message}: no exceptions") -def format_difficulties(text: str = "") -> List[str]: +def format_difficulties(text: str = "") -> List[int]: """Format the challenge difficulty input text.""" text = text.replace(" ", ",") text = text.replace(" ", ",") text = text.replace(",,", ",") + # Use ":" to easy generate multiple challenges of the same difficulty + if ":" in text: + return [int(text.split(":")[0]) for _ in range(int(text.split(":")[1]))] + if text.lower() == "all" or not text: return list(range(min_diff, max_diff + 1, 1)) else: return [int(x) for x in text.split(",")] if "," in text else [int(text)] + +# get the list of cuda devices +def get_cuda_device_list() -> List: + device_list = [] + command = ["nvidia-smi", "--query-gpu=gpu_name,gpu_bus_id,vbios_version,memory.total", "--format=csv"] + result = subprocess.run(command, capture_output=True, text=True).stdout + device_list = [item for item in result.split("\n")[1:-1]] + print(f"Found Devices:",device_list) + return device_list + +# Hashcat thread to handle send multiple hashcat commands to the GPU devices +def hashcat_thread(difficulty: int, hashcat_path: str, _hash: str, salt: str, run_id: str, mode: str, chars: str, mask: str, + hashcat_workload_profile: str, hashcat_extended_options: str, device_id: int): + start_time = time.time() + unknown_error_message = f"Difficulty {difficulty} challenge ID {run_id}: ❌ run_hashcat execution failed" + + # Check the hash algorithm and construct the hash and salt string accordingly + if mode == pow_mode_scrypt: + _hash_str = ":".join(_hash.split(":")[0:4]) + ":" + salt + ":" + _hash.split(":")[4] + elif mode == pow_mode_sap: + _hash_str = f"{_hash}" + else: + _hash_str = f"{_hash}:{salt}" + + try: + command = [ + hashcat_path, + _hash_str, + "-a", + "3", + "-d", + str(device_id), + "--session", + f"{run_id}", + "-m", + mode, + "-1", + str(chars), + mask, + "-w", + hashcat_workload_profile, + hashcat_extended_options, + "--potfile-disable", + "--runtime", + "30", + ] + + execute_command = " ".join(command) + process = subprocess.run( + command, + capture_output=True, + text=True, + timeout=30, + ) + + execution_time = time.time() - start_time + + if process.returncode == 0: + if process.stdout: + if mode == pow_mode_scrypt: + _hash = ":".join(_hash.split(":")[0:4]) + ":" + salt + _hash.split(":")[4] + result = hashcat_verify(_hash, process.stdout) + # Convert hashcat output the $HEX[] format + if "$HEX" in result: + result = decode_hex(result) + bt.logging.info(f"{run_id}: ✅ Convert $HEX format to {result}") + bt.logging.success( + f"Difficulty {difficulty} challenge ID {run_id}: ✅ Result {result} found in {execution_time:0.2f} seconds !" + ) + + # check the challenges_solved dictionary with each GPU device + if device_id in challenges_solved: + if difficulty in challenges_solved[device_id]: + challenges_solved[device_id][difficulty] = challenges_solved[device_id][difficulty] + 1 + challenge_solve_durations[device_id][difficulty] = (challenge_solve_durations[device_id][difficulty] + + execution_time) + else: + challenges_solved[device_id].update({difficulty: 1}) + challenge_solve_durations[device_id].update({difficulty: execution_time}) + else: + challenges_solved[device_id] = {difficulty: 1} + challenge_solve_durations[device_id] = {difficulty: execution_time} + + else: + error_message = f"Difficulty {difficulty} challenge ID {run_id}: ❌ Hashcat execution failed with code {process.returncode}: {process.stderr}" + bt.logging.warning(error_message) + + except subprocess.TimeoutExpired: + # execution_time = time.time() - start_time + error_message = f"Difficulty {difficulty} challenge ID {run_id}: ❌ Hashcat execution timed out" + bt.logging.warning(error_message) + + except Exception as e: + # execution_time = time.time() - start_time + bt.logging.warning(f"{unknown_error_message}: {e}") + + # bt.logging.warning(f"{unknown_error_message}: no exceptions") + +def decode_hex(password): + decoded = [] + pwd = password + if "$HEX" in password: + multihex = list(filter(None, password.split("$"))) + for x in multihex: + if "HEX[" in x: + endhex = x.find("]") + try: + decoded.append((bytes.fromhex(x[4:endhex]).decode("utf-8"))) + except: + decoded.append((bytes.fromhex(x[4:endhex]).decode("cp1252"))) + else: + decoded.append(x) + if len(decoded) != 0: + pwd = ''.join(decoded) + return (pwd) + else: + return (pwd) + def main(): """Handle the core benchmarking logic.""" @@ -323,12 +537,16 @@ def main(): benchmark_quantity: int hashcat_workload_profile: str = "3" hashcat_extended_options: str = "-O" + cuda_list = get_cuda_device_list() + # For the random challenge + random_challenge: str = "N" os.system('clear') # Check CUDA devices and docker availability check_cuda_availability() - build_benchmark_container('compute-subnet-benchmark','sn27-benchmark-container') + + build_benchmark_container('compute-subnet-benchmark', 'sn27-benchmark-container') has_docker, msg = check_docker_availability() if not has_docker: @@ -344,11 +562,13 @@ def main(): print("Example 1: 6") print("Example 2: 7 8 9") print("Example 3: 10, 11, 12") - print("Example 4: all" + "\n") + print("Example 4: 6:7") # For generate 7 challenges of difficulty 6 + print("Example 5: all" + "\n") while True: try: - selected_difficulties = input("What challenge difficulties would you like to benchmark? Some examples are listed above. (all): ") + selected_difficulties = input( + "What challenge difficulties would you like to benchmark? Some examples are listed above. (all): ") challenge_difficulty_list = format_difficulties(selected_difficulties) break except: @@ -371,13 +591,33 @@ def main(): except: print("Invalid entry. Defaulting to workload profile 3.") hashcat_workload_profile = "3" - - hashcat_extended_options = input("Enter any extra hashcat options to use. Leave this empty to use the recommended -O option. Enter None for no extended options. (-O): ") + + hashcat_extended_options = input( + "Enter any extra hashcat options to use. Leave this empty to use the recommended -O option. Enter None for no extended options. (-O): ") if hashcat_extended_options.lower() == "none": hashcat_extended_options = "" elif not hashcat_extended_options: hashcat_extended_options = "-O" + # Input selection for random hashcat challenge algorithm + try: + random_challenge = input("Would you like to use random hashcat challenge algorithm? (Y/N, default: N): ") + if random_challenge == "Y" or random_challenge == "y": + random_challenge = "Y" + else: + random_challenge = "N" + except: + random_challenge = "N" + + try: + run_sequence = input("Would you like to run hashcat challenge in parallel? (Y/N, default: N): ") + if run_sequence == "Y" or run_sequence == "y": + run_sequence = True + else: + run_sequence = False + except: + run_sequence = False + if benchmark_quantity < 1: benchmark_quantity = 1 @@ -400,7 +640,8 @@ def main(): else: challenge_totals[current_diff] = 1 - challenge = gen_challenge(length=current_diff, run_id=f"{current_diff}-{challenge_totals[current_diff]}") + challenge = gen_challenge(length=current_diff, + run_id=f"{current_diff}-{challenge_totals[current_diff]}", random_challenge=random_challenge) challenges.append(challenge) print(Style.RESET_ALL) @@ -408,22 +649,35 @@ def main(): # Run the benchmarks and output the results print(f"Hashcat profile set to {hashcat_workload_profile} with the following extended options: {'None' if not hashcat_extended_options else hashcat_extended_options}") print(f"Running {benchmark_quantity} benchmark(s) for the following challenge difficulties: {challenge_difficulty_list}" + "\n") - run_hashcat(challenges, hashcat_workload_profile=hashcat_workload_profile, hashcat_extended_options=hashcat_extended_options) + run_hashcat(challenges=challenges, hashcat_workload_profile=hashcat_workload_profile, + hashcat_extended_options=hashcat_extended_options, device_list=cuda_list, run_sequence=run_sequence) time.sleep(1) - + # print(challenges_solved) + # print(challenge_solve_durations) + print("\n" + "Completed benchmarking with the following results:") # Convert the difficulty list to a set to prevent printing duplicate results. Sort the set to print the results in ascending difficulty order - for difficulty in sorted(set(challenge_difficulty_list)): - total = challenge_totals[difficulty] - - if difficulty in challenges_solved: - solved = challenges_solved[difficulty] - success_percentage = solved / total * 100 - solve_time = challenge_solve_durations[difficulty] / solved - - print(f"Difficulty {difficulty} | Successfully solved {solved}/{total} challenge(s) ({success_percentage:0.2f}%) with an average solve time of {solve_time:0.2f} seconds.") - else: - print(f"Difficulty {difficulty} | Failed all {total} challenge(s) with a 0% success rate.") + # Loop the device_id to print the results for each GPU device + for dev_id in range(1, len(cuda_list) + 1): + if dev_id in challenge_allocated: + print(f"GPU #{str(dev_id)} results:") + for difficulty in sorted(set(challenge_difficulty_list)): + total = challenge_totals[difficulty] + total_by_device = challenge_allocated[dev_id] + + if challenges_solved and difficulty in challenges_solved[dev_id]: + solved = challenges_solved[dev_id][difficulty] + success_percentage = solved / total * 100 + success_percentage_device = solved / total_by_device * 100 + solve_time = challenge_solve_durations[dev_id][difficulty] / solved + + print( + f"Difficulty {difficulty} | Successfully solved {solved}/{total} challenge(s) ({success_percentage:0.2f}%) with an average solve time of {solve_time:0.2f} seconds.") + print( + f"Total: Difficulty {difficulty} | Successfully solved {solved}/{total_by_device} challenge(s) ({success_percentage_device:0.2f}%) on GPU#{str(dev_id)} with an average solve time of {solve_time:0.2f} seconds.") + else: + print(f"Difficulty {difficulty} | Failed all {total} challenge(s) with a 0% success rate.") + print("") if __name__ == "__main__": main() \ No newline at end of file