Skip to content

Commit

Permalink
fix(register_api.py): add deallocation retry and change the notify URL
Browse files Browse the repository at this point in the history
1. add the deallocation retry.
2. add the load notify URL from .env file
  • Loading branch information
thomas-chu123 committed Aug 2, 2024
1 parent b5a1cce commit 511011e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
WANDB_API_KEY="your_api_key"
DEALLOCATION_NOTIFY_URL="https://dev.neuralinternet.ai/api/gpus/webhook/deallocation"
STATUS_NOTIFY_URL="https://dev.neuralinternet.ai/api/gpus/webhook/status-change-warning"
49 changes: 30 additions & 19 deletions neurons/register_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from urllib3.exceptions import InsecureRequestWarning
import urllib3
urllib3.disable_warnings(InsecureRequestWarning)
from dotenv import load_dotenv

# Import Compute Subnet Libraries
import RSAEncryption as rsa
Expand Down Expand Up @@ -74,10 +75,7 @@
NOTIFY_RETRY_PERIOD = 10 # notify retry interval
PUBLIC_WANDB_NAME = "opencompute"
PUBLIC_WANDB_ENTITY = "neuralinternet"
DEALLOCATION_NOTIFY_URL = "https://dev.neuralinternet.ai/api/gpus/webhook/deallocation"
STATUS_NOTIFY_URL = "https://dev.neuralinternet.ai/api/gpus/webhook/status-change-warning"
# DEALLOCATION_NOTIFY_URL = "https://127.0.0.1:3000/api/gpus/webhook/deallocation"
# STATUS_NOTIFY_URL = "https://127.0.0.1:3000/api/gpus/webhook/status-change-warning"


class UserConfig(BaseModel):
netuid: str = Field(default="15")
Expand Down Expand Up @@ -256,12 +254,16 @@ def __init__(
self.app = FastAPI(debug=False)
else:
self.app = FastAPI(debug=False, docs_url=None, redoc_url=None)

load_dotenv()
self._setup_routes()
self.process = None
self.websocket_connection = None
self.allocation_table = []
self.checking_allocated = []
self.notify_retry_table = []
self.deallocation_notify_url = os.getenv("DEALLOCATION_NOTIFY_URL")
self.status_notify_url = os.getenv("STATUS_NOTIFY_URL")

def _setup_routes(self):
# Define a custom validation error handler
Expand Down Expand Up @@ -664,17 +666,26 @@ async def deallocate(hotkey: str, uuid_key: str, request: Request, notify_flag:
index = self.metagraph.hotkeys.index(hotkey)
axon = self.metagraph.axons[index]
run_start = time.time()
allocate_class = Allocate(timeline=0, device_requirement={}, checking=False, public_key=regkey)
deregister_response = await run_in_threadpool(
self.dendrite.query, axon, allocate_class, timeout=60
)
run_end = time.time()
time_eval = run_end - run_start
# bt.logging.info(f"API: Stop docker container in: {run_end - run_start:.2f} seconds")
retry_count = 0

if deregister_response and deregister_response["status"] is True:
bt.logging.info(f"API: Resource {hotkey} deallocated successfully")
else:
while retry_count < MAX_NOTIFY_RETRY:
allocate_class = Allocate(timeline=0, device_requirement={}, checking=False, public_key=regkey)
deregister_response = await run_in_threadpool(
self.dendrite.query, axon, allocate_class, timeout=60
)
run_end = time.time()
time_eval = run_end - run_start
# bt.logging.info(f"API: Stop docker container in: {run_end - run_start:.2f} seconds")

if deregister_response and deregister_response["status"] is True:
bt.logging.info(f"API: Resource {hotkey} deallocated successfully")
break
else:
retry_count += 1
bt.logging.info(f"API: Resource {hotkey} no response to deallocated signal - retry {retry_count}")
await asyncio.sleep(1)

if retry_count == MAX_NOTIFY_RETRY:
bt.logging.error(f"API: Resource {hotkey} deallocated successfully without response.")

deallocated_at = datetime.now(timezone.utc)
Expand Down Expand Up @@ -1174,7 +1185,7 @@ async def list_resources(query: ResourceQuery = None,
# Extract GPU details
gpu_miner = details["gpu"]
gpu_capacity = "{:.2f}".format(
(gpu_miner["capacity"] / 1000)
(gpu_miner["capacity"] / 1024)
)
gpu_name = str(gpu_miner["details"][0]["name"]).lower()
gpu_count = gpu_miner["count"]
Expand Down Expand Up @@ -2175,8 +2186,8 @@ async def _refresh_allocation(self):
bt.logging.info(f"API: Allocation refreshed: {self.allocation_table}")
await asyncio.sleep(DATA_SYNC_PERIOD)

@staticmethod
async def _notify_allocation_status(event_time: datetime, hotkey: str,

async def _notify_allocation_status(self, event_time: datetime, hotkey: str,
uuid: str, event: str, details: str | None = ""):
"""
Notify the allocation by hotkey and status. <br>
Expand All @@ -2193,7 +2204,7 @@ async def _notify_allocation_status(event_time: datetime, hotkey: str,
"status": event,
"uuid": uuid,
}
notify_url = DEALLOCATION_NOTIFY_URL
notify_url = self.deallocation_notify_url
elif event == "OFFLINE" or event == "ONLINE":
msg = {
"time": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
Expand All @@ -2202,7 +2213,7 @@ async def _notify_allocation_status(event_time: datetime, hotkey: str,
"status": event,
"uuid": uuid,
}
notify_url = STATUS_NOTIFY_URL
notify_url = self.status_notify_url

retries = 0
while retries < MAX_NOTIFY_RETRY:
Expand Down

0 comments on commit 511011e

Please sign in to comment.