Skip to content

Commit

Permalink
Added sync backoff for unresponsive peers. Improved sync peer selection.
Browse files Browse the repository at this point in the history
  • Loading branch information
markqvist committed Dec 20, 2022
1 parent b10755e commit 1440a0b
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 44 deletions.
107 changes: 68 additions & 39 deletions LXMF/LXMPeer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ class LXMPeer:
# be unreachable before it is removed
MAX_UNREACHABLE = 4*24*60*60

# Everytime consecutive time a sync
# link fails to establish, add this
# amount off time to wait before the
# next sync is attempted.
SYNC_BACKOFF_STEP = 12*60

# How long to wait for an answer to
# peer path requests before deferring
# sync to later.
PATH_REQUEST_GRACE = 7.5

@staticmethod
def from_bytes(peer_bytes, router):
dictionary = msgpack.unpackb(peer_bytes)
Expand Down Expand Up @@ -66,6 +77,9 @@ def to_bytes(self):
def __init__(self, router, destination_hash):
self.alive = False
self.last_heard = 0
self.next_sync_attempt = 0
self.last_sync_attempt = 0
self.sync_backoff = 0
self.peering_timebase = 0

self.link = None
Expand All @@ -80,48 +94,61 @@ def __init__(self, router, destination_hash):
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")

def sync(self):

RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG)
self.last_sync_attempt = time.time()

if not RNS.Transport.has_path(self.destination_hash):
RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.destination_hash)
RNS.log("Path requested, retrying sync later", RNS.LOG_DEBUG)

else:
if self.identity == None:
self.identity = RNS.Identity.recall(destination_hash)
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")

if self.identity != None:
if len(self.unhandled_messages) > 0:
if self.state == LXMPeer.IDLE:
RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG)
self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed)
self.state = LXMPeer.LINK_ESTABLISHING

else:
if self.state == LXMPeer.LINK_READY:
self.alive = True
self.last_heard = time.time()

RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG)
unhandled_ids = []
purged_ids = []
for transient_id in self.unhandled_messages:
if transient_id in self.router.propagation_entries:
unhandled_ids.append(transient_id)
else:
purged_ids.append(transient_id)

for transient_id in purged_ids:
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
self.unhandled_messages.pop(transient_id)

RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
self.state = LXMPeer.REQUEST_SENT
if time.time() > self.next_sync_attempt:
if not RNS.Transport.has_path(self.destination_hash):
RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.destination_hash)
time.sleep(LXMPeer.PATH_REQUEST_GRACE)

if not RNS.Transport.has_path(self.destination_hash):
RNS.log("Path request was not answered, retrying sync with peer "+RNS.prettyhexrep(self.destination_hash)+" later", RNS.LOG_DEBUG)

else:
RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR)
if self.identity == None:
self.identity = RNS.Identity.recall(destination_hash)
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")

if self.identity != None:
if len(self.unhandled_messages) > 0:
if self.state == LXMPeer.IDLE:
RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG)
self.sync_backoff += LXMPeer.SYNC_BACKOFF_STEP
self.next_sync_attempt = time.time() + self.sync_backoff
self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed)
self.state = LXMPeer.LINK_ESTABLISHING

else:
if self.state == LXMPeer.LINK_READY:
self.alive = True
self.last_heard = time.time()
self.sync_backoff = 0

RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG)
unhandled_ids = []
purged_ids = []
for transient_id in self.unhandled_messages:
if transient_id in self.router.propagation_entries:
unhandled_ids.append(transient_id)
else:
purged_ids.append(transient_id)

for transient_id in purged_ids:
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
self.unhandled_messages.pop(transient_id)

RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
self.state = LXMPeer.REQUEST_SENT
else:
RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR)
else:
RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG)
if self.last_sync_attempt > self.last_heard:
self.alive = False

def request_failed(self, request_receipt):
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
Expand Down Expand Up @@ -191,6 +218,7 @@ def offer_response(self, request_receipt):
resource.transferred_messages = wanted_message_ids
self.state = LXMPeer.RESOURCE_TRANSFERRING
else:
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG)
self.state = LXMPeer.IDLE

except Exception as e:
Expand Down Expand Up @@ -226,6 +254,7 @@ def resource_concluded(self, resource):
def link_established(self, link):
self.link.identify(self.router.identity)
self.state = LXMPeer.LINK_READY
self.next_sync_attempt = 0
self.sync()

def link_closed(self, link):
Expand Down
27 changes: 23 additions & 4 deletions LXMF/LXMRouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,11 @@ def delivery_resource_concluded(self, resource):
def peer(self, destination_hash, timestamp):
if destination_hash in self.peers:
peer = self.peers[destination_hash]
peer.alive = True
if timestamp > peer.peering_timebase:
peer.alive = True
peer.sync_backoff = 0
peer.next_sync_attempt = 0

peer.peering_timebase = timestamp
peer.last_heard = time.time()
else:
Expand All @@ -934,18 +938,33 @@ def unpeer(self, destination_hash, timestamp = None):
def sync_peers(self):
culled_peers = []
waiting_peers = []
unresponsive_peers = []
for peer_id in self.peers:
peer = self.peers[peer_id]
if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE:
culled_peers.append(peer_id)
else:
if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0:
waiting_peers.append(peer)
if peer.alive:
waiting_peers.append(peer)
else:
if hasattr(peer, "next_sync_attempt") and time.time() > peer.next_sync_attempt:
unresponsive_peers.append(peer)
else:
pass
# RNS.log("Not adding peer "+str(peer)+" since it is in sync backoff", RNS.LOG_DEBUG)

peer_pool = []
if len(waiting_peers) > 0:
RNS.log("Randomly selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG)
selected_index = random.randint(0,len(waiting_peers)-1)
selected_peer = waiting_peers[selected_index]
peer_pool = waiting_peers
elif len(unresponsive_peers) > 0:
RNS.log("No active peers available, randomly selecting peer to sync from "+str(len(unresponsive_peers))+" unresponsive peers.", RNS.LOG_DEBUG)
peer_pool = unresponsive_peers

if len(peer_pool) > 0:
selected_index = random.randint(0,len(peer_pool)-1)
selected_peer = peer_pool[selected_index]
RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG)
selected_peer.sync()

Expand Down
2 changes: 1 addition & 1 deletion LXMF/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.6"
__version__ = "0.2.7"

0 comments on commit 1440a0b

Please sign in to comment.