Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] use with env to reset start method after #935

Merged
merged 2 commits into from
Oct 3, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 119 additions & 103 deletions bittensor/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,22 @@ def get_block_with_retry(subtensor: 'bittensor.Subtensor') -> Tuple[int, int, by
raise Exception("Network error. Could not connect to substrate to get block hash")
return block_number, difficulty, block_hash

class UsingSpawnStartMethod():
def __init__(self, force: bool = False):
self._old_start_method = None
self._force = force

def __enter__(self):
self._old_start_method = multiprocessing.get_start_method(allow_none=True)
if self._old_start_method == None:
self._old_start_method = 'spawn' # default to spawn

multiprocessing.set_start_method('spawn', force=self._force)

def __exit__(self, *args):
# restore the old start method
multiprocessing.set_start_method(self._old_start_method, force=True)


def solve_for_difficulty_fast_cuda( subtensor: 'bittensor.Subtensor', wallet: 'bittensor.Wallet', update_interval: int = 50_000, TPB: int = 512, dev_id: Union[List[int], int] = 0, use_kernel_launch_optimization: bool = False ) -> Optional[POWSolution]:
"""
Expand Down Expand Up @@ -580,116 +596,116 @@ def solve_for_difficulty_fast_cuda( subtensor: 'bittensor.Subtensor', wallet: 'b
status = console.status("Solving")

# Set mp start to use spawn so CUDA doesn't complain
multiprocessing.set_start_method('spawn', force=True)

curr_block = multiprocessing.Array('h', 64, lock=True) # byte array
curr_block_num = multiprocessing.Value('i', 0, lock=True) # int
curr_diff = multiprocessing.Array('Q', [0, 0], lock=True) # [high, low]

def update_curr_block(block_number: int, block_bytes: bytes, diff: int, lock: multiprocessing.Lock):
with lock:
curr_block_num.value = block_number
for i in range(64):
curr_block[i] = block_bytes[i]
registration_diff_pack(diff, curr_diff)

status.start()

# Establish communication queues
stopEvent = multiprocessing.Event()
stopEvent.clear()
solution_queue = multiprocessing.Queue()
time_queue = multiprocessing.Queue()
check_block = multiprocessing.Lock()

# Start consumers
num_processes = len(dev_id)
## Create one consumer per GPU
solvers = [ CUDASolver(i, num_processes, update_interval, time_queue, solution_queue, stopEvent, curr_block, curr_block_num, curr_diff, check_block, limit, dev_id[i], TPB)
for i in range(num_processes) ]

# Get first block
block_number = subtensor.get_current_block()
difficulty = subtensor.difficulty
block_hash = subtensor.substrate.get_block_hash( block_number )
while block_hash == None:
block_hash = subtensor.substrate.get_block_hash( block_number )
block_bytes = block_hash.encode('utf-8')[2:]
old_block_number = block_number
# Set to current block
update_curr_block(block_number, block_bytes, difficulty, check_block)

# Set new block events for each solver to start
for w in solvers:
w.newBlockEvent.set()

for w in solvers:
w.start() # start the solver processes

start_time = time.time()
time_since = 0.0
solution = None
itrs_per_sec = 0
while not wallet.is_registered(subtensor):
# Wait until a solver finds a solution
try:
solution = solution_queue.get(block=True, timeout=0.15)
if solution is not None:
break
except Empty:
# No solution found, try again
pass
# Force the set start method in-case of re-register
with UsingSpawnStartMethod(force=True):
curr_block = multiprocessing.Array('h', 64, lock=True) # byte array
curr_block_num = multiprocessing.Value('i', 0, lock=True) # int
curr_diff = multiprocessing.Array('Q', [0, 0], lock=True) # [high, low]

def update_curr_block(block_number: int, block_bytes: bytes, diff: int, lock: multiprocessing.Lock):
with lock:
curr_block_num.value = block_number
for i in range(64):
curr_block[i] = block_bytes[i]
registration_diff_pack(diff, curr_diff)

status.start()

# Establish communication queues
stopEvent = multiprocessing.Event()
stopEvent.clear()
solution_queue = multiprocessing.Queue()
time_queue = multiprocessing.Queue()
check_block = multiprocessing.Lock()

# Start consumers
num_processes = len(dev_id)
## Create one consumer per GPU
solvers = [ CUDASolver(i, num_processes, update_interval, time_queue, solution_queue, stopEvent, curr_block, curr_block_num, curr_diff, check_block, limit, dev_id[i], TPB)
for i in range(num_processes) ]

# check for new block
# Get first block
block_number = subtensor.get_current_block()
if block_number != old_block_number:
old_block_number = block_number
# update block information
block_hash = subtensor.substrate.get_block_hash( block_number)
while block_hash == None:
block_hash = subtensor.substrate.get_block_hash( block_number)
block_bytes = block_hash.encode('utf-8')[2:]
difficulty = subtensor.difficulty

update_curr_block(block_number, block_bytes, difficulty, check_block)
# Set new block events for each solver
for w in solvers:
w.newBlockEvent.set()

# Get times for each solver
time_total = 0
num_time = 0
for _ in solvers:
difficulty = subtensor.difficulty
block_hash = subtensor.substrate.get_block_hash( block_number )
while block_hash == None:
block_hash = subtensor.substrate.get_block_hash( block_number )
block_bytes = block_hash.encode('utf-8')[2:]
old_block_number = block_number
# Set to current block
update_curr_block(block_number, block_bytes, difficulty, check_block)

# Set new block events for each solver to start
for w in solvers:
w.newBlockEvent.set()

for w in solvers:
w.start() # start the solver processes

start_time = time.time()
time_since = 0.0
solution = None
itrs_per_sec = 0
while not wallet.is_registered(subtensor):
# Wait until a solver finds a solution
try:
time_ = time_queue.get_nowait()
time_total += time_
num_time += 1

solution = solution_queue.get(block=True, timeout=0.15)
if solution is not None:
break
except Empty:
break

if num_time > 0:
time_avg = time_total / num_time
itrs_per_sec = TPB*update_interval*num_processes / time_avg
time_since = time.time() - start_time
# No solution found, try again
pass

# check for new block
block_number = subtensor.get_current_block()
if block_number != old_block_number:
old_block_number = block_number
# update block information
block_hash = subtensor.substrate.get_block_hash( block_number)
while block_hash == None:
block_hash = subtensor.substrate.get_block_hash( block_number)
block_bytes = block_hash.encode('utf-8')[2:]
difficulty = subtensor.difficulty

update_curr_block(block_number, block_bytes, difficulty, check_block)
# Set new block events for each solver
for w in solvers:
w.newBlockEvent.set()

# Get times for each solver
time_total = 0
num_time = 0
for _ in solvers:
try:
time_ = time_queue.get_nowait()
time_total += time_
num_time += 1

except Empty:
break

message = f"""Solving
time spent: {time_since}
Difficulty: [bold white]{millify(difficulty)}[/bold white]
Iters: [bold white]{get_human_readable(int(itrs_per_sec), 'H')}/s[/bold white]
Block: [bold white]{block_number}[/bold white]
Block_hash: [bold white]{block_hash.encode('utf-8')}[/bold white]"""
status.update(message.replace(" ", ""))

# exited while, found_solution contains the nonce or wallet is registered
if solution is not None:
stopEvent.set() # stop all other processes
status.stop()
if num_time > 0:
time_avg = time_total / num_time
itrs_per_sec = TPB*update_interval*num_processes / time_avg
time_since = time.time() - start_time

message = f"""Solving
time spent: {time_since}
Difficulty: [bold white]{millify(difficulty)}[/bold white]
Iters: [bold white]{get_human_readable(int(itrs_per_sec), 'H')}/s[/bold white]
Block: [bold white]{block_number}[/bold white]
Block_hash: [bold white]{block_hash.encode('utf-8')}[/bold white]"""
status.update(message.replace(" ", ""))

# exited while, found_solution contains the nonce or wallet is registered
if solution is not None:
stopEvent.set() # stop all other processes
status.stop()

return solution
return solution

status.stop()
return None
status.stop()
return None

def create_pow( subtensor, wallet, cuda: bool = False, dev_id: Union[List[int], int] = 0, tpb: int = 256, num_processes: int = None, update_interval: int = None) -> Optional[Dict[str, Any]]:
if cuda:
Expand Down