Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix verification tool for later Py/Dj versions and add tests #41

Merged
merged 19 commits into from
Feb 21, 2018
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 62 additions & 37 deletions hawkrest/management/commands/hawkrequest.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,69 @@
from optparse import make_option

from django.core.management.base import BaseCommand, CommandError

import logging
from mohawk import Sender

from hawkrest import lookup_credentials
from hawkrest import HawkAuthentication


DEFAULT_HTTP_METHOD = 'GET'

CMD_OPTIONS = {
'--url': {
'action': 'store',
'type': str,
'help': 'Absolute URL to request.'
},
'--creds': {
'action': 'store',
'type': str,
'help': 'ID for Hawk credentials.'
},
'-X': {
'action': 'store',
'type': str,
'help': 'Request method. Default: {}.'.format(DEFAULT_HTTP_METHOD),
'default': DEFAULT_HTTP_METHOD
},
'-d': {
'action': 'store',
'type': str,
'help': 'Query string parameters.'
}
}


def request(url, method, data, headers):
import requests
do_request = getattr(requests, method.lower())
res = do_request(url, data=data, headers=headers)
return res


def lookup_credentials(creds_key):
return HawkAuthentication().hawk_credentials_lookup(creds_key)


class Command(BaseCommand):
help = 'Make a Hawk authenticated request'
option_list = BaseCommand.option_list + (
make_option('--url', action='store', type=str,
help='Absolute URL to request.'),
make_option('--creds', action='store', type=str,
help='ID for Hawk credentials.'),
make_option('-X', action='store', type=str,
help='Request method. Default: %default.',
default='GET'),
make_option('-d', action='store', type=str,
help='Query string parameters'),
)

def handle(self, *args, **options):
hawk_log = logging.getLogger('mohawk')
hawk_log.setLevel(logging.DEBUG)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you removed this. Was the logging too noisy? I had found it helpful in the past because it shows exactly what was used to create MACs, etc. If you still want to remove it, maybe there can be an option to re-enable it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. It actually has more to do with me not fully grasping how getLogger works. At the time, I thought we were creating a new logger and that it wasn't being used, but now it's clear to me that we're listening in on the logger of the underlying mohawk lib.

I added it back in; however, I thought it more conventional to add this logging config to the top of the module beneath the imports.

hawk_log.addHandler(logging.StreamHandler())
def add_arguments(self, parser):
for opt, config in CMD_OPTIONS.items():
parser.add_argument(opt, **config)

try:
import requests
except ImportError:
raise CommandError('To use this command you first need to '
'install the requests module')
def handle(self, *args, **options):
url = options['url']
if not url:
raise CommandError('Specify a URL to load with --url')

creds_key = options['creds']
if not creds_key:
raise CommandError('Specify ID for Hawk credentials with --creds')

method = options['X']
qs = options['d'] or ''
request_content_type = ('application/x-www-form-urlencoded'
if qs else 'text/plain')
method = options['X']

credentials = lookup_credentials(options['creds'])
credentials = lookup_credentials(creds_key)

sender = Sender(credentials,
url, method.upper(),
Expand All @@ -51,22 +73,25 @@ def handle(self, *args, **options):
headers = {'Authorization': sender.request_header,
'Content-Type': request_content_type}

do_request = getattr(requests, method.lower())
res = do_request(url, data=qs, headers=headers)
try:
res = request(url, method.lower(), data=qs, headers=headers)
except ImportError:
raise CommandError('To use this command you first need to '
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see this try/except/raise block moved back to the exact import requests statement within request() because otherwise it could obscure other imports.

For testing purposes you could create a get_requests_module() that does the import so you can mock it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes perfect sense. Done.

'install the requests module')

print '{method} -d {qs} {url}'.format(method=method.upper(),
qs=qs or 'None',
url=url)
print res.text
self.stdout.write('{method} -d {qs} {url}'.format(method=method.upper(),
qs=qs or 'None',
url=url))
self.stdout.write(res.text)

# Verify we're talking to our trusted server.
print res.headers
self.stdout.write(str(res.headers))
auth_hdr = res.headers.get('Server-Authorization', None)
if auth_hdr:
sender.accept_response(auth_hdr,
content=res.text,
content_type=res.headers['Content-Type'])
print '<response was Hawk verified>'
self.stdout.write('<response was Hawk verified>')
else:
print '** NO Server-Authorization header **'
print '<response was NOT Hawk verified>'
self.stdout.write('** NO Server-Authorization header **')
self.stdout.write('<response was NOT Hawk verified>')
60 changes: 60 additions & 0 deletions tests/test_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import mock

from django.core.management.base import CommandError
from django.core.management import call_command

from tests.base import BaseTest


def exec_cmd(**kwargs):
call_command('hawkrequest', **kwargs)


class UnauthorizedResponse:
def __init__(self):
self.headers = {}
self.text = 'Unauthorized'


class AuthorizedResponse:
def __init__(self):
self.headers = {
'Server-Authorization': 'xyz',
'Content-Type': 'text/plain'
}
self.text = 'Authorized'
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it might be easy enough with a real Response. If it's not easy then faking it is in fine but this is worth a try:

response = Response()
response.headers['Server-Authorization'] = 'xyz'
response.headers['Content-Type'] = 'text/plain'
response._content = 'Authorized'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simpler, more realistic, and works great. Thank you.



cmd_request = 'hawkrest.management.commands.hawkrequest.request'


class TestManagementCommand(BaseTest):

@mock.patch(cmd_request, mock.Mock(side_effect=ImportError))
def test_error_raised_if_requests_not_imported(self):
with self.assertRaises(CommandError):
exec_cmd(url=self.url, creds=self.credentials_id)

def test_error_raised_if_url_not_specified(self):
with self.assertRaises(CommandError):
exec_cmd(creds=self.credentials_id)

def test_error_raised_if_creds_missing(self):
with self.assertRaises(CommandError):
exec_cmd(url=self.url)

def test_error_raises_if_creds_not_found(self):
with self.assertRaises(CommandError):
exec_cmd(creds='nonexistent')

@mock.patch(cmd_request, mock.Mock(return_value=UnauthorizedResponse()))
@mock.patch('mohawk.Sender.accept_response')
def test_response_unverified_without_auth_header(self, mock_mohawk):
exec_cmd(url=self.url, creds=self.credentials_id)
self.assertFalse(mock_mohawk.called)

@mock.patch(cmd_request, mock.Mock(return_value=AuthorizedResponse()))
@mock.patch('mohawk.Sender.accept_response')
def test_response_verified_with_auth_header(self, mock_mohawk):
exec_cmd(url=self.url, creds=self.credentials_id)
self.assertTrue(mock_mohawk.called)