diff --git a/RNS/Interfaces/I2PInterface.py b/RNS/Interfaces/I2PInterface.py index 290f8bbf..8c21a87c 100644 --- a/RNS/Interfaces/I2PInterface.py +++ b/RNS/Interfaces/I2PInterface.py @@ -48,6 +48,8 @@ def __init__(self, rns_storagepath): import RNS.vendor.i2plib as i2plib import RNS.vendor.i2plib.utils + self.client_tunnels = {} + self.server_tunnels = {} self.loop = None self.i2plib = i2plib self.utils = i2plib.utils @@ -68,27 +70,45 @@ def start(self): finally: self.loop.close() + def stop(self): for task in asyncio.Task.all_tasks(loop=self.loop): task.cancel() self.loop.stop() + def get_free_port(self): return self.i2plib.utils.get_free_port() + def client_tunnel(self, owner, i2p_destination): - try: - async def tunnel_up(): - RNS.log("Bringing up I2P tunnel to "+str(owner)+" in background, this may take a while...", RNS.LOG_INFO) - tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address) - await tunnel.run() - RNS.log(str(owner)+ " tunnel setup complete", RNS.LOG_VERBOSE) + self.client_tunnels[i2p_destination] = False + + while True: + if not self.client_tunnels[i2p_destination]: + try: + async def tunnel_up(): + RNS.log("Bringing up I2P tunnel to "+str(owner)+", this may take a while...", RNS.LOG_INFO) + tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address, loop=self.loop) + await tunnel.run() + owner.awaiting_i2p_tunnel = False + RNS.log(str(owner)+ " tunnel setup complete", RNS.LOG_VERBOSE) - asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop) + try: + self.loop.ext_owner = self + future = asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result() + self.client_tunnels[i2p_destination] = True - except Exception as e: - raise IOError("Could not connect to I2P SAM API while configuring to "+str(owner)+". Check that I2P is running and SAM is enabled.") + except Exception as e: + RNS.log("Error while setting up I2P tunnel: "+str(e)) + raise e + + + except Exception as e: + raise IOError("Could not connect to I2P SAM API while configuring to "+str(owner)+". Check that I2P is running and SAM is enabled.") + + time.sleep(5) def server_tunnel(self, owner): @@ -110,18 +130,23 @@ def server_tunnel(self, owner): i2p_b32 = i2p_dest.base32 - try: - async def tunnel_up(): - RNS.log(str(owner)+" Bringing up I2P tunnel in background, this may take a while...", RNS.LOG_INFO) - tunnel = self.i2plib.ServerTunnel((owner.bind_ip, owner.bind_port), loop=self.loop, destination=i2p_dest, sam_address=self.sam_address) - await tunnel.run() - owner.awaiting_i2p_tunnel = False - RNS.log(str(owner)+ " tunnel setup complete, instance reachable at: "+str(i2p_dest.base32)+".b32.i2p", RNS.LOG_VERBOSE) + self.server_tunnels[i2p_b32] = False - asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop) + while self.server_tunnels[i2p_b32] == False: + try: + async def tunnel_up(): + RNS.log(str(owner)+" Bringing up I2P endpoint, this may take a while...", RNS.LOG_INFO) + tunnel = self.i2plib.ServerTunnel((owner.bind_ip, owner.bind_port), loop=self.loop, destination=i2p_dest, sam_address=self.sam_address) + await tunnel.run() + RNS.log(str(owner)+ " endpoint setup complete. Now reachable at: "+str(i2p_dest.base32)+".b32.i2p", RNS.LOG_VERBOSE) - except Exception as e: - raise IOError("Could not connect to I2P SAM API while configuring "+str(self)+". Check that I2P is running and SAM is enabled.") + asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result() + self.server_tunnels[i2p_b32] = True + + except Exception as e: + raise IOError("Could not connect to I2P SAM API while configuring "+str(self)+". Check that I2P is running and SAM is enabled.") + + time.sleep(5) def get_loop(self): return asyncio.get_event_loop() @@ -148,6 +173,7 @@ def __init__(self, parent_interface, owner, name, target_i2p_dest=None, connecte self.OUT = False self.socket = None self.parent_interface = parent_interface + self.parent_count = True self.name = name self.initiator = False self.reconnecting = False @@ -188,19 +214,34 @@ def __init__(self, parent_interface, owner, name, target_i2p_dest=None, connecte self.target_port = self.bind_port self.awaiting_i2p_tunnel = True - self.parent_interface.i2p.client_tunnel(self, target_i2p_dest) - - if not self.connect(initial=True): - thread = threading.Thread(target=self.reconnect) - thread.setDaemon(True) - thread.start() - else: - thread = threading.Thread(target=self.read_loop) - thread.setDaemon(True) - thread.start() + + def tunnel_job(): + self.parent_interface.i2p.client_tunnel(self, target_i2p_dest) + + thread = threading.Thread(target=tunnel_job) + thread.setDaemon(True) + thread.start() + + def wait_job(): + while self.awaiting_i2p_tunnel: + time.sleep(0.25) + if not self.kiss_framing: self.wants_tunnel = True + if not self.connect(initial=True): + thread = threading.Thread(target=self.reconnect) + thread.setDaemon(True) + thread.start() + else: + thread = threading.Thread(target=self.read_loop) + thread.setDaemon(True) + thread.start() + + thread = threading.Thread(target=wait_job) + thread.setDaemon(True) + thread.start() + def set_timeouts_linux(self): if not self.i2p_tunneled: @@ -274,6 +315,9 @@ def connect(self, initial=False): self.writing = False self.never_connected = False + if not self.kiss_framing and self.wants_tunnel: + RNS.Transport.synthesize_tunnel(self) + return True @@ -316,7 +360,7 @@ def reconnect(self): def processIncoming(self, data): self.rxb += len(data) - if hasattr(self, "parent_interface") and self.parent_interface != None: + if hasattr(self, "parent_interface") and self.parent_interface != None and self.parent_count: self.parent_interface.rxb += len(data) self.owner.inbound(data, self) @@ -337,7 +381,7 @@ def processOutgoing(self, data): self.socket.sendall(data) self.writing = False self.txb += len(data) - if hasattr(self, "parent_interface") and self.parent_interface != None: + if hasattr(self, "parent_interface") and self.parent_interface != None and self.parent_count: self.parent_interface.txb += len(data) except Exception as e: @@ -456,12 +500,13 @@ def __str__(self): class I2PInterface(Interface): - def __init__(self, owner, name, rns_storagepath, peers): + def __init__(self, owner, name, rns_storagepath, peers, connectable = True): self.rxb = 0 self.txb = 0 self.online = False self.clients = 0 self.owner = owner + self.connectable = connectable self.i2p_tunneled = True self.i2p = I2PController(rns_storagepath) @@ -492,7 +537,13 @@ def createHandler(*args, **keys): thread.setDaemon(True) thread.start() - self.i2p.server_tunnel(self) + if self.connectable: + def tunnel_job(): + self.i2p.server_tunnel(self) + + thread = threading.Thread(target=tunnel_job) + thread.setDaemon(True) + thread.start() if peers != None: for peer_addr in peers: @@ -501,6 +552,7 @@ def createHandler(*args, **keys): peer_interface.OUT = True peer_interface.IN = True peer_interface.parent_interface = self + peer_interface.parent_count = False RNS.Transport.interfaces.append(peer_interface) self.online = True diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index 89bd4711..e2e123d8 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -399,12 +399,14 @@ def __apply_config(self): if c["type"] == "I2PInterface": i2p_peers = c.as_list("peers") if "peers" in c else None + connectable = c.as_bool("connectable") if "connectable" in c else False interface = I2PInterface.I2PInterface( RNS.Transport, name, Reticulum.storagepath, - i2p_peers + i2p_peers, + connectable = connectable, ) if "outgoing" in c and c.as_bool("outgoing") == True: diff --git a/RNS/vendor/i2plib/tunnel.py b/RNS/vendor/i2plib/tunnel.py index 30d5d9a0..96eb4c15 100644 --- a/RNS/vendor/i2plib/tunnel.py +++ b/RNS/vendor/i2plib/tunnel.py @@ -95,8 +95,7 @@ async def handle_client(client_reader, client_writer): asyncio.ensure_future(proxy_data(client_reader, remote_writer), loop=self.loop) - self.server = await asyncio.start_server(handle_client, *self.local_address, - loop=self.loop) + self.server = await asyncio.start_server(handle_client, *self.local_address, loop=self.loop) def stop(self): super().stop()