Skip to content

Commit

Permalink
Improved I2P Interface
Browse files Browse the repository at this point in the history
  • Loading branch information
markqvist committed Feb 24, 2022
1 parent 987ff06 commit 4818413
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 36 deletions.
118 changes: 85 additions & 33 deletions RNS/Interfaces/I2PInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def __init__(self, rns_storagepath):
import RNS.vendor.i2plib as i2plib
import RNS.vendor.i2plib.utils

self.client_tunnels = {}
self.server_tunnels = {}
self.loop = None
self.i2plib = i2plib
self.utils = i2plib.utils
Expand All @@ -68,27 +70,45 @@ def start(self):
finally:
self.loop.close()


def stop(self):
for task in asyncio.Task.all_tasks(loop=self.loop):
task.cancel()

self.loop.stop()


def get_free_port(self):
return self.i2plib.utils.get_free_port()


def client_tunnel(self, owner, i2p_destination):
try:
async def tunnel_up():
RNS.log("Bringing up I2P tunnel to "+str(owner)+" in background, this may take a while...", RNS.LOG_INFO)
tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address)
await tunnel.run()
RNS.log(str(owner)+ " tunnel setup complete", RNS.LOG_VERBOSE)
self.client_tunnels[i2p_destination] = False

while True:
if not self.client_tunnels[i2p_destination]:
try:
async def tunnel_up():
RNS.log("Bringing up I2P tunnel to "+str(owner)+", this may take a while...", RNS.LOG_INFO)
tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address, loop=self.loop)
await tunnel.run()
owner.awaiting_i2p_tunnel = False
RNS.log(str(owner)+ " tunnel setup complete", RNS.LOG_VERBOSE)

asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop)
try:
self.loop.ext_owner = self
future = asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result()
self.client_tunnels[i2p_destination] = True

except Exception as e:
raise IOError("Could not connect to I2P SAM API while configuring to "+str(owner)+". Check that I2P is running and SAM is enabled.")
except Exception as e:
RNS.log("Error while setting up I2P tunnel: "+str(e))
raise e


except Exception as e:
raise IOError("Could not connect to I2P SAM API while configuring to "+str(owner)+". Check that I2P is running and SAM is enabled.")

time.sleep(5)


def server_tunnel(self, owner):
Expand All @@ -110,18 +130,23 @@ def server_tunnel(self, owner):

i2p_b32 = i2p_dest.base32

try:
async def tunnel_up():
RNS.log(str(owner)+" Bringing up I2P tunnel in background, this may take a while...", RNS.LOG_INFO)
tunnel = self.i2plib.ServerTunnel((owner.bind_ip, owner.bind_port), loop=self.loop, destination=i2p_dest, sam_address=self.sam_address)
await tunnel.run()
owner.awaiting_i2p_tunnel = False
RNS.log(str(owner)+ " tunnel setup complete, instance reachable at: "+str(i2p_dest.base32)+".b32.i2p", RNS.LOG_VERBOSE)
self.server_tunnels[i2p_b32] = False

asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop)
while self.server_tunnels[i2p_b32] == False:
try:
async def tunnel_up():
RNS.log(str(owner)+" Bringing up I2P endpoint, this may take a while...", RNS.LOG_INFO)
tunnel = self.i2plib.ServerTunnel((owner.bind_ip, owner.bind_port), loop=self.loop, destination=i2p_dest, sam_address=self.sam_address)
await tunnel.run()
RNS.log(str(owner)+ " endpoint setup complete. Now reachable at: "+str(i2p_dest.base32)+".b32.i2p", RNS.LOG_VERBOSE)

except Exception as e:
raise IOError("Could not connect to I2P SAM API while configuring "+str(self)+". Check that I2P is running and SAM is enabled.")
asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result()
self.server_tunnels[i2p_b32] = True

except Exception as e:
raise IOError("Could not connect to I2P SAM API while configuring "+str(self)+". Check that I2P is running and SAM is enabled.")

time.sleep(5)

def get_loop(self):
return asyncio.get_event_loop()
Expand All @@ -148,6 +173,7 @@ def __init__(self, parent_interface, owner, name, target_i2p_dest=None, connecte
self.OUT = False
self.socket = None
self.parent_interface = parent_interface
self.parent_count = True
self.name = name
self.initiator = False
self.reconnecting = False
Expand Down Expand Up @@ -188,19 +214,34 @@ def __init__(self, parent_interface, owner, name, target_i2p_dest=None, connecte
self.target_port = self.bind_port

self.awaiting_i2p_tunnel = True
self.parent_interface.i2p.client_tunnel(self, target_i2p_dest)

if not self.connect(initial=True):
thread = threading.Thread(target=self.reconnect)
thread.setDaemon(True)
thread.start()
else:
thread = threading.Thread(target=self.read_loop)
thread.setDaemon(True)
thread.start()

def tunnel_job():
self.parent_interface.i2p.client_tunnel(self, target_i2p_dest)

thread = threading.Thread(target=tunnel_job)
thread.setDaemon(True)
thread.start()

def wait_job():
while self.awaiting_i2p_tunnel:
time.sleep(0.25)

if not self.kiss_framing:
self.wants_tunnel = True

if not self.connect(initial=True):
thread = threading.Thread(target=self.reconnect)
thread.setDaemon(True)
thread.start()
else:
thread = threading.Thread(target=self.read_loop)
thread.setDaemon(True)
thread.start()

thread = threading.Thread(target=wait_job)
thread.setDaemon(True)
thread.start()


def set_timeouts_linux(self):
if not self.i2p_tunneled:
Expand Down Expand Up @@ -274,6 +315,9 @@ def connect(self, initial=False):
self.writing = False
self.never_connected = False

if not self.kiss_framing and self.wants_tunnel:
RNS.Transport.synthesize_tunnel(self)

return True


Expand Down Expand Up @@ -316,7 +360,7 @@ def reconnect(self):

def processIncoming(self, data):
self.rxb += len(data)
if hasattr(self, "parent_interface") and self.parent_interface != None:
if hasattr(self, "parent_interface") and self.parent_interface != None and self.parent_count:
self.parent_interface.rxb += len(data)

self.owner.inbound(data, self)
Expand All @@ -337,7 +381,7 @@ def processOutgoing(self, data):
self.socket.sendall(data)
self.writing = False
self.txb += len(data)
if hasattr(self, "parent_interface") and self.parent_interface != None:
if hasattr(self, "parent_interface") and self.parent_interface != None and self.parent_count:
self.parent_interface.txb += len(data)

except Exception as e:
Expand Down Expand Up @@ -456,12 +500,13 @@ def __str__(self):

class I2PInterface(Interface):

def __init__(self, owner, name, rns_storagepath, peers):
def __init__(self, owner, name, rns_storagepath, peers, connectable = True):
self.rxb = 0
self.txb = 0
self.online = False
self.clients = 0
self.owner = owner
self.connectable = connectable
self.i2p_tunneled = True

self.i2p = I2PController(rns_storagepath)
Expand Down Expand Up @@ -492,7 +537,13 @@ def createHandler(*args, **keys):
thread.setDaemon(True)
thread.start()

self.i2p.server_tunnel(self)
if self.connectable:
def tunnel_job():
self.i2p.server_tunnel(self)

thread = threading.Thread(target=tunnel_job)
thread.setDaemon(True)
thread.start()

if peers != None:
for peer_addr in peers:
Expand All @@ -501,6 +552,7 @@ def createHandler(*args, **keys):
peer_interface.OUT = True
peer_interface.IN = True
peer_interface.parent_interface = self
peer_interface.parent_count = False
RNS.Transport.interfaces.append(peer_interface)

self.online = True
Expand Down
4 changes: 3 additions & 1 deletion RNS/Reticulum.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,14 @@ def __apply_config(self):

if c["type"] == "I2PInterface":
i2p_peers = c.as_list("peers") if "peers" in c else None
connectable = c.as_bool("connectable") if "connectable" in c else False

interface = I2PInterface.I2PInterface(
RNS.Transport,
name,
Reticulum.storagepath,
i2p_peers
i2p_peers,
connectable = connectable,
)

if "outgoing" in c and c.as_bool("outgoing") == True:
Expand Down
3 changes: 1 addition & 2 deletions RNS/vendor/i2plib/tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ async def handle_client(client_reader, client_writer):
asyncio.ensure_future(proxy_data(client_reader, remote_writer),
loop=self.loop)

self.server = await asyncio.start_server(handle_client, *self.local_address,
loop=self.loop)
self.server = await asyncio.start_server(handle_client, *self.local_address, loop=self.loop)

def stop(self):
super().stop()
Expand Down

0 comments on commit 4818413

Please sign in to comment.