Skip to content

Commit

Permalink
Merge pull request #171 from thomas-chu123/dev_api_new
Browse files Browse the repository at this point in the history
fix(register_api.py): add deallocation retry and change the notify URL
  • Loading branch information
thomas-chu123 authored Aug 2, 2024
2 parents ffc9106 + 511011e commit f15a257
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
WANDB_API_KEY="your_api_key"
DEALLOCATION_NOTIFY_URL="https://dev.neuralinternet.ai/api/gpus/webhook/deallocation"
STATUS_NOTIFY_URL="https://dev.neuralinternet.ai/api/gpus/webhook/status-change-warning"
49 changes: 30 additions & 19 deletions neurons/register_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from urllib3.exceptions import InsecureRequestWarning
import urllib3
urllib3.disable_warnings(InsecureRequestWarning)
from dotenv import load_dotenv

# Import Compute Subnet Libraries
import RSAEncryption as rsa
Expand Down Expand Up @@ -74,10 +75,7 @@
NOTIFY_RETRY_PERIOD = 10 # notify retry interval
PUBLIC_WANDB_NAME = "opencompute"
PUBLIC_WANDB_ENTITY = "neuralinternet"
DEALLOCATION_NOTIFY_URL = "https://dev.neuralinternet.ai/api/gpus/webhook/deallocation"
STATUS_NOTIFY_URL = "https://dev.neuralinternet.ai/api/gpus/webhook/status-change-warning"
# DEALLOCATION_NOTIFY_URL = "https://127.0.0.1:3000/api/gpus/webhook/deallocation"
# STATUS_NOTIFY_URL = "https://127.0.0.1:3000/api/gpus/webhook/status-change-warning"


class UserConfig(BaseModel):
netuid: str = Field(default="15")
Expand Down Expand Up @@ -256,12 +254,16 @@ def __init__(
self.app = FastAPI(debug=False)
else:
self.app = FastAPI(debug=False, docs_url=None, redoc_url=None)

load_dotenv()
self._setup_routes()
self.process = None
self.websocket_connection = None
self.allocation_table = []
self.checking_allocated = []
self.notify_retry_table = []
self.deallocation_notify_url = os.getenv("DEALLOCATION_NOTIFY_URL")
self.status_notify_url = os.getenv("STATUS_NOTIFY_URL")

def _setup_routes(self):
# Define a custom validation error handler
Expand Down Expand Up @@ -664,17 +666,26 @@ async def deallocate(hotkey: str, uuid_key: str, request: Request, notify_flag:
index = self.metagraph.hotkeys.index(hotkey)
axon = self.metagraph.axons[index]
run_start = time.time()
allocate_class = Allocate(timeline=0, device_requirement={}, checking=False, public_key=regkey)
deregister_response = await run_in_threadpool(
self.dendrite.query, axon, allocate_class, timeout=60
)
run_end = time.time()
time_eval = run_end - run_start
# bt.logging.info(f"API: Stop docker container in: {run_end - run_start:.2f} seconds")
retry_count = 0

if deregister_response and deregister_response["status"] is True:
bt.logging.info(f"API: Resource {hotkey} deallocated successfully")
else:
while retry_count < MAX_NOTIFY_RETRY:
allocate_class = Allocate(timeline=0, device_requirement={}, checking=False, public_key=regkey)
deregister_response = await run_in_threadpool(
self.dendrite.query, axon, allocate_class, timeout=60
)
run_end = time.time()
time_eval = run_end - run_start
# bt.logging.info(f"API: Stop docker container in: {run_end - run_start:.2f} seconds")

if deregister_response and deregister_response["status"] is True:
bt.logging.info(f"API: Resource {hotkey} deallocated successfully")
break
else:
retry_count += 1
bt.logging.info(f"API: Resource {hotkey} no response to deallocated signal - retry {retry_count}")
await asyncio.sleep(1)

if retry_count == MAX_NOTIFY_RETRY:
bt.logging.error(f"API: Resource {hotkey} deallocated successfully without response.")

deallocated_at = datetime.now(timezone.utc)
Expand Down Expand Up @@ -1174,7 +1185,7 @@ async def list_resources(query: ResourceQuery = None,
# Extract GPU details
gpu_miner = details["gpu"]
gpu_capacity = "{:.2f}".format(
(gpu_miner["capacity"] / 1000)
(gpu_miner["capacity"] / 1024)
)
gpu_name = str(gpu_miner["details"][0]["name"]).lower()
gpu_count = gpu_miner["count"]
Expand Down Expand Up @@ -2175,8 +2186,8 @@ async def _refresh_allocation(self):
bt.logging.info(f"API: Allocation refreshed: {self.allocation_table}")
await asyncio.sleep(DATA_SYNC_PERIOD)

@staticmethod
async def _notify_allocation_status(event_time: datetime, hotkey: str,

async def _notify_allocation_status(self, event_time: datetime, hotkey: str,
uuid: str, event: str, details: str | None = ""):
"""
Notify the allocation by hotkey and status. <br>
Expand All @@ -2193,7 +2204,7 @@ async def _notify_allocation_status(event_time: datetime, hotkey: str,
"status": event,
"uuid": uuid,
}
notify_url = DEALLOCATION_NOTIFY_URL
notify_url = self.deallocation_notify_url
elif event == "OFFLINE" or event == "ONLINE":
msg = {
"time": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
Expand All @@ -2202,7 +2213,7 @@ async def _notify_allocation_status(event_time: datetime, hotkey: str,
"status": event,
"uuid": uuid,
}
notify_url = STATUS_NOTIFY_URL
notify_url = self.status_notify_url

retries = 0
while retries < MAX_NOTIFY_RETRY:
Expand Down

0 comments on commit f15a257

Please sign in to comment.