Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AioHttp server middleware #3

Merged
merged 1 commit into from
Nov 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions aws_xray_sdk/core/async_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import asyncio

from .context import Context as _Context


class AsyncContext(_Context):
"""
Async Context for storing segments.

Inherits nearly everything from the main Context class.
Replaces threading.local with a task based local storage class,
Also overrides clear_trace_entities
"""
def __init__(self, *args, loop=None, use_task_factory=True, **kwargs):
super(AsyncContext, self).__init__(*args, **kwargs)

self._loop = loop
if loop is None:
self._loop = asyncio.get_event_loop()

if use_task_factory:
self._loop.set_task_factory(task_factory)

self._local = TaskLocalStorage(loop=loop)

def clear_trace_entities(self):
"""
Clear all trace_entities stored in the task local context.
"""
if self._local is not None:
self._local.clear()


class TaskLocalStorage(object):
"""
Simple task local storage
"""
def __init__(self, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop

def __setattr__(self, name, value):
if name in ('_loop',):
# Set normal attributes
object.__setattr__(self, name, value)

else:
# Set task local attributes
task = asyncio.Task.current_task(loop=self._loop)
if task is None:
return None

if not hasattr(task, 'context'):
task.context = {}

task.context[name] = value

def __getattribute__(self, item):
if item in ('_loop', 'clear'):
# Return references to local objects
return object.__getattribute__(self, item)

task = asyncio.Task.current_task(loop=self._loop)
if task is None:
return None

if hasattr(task, 'context') and item in task.context:
return task.context[item]

raise AttributeError('Task context does not have attribute {0}'.format(item))

def clear(self):
# If were in a task, clear the context dictionary
task = asyncio.Task.current_task(loop=self._loop)
if task is not None and hasattr(task, 'context'):
task.context.clear()


def task_factory(loop, coro):
"""
Task factory function

Fuction closely mirrors the logic inside of
asyncio.BaseEventLoop.create_task. Then if there is a current
task and the current task has a context then share that context
with the new task
"""
task = asyncio.Task(coro, loop=loop)
if task._source_traceback: # flake8: noqa
del task._source_traceback[-1] # flake8: noqa

# Share context with new task if possible
current_task = asyncio.Task.current_task(loop=loop)
if current_task is not None and hasattr(current_task, 'context'):
setattr(task, 'context', current_task.context)

return task
11 changes: 7 additions & 4 deletions aws_xray_sdk/core/sampling/sampling_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def applies(self, service_name, method, path):
the incoming request based on some of the request's parameters.
Any None parameters provided will be considered an implicit match.
"""
return (not service_name or wildcard_match(self.service_name, service_name)) \
return (not service_name or wildcard_match(self.service_name, service_name)) \
and (not method or wildcard_match(self.service_name, method)) \
and (not path or wildcard_match(self.path, path))

Expand Down Expand Up @@ -89,11 +89,14 @@ def reservoir(self):

def _validate(self):
if self.fixed_target < 0 or self.rate < 0:
raise InvalidSamplingManifestError('All rules must have non-negative values for fixed_target and rate')
raise InvalidSamplingManifestError('All rules must have non-negative values for '
'fixed_target and rate')

if self._default:
if self.service_name or self.method or self.path:
raise InvalidSamplingManifestError('The default rule must not specify values for url_path, service_name, or http_method')
raise InvalidSamplingManifestError('The default rule must not specify values for '
'url_path, service_name, or http_method')
else:
if not self.service_name or not self.method or not self.path:
raise InvalidSamplingManifestError('All non-default rules must have values for url_path, service_name, and http_method')
raise InvalidSamplingManifestError('All non-default rules must have values for '
'url_path, service_name, and http_method')
4 changes: 2 additions & 2 deletions aws_xray_sdk/core/utils/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
PY2 = sys.version_info < (3,)

if PY2:
annotation_value_types = (int, long, float, bool, str)
string_types = basestring
annotation_value_types = (int, long, float, bool, str) # noqa: F821
string_types = basestring # noqa: F821
else:
annotation_value_types = (int, float, bool, str)
string_types = str
Empty file.
78 changes: 78 additions & 0 deletions aws_xray_sdk/ext/aiohttp/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
AioHttp Middleware
"""
import traceback

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.models import http
from aws_xray_sdk.ext.util import calculate_sampling_decision, calculate_segment_name, construct_xray_header


async def middleware(app, handler):
"""
AioHttp Middleware Factory
"""
async def _middleware(request):
"""
Main middleware function, deals with all the X-Ray segment logic
"""
# Create X-Ray headers
xray_header = construct_xray_header(request.headers)
# Get name of service or generate a dynamic one from host
name = calculate_segment_name(request.headers['host'].split(':', 1)[0], xray_recorder)

sampling_decision = calculate_sampling_decision(
trace_header=xray_header,
recorder=xray_recorder,
service_name=request.headers['host'],
method=request.method,
path=request.path,
)

# Start a segment
segment = xray_recorder.begin_segment(
name=name,
traceid=xray_header.root,
parent_id=xray_header.parent,
sampling=sampling_decision,
)

# Store request metadata in the current segment
segment.put_http_meta(http.URL, request.url)
segment.put_http_meta(http.METHOD, request.method)

if 'User-Agent' in request.headers:
segment.put_http_meta(http.USER_AGENT, request.headers['User-Agent'])

if 'X-Forwarded-For' in request.headers:
segment.put_http_meta(http.CLIENT_IP, request.headers['X-Forwarded-For'])
segment.put_http_meta(http.X_FORWARDED_FOR, True)
elif 'remote_addr' in request.headers:
segment.put_http_meta(http.CLIENT_IP, request.headers['remote_addr'])
else:
segment.put_http_meta(http.CLIENT_IP, request.remote)

try:
# Call next middleware or request handler
response = await handler(request)
except Exception as err:
# Store exception information including the stacktrace to the segment
segment = xray_recorder.current_segment()
segment.put_http_meta(http.STATUS, 500)
stack = traceback.extract_stack(limit=xray_recorder._max_trace_back)
segment.add_exception(err, stack)
xray_recorder.end_segment()
raise

# Store response metadata into the current segment
segment.put_http_meta(http.STATUS, response.status)

if 'Content-Length' in response.headers:
length = int(response.headers['Content-Length'])
segment.put_http_meta(http.CONTENT_LENGTH, length)

# Close segment so it can be dispatched off to the daemon
xray_recorder.end_segment()

return response
return _middleware
8 changes: 3 additions & 5 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = ['sphinx.ext.autodoc',
'sphinx.ext.doctest',
'sphinx.ext.intersphinx',
'sphinx.ext.coverage']
'sphinx.ext.doctest',
'sphinx.ext.intersphinx',
'sphinx.ext.coverage']

# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
Expand Down Expand Up @@ -171,7 +171,5 @@
]




# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {'https://docs.python.org/': None}
34 changes: 33 additions & 1 deletion docs/frameworks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,36 @@ To generate segment based on incoming requests, you need to instantiate the X-Ra
XRayMiddleware(app, xray_recorder)

Flask built-in template rendering will be wrapped into subsegments.
You can configure the recorder, see :ref:`Configure Global Recorder <configurations>` for more details.
You can configure the recorder, see :ref:`Configure Global Recorder <configurations>` for more details.

aiohttp Server
==============

For X-Ray to create a segment based on an incoming request, you need register some middleware with aiohttp. As aiohttp
is an asyncronous framework, X-Ray will also need to be configured with an ``AsyncContext`` compared to the default threadded
version.::

import asyncio

from aiohttp import web

from aws_xray_sdk.ext.aiohttp.middleware import middleware
from aws_xray_sdk.core.async_context import AsyncContext
from aws_xray_sdk.core import xray_recorder
# Configure X-Ray to use AsyncContext
xray_recorder.configure(service='service_name', context=AsyncContext())


async def handler(request):
return web.Response(body='Hello World')

loop = asyncio.get_event_loop()
# Use X-Ray SDK middleware, its crucial the X-Ray middleware comes first
app = web.Application(middlewares=[middleware])
app.router.add_get("/", handler)

web.run_app(app)

There are two things to note from the example above. Firstly a middleware corountine from aws-xray-sdk is provided during the creation
of an aiohttp server app. Lastly the ``xray_recorder`` has also been configured with a name and an ``AsyncContext``. See
:ref:`Configure Global Recorder <configurations>` for more information about configuring the ``xray_recorder``.
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
name='aws-xray-sdk',
version='0.93',

description='The AWS X-Ray SDK for Python (the SDK) enables Python developers to record and emit information from within their applications to the AWS X-Ray service.',
description='The AWS X-Ray SDK for Python (the SDK) enables Python developers to record'
' and emit information from within their applications to the AWS X-Ray service.',
long_description=long_description,

url='https://github.com/aws/aws-xray-sdk-python',
Expand Down
Empty file added tests/ext/aiohttp/__init__.py
Empty file.
Loading