Skip to content

Commit

Permalink
Made the ip connection cancelable during the connection attempt
Browse files Browse the repository at this point in the history
The connection parameters will now only be saved when a new connection is actually established
  • Loading branch information
PatrickBaus committed Jul 16, 2021
1 parent ec17452 commit 65807ab
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 31 deletions.
1 change: 1 addition & 0 deletions tinkerforge_async/_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "1.2.0"
105 changes: 74 additions & 31 deletions tinkerforge_async/ip_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def __init__(self, host=None, port=4223, authentication_secret=None):
self.__logger = logging.getLogger(__name__)

# These will be assigned during connect()
self.__connection_task = None
self.__reader, self.__writer = None, None
self.__main_task = None
self.__lock = None
Expand Down Expand Up @@ -270,7 +271,7 @@ async def __read_packet(self):

return header, payload
except asyncio.TimeoutError:
return None, None
return None, None # No new packets. Nothing to do here.
except ConnectionResetError as exc:
raise NotConnectedError('Tinkerforge IP Connection not connected.') from exc

Expand Down Expand Up @@ -380,7 +381,7 @@ async def __authenticate(self, authentication_secret):
mac.update(client_nonce)

digest = mac.digest()
del mac # remove it from memery
del mac # remove it from memory

await self.send_request(
device=self,
Expand All @@ -389,57 +390,99 @@ async def __authenticate(self, authentication_secret):
response_expected=False,
)

async def __connect(self, host=None, port=None, authentication_secret=None):
"""
The __connect() call should be wrapped in a task, so it can be canceled
by the disconnect() function call. It must be protected by
`self.__lock`.
"""
try:
self.__reader, self.__writer = await asyncio.wait_for(
asyncio.open_connection(self.__host, self.__port),
self.__timeout
)
except asyncio.TimeoutError:
# Catch and reraise the timeout, because we want to get rid of
# the CancelledError raised by asyncio.wait_for() and also add
# our own message.
raise asyncio.TimeoutError(
'Timeout during connection attempt to %s:%i',
self.__host, self.__port
) from None

# If we are connected, start the listening task
self.__main_task = asyncio.create_task(self.main_loop())
if authentication_secret is not None:
try:
authentication_secret = authentication_secret.encode('ascii')
except AttributeError:
pass # already a bytestring

await self.__authenticate(authentication_secret)

self.__logger.info('Tinkerforge IP connection connected.')

async def connect(self, host=None, port=None, authentication_secret=None):
"""
Connect to a Tinkerforge host/stack. The parameters host, port and
authentication_secret are optional and can already be set at object
creation time. If any of host, port, authentication_secret are set, they
will overwrite the ones set at creation time.
The connect() call does authentication in the background if
authentication_secret is set either at creation or runtime. There is no
The connect() call handles authentication transparently if
`authentication_secret` is set either at creation or runtime. There is no
further user intervention necessary.
"""
if host is not None:
self.__host = host
if port is not None:
self.__port = port
if self.__host is None:
raise TypeError('Invalid hostname')
if authentication_secret is None:
authentication_secret = self.__authentication_secret

if self.__lock is None:
self.__lock = asyncio.Lock()
async with self.__lock:
if not self.is_connected:
self.__enumeration_queue = asyncio.Queue(maxsize=20)
self.__sequence_number_queue = asyncio.Queue(maxsize=15)
for i in range(1, 16):
self.__sequence_number_queue.put_nowait(i)

self.__reader, self.__writer = await asyncio.open_connection(self.__host, self.__port)
self.__main_task = asyncio.create_task(self.main_loop())
if authentication_secret is not None:
try:
authentication_secret = authentication_secret.encode('ascii')
except AttributeError:
pass # already a bytestring

await self.__authenticate(authentication_secret)

self.__logger.info('Tinkerforge IP connection connected.')
try:
# Update all connection parameters before connecting
if host is not None:
self.__host = host
if port is not None:
self.__port = port
if self.__host is None or self.__port is None:
raise TypeError('Invalid hostname')
if authentication_secret is None:
authentication_secret = self.__authentication_secret

self.__enumeration_queue = asyncio.Queue(maxsize=20)
self.__sequence_number_queue = asyncio.Queue(maxsize=15)
for i in range(1, 16):
self.__sequence_number_queue.put_nowait(i)
# We need to wrap the connection attempt into a task,
# because we want to be able to cancel it any time using the
# disconnect() call
self.__connection_task = asyncio.create_task(
self.__connect(host, port, authentication_secret)
)
# Actually wait for the task to finish
await self.__connection_task
finally:
self.__connection_task = None

async def disconnect(self):
"""
Disconnect from a tinkerforge host and clean up.
"""
if self.__lock is None:
self.__lock = asyncio.Lock()
if self.__connection_task is not None:
self.__connection_task.cancel()
try:
await self.__connection_task
except asyncio.CancelledError:
pass

async with self.__lock:
try:
if self.__main_task is not None and not self.__main_task.done():
self.__main_task.cancel()
await self.__main_task
try:
await self.__main_task
except asyncio.CancelledError:
pass
finally:
self.__lock = None

Expand All @@ -462,4 +505,4 @@ async def __close_transport(self):
if not future.done():
future.set_exception(NotConnectedError('Tinkerforge IP Connection closed.'))
self.__pending_requests = {}
self.__logger.info('Tinkerforge IP connection closed.')
self.__logger.info('Tinkerforge IP connection (%s:%s) closed.', self.__host, self.__port)

0 comments on commit 65807ab

Please sign in to comment.