Skip to content

Commit

Permalink
Merge pull request #3 from terrycain/aiohttp_middleware
Browse files Browse the repository at this point in the history
AioHttp server middleware
  • Loading branch information
haotianw465 authored Nov 9, 2017
2 parents a00b8b5 + 9672b13 commit 63b5ea2
Show file tree
Hide file tree
Showing 12 changed files with 468 additions and 14 deletions.
98 changes: 98 additions & 0 deletions aws_xray_sdk/core/async_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import asyncio

from .context import Context as _Context


class AsyncContext(_Context):
"""
Async Context for storing segments.
Inherits nearly everything from the main Context class.
Replaces threading.local with a task based local storage class,
Also overrides clear_trace_entities
"""
def __init__(self, *args, loop=None, use_task_factory=True, **kwargs):
super(AsyncContext, self).__init__(*args, **kwargs)

self._loop = loop
if loop is None:
self._loop = asyncio.get_event_loop()

if use_task_factory:
self._loop.set_task_factory(task_factory)

self._local = TaskLocalStorage(loop=loop)

def clear_trace_entities(self):
"""
Clear all trace_entities stored in the task local context.
"""
if self._local is not None:
self._local.clear()


class TaskLocalStorage(object):
"""
Simple task local storage
"""
def __init__(self, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop

def __setattr__(self, name, value):
if name in ('_loop',):
# Set normal attributes
object.__setattr__(self, name, value)

else:
# Set task local attributes
task = asyncio.Task.current_task(loop=self._loop)
if task is None:
return None

if not hasattr(task, 'context'):
task.context = {}

task.context[name] = value

def __getattribute__(self, item):
if item in ('_loop', 'clear'):
# Return references to local objects
return object.__getattribute__(self, item)

task = asyncio.Task.current_task(loop=self._loop)
if task is None:
return None

if hasattr(task, 'context') and item in task.context:
return task.context[item]

raise AttributeError('Task context does not have attribute {0}'.format(item))

def clear(self):
# If were in a task, clear the context dictionary
task = asyncio.Task.current_task(loop=self._loop)
if task is not None and hasattr(task, 'context'):
task.context.clear()


def task_factory(loop, coro):
"""
Task factory function
Fuction closely mirrors the logic inside of
asyncio.BaseEventLoop.create_task. Then if there is a current
task and the current task has a context then share that context
with the new task
"""
task = asyncio.Task(coro, loop=loop)
if task._source_traceback: # flake8: noqa
del task._source_traceback[-1] # flake8: noqa

# Share context with new task if possible
current_task = asyncio.Task.current_task(loop=loop)
if current_task is not None and hasattr(current_task, 'context'):
setattr(task, 'context', current_task.context)

return task
11 changes: 7 additions & 4 deletions aws_xray_sdk/core/sampling/sampling_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def applies(self, service_name, method, path):
the incoming request based on some of the request's parameters.
Any None parameters provided will be considered an implicit match.
"""
return (not service_name or wildcard_match(self.service_name, service_name)) \
return (not service_name or wildcard_match(self.service_name, service_name)) \
and (not method or wildcard_match(self.service_name, method)) \
and (not path or wildcard_match(self.path, path))

Expand Down Expand Up @@ -89,11 +89,14 @@ def reservoir(self):

def _validate(self):
if self.fixed_target < 0 or self.rate < 0:
raise InvalidSamplingManifestError('All rules must have non-negative values for fixed_target and rate')
raise InvalidSamplingManifestError('All rules must have non-negative values for '
'fixed_target and rate')

if self._default:
if self.service_name or self.method or self.path:
raise InvalidSamplingManifestError('The default rule must not specify values for url_path, service_name, or http_method')
raise InvalidSamplingManifestError('The default rule must not specify values for '
'url_path, service_name, or http_method')
else:
if not self.service_name or not self.method or not self.path:
raise InvalidSamplingManifestError('All non-default rules must have values for url_path, service_name, and http_method')
raise InvalidSamplingManifestError('All non-default rules must have values for '
'url_path, service_name, and http_method')
4 changes: 2 additions & 2 deletions aws_xray_sdk/core/utils/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
PY2 = sys.version_info < (3,)

if PY2:
annotation_value_types = (int, long, float, bool, str)
string_types = basestring
annotation_value_types = (int, long, float, bool, str) # noqa: F821
string_types = basestring # noqa: F821
else:
annotation_value_types = (int, float, bool, str)
string_types = str
Empty file.
78 changes: 78 additions & 0 deletions aws_xray_sdk/ext/aiohttp/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
AioHttp Middleware
"""
import traceback

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.models import http
from aws_xray_sdk.ext.util import calculate_sampling_decision, calculate_segment_name, construct_xray_header


async def middleware(app, handler):
"""
AioHttp Middleware Factory
"""
async def _middleware(request):
"""
Main middleware function, deals with all the X-Ray segment logic
"""
# Create X-Ray headers
xray_header = construct_xray_header(request.headers)
# Get name of service or generate a dynamic one from host
name = calculate_segment_name(request.headers['host'].split(':', 1)[0], xray_recorder)

sampling_decision = calculate_sampling_decision(
trace_header=xray_header,
recorder=xray_recorder,
service_name=request.headers['host'],
method=request.method,
path=request.path,
)

# Start a segment
segment = xray_recorder.begin_segment(
name=name,
traceid=xray_header.root,
parent_id=xray_header.parent,
sampling=sampling_decision,
)

# Store request metadata in the current segment
segment.put_http_meta(http.URL, request.url)
segment.put_http_meta(http.METHOD, request.method)

if 'User-Agent' in request.headers:
segment.put_http_meta(http.USER_AGENT, request.headers['User-Agent'])

if 'X-Forwarded-For' in request.headers:
segment.put_http_meta(http.CLIENT_IP, request.headers['X-Forwarded-For'])
segment.put_http_meta(http.X_FORWARDED_FOR, True)
elif 'remote_addr' in request.headers:
segment.put_http_meta(http.CLIENT_IP, request.headers['remote_addr'])
else:
segment.put_http_meta(http.CLIENT_IP, request.remote)

try:
# Call next middleware or request handler
response = await handler(request)
except Exception as err:
# Store exception information including the stacktrace to the segment
segment = xray_recorder.current_segment()
segment.put_http_meta(http.STATUS, 500)
stack = traceback.extract_stack(limit=xray_recorder._max_trace_back)
segment.add_exception(err, stack)
xray_recorder.end_segment()
raise

# Store response metadata into the current segment
segment.put_http_meta(http.STATUS, response.status)

if 'Content-Length' in response.headers:
length = int(response.headers['Content-Length'])
segment.put_http_meta(http.CONTENT_LENGTH, length)

# Close segment so it can be dispatched off to the daemon
xray_recorder.end_segment()

return response
return _middleware
8 changes: 3 additions & 5 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = ['sphinx.ext.autodoc',
'sphinx.ext.doctest',
'sphinx.ext.intersphinx',
'sphinx.ext.coverage']
'sphinx.ext.doctest',
'sphinx.ext.intersphinx',
'sphinx.ext.coverage']

# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
Expand Down Expand Up @@ -171,7 +171,5 @@
]




# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {'https://docs.python.org/': None}
34 changes: 33 additions & 1 deletion docs/frameworks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,36 @@ To generate segment based on incoming requests, you need to instantiate the X-Ra
XRayMiddleware(app, xray_recorder)

Flask built-in template rendering will be wrapped into subsegments.
You can configure the recorder, see :ref:`Configure Global Recorder <configurations>` for more details.
You can configure the recorder, see :ref:`Configure Global Recorder <configurations>` for more details.

aiohttp Server
==============

For X-Ray to create a segment based on an incoming request, you need register some middleware with aiohttp. As aiohttp
is an asyncronous framework, X-Ray will also need to be configured with an ``AsyncContext`` compared to the default threadded
version.::

import asyncio

from aiohttp import web

from aws_xray_sdk.ext.aiohttp.middleware import middleware
from aws_xray_sdk.core.async_context import AsyncContext
from aws_xray_sdk.core import xray_recorder
# Configure X-Ray to use AsyncContext
xray_recorder.configure(service='service_name', context=AsyncContext())


async def handler(request):
return web.Response(body='Hello World')

loop = asyncio.get_event_loop()
# Use X-Ray SDK middleware, its crucial the X-Ray middleware comes first
app = web.Application(middlewares=[middleware])
app.router.add_get("/", handler)

web.run_app(app)

There are two things to note from the example above. Firstly a middleware corountine from aws-xray-sdk is provided during the creation
of an aiohttp server app. Lastly the ``xray_recorder`` has also been configured with a name and an ``AsyncContext``. See
:ref:`Configure Global Recorder <configurations>` for more information about configuring the ``xray_recorder``.
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
name='aws-xray-sdk',
version='0.93',

description='The AWS X-Ray SDK for Python (the SDK) enables Python developers to record and emit information from within their applications to the AWS X-Ray service.',
description='The AWS X-Ray SDK for Python (the SDK) enables Python developers to record'
' and emit information from within their applications to the AWS X-Ray service.',
long_description=long_description,

url='https://github.com/aws/aws-xray-sdk-python',
Expand Down
Empty file added tests/ext/aiohttp/__init__.py
Empty file.
Loading

0 comments on commit 63b5ea2

Please sign in to comment.