Skip to content

Commit

Permalink
feat(register_api.py and container.py): support restart and exchange …
Browse files Browse the repository at this point in the history
…ssh key

1. add restart docker in API
2. add key exchange in API
  • Loading branch information
thomas-chu123 committed Jul 31, 2024
1 parent 6e52234 commit b5a1cce
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 7 deletions.
5 changes: 5 additions & 0 deletions compute/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ class Allocate(bt.Synapse):
"volume_path": "/tmp",
"dockerfile": ""
}
docker_change: bool = False
docker_action: dict = {
"action": "",
"ssh_key": "",
}

def deserialize(self) -> dict:
"""
Expand Down
47 changes: 47 additions & 0 deletions neurons/Miner/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,50 @@ def build_sample_container():
except Exception as e:
bt.logging.info(f"Error build sample container {e}")
return {"status": False}


def restart_container():
try:
client, containers = get_docker()
running_container = None
for container in containers:
if container_name in container.name:
running_container = container
break
if running_container:
# stop and remove the container by using the SIGTERM signal to PID 1 (init) process in the container
if running_container.status == "running":
running_container.exec_run(cmd="kill -15 1")
running_container.wait()
running_container.restart()
return {"status": True}
else:
bt.logging.info("Unable to find container")
return {"status": False}
except Exception as e:
bt.logging.info(f"Error restart container {e}")
return {"status": False}


def exchange_key_container(new_ssh_key: str):
try:
client, containers = get_docker()
running_container = None
for container in containers:
if container_name in container.name:
running_container = container
break
if running_container:
# stop and remove the container by using the SIGTERM signal to PID 1 (init) process in the container
if running_container.status == "running":
running_container.exec_run(cmd=f"bash -c \"echo '{new_ssh_key}' > /root/.ssh/authorized_keys & sync & sleep 1\"")
running_container.exec_run(cmd="kill -15 1")
running_container.wait()
running_container.restart()
return {"status": True}
else:
bt.logging.info("Unable to find container")
return {"status": False}
except Exception as e:
bt.logging.info(f"Error changing SSH key on container {e}")
return {"status": False}
34 changes: 28 additions & 6 deletions neurons/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@
get_remote_version,
)
from neurons.Miner.allocate import check_allocation, register_allocation, deregister_allocation, check_if_allocated
from neurons.Miner.container import build_check_container, build_sample_container, check_container, kill_container
from neurons.Miner.container import (
build_check_container,
build_sample_container,
check_container,
kill_container,
restart_container,
exchange_key_container,
)
from compute.wandb.wandb import ComputeWandb
from neurons.Miner.allocate import check_allocation, register_allocation
from neurons.Miner.pow import check_cuda_availability, run_miner_pow
Expand Down Expand Up @@ -358,6 +365,8 @@ def allocate(self, synapse: Allocate) -> Allocate:
checking = synapse.checking
docker_requirement = synapse.docker_requirement
docker_requirement['ssh_port'] = int(self.config.ssh.port)
docker_change = synapse.docker_change
docker_action = synapse.docker_action

if checking is True:
if timeline > 0:
Expand All @@ -368,12 +377,25 @@ def allocate(self, synapse: Allocate) -> Allocate:
result = check_if_allocated(public_key=public_key)
synapse.output = result
else:
public_key = synapse.public_key
if timeline > 0:
result = register_allocation(timeline, device_requirement, public_key, docker_requirement)
synapse.output = result
if docker_change is True:
if docker_action['action'] == 'exchange_key':
public_key = synapse.public_key
new_ssh_key = docker_action['ssh_key']
result = exchange_key_container(new_ssh_key)
synapse.output = result
elif docker_action['action'] == 'restart':
public_key = synapse.public_key
result = restart_container()
synapse.output = result
else:
bt.logging.info(f"Unknown action: {docker_action['action']}")
else:
result = deregister_allocation(public_key)
public_key = synapse.public_key
if timeline > 0:
result = register_allocation(timeline, device_requirement, public_key, docker_requirement)
synapse.output = result
else:
result = deregister_allocation(public_key)
synapse.output = result
self.update_allocation(synapse)
return synapse
Expand Down
210 changes: 209 additions & 1 deletion neurons/register_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ async def deallocate(hotkey: str, uuid_key: str, request: Request, notify_flag:
},
)
else:
bt.logging.error(f"API: Invalid UUID key")
bt.logging.error(f"API: Invalid UUID key for {hotkey}")
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={
Expand Down Expand Up @@ -798,6 +798,213 @@ async def check_miner_status(hotkey_list: List[str]) -> JSONResponse:
},
)

@self.app.post(path="/service/restart_docker",
tags=["Allocation"],
response_model=SuccessResponse | ErrorResponse,
responses={
200: {
"model": SuccessResponse,
"description": "Resource restart successfully.",
},
403: {
"model": ErrorResponse,
"description": "An error occurred while restarting docker.",
},
})
async def restart_docker(hotkey: str, uuid_key: str) -> JSONResponse:
# Instantiate the connection to the db
db = ComputeDb()
cursor = db.get_cursor()

try:
# Retrieve the allocation details for the given hotkey
cursor.execute(
"SELECT details, hotkey FROM allocation WHERE hotkey = ?",
(hotkey,),
)
row = cursor.fetchone()

if row:
# Parse the JSON string in the 'details' column
info = json.loads(row[0])
result_hotkey = row[1]

username = info["username"]
password = info["password"]
port = info["port"]
ip = info["ip"]
regkey = info["regkey"]
uuid_key_db = info["uuid"]

docker_action = {
"action": "restart",
"ssh_key": "",
}

if uuid_key_db == uuid_key:
index = self.metagraph.hotkeys.index(hotkey)
axon = self.metagraph.axons[index]
run_start = time.time()
allocate_class = Allocate(timeline=0, device_requirement={}, checking=False, public_key=regkey,
docker_change=True, docker_action=docker_action)
response = await run_in_threadpool(
self.dendrite.query, axon, allocate_class, timeout=60
)
run_end = time.time()
time_eval = run_end - run_start
# bt.logging.info(f"API: Stop docker container in: {run_end - run_start:.2f} seconds")

if response and response["status"] is True:
bt.logging.info(f"API: Resource {hotkey} docker restart successfully")
else:
bt.logging.error(f"API: Resource {hotkey} docker restart without response.")

return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"success": True,
"message": "Resource restarted successfully.",
},
)
else:
bt.logging.error(f"API: Invalid UUID key for {hotkey}")
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={
"success": False,
"message": "Restart not successfully, please try again.",
"err_detail": "Invalid UUID key",
},
)

else:
bt.logging.info(f"API: No allocation details found for the provided hotkey")
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={
"success": False,
"message": "No allocation details found for the provided hotkey.",
"err_detail": "No allocation details found for the provided hotkey.",
},
)
except Exception as e:
bt.logging.error(f"API: An error occurred during restart operation {e.__repr__()}")
return JSONResponse(
status_code=status.HTTP_403_FORBIDDEN,
content={
"success": False,
"message": "An error occurred during restart operation.",
"err_detail": e.__repr__(),
},
)
finally:
cursor.close()
db.close()


@self.app.post("/service/exchange_docker_key",
tags=["Allocation"],
response_model=SuccessResponse | ErrorResponse,
responses={
200: {
"model": SuccessResponse,
"description": "Resource ssh_key was changed successfully.",
},
403: {
"model": ErrorResponse,
"description": "An error occurred while exchanging docker key.",
},
})
async def exchange_docker_key(hotkey: str, uuid_key: str, ssh_key: str) -> JSONResponse:
# Instantiate the connection to the db
db = ComputeDb()
cursor = db.get_cursor()

try:
# Retrieve the allocation details for the given hotkey
cursor.execute(
"SELECT details, hotkey FROM allocation WHERE hotkey = ?",
(hotkey,),
)
row = cursor.fetchone()

if row:
# Parse the JSON string in the 'details' column
info = json.loads(row[0])
result_hotkey = row[1]

username = info["username"]
password = info["password"]
port = info["port"]
ip = info["ip"]
regkey = info["regkey"]
uuid_key_db = info["uuid"]

docker_action = {
"action": "exchange_key",
"ssh_key": ssh_key,
}

if uuid_key_db == uuid_key:
index = self.metagraph.hotkeys.index(hotkey)
axon = self.metagraph.axons[index]
run_start = time.time()
allocate_class = Allocate(timeline=0, device_requirement={}, checking=False, public_key=regkey,
docker_change=True, docker_action=docker_action)
response = await run_in_threadpool(
self.dendrite.query, axon, allocate_class, timeout=60
)
run_end = time.time()
time_eval = run_end - run_start
# bt.logging.info(f"API: Stop docker container in: {run_end - run_start:.2f} seconds")

if response and response["status"] is True:
bt.logging.info(f"API: Resource {hotkey} docker ssh_key exchange successfully")
else:
bt.logging.error(f"API: Resource {hotkey} docker ssh_key exchange without response.")

return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"success": True,
"message": "Resource ssh_key is exchanged successfully.",
},
)
else:
bt.logging.error(f"API: Invalid UUID key for {hotkey}")
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={
"success": False,
"message": "Exchange ssh_key not successfully, please try again.",
"err_detail": "Invalid UUID key",
},
)

else:
bt.logging.info(f"API: No allocation details found for the provided hotkey")
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={
"success": False,
"message": "No allocation details found for the provided hotkey.",
"err_detail": "No allocation details found for the provided hotkey.",
},
)
except Exception as e:
bt.logging.error(f"API: An error occurred during exchange ssh_key operation {e.__repr__()}")
return JSONResponse(
status_code=status.HTTP_403_FORBIDDEN,
content={
"success": False,
"message": "An error occurred during exchange ssh_key operation.",
"err_detail": e.__repr__(),
},
)
finally:
cursor.close()
db.close()

@self.app.post(
"/list/allocations_sql",
tags=["SQLite"],
Expand Down Expand Up @@ -1719,6 +1926,7 @@ async def test_notify(hotkey: str = None, uuid_key: str = None, event: str = Non
},
)


@staticmethod
def _init_config():
"""
Expand Down

0 comments on commit b5a1cce

Please sign in to comment.