Skip to content
This repository has been archived by the owner on Oct 29, 2024. It is now read-only.

Re-add cluster-client #411

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
from __future__ import print_function
from __future__ import unicode_literals

from functools import wraps
import json
import socket
import time
import threading
import random
import requests
import requests.exceptions
from sys import version_info
Expand Down Expand Up @@ -110,6 +114,9 @@ def __init__(self,
'Accept': 'text/plain'
}

# _baseurl, _host and _port are properties to allow
# influxdb.InfluxDBClusterClient to override them with
# thread-local variables
@property
def _baseurl(self):
return self._get_baseurl()
Expand Down Expand Up @@ -783,6 +790,175 @@ def send_packet(self, packet, protocol='json'):
self.udp_socket.sendto(data, (self._host, self.udp_port))


class InfluxDBClusterClient(object):
"""The :class:`~.InfluxDBClusterClient` is the client for connecting
to a cluster of InfluxDB servers. Each query hits different host from the
list of hosts.

WARNING: This only works if all node in the lists are equivalent. E.g.
writing or reading to one node or another give the same result.
This works with InfluxDB Enterprise cluster or old cluster using
InfluxDB 0.11.
It does NOT works with newer open-source cluster solution using
influxdb-relay.

:param hosts: all hosts to be included in the cluster, each of which
should be in the format (address, port),
e.g. [('127.0.0.1', 8086), ('127.0.0.1', 9096)]. Defaults to
[('localhost', 8086)]
:type hosts: list of tuples
:param shuffle: whether the queries should hit servers evenly(randomly),
defaults to True
:type shuffle: bool
:param client_base_class: the base class for the cluster client.
This parameter is used to enable the support of different client
types. Defaults to :class:`~.InfluxDBClient`
:param healing_delay: the delay in seconds, counting from last failure of
a server, before re-adding server to the list of working servers.
Defaults to 15 minutes (900 seconds)
"""

def __init__(self,
hosts=[('localhost', 8086)],
username='root',
password='root',
database=None,
ssl=False,
verify_ssl=False,
timeout=None,
use_udp=False,
udp_port=4444,
shuffle=True,
client_base_class=InfluxDBClient,
healing_delay=900,
):
self.clients = [self] # Keep it backwards compatible
self.hosts = hosts
self.bad_hosts = [] # Corresponding server has failures in history
self.shuffle = shuffle
self.healing_delay = healing_delay
self._last_healing = time.time()
host, port = self.hosts[0]
self._hosts_lock = threading.Lock()
self._thread_local = threading.local()
self._client = client_base_class(host=host,
port=port,
username=username,
password=password,
database=database,
ssl=ssl,
verify_ssl=verify_ssl,
timeout=timeout,
use_udp=use_udp,
udp_port=udp_port)
for method in dir(client_base_class):
orig_attr = getattr(client_base_class, method, '')
if method.startswith('_') or not callable(orig_attr):
continue

setattr(self, method, self._make_func(orig_attr))

self._client._get_host = self._get_host
self._client._get_port = self._get_port
self._client._get_baseurl = self._get_baseurl
self._update_client_host(self.hosts[0])

@staticmethod
def from_DSN(dsn, client_base_class=InfluxDBClient,
shuffle=True, **kwargs):
"""Same as :meth:`~.InfluxDBClient.from_DSN`, but supports
multiple servers.

:param shuffle: whether the queries should hit servers
evenly(randomly), defaults to True
:type shuffle: bool
:param client_base_class: the base class for all clients in the
cluster. This parameter is used to enable the support of
different client types. Defaults to :class:`~.InfluxDBClient`

:Example:

::

>> cluster = InfluxDBClusterClient.from_DSN('influxdb://usr:pwd\
@host1:8086,usr:pwd@host2:8086/db_name', timeout=5)
>> type(cluster)
<class 'influxdb.client.InfluxDBClusterClient'>
>> cluster.hosts
[('host1', 8086), ('host2', 8086)]
>> cluster._client
<influxdb.client.InfluxDBClient at 0x7feb438ec950>]
"""
init_args = parse_dsn(dsn)
init_args.update(**kwargs)
init_args['shuffle'] = shuffle
init_args['client_base_class'] = client_base_class
cluster_client = InfluxDBClusterClient(**init_args)
return cluster_client

def _update_client_host(self, host):
self._thread_local.host, self._thread_local.port = host
self._thread_local.baseurl = "{0}://{1}:{2}".format(
self._client._scheme,
self._client._host,
self._client._port
)

def _get_baseurl(self):
return self._thread_local.baseurl

def _get_host(self):
return self._thread_local.host

def _get_port(self):
return self._thread_local.port

def _make_func(self, orig_func):

@wraps(orig_func)
def func(*args, **kwargs):
now = time.time()
with self._hosts_lock:
if (self.bad_hosts and
self._last_healing + self.healing_delay < now):
h = self.bad_hosts.pop(0)
self.hosts.append(h)
self._last_healing = now

if self.shuffle:
random.shuffle(self.hosts)

hosts = self.hosts + self.bad_hosts

for h in hosts:
bad_host = False
try:
self._update_client_host(h)
return orig_func(self._client, *args, **kwargs)
except InfluxDBClientError as e:
# Errors caused by user's requests, re-raise
raise e
except ValueError as e:
raise e
except Exception as e:
# Errors that might caused by server failure, try another
bad_host = True
with self._hosts_lock:
if h in self.hosts:
self.hosts.remove(h)
self.bad_hosts.append(h)
self._last_healing = now
finally:
with self._hosts_lock:
if not bad_host and h in self.bad_hosts:
self.bad_hosts.remove(h)
self.hosts.append(h)

raise InfluxDBServerError("InfluxDB: no viable server!")

return func


def parse_dsn(dsn):
conn_params = urlparse(dsn)
init_args = {}
Expand Down
124 changes: 124 additions & 0 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import requests
import requests.exceptions
import socket
import time
import requests_mock
import random
from nose.tools import raises
Expand All @@ -32,6 +33,7 @@
import unittest

from influxdb import InfluxDBClient
from influxdb.client import InfluxDBServerError, InfluxDBClusterClient


def _build_response_object(status_code=200, content=""):
Expand Down Expand Up @@ -811,3 +813,125 @@ def query(self,
raise Exception("Fail Twice")
else:
return "Success"


class TestInfluxDBClusterClient(unittest.TestCase):

def setUp(self):
# By default, raise exceptions on warnings
warnings.simplefilter('error', FutureWarning)

self.hosts = [('host1', 8086), ('host2', 8086), ('host3', 8086)]
self.dsn_string = 'influxdb://uSr:pWd@host1:8086,uSr:pWd@host2:8086/db'

def test_init(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
username='username',
password='password',
database='database',
shuffle=False,
client_base_class=FakeClient)
self.assertEqual(3, len(cluster.hosts))
self.assertEqual(0, len(cluster.bad_hosts))
self.assertIn((cluster._client._host,
cluster._client._port), cluster.hosts)

def test_one_server_fails(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=False,
client_base_class=FakeClient)
self.assertEqual('Success', cluster.query('Fail once'))
self.assertEqual(2, len(cluster.hosts))
self.assertEqual(1, len(cluster.bad_hosts))

def test_two_servers_fail(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=False,
client_base_class=FakeClient)
self.assertEqual('Success', cluster.query('Fail twice'))
self.assertEqual(1, len(cluster.hosts))
self.assertEqual(2, len(cluster.bad_hosts))

def test_all_fail(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=True,
client_base_class=FakeClient)
with self.assertRaises(InfluxDBServerError):
cluster.query('Fail')
self.assertEqual(0, len(cluster.hosts))
self.assertEqual(3, len(cluster.bad_hosts))

def test_all_good(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=True,
client_base_class=FakeClient)
self.assertEqual('Success', cluster.query(''))
self.assertEqual(3, len(cluster.hosts))
self.assertEqual(0, len(cluster.bad_hosts))

def test_recovery(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=True,
client_base_class=FakeClient)
with self.assertRaises(InfluxDBServerError):
cluster.query('Fail')
self.assertEqual('Success', cluster.query(''))
self.assertEqual(1, len(cluster.hosts))
self.assertEqual(2, len(cluster.bad_hosts))

def test_healing(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=True,
healing_delay=1,
client_base_class=FakeClient)
with self.assertRaises(InfluxDBServerError):
cluster.query('Fail')
self.assertEqual('Success', cluster.query(''))
time.sleep(1.1)
self.assertEqual('Success', cluster.query(''))
self.assertEqual(2, len(cluster.hosts))
self.assertEqual(1, len(cluster.bad_hosts))
time.sleep(1.1)
self.assertEqual('Success', cluster.query(''))
self.assertEqual(3, len(cluster.hosts))
self.assertEqual(0, len(cluster.bad_hosts))

def test_dsn(self):
cli = InfluxDBClusterClient.from_DSN(self.dsn_string)
self.assertEqual([('host1', 8086), ('host2', 8086)], cli.hosts)
self.assertEqual('http://host1:8086', cli._client._baseurl)
self.assertEqual('uSr', cli._client._username)
self.assertEqual('pWd', cli._client._password)
self.assertEqual('db', cli._client._database)
self.assertFalse(cli._client.use_udp)

cli = InfluxDBClusterClient.from_DSN('udp+' + self.dsn_string)
self.assertTrue(cli._client.use_udp)

cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string)
self.assertEqual('https://host1:8086', cli._client._baseurl)

cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string,
**{'ssl': False})
self.assertEqual('http://host1:8086', cli._client._baseurl)

def test_dsn_password_caps(self):
cli = InfluxDBClusterClient.from_DSN(
'https+influxdb://usr:pWd@host:8086/db')
self.assertEqual('pWd', cli._client._password)

def test_dsn_mixed_scheme_case(self):
cli = InfluxDBClusterClient.from_DSN(
'hTTps+inFLUxdb://usr:pWd@host:8086/db')
self.assertEqual('pWd', cli._client._password)
self.assertEqual('https://host:8086', cli._client._baseurl)

cli = InfluxDBClusterClient.from_DSN(
'uDP+influxdb://usr:pwd@host1:8086,usr:pwd@host2:8086/db')
self.assertTrue(cli._client.use_udp)