Skip to content
This repository has been archived by the owner on Oct 29, 2024. It is now read-only.

[Ready for Review] Chunked query responses #418

Merged
merged 8 commits into from
Mar 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ addons:
- wget

matrix:
allow_failures:
- python: 3.4
env: TOX_ENV=docs
include:
- python: 2.7
env: TOX_ENV=py27
Expand Down
32 changes: 31 additions & 1 deletion influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,27 @@ def write(self, data, params=None, expected_response_code=204,
)
return True

def _read_chunked_response(self, response, raise_errors=True):
result_set = {}
for line in response.iter_lines():
if isinstance(line, bytes):
line = line.decode('utf-8')
data = json.loads(line)
for result in data.get('results', []):
for _key in result:
if type(result[_key]) == list:
result_set.setdefault(_key, []).extend(result[_key])
return ResultSet(result_set, raise_errors=raise_errors)

def query(self,
query,
params=None,
epoch=None,
expected_response_code=200,
database=None,
raise_errors=True):
raise_errors=True,
chunked=False,
chunk_size=0):
"""Send a query to InfluxDB.

:param query: the actual query string
Expand All @@ -319,6 +333,14 @@ def query(self,
returns errors, defaults to True
:type raise_errors: bool

:param chunked: Enable to use chunked responses from InfluxDB.
With ``chunked`` enabled, one ResultSet is returned per chunk
containing all results within that chunk
:type chunked: bool

:param chunk_size: Size of each chunk to tell InfluxDB to use.
:type chunk_size: int

:returns: the queried data
:rtype: :class:`~.ResultSet`
"""
Expand All @@ -331,6 +353,11 @@ def query(self,
if epoch is not None:
params['epoch'] = epoch

if chunked:
params['chunked'] = 'true'
if chunk_size > 0:
params['chunk_size'] = chunk_size

response = self.request(
url="query",
method='GET',
Expand All @@ -339,6 +366,9 @@ def query(self,
expected_response_code=expected_response_code
)

if chunked:
return self._read_chunked_response(response)

data = response.json()

results = [
Expand Down
38 changes: 38 additions & 0 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import unittest

from influxdb import InfluxDBClient
from influxdb.resultset import ResultSet


def _build_response_object(status_code=200, content=""):
Expand Down Expand Up @@ -792,6 +793,43 @@ def test_invalid_port_fails(self):
with self.assertRaises(ValueError):
InfluxDBClient('host', '80/redir', 'username', 'password')

def test_chunked_response(self):
example_response = \
u'{"results":[{"statement_id":0,"series":' \
'[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
'[{"statement_id":0,"series":[{"name":"iops","columns":' \
'["fieldKey","fieldType"],"values":[["value","integer"]]}],' \
'"partial":true}]}\n{"results":[{"statement_id":0,"series":' \
'[{"name":"load","columns":["fieldKey","fieldType"],"values":' \
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
'[{"statement_id":0,"series":[{"name":"memory","columns":' \
'["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n'

with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
response = self.cli.query('show series limit 4 offset 0',
chunked=True, chunk_size=4)
self.assertTrue(len(response) == 4)
self.assertEqual(response.__repr__(), ResultSet(
{'series': [{'values': [['value', 'integer']],
'name': 'cpu',
'columns': ['fieldKey', 'fieldType']},
{'values': [['value', 'integer']],
'name': 'iops',
'columns': ['fieldKey', 'fieldType']},
{'values': [['value', 'integer']],
'name': 'load',
'columns': ['fieldKey', 'fieldType']},
{'values': [['value', 'integer']],
'name': 'memory',
'columns': ['fieldKey', 'fieldType']}]}
).__repr__())


class FakeClient(InfluxDBClient):

Expand Down