Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removal of dendrite multiprocessing #1017

Merged
merged 3 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 0 additions & 48 deletions bittensor/_dendrite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def __new__(
config.dendrite.timeout = timeout if timeout != None else config.dendrite.timeout
config.dendrite.requires_grad = requires_grad if requires_grad != None else config.dendrite.requires_grad
config.dendrite.max_active_receptors = max_active_receptors if max_active_receptors != None else config.dendrite.max_active_receptors
config.dendrite.multiprocessing = multiprocess if multiprocess != None else config.dendrite.multiprocessing
config.dendrite.compression = compression if compression != None else config.dendrite.compression
config.dendrite._mock = _mock if _mock != None else config.dendrite._mock
dendrite.check_config( config )
Expand All @@ -92,24 +91,6 @@ def __new__(
config = config,
wallet = wallet
)
elif config.dendrite.multiprocessing:
authkey = wallet.hotkey.ss58_address.encode('UTF-8')
try:
manager_client = dendrite.manager_connect(authkey = authkey)
logger.success('Receptor Pool Server Connected')

except:
dendrite.manager_serve(config, wallet, receptor_pool, authkey = authkey)
logger.success('Receptor Pool Server Started')
manager_client = dendrite.manager_connect(authkey = authkey)
logger.success('Receptor Pool Server Connected')

return dendrite_impl.Dendrite (
config = config,
wallet = wallet,
receptor_pool = manager_client.get_receptorpool(),
manager = manager_client,
)
else:
return dendrite_impl.Dendrite (
config = config,
Expand Down Expand Up @@ -144,7 +125,6 @@ def add_args( cls, parser: argparse.ArgumentParser, prefix: str = None ):
parser.add_argument('--' + prefix_str + 'dendrite.timeout', type=int, help='''Default request timeout.''', default = bittensor.defaults.dendrite.timeout)
parser.add_argument('--' + prefix_str + 'dendrite.requires_grad', action='store_true', help='''If true, the dendrite passes gradients on the wire.''', default = bittensor.defaults.dendrite.requires_grad)
parser.add_argument('--' + prefix_str + 'dendrite.no_requires_grad', dest = prefix_str + 'dendrite.requires_grad', action='store_false', help='''If set, the dendrite will not passes gradients on the wire.''')
parser.add_argument('--' + prefix_str + 'dendrite.multiprocessing', dest = prefix_str + 'dendrite.multiprocessing', action='store_true', help='''If set, the dendrite will initialize multiprocessing''', default=bittensor.defaults.dendrite.multiprocessing)
parser.add_argument('--' + prefix_str + 'dendrite.compression', type=str, help='''Which compression algorithm to use for compression (gzip, deflate, NoCompression) ''', default = bittensor.defaults.dendrite.compression)
parser.add_argument('--' + prefix_str + 'dendrite._mock', action='store_true', help='To turn on dendrite mocking for testing purposes.', default=False)
parser.add_argument('--' + prefix_str + 'dendrite.prometheus.level',
Expand All @@ -166,7 +146,6 @@ def add_defaults(cls, defaults):
defaults.dendrite.max_active_receptors = os.getenv('BT_DENDRITE_MAX_ACTIVE_RECEPTORS') if os.getenv('BT_DENDRITE_MAX_ACTIVE_RECEPTORS') != None else 4096
defaults.dendrite.timeout = os.getenv('BT_DENDRITE_TIMEOUT') if os.getenv('BT_DENDRITE_TIMEOUT') != None else bittensor.__blocktime__ + 2
defaults.dendrite.requires_grad = os.getenv('BT_DENDRITE_REQUIRES_GRAD') if os.getenv('BT_DENDRITE_REQUIRES_GRAD') != None else True
defaults.dendrite.multiprocessing = os.getenv('BT_DENDRITE_MULTIPROCESSING') if os.getenv('BT_DENDRITE_MULTIPROCESSING') != None else False
defaults.dendrite.compression = os.getenv('BT_DENDRITE_COMPRESSION') if os.getenv('BT_DENDRITE_COMPRESSION') != None else 'NoCompression'
# Prometheus
defaults.dendrite.prometheus = bittensor.config()
Expand All @@ -183,30 +162,3 @@ def check_config( cls, config: 'bittensor.Config' ):
assert config.dendrite.max_active_receptors >= 0, 'max_active_receptors must be larger or eq to 0'
assert config.dendrite.prometheus.level in [l.name for l in list(bittensor.prometheus.level)], "dendrite.prometheus.level must be in: {}".format([l.name for l in list(bittensor.prometheus.level)])
bittensor.wallet.check_config( config )

@classmethod
def manager_connect(cls, authkey = b'abracadabra'):
r"""Creates a custom manager class and connects it to the local server.
"""
BaseManager.register('get_receptorpool')
BaseManager.register('add_connection_count')
BaseManager.register('deduct_connection_count')
BaseManager.register('get_total_requests')
manager = BaseManager(address=('', 4098), authkey=authkey)
manager.connect()
manager.add_connection_count()
return manager

@classmethod
def manager_serve(cls, config, wallet, receptor_pool = None, authkey = b'abracadabra'):
r"""Creates/Uses a receptor pool to create a local server for receptor pool
"""
if receptor_pool == None:
receptor_pool = bittensor.receptor_pool(
wallet = wallet,
max_active_receptors = config.dendrite.max_active_receptors
)
ManagerServer.register('get_receptorpool', callable=lambda:receptor_pool,exposed=['forward','backward','get_receptors_state', 'get_total_requests'])
manager = ManagerServer(address=('', 4098), authkey=authkey)

return manager
50 changes: 0 additions & 50 deletions tests/integration_tests/test_dendrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,56 +207,6 @@ def test_dendrite_backoff():
check_resp_shape(out, 1, 3, 3)
del _dendrite

def test_dendrite_multiple():
endpoint_obj = bittensor.endpoint(
version = bittensor.__version_as_int__,
uid = 0,
ip = '0.0.0.0',
ip_type = 4,
port = 12345,
hotkey = wallet.hotkey.ss58_address,
coldkey = wallet.coldkey.ss58_address,
modality = 0
)
x = torch.tensor( [ 1,2,3 ] )

config = bittensor.dendrite.config()
receptor_pool = bittensor.receptor_pool(
wallet = wallet,
max_active_receptors = config.dendrite.max_active_receptors,
compression = config.dendrite.compression,
)

authkey = wallet.hotkey.ss58_address.encode('UTF-8')
manager_server = bittensor.dendrite.manager_serve(config, wallet, receptor_pool, authkey = authkey)

dend1 = bittensor.dendrite( wallet = wallet, multiprocess=True)
dend2 = bittensor.dendrite( wallet = wallet, multiprocess=True)
dend3 = bittensor.dendrite( wallet = wallet, multiprocess=True)
dend4 = bittensor.dendrite( wallet = wallet, multiprocess=True)

out, ops, times = dend1.text( endpoints = endpoint_obj, inputs = x, synapses = synapses )
assert list(ops[0]) == [bittensor.proto.ReturnCode.Unavailable] * len(synapses)

out, ops, times = dend2.text( endpoints = endpoint_obj, inputs = x, synapses = synapses )
assert list(ops[0]) == [bittensor.proto.ReturnCode.Unavailable] * len(synapses)

out, ops, times = dend3.text( endpoints = endpoint_obj, inputs = x, synapses = synapses )
assert list(ops[0]) == [bittensor.proto.ReturnCode.Unavailable] * len(synapses)

out, ops, times = dend4.text( endpoints = endpoint_obj, inputs = x, synapses = synapses )
assert list(ops[0]) == [bittensor.proto.ReturnCode.Unavailable] * len(synapses)

assert len(receptor_pool.receptors) == 1
assert manager_server.connected_count == 4
dend4.__del__()
assert manager_server.connected_count == 3
dend3.__del__()
assert manager_server.connected_count == 2
dend2.__del__()
assert manager_server.connected_count == 1
dend1.__del__()


def test_dendrite_to_df():
dendrite.to_dataframe(bittensor.metagraph(_mock=True).sync())
Expand Down