Skip to content

Commit

Permalink
feat: additional exception shielding for asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
standy66 committed Jul 23, 2019
1 parent d99a2e3 commit 3cbd35c
Showing 1 changed file with 54 additions and 51 deletions.
105 changes: 54 additions & 51 deletions src/purerpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,62 +170,65 @@ def __init__(self, services: dict):
self.services = services

async def request_received(self, stream: GRPCProtoStream):
await stream.start_response()
event = await stream.receive_event()

if not isinstance(event, RequestReceived):
await stream.close(Status(StatusCode.INTERNAL, status_message="Expected headers"))
return

try:
service = self.services[event.service_name]
except KeyError:
await stream.close(Status(
StatusCode.UNIMPLEMENTED,
status_message="Service {service_name} is not implemented".format(service_name=event.service_name)
))
return
await stream.start_response()
event = await stream.receive_event()

try:
bound_rpc_method = service.methods[event.method_name]
except KeyError:
await stream.close(Status(
StatusCode.UNIMPLEMENTED,
status_message="Method {method_name} is not implemented in service {service_name}".format(
method_name=event.method_name,
service_name=event.service_name
)
))
return
if not isinstance(event, RequestReceived):
await stream.close(Status(StatusCode.INTERNAL, status_message="Expected headers"))
return

# TODO: Should at least pass through GeneratorExit
try:
method_fn = functools.partial(bound_rpc_method.method_fn, request=event)
cardinality = bound_rpc_method.signature.cardinality
stream.expect_message_type(bound_rpc_method.signature.request_type)
if cardinality == Cardinality.STREAM_STREAM:
await call_server_stream_stream(method_fn, stream)
elif cardinality == Cardinality.UNARY_STREAM:
await call_server_unary_stream(method_fn, stream)
elif cardinality == Cardinality.STREAM_UNARY:
await call_server_stream_unary(method_fn, stream)
else:
await call_server_unary_unary(method_fn, stream)
except RpcFailedError as error:
await stream.close(error.status)
try:
service = self.services[event.service_name]
except KeyError:
await stream.close(Status(
StatusCode.UNIMPLEMENTED,
status_message="Service {service_name} is not implemented".format(service_name=event.service_name)
))
return

try:
bound_rpc_method = service.methods[event.method_name]
except KeyError:
await stream.close(Status(
StatusCode.UNIMPLEMENTED,
status_message="Method {method_name} is not implemented in service {service_name}".format(
method_name=event.method_name,
service_name=event.service_name
)
))
return

# TODO: Should at least pass through GeneratorExit
try:
method_fn = functools.partial(bound_rpc_method.method_fn, request=event)
cardinality = bound_rpc_method.signature.cardinality
stream.expect_message_type(bound_rpc_method.signature.request_type)
if cardinality == Cardinality.STREAM_STREAM:
await call_server_stream_stream(method_fn, stream)
elif cardinality == Cardinality.UNARY_STREAM:
await call_server_unary_stream(method_fn, stream)
elif cardinality == Cardinality.STREAM_UNARY:
await call_server_stream_unary(method_fn, stream)
else:
await call_server_unary_unary(method_fn, stream)
except RpcFailedError as error:
await stream.close(error.status)
except:
logging.exception("Got exception while writing response stream")
await stream.close(Status(StatusCode.CANCELLED, status_message=repr(sys.exc_info())))
except:
logging.exception("Got exception while writing response stream")
await stream.close(Status(StatusCode.CANCELLED, status_message=repr(sys.exc_info())))
logging.exception("Got exception in request_received")

async def __call__(self, socket):
async with GRPCProtoSocket(self.config, socket) as self.grpc_socket:
# TODO: resource usage warning
# TODO: TaskGroup() uses a lot of memory if the connection is kept for a long time
# TODO: do we really need it here?
async with anyio.create_task_group() as task_group:
# TODO: Should at least pass through GeneratorExit
try:
# TODO: Should at least pass through GeneratorExit
try:
async with GRPCProtoSocket(self.config, socket) as self.grpc_socket:
# TODO: resource usage warning
# TODO: TaskGroup() uses a lot of memory if the connection is kept for a long time
# TODO: do we really need it here?
async with anyio.create_task_group() as task_group:
async for stream in self.grpc_socket.listen():
await task_group.spawn(self.request_received, stream)
except:
logging.exception("Got exception in main dispatch loop")
except:
logging.exception("Got exception in main dispatch loop")

0 comments on commit 3cbd35c

Please sign in to comment.