diff --git a/.travis.yml b/.travis.yml index 1dd2c78d..2259dc06 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,6 +6,9 @@ addons: - wget matrix: + allow_failures: + - python: 3.4 + env: TOX_ENV=docs include: - python: 2.7 env: TOX_ENV=py27 diff --git a/influxdb/client.py b/influxdb/client.py index ab9aa409..0698c871 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -293,13 +293,27 @@ def write(self, data, params=None, expected_response_code=204, ) return True + def _read_chunked_response(self, response, raise_errors=True): + result_set = {} + for line in response.iter_lines(): + if isinstance(line, bytes): + line = line.decode('utf-8') + data = json.loads(line) + for result in data.get('results', []): + for _key in result: + if type(result[_key]) == list: + result_set.setdefault(_key, []).extend(result[_key]) + return ResultSet(result_set, raise_errors=raise_errors) + def query(self, query, params=None, epoch=None, expected_response_code=200, database=None, - raise_errors=True): + raise_errors=True, + chunked=False, + chunk_size=0): """Send a query to InfluxDB. :param query: the actual query string @@ -319,6 +333,14 @@ def query(self, returns errors, defaults to True :type raise_errors: bool + :param chunked: Enable to use chunked responses from InfluxDB. + With ``chunked`` enabled, one ResultSet is returned per chunk + containing all results within that chunk + :type chunked: bool + + :param chunk_size: Size of each chunk to tell InfluxDB to use. + :type chunk_size: int + :returns: the queried data :rtype: :class:`~.ResultSet` """ @@ -331,6 +353,11 @@ def query(self, if epoch is not None: params['epoch'] = epoch + if chunked: + params['chunked'] = 'true' + if chunk_size > 0: + params['chunk_size'] = chunk_size + response = self.request( url="query", method='GET', @@ -339,6 +366,9 @@ def query(self, expected_response_code=expected_response_code ) + if chunked: + return self._read_chunked_response(response) + data = response.json() results = [ diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index f586df3f..0ba04f4a 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -32,6 +32,7 @@ import unittest from influxdb import InfluxDBClient +from influxdb.resultset import ResultSet def _build_response_object(status_code=200, content=""): @@ -792,6 +793,43 @@ def test_invalid_port_fails(self): with self.assertRaises(ValueError): InfluxDBClient('host', '80/redir', 'username', 'password') + def test_chunked_response(self): + example_response = \ + u'{"results":[{"statement_id":0,"series":' \ + '[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \ + '[["value","integer"]]}],"partial":true}]}\n{"results":' \ + '[{"statement_id":0,"series":[{"name":"iops","columns":' \ + '["fieldKey","fieldType"],"values":[["value","integer"]]}],' \ + '"partial":true}]}\n{"results":[{"statement_id":0,"series":' \ + '[{"name":"load","columns":["fieldKey","fieldType"],"values":' \ + '[["value","integer"]]}],"partial":true}]}\n{"results":' \ + '[{"statement_id":0,"series":[{"name":"memory","columns":' \ + '["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n' + + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/query", + text=example_response + ) + response = self.cli.query('show series limit 4 offset 0', + chunked=True, chunk_size=4) + self.assertTrue(len(response) == 4) + self.assertEqual(response.__repr__(), ResultSet( + {'series': [{'values': [['value', 'integer']], + 'name': 'cpu', + 'columns': ['fieldKey', 'fieldType']}, + {'values': [['value', 'integer']], + 'name': 'iops', + 'columns': ['fieldKey', 'fieldType']}, + {'values': [['value', 'integer']], + 'name': 'load', + 'columns': ['fieldKey', 'fieldType']}, + {'values': [['value', 'integer']], + 'name': 'memory', + 'columns': ['fieldKey', 'fieldType']}]} + ).__repr__()) + class FakeClient(InfluxDBClient):