diff --git a/community.py b/community.py index 3ac6d0d3..06dbf61b 100644 --- a/community.py +++ b/community.py @@ -31,10 +31,10 @@ from .member import DummyMember, Member from .message import (BatchConfiguration, Message, Packet, DropMessage, DelayMessageByProof, DelayMessageByMissingMessage, DropPacket, DelayPacket, DelayMessage) -from .payload import (AuthorizePayload, RevokePayload, UndoPayload, DestroyCommunityPayload, DynamicSettingsPayload, - IdentityPayload, MissingIdentityPayload, IntroductionRequestPayload, IntroductionResponsePayload, - PunctureRequestPayload, PuncturePayload, MissingMessagePayload, MissingSequencePayload, - MissingProofPayload, SignatureRequestPayload, SignatureResponsePayload) +from .payload import (AuthorizePayload, RevokePayload, UndoPayload, CancelPayload, DestroyCommunityPayload, + DynamicSettingsPayload, IdentityPayload, MissingIdentityPayload, IntroductionRequestPayload, + IntroductionResponsePayload, PunctureRequestPayload, PuncturePayload, MissingMessagePayload, + MissingSequencePayload, MissingProofPayload, SignatureRequestPayload, SignatureResponsePayload) from .requestcache import RequestCache, SignatureRequestCache, IntroductionRequestCache from .resolution import PublicResolution, LinearResolution, DynamicResolution from .statistics import CommunityStatistics @@ -137,7 +137,8 @@ def create_community(cls, dispersy, my_member, *args, **kargs): # authorize MY_MEMBER permission_triplets = [] - message_names = (u"dispersy-authorize", u"dispersy-revoke", u"dispersy-undo-own", u"dispersy-undo-other") + message_names = (u"dispersy-authorize", u"dispersy-revoke", u"dispersy-undo-own", u"dispersy-undo-other", + u"dispersy-cancel-own", u"dispersy-cancel-other") for message in community.get_meta_messages(): # grant all permissions for messages that use LinearResolution or DynamicResolution if isinstance(message.resolution, (LinearResolution, DynamicResolution)): @@ -151,6 +152,14 @@ def create_community(cls, dispersy, my_member, *args, **kargs): if not message.name in message_names: permission_triplets.append((my_member, message, u"undo")) + # TODO(lfei): remove undo_callback part after we have completely removed the undo message + # TODO(lfei): make sure the comments are correct + # ensure that cancel_callback is available + if message.cancel_callback: + # we do not support cancel permissions for authorize, revoke, cancel-own, and cancel-other (yet) + if message.name not in message_names: + permission_triplets.append((my_member, message, u"cancel")) + # grant authorize, revoke, and undo permission for messages that use PublicResolution # and SyncDistribution. Why? The undo permission allows nodes to revoke a specific # message that was gossiped around. The authorize permission is required to grant other @@ -166,6 +175,16 @@ def create_community(cls, dispersy, my_member, *args, **kargs): for allowed in (u"authorize", u"revoke", u"undo"): permission_triplets.append((my_member, message, allowed)) + # TODO(lfei): remove undo_callback part after we have completely removed the undo message + # TODO(lfei): make sure the comments are correct + # ensure that cancel_callback is available + if message.cancel_callback: + # we do not support cancel permissions for authorize, revoke, cancel-own, and cancel-other (yet) + if message.name not in message_names: + for allowed in (u"authorize", u"revoke", u"cancel"): + if (my_member, message, allowed) not in permission_triplets: + permission_triplets.append((my_member, message, allowed)) + if permission_triplets: community.create_authorize(permission_triplets, sign_with_master=True, forward=False) @@ -1715,6 +1734,22 @@ def initiate_meta_messages(self): UndoPayload(), self.check_undo, self.on_undo), + Message(self, u"dispersy-cancel-own", + MemberAuthentication(), + PublicResolution(), + FullSyncDistribution(enable_sequence_number=False, synchronization_direction=u"ASC", priority=128), + CommunityDestination(node_count=10), + CancelPayload(), + self.check_cancel, + self.on_cancel), + Message(self, u"dispersy-cancel-other", + MemberAuthentication(), + LinearResolution(), + FullSyncDistribution(enable_sequence_number=False, synchronization_direction=u"ASC", priority=128), + CommunityDestination(node_count=10), + CancelPayload(), + self.check_cancel, + self.on_cancel), Message(self, u"dispersy-destroy-community", MemberAuthentication(), LinearResolution(), @@ -3169,7 +3204,7 @@ def create_authorize(self, permission_triplets, sign_with_master=False, store=Tr assert isinstance(triplet[0], Member) assert isinstance(triplet[1], Message) assert isinstance(triplet[2], unicode) - assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo") + assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo", u"cancel") meta = self.get_meta_message(u"dispersy-authorize") message = meta.impl(authentication=((self.master_member if sign_with_master else self.my_member),), @@ -3281,6 +3316,194 @@ def on_revoke(self, messages, initializing=False): for meta, globaltime_range in changes.iteritems(): self._update_timerange(meta, globaltime_range[0], globaltime_range[1]) + # TODO(lfei): make sure that the comments are correct + def create_cancel(self, message, sign_with_master=False, store=True, update=True, forward=True): + """ + Creates a cancel message to end the loop of undo messages. + + A CANCEL message simply cancels another message and that's it. If a + person wants to enable the message that has been cancelled, he has to + create a new message with exactly the same content again. + """ + if __debug__: + assert isinstance(message, Message.Implementation) + assert isinstance(sign_with_master, bool) + assert isinstance(store, bool) + assert isinstance(update, bool) + assert isinstance(forward, bool) + assert message.cancel_callback, "message does not allow cancel" + assert message.name not in (u"dispersy-cancel-own", u"dispersy-cancel-other", + u"dispersy-undo-own", u"dispersy-undo-other", + u"dispersy-authorize", u"dispersy-revoke"),\ + "Currently we do NOT support canceling any of these, as it has consequences for other messages" + + # check and make sure that we cancel a message only once. + try: + undone, = self._dispersy._database.execute( + u"SELECT undone FROM sync WHERE community = ? AND member = ? AND global_time = ?", + (self.database_id, message.authentication.member.database_id, message.distribution.global_time)).next() + + except StopIteration: + assert False, "The message that we want to cancel does not exist. Programming error" + return + + if undone: + self._logger.error(u"Attempting to CANCEL the same message twice. Do nothing.") + raise RuntimeError(u"Attempting to CANCEL the same message twice. Do nothing.") + return + + # create the cancel message + meta = self.get_meta_message(u"dispersy-cancel-own" if self.my_member == message.authentication.member + and not sign_with_master else u"dispersy-cancel-other") + msg = meta.impl(authentication=((self.master_member if sign_with_master else self.my_member),), + distribution=(self.claim_global_time(),), + payload=(message.authentication.member, message.distribution.global_time, message)) + + if __debug__: + assert msg.distribution.global_time > message.distribution.global_time + allowed, _ = self.timeline.check(msg) + assert allowed, "create_cancel was called without having the permission to cancel" + + self._dispersy.store_update_forward([msg], store, update, forward) + return msg + + # TODO(lfei): make sure that the comments are correct + def check_cancel(self, messages): + # Note: previously all MESSAGES have been checked to ensure that the sequence numbers are + # correct. this check takes into account the messages in the batch. hence, if one of these + # messages is dropped or delayed it can invalidate the sequence numbers of the other + # messages in this batch! + + assert all(message.name in (u"dispersy-cancel-own", u"dispersy-cancel-other") for message in messages) + + for message in messages: + if message.payload.packet is None: + # obtain the packet that we are attempting to cancel + try: + packet_id, message_name, packet_data = self._dispersy._database.execute( + u"SELECT sync.id, meta_message.name, sync.packet FROM sync" + u" JOIN meta_message ON meta_message.id = sync.meta_message" + u" WHERE sync.community = ? AND sync.member = ? AND sync.global_time = ?", + (self.database_id, message.payload.member.database_id, message.payload.global_time)).next() + except StopIteration: + delay = DelayMessageByMissingMessage(message, message.payload.member, message.payload.global_time) + yield delay + continue + + message.payload.packet = Packet(self.get_meta_message(message_name), str(packet_data), packet_id) + + # ensure that the message in the payload allows cancel + if not message.payload.packet.meta.cancel_callback: + drop = DropMessage(message, "message does not allow cancel") + yield drop + continue + + # check the timeline + allowed, _ = message.community.timeline.check(message) + if not allowed: + delay = DelayMessageByProof(message) + yield delay + continue + + yield message + + # TODO(lfei): make sure that the comments are correct + def on_cancel(self, messages): + """ + Cancels a single message. + """ + assert all(message.name in (u"dispersy-cancel-own", u"dispersy-cancel-other") for message in messages) + + # Check the cancel messages we have got and only update the database with the latest one + # (which is the message with the biggest mid and global_time) + # check the cancel messages grouped by the message they cancel + categoried_msg_dict = dict() + for message in messages: + if isinstance(message, Message.Implementation) and message.payload.process_cancel: + # That's a normal cancel message + #parameters.append((message.packet_id, self.database_id, message.payload.member.database_id, + # message.payload.global_time)) + key = (message.payload.global_time, message.payload.member.mid) + if key not in categoried_msg_dict: + categoried_msg_dict[key] = dict() + categoried_msg_dict[key]["messages"] = list() + + # try to load the cancel message we have in the database + undone, = self._dispersy._database.execute( + u"SELECT undone FROM sync WHERE community = ? AND member = ? AND global_time = ?", + (self.database_id, message.payload.member.database_id, message.payload.global_time)).next() + if undone != 0: + try: + packet, = self._dispersy._database.execute( + u"SELECT sync.packet FROM sync" + u" JOIN meta_message ON meta_message.id == sync.meta_message" + u" WHERE meta_message.name LIKE 'dispersy-cancel-%' AND sync.id == ?", (undone,)).next() + except StopIteration as ex: + # no previous cancel for this message + pass + else: + categoried_msg_dict[key]["cancel_in_db"] = (str(packet), undone) + + categoried_msg_dict[key]["messages"].append(message) + + # for each message to be cancelled, check the cancel messages member global_time and mid + # (the larger the later), and we only use the latest one. + # The logic is that: + # (1) if the latest message is already in our database: + # (a) we will store the messages we have received, + # (b) and send the latest message to all the others because they don't have the latest message. + # (2) if the latest message is one of the messages we have received: + # (a) we will store the messages we have received, + # (b) update the undone with the latest message id, + # (c) and send the latest message to the others whose don't have have the latest message. + parameters = list() + real_messages = list() + for key, item in categoried_msg_dict.iteritems(): + all_item_list = [(msg.packet, msg.packet_id) for msg in item["messages"]] + + # add the database msg if it exists + if item.get("cancel_in_db", None): + assert (item["cancel_in_db"][0], item["cancel_in_db"][1], None) not in all_item_list,\ + "There are duplicate cancel messages with the one in DB!!!" + all_item_list.append(item["cancel_in_db"]) + + # sort and get the latest one + all_item_list.sort() + the_latest_one = all_item_list[-1] + if not item.get("cancel_in_db", None) or the_latest_one[-1] != item["cancel_in_db"][-1]: + # need to update the undone pointer in database + undone_ptr = the_latest_one[-1] + for msg in item["messages"]: + if msg.packet_id == undone_ptr: + the_latest_msg = msg + break + + parameters.append((undone_ptr, self.database_id, the_latest_msg.payload.member.database_id, + the_latest_msg.payload.global_time)) + real_messages.append(the_latest_msg) + + # send the latest packet to other people + the_latest_packet_id = the_latest_one[-1] + try: + the_latest_packet, = self._dispersy._database.execute(u"SELECT packet FROM sync WHERE id = ?", + (the_latest_one[-1],)).next() + except StopIteration: + pass + else: + for msg in item["messages"]: + if msg.packet_id != the_latest_packet_id: + self._dispersy._send_packets([msg.candidate], [str(the_latest_packet)], + self, "-caused by on_cancel-") + + if parameters: + self._dispersy._database.executemany(u"UPDATE sync SET undone = ? " + u"WHERE community = ? AND member = ? AND global_time = ?", parameters) + + for meta, sub_messages in groupby(real_messages, key=lambda x: x.payload.packet.meta): + meta.cancel_callback([(message.payload.member, message.payload.global_time, message.payload.packet) + for message in sub_messages]) + + # TODO(lfei): remove this after we have completely removed the undo message def create_undo(self, message, sign_with_master=False, store=True, update=True, forward=True): """ Create a dispersy-undo-own or dispersy-undo-other message to undo MESSAGE. @@ -3301,7 +3524,10 @@ def create_undo(self, message, sign_with_master=False, store=True, update=True, assert isinstance(update, bool) assert isinstance(forward, bool) assert message.undo_callback, "message does not allow undo" - assert not message.name in (u"dispersy-undo-own", u"dispersy-undo-other", u"dispersy-authorize", u"dispersy-revoke"), "Currently we do NOT support undoing any of these, as it has consequences for other messages" + assert not message.name in (u"dispersy-undo-own", u"dispersy-undo-other", + u"dispersy-authorize", u"dispersy-revoke", + u"dispersy-cancel-own", u"dispersy-cancel-other"),\ + "Currently we do NOT support undoing any of these, as it has consequences for other messages" # creating a second dispersy-undo for the same message is malicious behavior (it can cause # infinate data traffic). nodes that notice this behavior must blacklist the offending @@ -3350,6 +3576,7 @@ def create_undo(self, message, sign_with_master=False, store=True, update=True, self._dispersy.store_update_forward([msg], store, update, forward) return msg + # TODO(lfei): remove this after we have completely removed the undo message def check_undo(self, messages): # Note: previously all MESSAGES have been checked to ensure that the sequence numbers are # correct. this check takes into account the messages in the batch. hence, if one of these @@ -3448,6 +3675,7 @@ def check_undo(self, messages): yield message + # TODO(lfei): remove this after we have completely removed the undo message def on_undo(self, messages): """ Undo a single message. diff --git a/conversion.py b/conversion.py index cd01e4df..12918dd3 100644 --- a/conversion.py +++ b/conversion.py @@ -425,7 +425,8 @@ def _encode_authorize(self, message): ] ] """ - permission_map = {u"permit": int("0001", 2), u"authorize": int("0010", 2), u"revoke": int("0100", 2), u"undo": int("1000", 2)} + permission_map = {u"permit": int("00001", 2), u"authorize": int("00010", 2), u"revoke": int("00100", 2), + u"undo": int("01000", 2), u"cancel": int("10000", 2)} members = {} for member, message, permission in message.payload.permission_triplets: public_key = member.public_key @@ -454,7 +455,8 @@ def _encode_authorize(self, message): return tuple(data) def _decode_authorize(self, placeholder, offset, data): - permission_map = {u"permit": int("0001", 2), u"authorize": int("0010", 2), u"revoke": int("0100", 2), u"undo": int("1000", 2)} + permission_map = {u"permit": int("00001", 2), u"authorize": int("00010", 2), u"revoke": int("00100", 2), + u"undo": int("01000", 2), u"cancel": int("10000", 2)} permission_triplets = [] while offset < len(data): @@ -521,7 +523,8 @@ def _encode_revoke(self, message): ] ] """ - permission_map = {u"permit": int("0001", 2), u"authorize": int("0010", 2), u"revoke": int("0100", 2), u"undo": int("1000", 2)} + permission_map = {u"permit": int("00001", 2), u"authorize": int("00010", 2), u"revoke": int("00100", 2), + u"undo": int("01000", 2), u"cancel": int("10000", 2)} members = {} for member, message, permission in message.payload.permission_triplets: public_key = member.public_key @@ -550,7 +553,8 @@ def _encode_revoke(self, message): return tuple(data) def _decode_revoke(self, placeholder, offset, data): - permission_map = {u"permit": int("0001", 2), u"authorize": int("0010", 2), u"revoke": int("0100", 2), u"undo": int("1000", 2)} + permission_map = {u"permit": int("00001", 2), u"authorize": int("00010", 2), u"revoke": int("00100", 2), + u"undo": int("01000", 2), u"cancel": int("10000", 2)} permission_triplets = [] while offset < len(data): @@ -599,6 +603,57 @@ def _decode_revoke(self, placeholder, offset, data): return offset, placeholder.meta.payload.Implementation(placeholder.meta.payload, permission_triplets) + def _encode_cancel_own(self, message): + return self._struct_Q.pack(message.payload.global_time), + + def _decode_cancel_own(self, placeholder, offset, data): + # use the member in the Authentication policy + member = placeholder.authentication.member + + if len(data) < offset + 8: + raise DropPacket("Insufficient packet size") + + global_time, = self._struct_Q.unpack_from(data, offset) + offset += 8 + + if not global_time < placeholder.distribution.global_time: + raise DropPacket("Invalid global time (trying to apply undo to the future)") + + return offset, placeholder.meta.payload.Implementation(placeholder.meta.payload, member, global_time) + + def _encode_cancel_other(self, message): + public_key = message.payload.member.public_key + assert message.payload.member.public_key + return self._struct_H.pack(len(public_key)), public_key, self._struct_Q.pack(message.payload.global_time) + + def _decode_cancel_other(self, placeholder, offset, data): + if len(data) < offset + 2: + raise DropPacket("Insufficient packet size") + + key_length, = self._struct_H.unpack_from(data, offset) + offset += 2 + + if len(data) < offset + key_length: + raise DropPacket("Insufficient packet size") + + public_key = data[offset:offset + key_length] + try: + member = self._community.dispersy.get_member(public_key=public_key) + except: + raise DropPacket("Invalid cryptographic key (_decode_revoke)") + offset += key_length + + if len(data) < offset + 8: + raise DropPacket("Insufficient packet size") + + global_time, = self._struct_Q.unpack_from(data, offset) + offset += 8 + + if not global_time < placeholder.distribution.global_time: + raise DropPacket("Invalid global time (trying to apply undo to the future)") + + return offset, placeholder.meta.payload.Implementation(placeholder.meta.payload, member, global_time) + def _encode_undo_own(self, message): return (self._struct_Q.pack(message.payload.global_time),) @@ -1322,6 +1377,8 @@ def define(value, name, encode, decode): define(237, u"dispersy-undo-other", self._encode_undo_other, self._decode_undo_other) define(236, u"dispersy-dynamic-settings", self._encode_dynamic_settings, self._decode_dynamic_settings) # 235 for obsolete dispersy-missing-last-message + define(234, u"dispersy-cancel-own", self._encode_cancel_own, self._decode_cancel_own) + define(233, u"dispersy-cancel-other", self._encode_cancel_other, self._decode_cancel_other) if __debug__: if debug_non_available: diff --git a/dispersy.py b/dispersy.py index 7846d2ea..7ce02927 100644 --- a/dispersy.py +++ b/dispersy.py @@ -1274,7 +1274,7 @@ def check_double_member_and_global_time(unique, times, message): # refuse messages that have been pruned (or soon will be) messages = [DropMessage(message, "message has been pruned") if isinstance(message, Message.Implementation) and not message.distribution.pruning.is_active() else message for message in messages] - # for meta data messages + # for metadata messages if meta.distribution.custom_callback: unique = set() times = {} diff --git a/message.py b/message.py index 1c9091c4..1cc981d1 100644 --- a/message.py +++ b/message.py @@ -281,6 +281,10 @@ def handle_callback(self): def undo_callback(self): return self._meta._undo_callback + @property + def cancel_callback(self): + return self._meta._cancel_callback + @property def priority(self): return self._meta._priority @@ -422,7 +426,8 @@ def regenerate_packet(self, packet=""): def __str__(self): return "<%s.%s %s>" % (self._meta.__class__.__name__, self.__class__.__name__, self._meta._name) - def __init__(self, community, name, authentication, resolution, distribution, destination, payload, check_callback, handle_callback, undo_callback=None, batch=None): + def __init__(self, community, name, authentication, resolution, distribution, destination, payload, + check_callback, handle_callback, undo_callback=None, cancel_callback=None, batch=None): from .community import Community assert isinstance(community, Community), "COMMUNITY has invalid type '%s'" % type(community) assert isinstance(name, unicode), "NAME has invalid type '%s'" % type(name) @@ -434,6 +439,8 @@ def __init__(self, community, name, authentication, resolution, distribution, de assert callable(check_callback), type(check_callback) assert callable(handle_callback), type(handle_callback) assert undo_callback is None or callable(undo_callback), undo_callback + assert cancel_callback is None or callable(cancel_callback),\ + "cancel_callback is not a callable: %s" % type(cancel_callback) if isinstance(resolution, DynamicResolution): assert callable(undo_callback), "UNDO_CALLBACK must be specified when using the DynamicResolution policy" assert batch is None or isinstance(batch, BatchConfiguration), type(batch) @@ -448,6 +455,7 @@ def __init__(self, community, name, authentication, resolution, distribution, de self._check_callback = check_callback self._handle_callback = handle_callback self._undo_callback = undo_callback + self._cancel_callback = cancel_callback self._batch = BatchConfiguration() if batch is None else batch self._logger = logging.getLogger(self.__class__.__name__) @@ -506,6 +514,10 @@ def handle_callback(self): def undo_callback(self): return self._undo_callback + @property + def cancel_callback(self): + return self._cancel_callback + @property def batch(self): return self._batch diff --git a/payload.py b/payload.py index fcc3fc1d..82a5d6d6 100644 --- a/payload.py +++ b/payload.py @@ -353,7 +353,7 @@ def __init__(self, meta, permission_triplets): assert isinstance(triplet[1].resolution, (PublicResolution, LinearResolution, DynamicResolution)), triplet[1] assert isinstance(triplet[1].authentication, (MemberAuthentication, DoubleMemberAuthentication)), triplet[1] assert isinstance(triplet[2], unicode), triplet[2] - assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo"), triplet[2] + assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo", u"cancel"), triplet[2] super(AuthorizePayload.Implementation, self).__init__(meta) self._permission_triplets = permission_triplets @@ -386,7 +386,7 @@ def __init__(self, meta, permission_triplets): assert isinstance(triplet[1].resolution, (PublicResolution, LinearResolution, DynamicResolution)), triplet assert isinstance(triplet[1].authentication, (MemberAuthentication, DoubleMemberAuthentication)), triplet assert isinstance(triplet[2], unicode), triplet - assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo"), triplet + assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo", u"cancel"), triplet super(RevokePayload.Implementation, self).__init__(meta) self._permission_triplets = permission_triplets @@ -439,6 +439,50 @@ def packet(self, packet): self._packet = packet +class CancelPayload(Payload): + + class Implementation(Payload.Implementation): + + def __init__(self, meta, member, global_time, packet=None): + from .member import Member + from .message import Packet + assert isinstance(member, Member) + assert isinstance(global_time, (int, long)) + assert packet is None or isinstance(packet, Packet) + assert global_time > 0 + super(CancelPayload.Implementation, self).__init__(meta) + self._member = member + self._global_time = global_time + self._packet = packet + self._process_cancel = True + + @property + def process_cancel(self): + return self._process_cancel + + @process_cancel.setter + def process_cancel(self, enabled=True): + self._process_cancel = enabled + + @property + def member(self): + return self._member + + @property + def global_time(self): + return self._global_time + + @property + def packet(self): + return self._packet + + @packet.setter + def packet(self, packet): + from .message import Packet + assert isinstance(packet, Packet), type(packet) + self._packet = packet + + class MissingSequencePayload(Payload): class Implementation(Payload.Implementation): diff --git a/taskmanager.py b/taskmanager.py index 8d939d0d..69f2f42b 100644 --- a/taskmanager.py +++ b/taskmanager.py @@ -31,7 +31,7 @@ def replace_task(self, name, task): def register_task(self, name, task): """ - Register a task so it can be canceled at shutdown time or by name. + Register a task so it can be cancelled at shutdown time or by name. """ assert not self.is_pending_task_active(name), name assert isinstance(task, (Deferred, DelayedCall, LoopingCall)), task diff --git a/tests/debugcommunity/community.py b/tests/debugcommunity/community.py index 43b215b7..cb0c7c15 100644 --- a/tests/debugcommunity/community.py +++ b/tests/debugcommunity/community.py @@ -76,7 +76,8 @@ def initiate_meta_messages(self): TextPayload(), self._generic_timeline_check, self.on_text, - self.undo_text), + undo_callback=self.undo_text, + cancel_callback=self.cancel_text), Message(self, u"bin-key-text", MemberAuthentication(encoding="bin"), PublicResolution(), @@ -85,7 +86,8 @@ def initiate_meta_messages(self): TextPayload(), self._generic_timeline_check, self.on_text, - self.undo_text), + undo_callback=self.undo_text, + cancel_callback=self.cancel_text), Message(self, u"ASC-text", MemberAuthentication(), PublicResolution(), @@ -119,7 +121,8 @@ def initiate_meta_messages(self): TextPayload(), self._generic_timeline_check, self.on_text, - self.undo_text), + undo_callback=self.undo_text, + cancel_callback=self.cancel_text), Message(self, u"sequence-text", MemberAuthentication(), PublicResolution(), @@ -128,7 +131,8 @@ def initiate_meta_messages(self): TextPayload(), self._generic_timeline_check, self.on_text, - self.undo_text), + undo_callback=self.undo_text, + cancel_callback=self.cancel_text), Message(self, u"full-sync-global-time-pruning-text", MemberAuthentication(), PublicResolution(), @@ -137,7 +141,8 @@ def initiate_meta_messages(self): TextPayload(), self._generic_timeline_check, self.on_text, - self.undo_text), + undo_callback=self.undo_text, + cancel_callback=self.cancel_text), Message(self, u"high-priority-text", MemberAuthentication(), PublicResolution(), @@ -221,6 +226,14 @@ def undo_text(self, descriptors): message = packet.load_message() self._logger.debug("undo \"%s\" @%d", message.payload.text, global_time) + def cancel_text(self, descriptors): + """ + Received a cancel for a text message. + """ + for member, global_time, packet in descriptors: + message = packet.load_message() + self._logger.debug("cancel \"%s\" @%d", message.payload.text, global_time) + def dispersy_cleanup_community(self, message): if message.payload.is_soft_kill: raise NotImplementedError() diff --git a/tests/debugcommunity/node.py b/tests/debugcommunity/node.py index 201665b1..1ac886cd 100644 --- a/tests/debugcommunity/node.py +++ b/tests/debugcommunity/node.py @@ -354,6 +354,28 @@ def assert_not_stored(self, message=None, messages=None): assert_is_done = assert_is_stored + @blocking_call_on_reactor_thread + def assert_is_cancelled(self, message=None, messages=None, cancelled_by=None): + if messages is None: + messages = [message] + + for message in messages: + try: + undone, = self._dispersy.database.execute( + u"SELECT undone FROM sync, member" + u" WHERE sync.member = member.id AND community = ? AND mid = ? AND global_time = ?", + (self._community.database_id, buffer(message.authentication.member.mid), + message.distribution.global_time)).next() + self._testclass.assertGreater(undone, 0, "Message is not cancelled") + if cancelled_by: + undone, = self._dispersy.database.execute( + u"SELECT packet FROM sync WHERE id = ? ", + (undone,)).next() + self._testclass.assertEqual(str(undone), cancelled_by.packet) + + except StopIteration: + self._testclass.fail("Message is not stored") + @blocking_call_on_reactor_thread def assert_is_undone(self, message=None, messages=None, undone_by=None): if messages == None: @@ -473,6 +495,36 @@ def create_identity(self, global_time=None): return meta.impl(authentication=(self._my_member,), distribution=(global_time,)) + @blocking_call_on_reactor_thread + def create_cancel_own(self, message, global_time=None): + """ + Returns a new dispersy-cancel-own message. + """ + assert message.authentication.member == self._my_member, "use create_dispersy_cancel_other" + meta = self._community.get_meta_message(u"dispersy-cancel-own") + + if global_time is None: + global_time = self.claim_global_time() + + return meta.impl(authentication=(self._my_member,), + distribution=(global_time,), + payload=(message.authentication.member, message.distribution.global_time, message)) + + @blocking_call_on_reactor_thread + def create_cancel_other(self, message, global_time=None): + """ + Returns a new dispersy-cancel-other message. + """ + meta = self._community.get_meta_message(u"dispersy-cancel-other") + + if global_time is None: + global_time = self.claim_global_time() + + return meta.impl(authentication=(self._my_member,), + distribution=(global_time,), + payload=(message.authentication.member, message.distribution.global_time, message)) + + @blocking_call_on_reactor_thread def create_undo_own(self, message, global_time=None, sequence_number=None): """ diff --git a/tests/test_cancel.py b/tests/test_cancel.py new file mode 100644 index 00000000..b8f0d809 --- /dev/null +++ b/tests/test_cancel.py @@ -0,0 +1,236 @@ +from time import sleep + +from .dispersytestclass import DispersyTestFunc +from ..util import blocking_call_on_reactor_thread + + +class TestCancel(DispersyTestFunc): + + def test_self_cancel_own(self): + """ + NODE generates a few messages and then cancels them. + + This is always allowed. In fact, no check is made since only externally received packets + will be checked. + """ + node, = self.create_nodes(1) + + # create messages + messages = [node.create_full_sync_text("Should cancel #%d" % i, i + 10) for i in xrange(10)] + node.give_messages(messages, node) + + # check that they are in the database and are NOT cancelled + node.assert_is_stored(messages=messages) + + # cancel all messages + cancels = [node.create_cancel_own(message, i + 100) for i, message in enumerate(messages)] + + node.give_messages(cancels, node) + + # check that they are in the database and ARE cancelled + node.assert_is_cancelled(messages=messages) + node.assert_is_stored(messages=cancels) + + def test_node_cancel_other(self): + """ + MM gives NODE permission to cancel, OTHER generates a few messages and then NODE cancels + them. + """ + node, other = self.create_nodes(2) + other.send_identity(node) + + # MM grants cancel permission to NODE + authorize = self._mm.create_authorize([(node.my_member, self._community.get_meta_message(u"full-sync-text"), + u"cancel")], self._mm.claim_global_time()) + node.give_message(authorize, self._mm) + other.give_message(authorize, self._mm) + + # OTHER creates messages + messages = [other.create_full_sync_text("Should cancel #%d" % i, i + 10) for i in xrange(10)] + node.give_messages(messages, other) + + # check that they are in the database and are NOT cancelled + node.assert_is_stored(messages=messages) + + # NODE cancels all messages + cancels = [node.create_cancel_other(message, message.distribution.global_time + 100) + for i, message in enumerate(messages)] + node.give_messages(cancels, node) + + # check that they are in the database and ARE cancelled + node.assert_is_cancelled(messages=messages) + node.assert_is_stored(messages=cancels) + + def test_self_attempt_cancel_twice(self): + """ + NODE generated a message and then cancels it twice. The dispersy core should ensure that + that the second cancel is refused and the first cancel message should be returned instead. + """ + node, = self.create_nodes(1) + + # create message + message = node.create_full_sync_text("Should cancel @%d" % 1, 1) + node.give_message(message, node) + + # cancel twice + @blocking_call_on_reactor_thread + def create_cancel(): + return node._community.create_cancel(message) + + cancel1 = node.call(create_cancel) + self.assertIsNotNone(cancel1.packet) + + self.assertRaises(RuntimeError, create_cancel) + + def test_node_resolve_cancel_twice(self): + """ + If we receive two cancel messages canceling the same message, the higher one should be stored as the + latest cancel messages. + + The latest cancel message is the one that has the highest packet binary. + + In this test, we first same the original message C0, then we send the higher cancel message C2, + and finally the lower cancel message C1. + + The receiver first updates its database with C2 as the latest cancel message of C0. + When it receives C1, it will recognise that it has C2 > C1, and it sends C2 to the sender + to inform him with the latest cancel message of C0. + """ + node, other = self.create_nodes(2) + node.send_identity(other) + + # MM grants cancel permission to NODE + authorize = self._mm.create_authorize([(node.my_member, self._community.get_meta_message(u"full-sync-text"), + u"cancel")], self._mm.claim_global_time()) + node.give_message(authorize, self._mm) + other.give_message(authorize, self._mm) + + # create message + message = node.create_full_sync_text("Should cancel @%d" % 10, 10) + + # create cancels + cancel1 = node.create_cancel_own(message, 11) + cancel2 = node.create_cancel_own(message, 12) + low_message, high_message = sorted([cancel1, cancel2], key=lambda msg: msg.packet) + other.give_message(message, node) + other.give_message(high_message, node) + other.give_message(low_message, node) + # OTHER should send the first message back when receiving + # the second one (its "higher" than the one just received) + cancel_packets = list() + + for candidate, b in node.receive_packets(): + self._logger.debug(candidate) + self._logger.debug(type(b)) + self._logger.debug("%d", len(b)) + self._logger.debug("before %d", len(cancel_packets)) + cancel_packets.append(b) + self._logger.debug("packets amount: %d", len(cancel_packets)) + self._logger.debug("first cancel %d", len(cancel_packets[0])) + self._logger.debug("%d", len(b)) + + for x in cancel_packets: + self._logger.debug("loop%d", len(x)) + + def fetch_all_messages(): + for row in list(other._dispersy.database.execute(u"SELECT * FROM sync")): + self._logger.debug("_______ %s", row) + other.call(fetch_all_messages) + + self._logger.debug("%d", len(high_message.packet)) + + self.assertEqual(len(cancel_packets), len([high_message.packet])) + + # NODE should have both messages on the database and the lowest one should be cancelled by the highest. + messages = other.fetch_messages((u"dispersy-cancel-own",)) + self.assertEquals(len(messages), 2) + other.assert_is_stored(low_message) + other.assert_is_stored(high_message) + other.assert_is_cancelled(message, cancelled_by=high_message) + + def test_missing_message(self): + """ + NODE generates a few messages without sending them to OTHER. Following, NODE cancels the + messages and sends the cancel messages to OTHER. OTHER must now use a dispersy-missing-message + to request the messages that are about to be cancelled. The messages need to be processed and + subsequently cancelled. + """ + node, other = self.create_nodes(2) + node.send_identity(other) + + # create messages + messages = [node.create_full_sync_text("Should cancel @%d" % i, i + 10) for i in xrange(10)] + + # cancel all messages + cancels = [node.create_cancel_own(message, message.distribution.global_time + 100) + for i, message in enumerate(messages)] + + # send cancels to OTHER + other.give_messages(cancels, node) + + # receive the dispersy-missing-message messages + global_times = [message.distribution.global_time for message in messages] + global_time_requests = list() + + for _, message in node.receive_messages(names=[u"dispersy-missing-message"]): + self.assertEqual(message.payload.member.public_key, node.my_member.public_key) + global_time_requests.extend(message.payload.global_times) + + self.assertEqual(sorted(global_times), sorted(global_time_requests)) + + # give all 'delayed' messages + other.give_messages(messages, node) + + # check that they are in the database and ARE cancelled + other.assert_is_cancelled(messages=messages) + other.assert_is_stored(messages=cancels) + + def test_revoke_causing_cancel(self): + """ + 3 peers: SELF, p1, p2 + steps: + (1) SELF authorizes p1 to have cancel permission. + (2) p2 creates message m1. + (3) p1 creates message c1 to cancel m1. + (4) SELF revokes the cancel permission from p1. + (5) After (4), m1 should still be cancelled by c1. + """ + p1, p2 = self.create_nodes(2) + p1.send_identity(p2) + + # MM grants cancel permission to NODE + authorize = self._mm.create_authorize([(p1.my_member, self._community.get_meta_message(u"full-sync-text"), + u"cancel")], self._mm.claim_global_time()) + self._mm.give_message(authorize, self._mm) + p1.give_message(authorize, self._mm) + p2.give_message(authorize, self._mm) + sleep(1) + + # OTHER creates a message + m1 = p2.create_full_sync_text("will be cancelled", 10) + self._mm.give_message(m1, p2) + p2.give_message(m1, p2) + p1.give_message(m1, p2) + sleep(1) + p2.assert_is_stored(m1) + p1.assert_is_stored(m1) + + # NODE cancels the message + cancel = p1.create_cancel_other(m1, m1.distribution.global_time + 1) + p1.give_message(cancel, p1) + p2.give_message(cancel, p1) + self._mm.give_message(cancel, p1) + sleep(1) + p2.assert_is_cancelled(m1, cancelled_by=cancel) + p2.assert_is_stored(cancel) + + # SELF revoke cancel permission from NODE + revoke = self._mm.create_revoke([(p1.my_member, self._community.get_meta_message(u"full-sync-text"), + u"cancel")], self._mm.claim_global_time()) + self._mm.give_message(revoke, self._mm) + p1.give_message(revoke, self._mm) + p2.give_message(revoke, self._mm) + sleep(1) + + p1.assert_is_cancelled(m1, cancelled_by=cancel) + p2.assert_is_cancelled(m1, cancelled_by=cancel) diff --git a/timeline.py b/timeline.py index d0944aaa..1d2e963e 100644 --- a/timeline.py +++ b/timeline.py @@ -69,7 +69,7 @@ def check(self, message, permission=u"permit"): assert isinstance(message, Message.Implementation), message assert isinstance(message.authentication, (MemberAuthentication.Implementation, DoubleMemberAuthentication.Implementation)), message.authentication assert isinstance(permission, unicode) - assert permission in (u"permit", u"authorize", u"revoke", u"undo") + assert permission in (u"permit", u"authorize", u"revoke", u"undo", u"cancel") if isinstance(message.authentication, MemberAuthentication.Implementation): # MemberAuthentication @@ -122,6 +122,24 @@ def check(self, message, permission=u"permit"): return self._check(message.authentication.member, message.distribution.global_time, message.resolution, [(message.payload.packet.meta, u"undo")]) + elif message.name == u"dispersy-cancel-other": + assert isinstance(message.resolution, LinearResolution.Implementation), message + if __debug__: + self._logger.debug("collecting proof for container message dispersy-cancel-other") + self._logger.debug("master-member: %d; my-member: %d", + message.community.master_member.database_id, + message.community.my_member.database_id) + self._logger.debug("dispersy-cancel-other created by %d@%d", + message.authentication.member.database_id, + message.distribution.global_time) + self._logger.debug(" canceling message by %d@%d (%s, %s)", + message.payload.member.database_id, message.payload.global_time, + message.payload.packet.name, message.payload.packet.resolution) + self.printer() + + return self._check(message.authentication.member, message.distribution.global_time, message.resolution, + [(message.payload.packet.meta, u"cancel")]) + else: return self._check(message.authentication.member, message.distribution.global_time, message.resolution, [(message.meta, permission)]) else: @@ -143,7 +161,7 @@ def allowed(self, meta, global_time=0, permission=u"permit"): assert isinstance(global_time, (int, long)) assert global_time >= 0 assert isinstance(permission, unicode) - assert permission in (u"permit", u"authorize", u"revoke", u"undo") + assert permission in (u"permit", u"authorize", u"revoke", u"undo", u"cancel") return self._check(self._community.my_member, global_time if global_time else self._community.global_time, meta.resolution, [(meta, permission)]) def _check(self, member, global_time, resolution, permission_pairs): @@ -165,7 +183,7 @@ def _check(self, member, global_time, resolution, permission_pairs): assert len(pair) == 2 assert isinstance(pair[0], Message), "Requires meta message" assert isinstance(pair[1], unicode) - assert pair[1] in (u"permit", u"authorize", u"revoke", u"undo") + assert pair[1] in (u"permit", u"authorize", u"revoke", u"undo", u"cancel") assert isinstance(resolution, (PublicResolution.Implementation, LinearResolution.Implementation, DynamicResolution.Implementation, PublicResolution, LinearResolution, DynamicResolution)), resolution # TODO: we can make this more efficient by changing the loop a bit. make a shallow copy of @@ -272,9 +290,10 @@ def authorize(self, author, global_time, permission_triplets, proof): assert isinstance(triplet[1].resolution, (PublicResolution, LinearResolution, DynamicResolution)) assert isinstance(triplet[1].authentication, (MemberAuthentication, DoubleMemberAuthentication)) assert isinstance(triplet[2], unicode) - assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo") + assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo", u"cancel") assert isinstance(proof, Message.Implementation) - assert proof.name in (u"dispersy-authorize", u"dispersy-revoke", u"dispersy-undo-own", u"dispersy-undo-other") + assert proof.name in (u"dispersy-authorize", u"dispersy-revoke", u"dispersy-undo-own", u"dispersy-undo-other", + u"dispersy-cancel-own", u"dispersy-cancel-other") # check that AUTHOR is allowed to perform authorizations for these messages messages = set(message for _, message, _ in permission_triplets) @@ -354,9 +373,10 @@ def revoke(self, author, global_time, permission_triplets, proof): assert isinstance(triplet[1].resolution, (PublicResolution, LinearResolution, DynamicResolution)) assert isinstance(triplet[1].authentication, (MemberAuthentication, DoubleMemberAuthentication)) assert isinstance(triplet[2], unicode) - assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo") + assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo", u"cancel") assert isinstance(proof, Message.Implementation) - assert proof.name in (u"dispersy-authorize", u"dispersy-revoke", u"dispersy-undo-own", u"dispersy-undo-other") + assert proof.name in (u"dispersy-authorize", u"dispersy-revoke", u"dispersy-undo-own", u"dispersy-undo-other", + u"dispersy-cancel-own", u"dispersy-cancel-other") # TODO: we must remove duplicates in the below permission_pairs list # check that AUTHOR is allowed to perform these authorizations