From 1df262cd63204e881f00f942d6aab919e47275ff Mon Sep 17 00:00:00 2001 From: Panos Date: Thu, 16 Feb 2017 14:55:32 +0000 Subject: [PATCH 1/8] Added chunked query responses implementation and test. Added chunked parameter to client query function. --- influxdb/client.py | 16 +++++++++++++++- influxdb/tests/client_test.py | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index ab9aa409..c4f70272 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -293,13 +293,23 @@ def write(self, data, params=None, expected_response_code=204, ) return True + def _read_chunked_response(self, response, raise_errors=True): + for line in response.iter_lines(): + # import ipdb; ipdb.set_trace() + if isinstance(line, bytes): + line = line.decode('utf-8') + data = json.loads(line) + for result in data.get('results', []): + yield ResultSet(result, raise_errors=raise_errors) + def query(self, query, params=None, epoch=None, expected_response_code=200, database=None, - raise_errors=True): + raise_errors=True, + chunked=False): """Send a query to InfluxDB. :param query: the actual query string @@ -339,6 +349,10 @@ def query(self, expected_response_code=expected_response_code ) + if chunked or 'chunked' in params: + params['chunked'] = 'true' + return self._read_chunked_response(response) + data = response.json() results = [ diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index f586df3f..def6ccb2 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -32,7 +32,7 @@ import unittest from influxdb import InfluxDBClient - +from influxdb.resultset import ResultSet def _build_response_object(status_code=200, content=""): resp = requests.Response() @@ -792,6 +792,37 @@ def test_invalid_port_fails(self): with self.assertRaises(ValueError): InfluxDBClient('host', '80/redir', 'username', 'password') + def test_chunked_response(self): + example_response = u'{"results":[{"statement_id":0,"series": ' \ + '[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \ + '[["value","integer"]]}],"partial":true}]}\n{"results":' \ + '[{"statement_id":0,"series":[{"name":"iops","columns":' \ + '["fieldKey","fieldType"],"values":[["value","integer"]]}],' \ + '"partial":true}]}\n{"results":[{"statement_id":0,"series":' \ + '[{"name":"load","columns":["fieldKey","fieldType"],"values":' \ + '[["value","integer"]]}],"partial":true}]}\n{"results":' \ + '[{"statement_id":0,"series":[{"name":"memory","columns":' \ + '["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n' + + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/query", + text=example_response + ) + response = list(self.cli.query('show series limit 4 offset 0', chunked=True)) + self.assertTrue(len(response) == 4) + self.assertEqual(response[0].raw, ResultSet( + {"statement_id":0, + "series": [{"name":"cpu","columns":["fieldKey","fieldType"], + "values": [["value","integer"]]}],"partial":True} + ).raw) + self.assertEqual(response[3].raw, ResultSet( + {"statement_id":0, + "series":[{"name":"memory","columns": + ["fieldKey","fieldType"], + "values":[["value","integer"]]}]} + ).raw) class FakeClient(InfluxDBClient): From a7c963c779c00b47fd8edf7846b5a2d5d763ecb2 Mon Sep 17 00:00:00 2001 From: Panos Date: Thu, 16 Feb 2017 15:26:37 +0000 Subject: [PATCH 2/8] Pep8 changes --- influxdb/tests/client_test.py | 46 ++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index def6ccb2..6886652c 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -34,6 +34,7 @@ from influxdb import InfluxDBClient from influxdb.resultset import ResultSet + def _build_response_object(status_code=200, content=""): resp = requests.Response() resp.status_code = status_code @@ -793,16 +794,17 @@ def test_invalid_port_fails(self): InfluxDBClient('host', '80/redir', 'username', 'password') def test_chunked_response(self): - example_response = u'{"results":[{"statement_id":0,"series": ' \ - '[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \ - '[["value","integer"]]}],"partial":true}]}\n{"results":' \ - '[{"statement_id":0,"series":[{"name":"iops","columns":' \ - '["fieldKey","fieldType"],"values":[["value","integer"]]}],' \ - '"partial":true}]}\n{"results":[{"statement_id":0,"series":' \ - '[{"name":"load","columns":["fieldKey","fieldType"],"values":' \ - '[["value","integer"]]}],"partial":true}]}\n{"results":' \ - '[{"statement_id":0,"series":[{"name":"memory","columns":' \ - '["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n' + example_response = \ + u'{"results":[{"statement_id":0,"series":' \ + '[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \ + '[["value","integer"]]}],"partial":true}]}\n{"results":' \ + '[{"statement_id":0,"series":[{"name":"iops","columns":' \ + '["fieldKey","fieldType"],"values":[["value","integer"]]}],' \ + '"partial":true}]}\n{"results":[{"statement_id":0,"series":' \ + '[{"name":"load","columns":["fieldKey","fieldType"],"values":' \ + '[["value","integer"]]}],"partial":true}]}\n{"results":' \ + '[{"statement_id":0,"series":[{"name":"memory","columns":' \ + '["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n' with requests_mock.Mocker() as m: m.register_uri( @@ -810,19 +812,23 @@ def test_chunked_response(self): "http://localhost:8086/query", text=example_response ) - response = list(self.cli.query('show series limit 4 offset 0', chunked=True)) + response = list(self.cli.query('show series limit 4 offset 0', + chunked=True)) self.assertTrue(len(response) == 4) self.assertEqual(response[0].raw, ResultSet( - {"statement_id":0, - "series": [{"name":"cpu","columns":["fieldKey","fieldType"], - "values": [["value","integer"]]}],"partial":True} - ).raw) + {"statement_id": 0, + "series": [{"name": "cpu", + "columns": ["fieldKey", "fieldType"], + "values": [["value", "integer"]]}], + "partial": True} + ).raw) self.assertEqual(response[3].raw, ResultSet( - {"statement_id":0, - "series":[{"name":"memory","columns": - ["fieldKey","fieldType"], - "values":[["value","integer"]]}]} - ).raw) + {"statement_id": 0, + "series": [{"name": "memory", + "columns": ["fieldKey", "fieldType"], + "values": [["value", "integer"]]}]} + ).raw) + class FakeClient(InfluxDBClient): From d08be8523ad9459d3692fe029f51a1abb24ebbae Mon Sep 17 00:00:00 2001 From: Panos Date: Thu, 16 Feb 2017 15:26:54 +0000 Subject: [PATCH 3/8] Ignore docs failures on travis --- .travis.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.travis.yml b/.travis.yml index 1dd2c78d..2259dc06 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,6 +6,9 @@ addons: - wget matrix: + allow_failures: + - python: 3.4 + env: TOX_ENV=docs include: - python: 2.7 env: TOX_ENV=py27 From 49698fc2572ed1c0e3b6bb196f66e90de9e683df Mon Sep 17 00:00:00 2001 From: Panos Date: Thu, 16 Feb 2017 15:30:12 +0000 Subject: [PATCH 4/8] Updated docstrings --- influxdb/client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/influxdb/client.py b/influxdb/client.py index c4f70272..55656068 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -295,7 +295,6 @@ def write(self, data, params=None, expected_response_code=204, def _read_chunked_response(self, response, raise_errors=True): for line in response.iter_lines(): - # import ipdb; ipdb.set_trace() if isinstance(line, bytes): line = line.decode('utf-8') data = json.loads(line) @@ -329,6 +328,11 @@ def query(self, returns errors, defaults to True :type raise_errors: bool + :param chunked: Enable to use chunked responses from InfluxDB. + With ``chunked`` enabled, a _generator_ of ResultSet objects + is returned as opposed to a list. + :type chunked: bool + :returns: the queried data :rtype: :class:`~.ResultSet` """ From 7dddb0baed32bf41bf0518bcfc5415849e3033da Mon Sep 17 00:00:00 2001 From: Panos Date: Thu, 16 Feb 2017 15:40:51 +0000 Subject: [PATCH 5/8] Added chunk size parameter --- influxdb/client.py | 8 +++++++- influxdb/tests/client_test.py | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 55656068..bd85d96c 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -308,7 +308,8 @@ def query(self, expected_response_code=200, database=None, raise_errors=True, - chunked=False): + chunked=False, + chunk_size=0): """Send a query to InfluxDB. :param query: the actual query string @@ -333,6 +334,9 @@ def query(self, is returned as opposed to a list. :type chunked: bool + :param chunk_size: Size of each chunk to tell InfluxDB to use. + :type chunk_size: int + :returns: the queried data :rtype: :class:`~.ResultSet` """ @@ -355,6 +359,8 @@ def query(self, if chunked or 'chunked' in params: params['chunked'] = 'true' + if chunk_size > 0: + params['chunk_size'] = chunk_size return self._read_chunked_response(response) data = response.json() diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 6886652c..c5a87ae8 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -813,7 +813,7 @@ def test_chunked_response(self): text=example_response ) response = list(self.cli.query('show series limit 4 offset 0', - chunked=True)) + chunked=True, chunk_size=4)) self.assertTrue(len(response) == 4) self.assertEqual(response[0].raw, ResultSet( {"statement_id": 0, From ffd1af91025c18d162713b40be800b88609b16f9 Mon Sep 17 00:00:00 2001 From: Panos Date: Thu, 16 Feb 2017 16:14:04 +0000 Subject: [PATCH 6/8] Fix chunked and chunk size parameter set --- influxdb/client.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index bd85d96c..28f6ae11 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -349,6 +349,11 @@ def query(self, if epoch is not None: params['epoch'] = epoch + if chunked: + params['chunked'] = 'true' + if chunk_size > 0: + params['chunk_size'] = chunk_size + response = self.request( url="query", method='GET', @@ -357,10 +362,7 @@ def query(self, expected_response_code=expected_response_code ) - if chunked or 'chunked' in params: - params['chunked'] = 'true' - if chunk_size > 0: - params['chunk_size'] = chunk_size + if chunked: return self._read_chunked_response(response) data = response.json() From 3625e4ce605ac2cbf049ae52602475de20b7e24b Mon Sep 17 00:00:00 2001 From: Panos Date: Tue, 21 Feb 2017 16:52:38 +0000 Subject: [PATCH 7/8] Make one result set per chunk for improved performance and API compatibility with non-chunked responses --- influxdb/client.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 28f6ae11..0698c871 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -294,12 +294,16 @@ def write(self, data, params=None, expected_response_code=204, return True def _read_chunked_response(self, response, raise_errors=True): + result_set = {} for line in response.iter_lines(): if isinstance(line, bytes): line = line.decode('utf-8') data = json.loads(line) for result in data.get('results', []): - yield ResultSet(result, raise_errors=raise_errors) + for _key in result: + if type(result[_key]) == list: + result_set.setdefault(_key, []).extend(result[_key]) + return ResultSet(result_set, raise_errors=raise_errors) def query(self, query, @@ -330,8 +334,8 @@ def query(self, :type raise_errors: bool :param chunked: Enable to use chunked responses from InfluxDB. - With ``chunked`` enabled, a _generator_ of ResultSet objects - is returned as opposed to a list. + With ``chunked`` enabled, one ResultSet is returned per chunk + containing all results within that chunk :type chunked: bool :param chunk_size: Size of each chunk to tell InfluxDB to use. From 0c2e1718c99f2246c5d49f10f28154f0bf61a485 Mon Sep 17 00:00:00 2001 From: Panos Date: Tue, 21 Feb 2017 17:16:35 +0000 Subject: [PATCH 8/8] Updated chunked responses test for API change --- influxdb/tests/client_test.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index c5a87ae8..0ba04f4a 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -812,22 +812,23 @@ def test_chunked_response(self): "http://localhost:8086/query", text=example_response ) - response = list(self.cli.query('show series limit 4 offset 0', - chunked=True, chunk_size=4)) + response = self.cli.query('show series limit 4 offset 0', + chunked=True, chunk_size=4) self.assertTrue(len(response) == 4) - self.assertEqual(response[0].raw, ResultSet( - {"statement_id": 0, - "series": [{"name": "cpu", - "columns": ["fieldKey", "fieldType"], - "values": [["value", "integer"]]}], - "partial": True} - ).raw) - self.assertEqual(response[3].raw, ResultSet( - {"statement_id": 0, - "series": [{"name": "memory", - "columns": ["fieldKey", "fieldType"], - "values": [["value", "integer"]]}]} - ).raw) + self.assertEqual(response.__repr__(), ResultSet( + {'series': [{'values': [['value', 'integer']], + 'name': 'cpu', + 'columns': ['fieldKey', 'fieldType']}, + {'values': [['value', 'integer']], + 'name': 'iops', + 'columns': ['fieldKey', 'fieldType']}, + {'values': [['value', 'integer']], + 'name': 'load', + 'columns': ['fieldKey', 'fieldType']}, + {'values': [['value', 'integer']], + 'name': 'memory', + 'columns': ['fieldKey', 'fieldType']}]} + ).__repr__()) class FakeClient(InfluxDBClient):