Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Cancel message #370

Open
wants to merge 13 commits into
base: devel
Choose a base branch
from
242 changes: 235 additions & 7 deletions community.py

Large diffs are not rendered by default.

65 changes: 61 additions & 4 deletions conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ def _encode_authorize(self, message):
]
]
"""
permission_map = {u"permit": int("0001", 2), u"authorize": int("0010", 2), u"revoke": int("0100", 2), u"undo": int("1000", 2)}
permission_map = {u"permit": int("00001", 2), u"authorize": int("00010", 2), u"revoke": int("00100", 2),
u"undo": int("01000", 2), u"cancel": int("10000", 2)}
members = {}
for member, message, permission in message.payload.permission_triplets:
public_key = member.public_key
Expand Down Expand Up @@ -454,7 +455,8 @@ def _encode_authorize(self, message):
return tuple(data)

def _decode_authorize(self, placeholder, offset, data):
permission_map = {u"permit": int("0001", 2), u"authorize": int("0010", 2), u"revoke": int("0100", 2), u"undo": int("1000", 2)}
permission_map = {u"permit": int("00001", 2), u"authorize": int("00010", 2), u"revoke": int("00100", 2),
u"undo": int("01000", 2), u"cancel": int("10000", 2)}
permission_triplets = []

while offset < len(data):
Expand Down Expand Up @@ -521,7 +523,8 @@ def _encode_revoke(self, message):
]
]
"""
permission_map = {u"permit": int("0001", 2), u"authorize": int("0010", 2), u"revoke": int("0100", 2), u"undo": int("1000", 2)}
permission_map = {u"permit": int("00001", 2), u"authorize": int("00010", 2), u"revoke": int("00100", 2),
u"undo": int("01000", 2), u"cancel": int("10000", 2)}
members = {}
for member, message, permission in message.payload.permission_triplets:
public_key = member.public_key
Expand Down Expand Up @@ -550,7 +553,8 @@ def _encode_revoke(self, message):
return tuple(data)

def _decode_revoke(self, placeholder, offset, data):
permission_map = {u"permit": int("0001", 2), u"authorize": int("0010", 2), u"revoke": int("0100", 2), u"undo": int("1000", 2)}
permission_map = {u"permit": int("00001", 2), u"authorize": int("00010", 2), u"revoke": int("00100", 2),
u"undo": int("01000", 2), u"cancel": int("10000", 2)}
permission_triplets = []

while offset < len(data):
Expand Down Expand Up @@ -599,6 +603,57 @@ def _decode_revoke(self, placeholder, offset, data):

return offset, placeholder.meta.payload.Implementation(placeholder.meta.payload, permission_triplets)

def _encode_cancel_own(self, message):
return self._struct_Q.pack(message.payload.global_time),

def _decode_cancel_own(self, placeholder, offset, data):
# use the member in the Authentication policy
member = placeholder.authentication.member

if len(data) < offset + 8:
raise DropPacket("Insufficient packet size")

global_time, = self._struct_Q.unpack_from(data, offset)
offset += 8

if not global_time < placeholder.distribution.global_time:
raise DropPacket("Invalid global time (trying to apply undo to the future)")

return offset, placeholder.meta.payload.Implementation(placeholder.meta.payload, member, global_time)

def _encode_cancel_other(self, message):
public_key = message.payload.member.public_key
assert message.payload.member.public_key
return self._struct_H.pack(len(public_key)), public_key, self._struct_Q.pack(message.payload.global_time)

def _decode_cancel_other(self, placeholder, offset, data):
if len(data) < offset + 2:
raise DropPacket("Insufficient packet size")

key_length, = self._struct_H.unpack_from(data, offset)
offset += 2

if len(data) < offset + key_length:
raise DropPacket("Insufficient packet size")

public_key = data[offset:offset + key_length]
try:
member = self._community.dispersy.get_member(public_key=public_key)
except:
raise DropPacket("Invalid cryptographic key (_decode_revoke)")
offset += key_length

if len(data) < offset + 8:
raise DropPacket("Insufficient packet size")

global_time, = self._struct_Q.unpack_from(data, offset)
offset += 8

if not global_time < placeholder.distribution.global_time:
raise DropPacket("Invalid global time (trying to apply undo to the future)")

return offset, placeholder.meta.payload.Implementation(placeholder.meta.payload, member, global_time)

def _encode_undo_own(self, message):
return (self._struct_Q.pack(message.payload.global_time),)

Expand Down Expand Up @@ -1322,6 +1377,8 @@ def define(value, name, encode, decode):
define(237, u"dispersy-undo-other", self._encode_undo_other, self._decode_undo_other)
define(236, u"dispersy-dynamic-settings", self._encode_dynamic_settings, self._decode_dynamic_settings)
# 235 for obsolete dispersy-missing-last-message
define(234, u"dispersy-cancel-own", self._encode_cancel_own, self._decode_cancel_own)
define(233, u"dispersy-cancel-other", self._encode_cancel_other, self._decode_cancel_other)

if __debug__:
if debug_non_available:
Expand Down
2 changes: 1 addition & 1 deletion dispersy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,7 @@ def check_double_member_and_global_time(unique, times, message):
# refuse messages that have been pruned (or soon will be)
messages = [DropMessage(message, "message has been pruned") if isinstance(message, Message.Implementation) and not message.distribution.pruning.is_active() else message for message in messages]

# for meta data messages
# for metadata messages
if meta.distribution.custom_callback:
unique = set()
times = {}
Expand Down
14 changes: 13 additions & 1 deletion message.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ def handle_callback(self):
def undo_callback(self):
return self._meta._undo_callback

@property
def cancel_callback(self):
return self._meta._cancel_callback

@property
def priority(self):
return self._meta._priority
Expand Down Expand Up @@ -422,7 +426,8 @@ def regenerate_packet(self, packet=""):
def __str__(self):
return "<%s.%s %s>" % (self._meta.__class__.__name__, self.__class__.__name__, self._meta._name)

def __init__(self, community, name, authentication, resolution, distribution, destination, payload, check_callback, handle_callback, undo_callback=None, batch=None):
def __init__(self, community, name, authentication, resolution, distribution, destination, payload,
check_callback, handle_callback, undo_callback=None, cancel_callback=None, batch=None):
from .community import Community
assert isinstance(community, Community), "COMMUNITY has invalid type '%s'" % type(community)
assert isinstance(name, unicode), "NAME has invalid type '%s'" % type(name)
Expand All @@ -434,6 +439,8 @@ def __init__(self, community, name, authentication, resolution, distribution, de
assert callable(check_callback), type(check_callback)
assert callable(handle_callback), type(handle_callback)
assert undo_callback is None or callable(undo_callback), undo_callback
assert cancel_callback is None or callable(cancel_callback),\
"cancel_callback is not a callable: %s" % type(cancel_callback)
if isinstance(resolution, DynamicResolution):
assert callable(undo_callback), "UNDO_CALLBACK must be specified when using the DynamicResolution policy"
assert batch is None or isinstance(batch, BatchConfiguration), type(batch)
Expand All @@ -448,6 +455,7 @@ def __init__(self, community, name, authentication, resolution, distribution, de
self._check_callback = check_callback
self._handle_callback = handle_callback
self._undo_callback = undo_callback
self._cancel_callback = cancel_callback
self._batch = BatchConfiguration() if batch is None else batch
self._logger = logging.getLogger(self.__class__.__name__)

Expand Down Expand Up @@ -506,6 +514,10 @@ def handle_callback(self):
def undo_callback(self):
return self._undo_callback

@property
def cancel_callback(self):
return self._cancel_callback

@property
def batch(self):
return self._batch
Expand Down
48 changes: 46 additions & 2 deletions payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def __init__(self, meta, permission_triplets):
assert isinstance(triplet[1].resolution, (PublicResolution, LinearResolution, DynamicResolution)), triplet[1]
assert isinstance(triplet[1].authentication, (MemberAuthentication, DoubleMemberAuthentication)), triplet[1]
assert isinstance(triplet[2], unicode), triplet[2]
assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo"), triplet[2]
assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo", u"cancel"), triplet[2]
super(AuthorizePayload.Implementation, self).__init__(meta)
self._permission_triplets = permission_triplets

Expand Down Expand Up @@ -386,7 +386,7 @@ def __init__(self, meta, permission_triplets):
assert isinstance(triplet[1].resolution, (PublicResolution, LinearResolution, DynamicResolution)), triplet
assert isinstance(triplet[1].authentication, (MemberAuthentication, DoubleMemberAuthentication)), triplet
assert isinstance(triplet[2], unicode), triplet
assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo"), triplet
assert triplet[2] in (u"permit", u"authorize", u"revoke", u"undo", u"cancel"), triplet
super(RevokePayload.Implementation, self).__init__(meta)
self._permission_triplets = permission_triplets

Expand Down Expand Up @@ -439,6 +439,50 @@ def packet(self, packet):
self._packet = packet


class CancelPayload(Payload):

class Implementation(Payload.Implementation):

def __init__(self, meta, member, global_time, packet=None):
from .member import Member
from .message import Packet
assert isinstance(member, Member)
assert isinstance(global_time, (int, long))
assert packet is None or isinstance(packet, Packet)
assert global_time > 0
super(CancelPayload.Implementation, self).__init__(meta)
self._member = member
self._global_time = global_time
self._packet = packet
self._process_cancel = True

@property
def process_cancel(self):
return self._process_cancel

@process_cancel.setter
def process_cancel(self, enabled=True):
self._process_cancel = enabled

@property
def member(self):
return self._member

@property
def global_time(self):
return self._global_time

@property
def packet(self):
return self._packet

@packet.setter
def packet(self, packet):
from .message import Packet
assert isinstance(packet, Packet), type(packet)
self._packet = packet


class MissingSequencePayload(Payload):

class Implementation(Payload.Implementation):
Expand Down
2 changes: 1 addition & 1 deletion taskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def replace_task(self, name, task):

def register_task(self, name, task):
"""
Register a task so it can be canceled at shutdown time or by name.
Register a task so it can be cancelled at shutdown time or by name.
"""
assert not self.is_pending_task_active(name), name
assert isinstance(task, (Deferred, DelayedCall, LoopingCall)), task
Expand Down
23 changes: 18 additions & 5 deletions tests/debugcommunity/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def initiate_meta_messages(self):
TextPayload(),
self._generic_timeline_check,
self.on_text,
self.undo_text),
undo_callback=self.undo_text,
cancel_callback=self.cancel_text),
Message(self, u"bin-key-text",
MemberAuthentication(encoding="bin"),
PublicResolution(),
Expand All @@ -85,7 +86,8 @@ def initiate_meta_messages(self):
TextPayload(),
self._generic_timeline_check,
self.on_text,
self.undo_text),
undo_callback=self.undo_text,
cancel_callback=self.cancel_text),
Message(self, u"ASC-text",
MemberAuthentication(),
PublicResolution(),
Expand Down Expand Up @@ -119,7 +121,8 @@ def initiate_meta_messages(self):
TextPayload(),
self._generic_timeline_check,
self.on_text,
self.undo_text),
undo_callback=self.undo_text,
cancel_callback=self.cancel_text),
Message(self, u"sequence-text",
MemberAuthentication(),
PublicResolution(),
Expand All @@ -128,7 +131,8 @@ def initiate_meta_messages(self):
TextPayload(),
self._generic_timeline_check,
self.on_text,
self.undo_text),
undo_callback=self.undo_text,
cancel_callback=self.cancel_text),
Message(self, u"full-sync-global-time-pruning-text",
MemberAuthentication(),
PublicResolution(),
Expand All @@ -137,7 +141,8 @@ def initiate_meta_messages(self):
TextPayload(),
self._generic_timeline_check,
self.on_text,
self.undo_text),
undo_callback=self.undo_text,
cancel_callback=self.cancel_text),
Message(self, u"high-priority-text",
MemberAuthentication(),
PublicResolution(),
Expand Down Expand Up @@ -221,6 +226,14 @@ def undo_text(self, descriptors):
message = packet.load_message()
self._logger.debug("undo \"%s\" @%d", message.payload.text, global_time)

def cancel_text(self, descriptors):
"""
Received a cancel for a text message.
"""
for member, global_time, packet in descriptors:
message = packet.load_message()
self._logger.debug("cancel \"%s\" @%d", message.payload.text, global_time)

def dispersy_cleanup_community(self, message):
if message.payload.is_soft_kill:
raise NotImplementedError()
Expand Down
52 changes: 52 additions & 0 deletions tests/debugcommunity/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,28 @@ def assert_not_stored(self, message=None, messages=None):

assert_is_done = assert_is_stored

@blocking_call_on_reactor_thread
def assert_is_cancelled(self, message=None, messages=None, cancelled_by=None):
if messages is None:
messages = [message]

for message in messages:
try:
undone, = self._dispersy.database.execute(
u"SELECT undone FROM sync, member"
u" WHERE sync.member = member.id AND community = ? AND mid = ? AND global_time = ?",
(self._community.database_id, buffer(message.authentication.member.mid),
message.distribution.global_time)).next()
self._testclass.assertGreater(undone, 0, "Message is not cancelled")
if cancelled_by:
undone, = self._dispersy.database.execute(
u"SELECT packet FROM sync WHERE id = ? ",
(undone,)).next()
self._testclass.assertEqual(str(undone), cancelled_by.packet)

except StopIteration:
self._testclass.fail("Message is not stored")

@blocking_call_on_reactor_thread
def assert_is_undone(self, message=None, messages=None, undone_by=None):
if messages == None:
Expand Down Expand Up @@ -473,6 +495,36 @@ def create_identity(self, global_time=None):

return meta.impl(authentication=(self._my_member,), distribution=(global_time,))

@blocking_call_on_reactor_thread
def create_cancel_own(self, message, global_time=None):
"""
Returns a new dispersy-cancel-own message.
"""
assert message.authentication.member == self._my_member, "use create_dispersy_cancel_other"
meta = self._community.get_meta_message(u"dispersy-cancel-own")

if global_time is None:
global_time = self.claim_global_time()

return meta.impl(authentication=(self._my_member,),
distribution=(global_time,),
payload=(message.authentication.member, message.distribution.global_time, message))

@blocking_call_on_reactor_thread
def create_cancel_other(self, message, global_time=None):
"""
Returns a new dispersy-cancel-other message.
"""
meta = self._community.get_meta_message(u"dispersy-cancel-other")

if global_time is None:
global_time = self.claim_global_time()

return meta.impl(authentication=(self._my_member,),
distribution=(global_time,),
payload=(message.authentication.member, message.distribution.global_time, message))


@blocking_call_on_reactor_thread
def create_undo_own(self, message, global_time=None, sequence_number=None):
"""
Expand Down
Loading