Skip to content
This repository has been archived by the owner on Oct 29, 2024. It is now read-only.

Commit

Permalink
Merge pull request #418 from pkittenis/chunked_query_responses
Browse files Browse the repository at this point in the history
[Ready for Review] Chunked query responses
  • Loading branch information
aviau authored Mar 1, 2017
2 parents c9fcede + 0c2e171 commit d1aa81a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ addons:
- wget

matrix:
allow_failures:
- python: 3.4
env: TOX_ENV=docs
include:
- python: 2.7
env: TOX_ENV=py27
Expand Down
32 changes: 31 additions & 1 deletion influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,27 @@ def write(self, data, params=None, expected_response_code=204,
)
return True

def _read_chunked_response(self, response, raise_errors=True):
result_set = {}
for line in response.iter_lines():
if isinstance(line, bytes):
line = line.decode('utf-8')
data = json.loads(line)
for result in data.get('results', []):
for _key in result:
if type(result[_key]) == list:
result_set.setdefault(_key, []).extend(result[_key])
return ResultSet(result_set, raise_errors=raise_errors)

def query(self,
query,
params=None,
epoch=None,
expected_response_code=200,
database=None,
raise_errors=True):
raise_errors=True,
chunked=False,
chunk_size=0):
"""Send a query to InfluxDB.
:param query: the actual query string
Expand All @@ -319,6 +333,14 @@ def query(self,
returns errors, defaults to True
:type raise_errors: bool
:param chunked: Enable to use chunked responses from InfluxDB.
With ``chunked`` enabled, one ResultSet is returned per chunk
containing all results within that chunk
:type chunked: bool
:param chunk_size: Size of each chunk to tell InfluxDB to use.
:type chunk_size: int
:returns: the queried data
:rtype: :class:`~.ResultSet`
"""
Expand All @@ -331,6 +353,11 @@ def query(self,
if epoch is not None:
params['epoch'] = epoch

if chunked:
params['chunked'] = 'true'
if chunk_size > 0:
params['chunk_size'] = chunk_size

response = self.request(
url="query",
method='GET',
Expand All @@ -339,6 +366,9 @@ def query(self,
expected_response_code=expected_response_code
)

if chunked:
return self._read_chunked_response(response)

data = response.json()

results = [
Expand Down
38 changes: 38 additions & 0 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import unittest

from influxdb import InfluxDBClient
from influxdb.resultset import ResultSet


def _build_response_object(status_code=200, content=""):
Expand Down Expand Up @@ -792,6 +793,43 @@ def test_invalid_port_fails(self):
with self.assertRaises(ValueError):
InfluxDBClient('host', '80/redir', 'username', 'password')

def test_chunked_response(self):
example_response = \
u'{"results":[{"statement_id":0,"series":' \
'[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
'[{"statement_id":0,"series":[{"name":"iops","columns":' \
'["fieldKey","fieldType"],"values":[["value","integer"]]}],' \
'"partial":true}]}\n{"results":[{"statement_id":0,"series":' \
'[{"name":"load","columns":["fieldKey","fieldType"],"values":' \
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
'[{"statement_id":0,"series":[{"name":"memory","columns":' \
'["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n'

with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
response = self.cli.query('show series limit 4 offset 0',
chunked=True, chunk_size=4)
self.assertTrue(len(response) == 4)
self.assertEqual(response.__repr__(), ResultSet(
{'series': [{'values': [['value', 'integer']],
'name': 'cpu',
'columns': ['fieldKey', 'fieldType']},
{'values': [['value', 'integer']],
'name': 'iops',
'columns': ['fieldKey', 'fieldType']},
{'values': [['value', 'integer']],
'name': 'load',
'columns': ['fieldKey', 'fieldType']},
{'values': [['value', 'integer']],
'name': 'memory',
'columns': ['fieldKey', 'fieldType']}]}
).__repr__())


class FakeClient(InfluxDBClient):

Expand Down

0 comments on commit d1aa81a

Please sign in to comment.