diff --git a/.coveragerc b/.coveragerc deleted file mode 100644 index 3d5fc07..0000000 --- a/.coveragerc +++ /dev/null @@ -1,8 +0,0 @@ -# http://nedbatchelder.com/code/coverage/config.html#config - -[run] -branch = True -omit = */tests/* - -[report] -omit = */tests/* diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..6f0a026 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,51 @@ +name: fluent-logger +on: + pull_request: + push: + branches: + - master +jobs: + build-stable: + runs-on: ${{ matrix.os }} + continue-on-error: false + strategy: + fail-fast: false + matrix: + os: + - ubuntu-latest + - windows-latest + - macosx-latest + python-version: + - '3.10' + - '3.9' + - '3.8' + - '3.7' + - '3.6' + - 'pypy-3.7' + env: + DEPLOY_PYTHONS: "3.9" + DEPLOY_OSES: "Linux" + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + steps: + - shell: bash + if: | + github.event_name == 'push' && + contains(env.DEPLOY_OSES, runner.os) && + contains(env.DEPLOY_PYTHONS, matrix.python-version) + run: | + echo "PYB_EXTRA_ARGS=+upload" >> $GITHUB_ENV + - uses: pybuilder/build@master + with: + python-version: ${{ matrix.python-version }} + pyb-extra-args: ${{ env.PYB_EXTRA_ARGS }} + build-stable-summary: + if: success() || failure() + runs-on: ubuntu-latest + name: Build Stable Summary + needs: build-stable + steps: + - name: Check build matrix status + if: needs.build-stable.result != 'success' + run: exit 1 diff --git a/.gitignore b/.gitignore index fd4bc6c..80b2e65 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,7 @@ /.tox /build /dist -.idea/ +/.idea +/.pybuilder +__pycache__ +/target \ No newline at end of file diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 75861ca..0000000 --- a/.travis.yml +++ /dev/null @@ -1,35 +0,0 @@ -sudo: false -language: python -python: - - "2.7" - - "3.4" - - "3.5" - - "3.6" - - "3.7" - - "3.8" - - pypy - - pypy3 - - nightly -# command to install dependencies, e.g. pip install -r requirements.txt --use-mirrors -install: - - "pip install -e ." - - "pip install 'coverage~=4.5.4' coveralls" -script: - - "PYTHONFAULTHANDLER=x timeout -sABRT 30s nosetests -vsd" -after_success: - - coveralls - -deploy: - provider: pypi - user: repeatedly - server: https://upload.pypi.org/legacy/ - password: - secure: CpNaj4F3TZvpP1aSJWidh/XexrWODV2sBdObrYU79Gyh9hFl6WLsA3JM9BfVsy9cGb/P/jP6ly4Z0/6qdIzZ5D6FPOB1B7rn5GZ2LAMOypRCA6W2uJbRjUU373Wut0p0OmQcMPto6XJsMlpvOEq+1uAq+LLAnAGEmmYTeskZebs= - on: - tags: true - condition: '"$TRAVIS_PYTHON_VERSION" = "3.8" || "$TRAVIS_PYTHON_VERSION" = "2.7"' - distributions: "sdist bdist_wheel" - -matrix: - allow_failures: - - python: nightly diff --git a/COPYING b/LICENSE similarity index 100% rename from COPYING rename to LICENSE diff --git a/MANIFEST.in b/MANIFEST.in deleted file mode 100644 index 3248bb2..0000000 --- a/MANIFEST.in +++ /dev/null @@ -1,4 +0,0 @@ -include README.rst -include setup.py -include COPYING -include test/*py diff --git a/README.rst b/README.rst index 4abb6f6..7e67ce5 100644 --- a/README.rst +++ b/README.rst @@ -24,9 +24,10 @@ Python application. Requirements ------------ -- Python 2.7 or 3.4+ -- ``msgpack-python`` +- Python 3.6+ +- ``msgpack`` - **IMPORTANT**: Version 0.8.0 is the last version supporting Python 2.6, 3.2 and 3.3 +- **IMPORTANT**: Version 0.9.6 is the last version supporting Python 2.7, 3.4 and 3.5 Installation ------------ diff --git a/build.py b/build.py new file mode 100644 index 0000000..1f52196 --- /dev/null +++ b/build.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +from pybuilder.core import use_plugin, init, Author + +use_plugin("python.core") +use_plugin("python.unittest") +use_plugin("python.flake8") +use_plugin("python.coverage") +use_plugin("python.coveralls") +use_plugin("python.distutils") +use_plugin("python.pycharm") +use_plugin("copy_resources") + + +name = "fluent-logger" +summary = "A Python logging handler for FluentD event collector" + +authors = [Author("Kazuki Ohta", "kazuki.ohta@gmail.com")] +maintainers = [Author("Arcadiy Ivanov", "arcadiy@ivanov.biz")] + +url = "https://github.com/fluent/fluent-logger-python" +urls = {"Bug Tracker": "https://github.com/fluent/fluent-logger-python/issues", + "Source Code": "https://github.com/fluent/fluent-logger-python", + "Documentation": "https://github.com/fluent/fluent-logger-python" + } +license = "Apache License, Version 2.0" +version = "1.0.0.dev" + +requires_python = ">=3.6" + +default_task = ["analyze", "publish"] + + +@init +def set_properties(project): + project.build_depends_on("docker", ">=5.0") + project.build_depends_on("cryptography", ">=2.9.0") + + project.set_property("verbose", True) + + project.set_property("coverage_break_build", False) + project.get_property("coverage_exceptions").extend(["setup"]) + + project.set_property("flake8_break_build", True) + project.set_property("flake8_extend_ignore", "E303") + project.set_property("flake8_include_test_sources", True) + project.set_property("flake8_max_line_length", 130) + + project.set_property("frosted_include_test_sources", True) + project.set_property("frosted_include_scripts", True) + + project.set_property("copy_resources_target", "$dir_dist/fluent") + project.get_property("copy_resources_glob").append("LICENSE") + project.include_file("fluent", "LICENSE") + + # PyPy distutils needs os.environ['PATH'] not matter what + # Also Windows needs PATH for DLL loading in all Pythons + project.set_property("integrationtest_inherit_environment", True) + + project.set_property("distutils_readme_description", True) + project.set_property("distutils_description_overwrite", True) + project.set_property("distutils_readme_file", "README.rst") + project.set_property("distutils_upload_skip_existing", True) + project.set_property("distutils_setup_keywords", ["fluentd", "logging", "logger", "python"]) + + project.set_property("distutils_classifiers", [ + "Programming Language :: Python", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only" + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Operating System :: MacOS :: MacOS X", + "Operating System :: POSIX :: Linux", + "Operating System :: Microsoft :: Windows", + "Operating System :: OS Independent", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: System :: Logging" + "Intended Audience :: Developers", + "Development Status :: 5 - Production/Stable", +]) + diff --git a/fluent/sender.py b/fluent/sender.py deleted file mode 100644 index 6762856..0000000 --- a/fluent/sender.py +++ /dev/null @@ -1,252 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import print_function - -import errno -import socket -import struct -import threading -import time -import traceback - -import msgpack - -_global_sender = None - - -def _set_global_sender(sender): # pragma: no cover - """ [For testing] Function to set global sender directly - """ - global _global_sender - _global_sender = sender - - -def setup(tag, **kwargs): # pragma: no cover - global _global_sender - _global_sender = FluentSender(tag, **kwargs) - - -def get_global_sender(): # pragma: no cover - return _global_sender - - -def close(): # pragma: no cover - get_global_sender().close() - - -class EventTime(msgpack.ExtType): - def __new__(cls, timestamp): - seconds = int(timestamp) - nanoseconds = int(timestamp % 1 * 10 ** 9) - return super(EventTime, cls).__new__( - cls, - code=0, - data=struct.pack(">II", seconds, nanoseconds), - ) - - -class FluentSender(object): - def __init__(self, - tag, - host='localhost', - port=24224, - bufmax=1 * 1024 * 1024, - timeout=3.0, - verbose=False, - buffer_overflow_handler=None, - nanosecond_precision=False, - msgpack_kwargs=None, - **kwargs): - """ - :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. - """ - self.tag = tag - self.host = host - self.port = port - self.bufmax = bufmax - self.timeout = timeout - self.verbose = verbose - self.buffer_overflow_handler = buffer_overflow_handler - self.nanosecond_precision = nanosecond_precision - self.msgpack_kwargs = {} if msgpack_kwargs is None else msgpack_kwargs - - self.socket = None - self.pendings = None - self.lock = threading.Lock() - self._closed = False - self._last_error_threadlocal = threading.local() - - def emit(self, label, data): - if self.nanosecond_precision: - cur_time = EventTime(time.time()) - else: - cur_time = int(time.time()) - return self.emit_with_time(label, cur_time, data) - - def emit_with_time(self, label, timestamp, data): - if self.nanosecond_precision and isinstance(timestamp, float): - timestamp = EventTime(timestamp) - try: - bytes_ = self._make_packet(label, timestamp, data) - except Exception as e: - self.last_error = e - bytes_ = self._make_packet(label, timestamp, - {"level": "CRITICAL", - "message": "Can't output to log", - "traceback": traceback.format_exc()}) - return self._send(bytes_) - - @property - def last_error(self): - return getattr(self._last_error_threadlocal, 'exception', None) - - @last_error.setter - def last_error(self, err): - self._last_error_threadlocal.exception = err - - def clear_last_error(self, _thread_id=None): - if hasattr(self._last_error_threadlocal, 'exception'): - delattr(self._last_error_threadlocal, 'exception') - - def close(self): - with self.lock: - if self._closed: - return - self._closed = True - if self.pendings: - try: - self._send_data(self.pendings) - except Exception: - self._call_buffer_overflow_handler(self.pendings) - - self._close() - self.pendings = None - - def _make_packet(self, label, timestamp, data): - if label: - tag = '.'.join((self.tag, label)) - else: - tag = self.tag - packet = (tag, timestamp, data) - if self.verbose: - print(packet) - return msgpack.packb(packet, **self.msgpack_kwargs) - - def _send(self, bytes_): - with self.lock: - if self._closed: - return False - return self._send_internal(bytes_) - - def _send_internal(self, bytes_): - # buffering - if self.pendings: - self.pendings += bytes_ - bytes_ = self.pendings - - try: - self._send_data(bytes_) - - # send finished - self.pendings = None - - return True - except socket.error as e: - self.last_error = e - - # close socket - self._close() - - # clear buffer if it exceeds max buffer size - if self.pendings and (len(self.pendings) > self.bufmax): - self._call_buffer_overflow_handler(self.pendings) - self.pendings = None - else: - self.pendings = bytes_ - - return False - - def _check_recv_side(self): - try: - self.socket.settimeout(0.0) - try: - recvd = self.socket.recv(4096) - except socket.error as recv_e: - if recv_e.errno != errno.EWOULDBLOCK: - raise - return - - if recvd == b'': - raise socket.error(errno.EPIPE, "Broken pipe") - finally: - self.socket.settimeout(self.timeout) - - def _send_data(self, bytes_): - # reconnect if possible - self._reconnect() - # send message - bytes_to_send = len(bytes_) - bytes_sent = 0 - self._check_recv_side() - while bytes_sent < bytes_to_send: - sent = self.socket.send(bytes_[bytes_sent:]) - if sent == 0: - raise socket.error(errno.EPIPE, "Broken pipe") - bytes_sent += sent - self._check_recv_side() - - def _reconnect(self): - if not self.socket: - try: - if self.host.startswith('unix://'): - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.settimeout(self.timeout) - sock.connect(self.host[len('unix://'):]) - else: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(self.timeout) - # This might be controversial and may need to be removed - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - sock.connect((self.host, self.port)) - except Exception as e: - try: - sock.close() - except Exception: # pragma: no cover - pass - raise e - else: - self.socket = sock - - def _call_buffer_overflow_handler(self, pending_events): - try: - if self.buffer_overflow_handler: - self.buffer_overflow_handler(pending_events) - except Exception as e: - # User should care any exception in handler - pass - - def _close(self): - try: - sock = self.socket - if sock: - try: - try: - sock.shutdown(socket.SHUT_RDWR) - except socket.error: # pragma: no cover - pass - finally: - try: - sock.close() - except socket.error: # pragma: no cover - pass - finally: - self.socket = None - - def __enter__(self): - return self - - def __exit__(self, typ, value, traceback): - try: - self.close() - except Exception as e: # pragma: no cover - self.last_error = e diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..950f549 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["pybuilder>=0.12.0"] +build-backend = "pybuilder.pep517" diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index b8f2760..0000000 --- a/setup.cfg +++ /dev/null @@ -1,10 +0,0 @@ -[nosetests] -match = ^test_ -cover-package = fluent -with-coverage = 1 -cover-erase = 1 -cover-branches = 1 -cover-inclusive = 1 -cover-min-percentage = 70 -[bdist_wheel] -universal = 1 diff --git a/setup.py b/setup.py old mode 100755 new mode 100644 index 65035ca..aaff15c --- a/setup.py +++ b/setup.py @@ -1,42 +1,89 @@ -#!/usr/bin/python +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# This file is part of PyBuilder +# +# Copyright 2011-2020 PyBuilder Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. -from os import path +# +# This script allows to support installation via: +# pip install git+git://@ +# +# This script is designed to be used in combination with `pip install` ONLY +# +# DO NOT RUN MANUALLY +# + +import os +import subprocess +import sys +import glob +import shutil + +from sys import version_info +py3 = version_info[0] == 3 +py2 = not py3 +if py2: + FileNotFoundError = OSError + + +def install_pyb(): + try: + subprocess.check_call([sys.executable, "-m", "pip", "install", "pybuilder"]) + except subprocess.CalledProcessError as e: + sys.exit(e.returncode) + + +script_dir = os.path.dirname(os.path.realpath(__file__)) +exit_code = 0 + +try: + subprocess.check_call(["pyb", "--version"]) +except FileNotFoundError as e: + if py3 or py2 and e.errno == 2: + install_pyb() + else: + raise +except subprocess.CalledProcessError as e: + if e.returncode == 127: + install_pyb() + else: + sys.exit(e.returncode) try: - from setuptools import setup -except ImportError: - from distutils.core import setup - -README = path.abspath(path.join(path.dirname(__file__), 'README.rst')) -desc = 'A Python logging handler for Fluentd event collector' - -setup( - name='fluent-logger', - version='0.9.6', - description=desc, - long_description=open(README).read(), - package_dir={'fluent': 'fluent'}, - packages=['fluent'], - install_requires=['msgpack<1.0.0'], - author='Kazuki Ohta', - author_email='kazuki.ohta@gmail.com', - url='https://github.com/fluent/fluent-logger-python', - download_url='http://pypi.python.org/pypi/fluent-logger/', - license='Apache License, Version 2.0', - classifiers=[ - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: Implementation :: CPython', - 'Programming Language :: Python :: Implementation :: PyPy', - 'Development Status :: 5 - Production/Stable', - 'Topic :: System :: Logging', - 'Intended Audience :: Developers', - ], - python_requires=">=2.7,!=3.0,!=3.1,!=3.2,!=3.3", - test_suite='tests' -) + from pybuilder.cli import main + # verbose, debug, skip all optional... + if main("-v", "-X", "-o", "--reset-plugins", "clean", "package"): + raise RuntimeError("PyBuilder build failed") + + from pybuilder.reactor import Reactor + reactor = Reactor.current_instance() + project = reactor.project + dist_dir = project.expand_path("$dir_dist") + + for src_file in glob.glob(os.path.join(dist_dir, "*")): + file_name = os.path.basename(src_file) + target_file_name = os.path.join(script_dir, file_name) + if os.path.exists(target_file_name): + if os.path.isdir(target_file_name): + shutil.rmtree(target_file_name) + else: + os.remove(target_file_name) + shutil.move(src_file, script_dir) + setup_args = sys.argv[1:] + subprocess.check_call([sys.executable, "setup.py"] + setup_args, cwd=script_dir) +except subprocess.CalledProcessError as e: + exit_code = e.returncode +sys.exit(exit_code) diff --git a/fluent/__init__.py b/src/main/python/fluent/__init__.py similarity index 100% rename from fluent/__init__.py rename to src/main/python/fluent/__init__.py diff --git a/fluent/asynchandler.py b/src/main/python/fluent/asynchandler.py similarity index 90% rename from fluent/asynchandler.py rename to src/main/python/fluent/asynchandler.py index bbba4c4..7bf6373 100644 --- a/fluent/asynchandler.py +++ b/src/main/python/fluent/asynchandler.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- -from fluent import asyncsender -from fluent import handler +from fluent import handler, asyncsender class FluentHandler(handler.FluentHandler): diff --git a/fluent/asyncsender.py b/src/main/python/fluent/asyncsender.py similarity index 84% rename from fluent/asyncsender.py rename to src/main/python/fluent/asyncsender.py index 7f8dc02..5e841cd 100644 --- a/fluent/asyncsender.py +++ b/src/main/python/fluent/asyncsender.py @@ -1,18 +1,11 @@ # -*- coding: utf-8 -*- -from __future__ import print_function - import threading - -try: - from queue import Queue, Full, Empty -except ImportError: - from Queue import Queue, Full, Empty +from queue import Queue, Full, Empty from fluent import sender -from fluent.sender import EventTime -__all__ = ["EventTime", "FluentSender"] +__all__ = ["FluentSender"] DEFAULT_QUEUE_MAXSIZE = 100 DEFAULT_QUEUE_CIRCULAR = False @@ -55,6 +48,7 @@ def __init__(self, msgpack_kwargs=None, queue_maxsize=DEFAULT_QUEUE_MAXSIZE, queue_circular=DEFAULT_QUEUE_CIRCULAR, + queue_overflow_handler=None, **kwargs): """ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. @@ -66,6 +60,10 @@ def __init__(self, **kwargs) self._queue_maxsize = queue_maxsize self._queue_circular = queue_circular + if queue_circular and queue_overflow_handler: + self._queue_overflow_handler = queue_overflow_handler + else: + self._queue_overflow_handler = self._queue_overflow_handler_default self._thread_guard = threading.Event() # This ensures visibility across all variables self._closed = False @@ -109,13 +107,15 @@ def _send(self, bytes_): if self._queue_circular and self._queue.full(): # discard oldest try: - self._queue.get(block=False) + discarded_bytes = self._queue.get(block=False) except Empty: # pragma: no cover pass + else: + self._queue_overflow_handler(discarded_bytes) try: self._queue.put(bytes_, block=(not self._queue_circular)) - except Full: # pragma: no cover - return False # this actually can't happen + except Full: # pragma: no cover + return False # this actually can't happen return True @@ -132,5 +132,8 @@ def _send_loop(self): finally: self._close() + def _queue_overflow_handler_default(self, discarded_bytes): + pass + def __exit__(self, exc_type, exc_val, exc_tb): self.close() diff --git a/fluent/event.py b/src/main/python/fluent/event.py similarity index 100% rename from fluent/event.py rename to src/main/python/fluent/event.py diff --git a/fluent/handler.py b/src/main/python/fluent/handler.py similarity index 97% rename from fluent/handler.py rename to src/main/python/fluent/handler.py index 9297550..7aefd8f 100644 --- a/fluent/handler.py +++ b/src/main/python/fluent/handler.py @@ -9,11 +9,6 @@ except ImportError: # pragma: no cover import json -try: - basestring -except NameError: # pragma: no cover - basestring = (str, bytes) - from fluent import sender @@ -120,7 +115,7 @@ def _structuring(self, data, record): if isinstance(msg, dict): self._add_dic(data, msg) - elif isinstance(msg, basestring): + elif isinstance(msg, str): self._add_dic(data, self._format_msg(record, msg)) else: self._add_dic(data, {'message': msg}) @@ -171,8 +166,8 @@ def _format_by_dict_uses_time(self): @staticmethod def _add_dic(data, dic): for key, value in dic.items(): - if isinstance(key, basestring): - data[str(key)] = value + if isinstance(key, str): + data[key] = value class FluentHandler(logging.Handler): diff --git a/src/main/python/fluent/sender.py b/src/main/python/fluent/sender.py new file mode 100644 index 0000000..0b59cec --- /dev/null +++ b/src/main/python/fluent/sender.py @@ -0,0 +1,974 @@ +# -*- coding: utf-8 -*- +import logging +import os +import socket +from base64 import b64encode +from collections import deque, OrderedDict as odict +from errno import EWOULDBLOCK, EINPROGRESS +from hashlib import sha512 +from os.path import abspath +from queue import Queue, Full, Empty +from select import select +from subprocess import Popen +from threading import Thread, Condition, RLock, Semaphore, BoundedSemaphore +from time import sleep +from time import time +from urllib.parse import urlparse +from uuid import uuid1 + +from msgpack import Packer +from msgpack.fallback import Unpacker + +DEFAULT_SCHEME = "tcp" + +CLOSED = b"" +EOF = CLOSED +NEW_OP = b"0" + +OP_READ = 1 +OP_WRITE = 2 +OP_CLOSE = 3 + +EPC_READY = 0 +EPC_HELO = 1 +EPC_PONG = 2 + +TOMBSTONE = object() + +_endpoint_registry = {} + + +def to_bytes(s): + if isinstance(s, str): + return s.encode("utf-8") + return s + + +def _register_endpoint(schemes, endpoint, force_overwrite=False): + if isinstance(schemes, str): + schemes = (schemes,) + + for scheme in schemes: + if scheme in _endpoint_registry and not force_overwrite: + raise RuntimeError("endpoint %s is already registered with %r" % (scheme, endpoint)) + _endpoint_registry[scheme] = endpoint + + +def _find_endpoint(scheme): + """ + ``scheme`` - ``Endpoint`` only handles ``scheme`` + ``scheme``+``subscheme`` - ``Endpoint`` only handles that specific chain of schemes overwriting the wildcard + ``scheme``+ - ``Endpoint`` handles all schemes that start with ``scheme`` + :param scheme: + :return: + """ + endpoint = _endpoint_registry.get(scheme) + if not endpoint: + for r_scheme in _endpoint_registry: + if r_scheme[-1] == "+" and scheme == r_scheme[:-1] or scheme.startswith(r_scheme): + endpoint = _endpoint_registry[r_scheme] + break + + return endpoint + + +def endpoint(url, **kwargs): + p_url = urlparse(url, scheme=DEFAULT_SCHEME) + endpoint = _find_endpoint(p_url.scheme) + if not endpoint: + raise ValueError("No endpoint found for %s" % url) + + return endpoint(**kwargs) + + +class QueueDict(odict): + def __init__(self, maxsize=0): + self.maxsize = maxsize + self._init(maxsize) + + # mutex must be held whenever the queue is mutating. All methods + # that acquire mutex must release it before returning. mutex + # is shared between the three conditions, so acquiring and + # releasing the conditions also acquires and releases mutex. + self.mutex = RLock() + + # Notify not_empty whenever an item is added to the queue; a + # thread waiting to get is notified then. + self.not_empty = Condition(self.mutex) + + # Notify not_full whenever an item is removed from the queue; + # a thread waiting to put is notified then. + self.not_full = Condition(self.mutex) + + # Notify all_tasks_done whenever the number of unfinished tasks + # drops to zero; thread waiting to join() is notified to resume + self.all_tasks_done = Condition(self.mutex) + self.unfinished_tasks = 0 + + def task_done(self): + '''Indicate that a formerly enqueued task is complete. + Used by Queue consumer threads. For each get() used to fetch a task, + a subsequent call to task_done() tells the queue that the processing + on the task is complete. + If a join() is currently blocking, it will resume when all items + have been processed (meaning that a task_done() call was received + for every item that had been put() into the queue). + Raises a ValueError if called more times than there were items + placed in the queue. + ''' + with self.all_tasks_done: + unfinished = self.unfinished_tasks - 1 + if unfinished <= 0: + if unfinished < 0: + raise ValueError('task_done() called too many times') + self.all_tasks_done.notify_all() + self.unfinished_tasks = unfinished + + def join(self): + '''Blocks until all items in the Queue have been gotten and processed. + The count of unfinished tasks goes up whenever an item is added to the + queue. The count goes down whenever a consumer thread calls task_done() + to indicate the item was retrieved and all work on it is complete. + When the count of unfinished tasks drops to zero, join() unblocks. + ''' + with self.all_tasks_done: + while self.unfinished_tasks: + self.all_tasks_done.wait() + + def qsize(self): + '''Return the approximate size of the queue (not reliable!).''' + with self.mutex: + return self._qsize() + + def empty(self): + '''Return True if the queue is empty, False otherwise (not reliable!). + This method is likely to be removed at some point. Use qsize() == 0 + as a direct substitute, but be aware that either approach risks a race + condition where a queue can grow before the result of empty() or + qsize() can be used. + To create code that needs to wait for all queued tasks to be + completed, the preferred technique is to use the join() method. + ''' + with self.mutex: + return not self._qsize() + + def full(self): + '''Return True if the queue is full, False otherwise (not reliable!). + This method is likely to be removed at some point. Use qsize() >= n + as a direct substitute, but be aware that either approach risks a race + condition where a queue can shrink before the result of full() or + qsize() can be used. + ''' + with self.mutex: + return 0 < self.maxsize <= self._qsize() + + def put(self, key, block=True, timeout=None, value=None): + '''Put an item into the queue. + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until a free slot is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the Full exception if no free slot was available within that time. + Otherwise ('block' is false), put an item on the queue if a free slot + is immediately available, else raise the Full exception ('timeout' + is ignored in that case). + ''' + with self.not_full: + if self.maxsize > 0: + if not block: + if self._qsize() >= self.maxsize: + raise Full + elif timeout is None: + while self._qsize() >= self.maxsize: + self.not_full.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time() + timeout + while self._qsize() >= self.maxsize: + remaining = endtime - time() + if remaining <= 0.0: + raise Full + self.not_full.wait(remaining) + self._put(key, value) + self.unfinished_tasks += 1 + self.not_empty.notify() + + def get(self, block=True, timeout=None): + '''Remove and return an item from the queue. + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until an item is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the Empty exception if no item was available within that time. + Otherwise ('block' is false), return an item if one is immediately + available, else raise the Empty exception ('timeout' is ignored + in that case). + ''' + with self.not_empty: + if not block: + if not self._qsize(): + raise Empty + elif timeout is None: + while not self._qsize(): + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time() + timeout + while not self._qsize(): + remaining = endtime - time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + item = self._get() + self.not_full.notify() + return item + + def pop(self, key, block=True, timeout=None): + '''Pop and return a specific item from the queue. + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until an item is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the Empty exception if no item was available within that time. + Otherwise ('block' is false), return an item if one is immediately + available, else raise the Empty exception ('timeout' is ignored + in that case). + ''' + with self.not_empty: + if not block: + item = self._pop(key) + elif timeout is None: + while not self._qsize(): + self.not_empty.wait() + item = self._pop(key) + if item: + break + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time() + timeout + while not self._qsize(): + remaining = endtime - time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + item = self._pop(key) + if item: + break + + self.not_full.notify() + return item + + def put_nowait(self, item): + '''Put an item into the queue without blocking. + Only enqueue the item if a free slot is immediately available. + Otherwise raise the Full exception. + ''' + return self.put(item, block=False) + + def get_nowait(self): + '''Remove and return an item from the queue without blocking. + Only get an item if one is immediately available. Otherwise + raise the Empty exception. + ''' + return self.get(block=False) + + # Override these methods to implement other queue organizations + # (e.g. stack or priority queue). + # These will only be called with appropriate locks held + + # Initialize the queue representation + def _init(self, maxsize): + self.queue = odict() + + def _qsize(self): + return len(self.queue) + + # Put a new item in the queue + def _put(self, key, value): + self.queue[key] = value + + # Get an item from the queue + def _get(self): + return self.queue.popitem(False) + + def _pop(self, key): + return self.queue.pop(key, None) + + +qdict = QueueDict + + +class Endpoint: + """An ``Endpoint`` represents a single FluentD server or a server cluster that operates cohesively as one unit. + Endpoint may have multiple ``EndpointConnection``s that may come and go as cluster nodes are spun up and die. + """ + + self_fqdn = to_bytes(socket.getfqdn()) + + def __init__(self, _url, shared_key=None, username=None, password=None): + self.url = _url + self.shared_key = shared_key + self.username = username + self.password = password + + self.connections = odict() + self.sender_c = None + + def attach(self, sender_c): + self.sender_c = sender_c + + def addrs(self): + """Returns all socket addresses """ + raise NotImplementedError + + def connection(self): + raise NotImplementedError + + def refresh_connections(self): + """Called by SenderConnection when it's time to refresh the connections""" + + s_addrs = set(self.addrs()) + + removed_addrs = self.connections.keys() - s_addrs + new_addrs = s_addrs - self.connections.keys() + + for addr in removed_addrs: + self.connections.pop(addr).close() + + for new_addr in new_addrs: + conn = self.connection()(new_addr, self) + self.connections[new_addr] = conn + conn.connect() + + return removed_addrs, new_addrs + + +class InetEndpoint(Endpoint): + default_port = 24224 + + def __init__(self, *args, prefer_ipv6=False, **kwargs): + super(InetEndpoint, self).__init__(*args, **kwargs) + netloc_hosts = self.url.netloc.split(",") + self.netloc_addrs = [addr if len(addr) > 1 else (addr[0], self.default_port) for addr in + (hp.split(":") for hp in netloc_hosts)] + self.prefer_ipv6 = prefer_ipv6 + self.addr_family_kind_proto = {} + + def addrs(self): + results = [] + for addr in self.netloc_addrs: + host_addrs = socket.getaddrinfo(host=addr[0], port=addr[1], **self.addr_family_kind_proto) + if self.prefer_ipv6: + ipv6_host_addrs = [host_addr for host_addr in host_addrs if host_addr[0] == socket.AF_INET6] + if ipv6_host_addrs: + host_addrs = ipv6_host_addrs + else: + ipv4_host_addrs = [host_addr for host_addr in host_addrs if host_addr[0] == socket.AF_INET] + if ipv4_host_addrs: + host_addrs = ipv4_host_addrs + + results.extend(host_addrs) + + return results + + +class TcpEndpoint(InetEndpoint): + def __init__(self, *args, **kwargs): + super(TcpEndpoint, self).__init__(*args, **kwargs) + self.addr_family_kind_proto = {"type": socket.SOCK_STREAM, + "proto": socket.IPPROTO_TCP} + + def connection(self): + return TcpConnection + + +_register_endpoint("tcp+", TcpEndpoint) + + +class EndpointConnection(Thread): + """One of the connections established for a specific ``Endpoint``. """ + + def __init__(self, addr, endpoint): + super(EndpointConnection, self, ).__init__(name="EPC %r" % (addr,), daemon=True) + + self.addr = addr + self.endpoint = endpoint + self.logger = endpoint.sender_c.logger + self.sender_c = endpoint.sender_c + self.sock = self._socket(addr) # type: socket.socket + self._fileno = self.sock.fileno() + self._unpacker = None + self._eventq = Queue() # queue of messages to be processed by the connection, in order + self._writeq = deque() # data to be written into a socket, in order + + self._shared_key_salt = None + self._nonce = None + self._keep_alive = False + + if endpoint.shared_key or endpoint.username: + self.state = EPC_HELO + else: + self.state = EPC_READY + + def connect(self): + self.sock.setblocking(False) + addr = self._connect_addr() + self.logger.debug("Establishing connection to %s", addr) + self._connect(addr) + self._fileno = self.sock.fileno() + self.start() + + def fileno(self): + return self._fileno + + def on_read(self): + try: + data = self._recv() + except socket.error as e: + if e.errno == EWOULDBLOCK or e.errno == EINPROGRESS: + return True + raise + + if data == b"\x00": + # This is just HEARTBEAT, skip + logger.debug("Received HEARTBEAT from %s", self._connect_addr) + return True + + unpacker = self._unpacker + if not unpacker: + unpacker = self._unpacker = Unpacker(encoding='utf-8') + + unpacker.feed(data) + obj = None + for obj in unpacker: + self.logger.debug("On %s received: %s", self, obj) + self._eventq.put(obj) + + self._unpacker = None + + if obj is None and data == EOF: + if self._keep_alive: + log = self.logger.warning + else: + log = self.logger.debug + log("Connection %s remote closed while reading", self) + + self.schedule_close() + return False + + return True + + def on_write(self): + try: + data = self._writeq.popleft() + except IndexError: + return False + + bytes_left = len(data) + bytes_sent = -1 + while bytes_left and bytes_sent: + try: + bytes_sent = self._send(data) + if not bytes_sent: + self.logger.warning("Connection %s remote closed unexpectedly while writing", self) + self.schedule_close() + return False + bytes_left -= bytes_sent + if bytes_left: + data = data[bytes_sent:] + except socket.error as e: + if e.errno == EWOULDBLOCK: + break + raise + + if bytes_left: # We tried to write everything but couldn't and received a 0-byte send + self._writeq.appendleft(data) + + return True + + def schedule_close(self): + self.logger.debug("Scheduling close on %s", self) + self.sender_c.schedule_op(OP_CLOSE, self) + + def close(self): + if self.sock.fileno() < 0: + return + + self.logger.debug("Closing %s", self) + self.sender_c.schedule_op(OP_READ, self, False) + self.sender_c.schedule_op(OP_WRITE, self, False) + try: + try: + try: + self.sock.shutdown(socket.SHUT_RDWR) + except socket.error: # pragma: no cover + pass + finally: + try: + self.sock.close() + except socket.error: # pragma: no cover + pass + finally: + self._eventq.put(TOMBSTONE) + + def send(self, data): + self._writeq.append(data) + self.sender_c.schedule_op(OP_WRITE, self) + + def ping_from_helo(self, obj): + shared_key_salt = None + shared_key_hexdigest = None + password_digest = "" + + self._keep_alive = obj[1].get("keepalive", False) + + if self.endpoint.shared_key: + self._shared_key_salt = shared_key_salt = os.urandom(16) + self._nonce = nonce = obj[1]["nonce"] + digest = sha512() + digest.update(shared_key_salt) + digest.update(self.endpoint.self_fqdn) + digest.update(nonce) + digest.update(to_bytes(self.endpoint.shared_key)) + shared_key_hexdigest = digest.hexdigest() + + if self.endpoint.username: + digest = sha512() + digest.update(obj[1]["auth"]) + digest.update(to_bytes(self.endpoint.username)) + digest.update(to_bytes(self.endpoint.password)) + password_digest = digest.hexdigest() + + data = ["PING", self.endpoint.self_fqdn, shared_key_salt, shared_key_hexdigest, + self.endpoint.username or "", password_digest] + msg = Packer(use_bin_type=True).pack(data) + return msg + + def verify_pong(self, obj): + try: + if not obj[1]: + self.logger.warning("Authentication failed for %s: %s", self, obj[2]) + return False + else: + # Authenticate server + digest = sha512() + digest.update(self._shared_key_salt) + digest.update(to_bytes(obj[3])) + digest.update(self._nonce) + digest.update(to_bytes(self.endpoint.shared_key)) + my_shared_key_hexdigest = digest.hexdigest() + if my_shared_key_hexdigest != obj[4]: + self.logger.warning("Server hash didn't match: %r vs %r", my_shared_key_hexdigest, obj[4]) + return False + return True + except Exception as e: + self.logger.error("Unknown error while validating PONG", exc_info=e) + return False + + def send_msg(self, tag, time, record, ack=False): + options = {"size": 1} + if ack: + options["chunk"] = b64encode(uuid1().bytes) + data = [tag, int(time), record, options] + self.logger.debug("Sending %r", data) + msg = Packer(use_bin_type=True).pack(data) + self.send(msg) + + def send_msgs(self, tag, entries, ack=False): + options = {"size": len(entries)} + if ack: + options["chunk"] = b64encode(uuid1().bytes) + data = [tag, entries, options] + self.logger.debug("Sending %r", data) + msg = Packer(use_bin_type=True).pack(data) + self.send(msg) + + def run(self): + eventq = self._eventq + while True: + obj = eventq.get(block=True) + if obj is TOMBSTONE: + return + if not obj: + logger.warning("Unexpected empty packet received from %s: %s", self.sock.getpeername(), obj) + self.close() + return + if isinstance(obj, (list, tuple)): # Array + msg_type = obj[0] + if msg_type == "HELO": + if self.state != EPC_HELO: + logger.warning("Unexpected HELO received from %s: %s", self.sock.getpeername(), obj) + self.close() + return + self.send(self.ping_from_helo(obj)) + self.state = EPC_PONG + elif msg_type == "PONG": + if self.state != EPC_PONG: + logger.warning("Unexpected PONG received from %s: %s", self.sock.getpeername(), obj) + self.close() + return + if not self.verify_pong(obj): + self.close() + return + self.state = EPC_READY + self.logger.info("Ready!") + else: # Dict + chunk_id = obj.get("ack", None) + if not chunk_id: + logger.warning("Unexpected response received from %s: %s", self.sock.getpeername(), obj) + self.close() + return + self.sender_c.ack_chunk(chunk_id) + + def _socket(self, addr): + raise NotImplementedError + + def _connect_addr(self): + raise NotImplementedError + + def _connect(self, addr): + raise NotImplementedError + + def _recv(self): + raise NotImplementedError + + def _send(self, data): + raise NotImplementedError + + +class StreamConnection(EndpointConnection): + def __init__(self, addr, endpoint, bufsize): + if addr[1] != socket.SOCK_STREAM: + raise ValueError("Socket type %s cannot be used with %s" % (addr[1], self.__class__.name)) + super(StreamConnection, self).__init__(addr, endpoint) + self.bufsize = bufsize + + def _socket(self, addr): + return socket.socket(addr[0], addr[1], addr[2]) + + def _connect_addr(self): + return self.addr[4] + + def _connect(self, addr): + try: + self.sock.connect(addr) + except socket.error as e: + if not (e.errno == EWOULDBLOCK or e.errno == EINPROGRESS): + raise + + def _recv(self): + return self.sock.recv(self.bufsize) + + def _send(self, data): + return self.sock.send(data) + + +class TcpConnection(StreamConnection): + def __init__(self, addr, endpoint, bufsize): + if addr[0] not in (socket.AF_INET, socket.AF_INET6): + raise ValueError("Address family %s cannot be used with %s" % (addr[0], self.__class__.name)) + super(TcpConnection, self).__init__(addr, endpoint, bufsize) + + +if False: # pragma: no branch + class UdpConnection(EndpointConnection): + def __init__(self, addr, endpoint, maxsize, bind_to): + if addr[1] != socket.SOCK_DGRAM: + raise ValueError("Socket type %s cannot be used with %s" % (addr[1], self.__class__.name)) + super(UdpConnection, self).__init__(addr, endpoint) + self.maxsize = maxsize + self.bind_to = bind_to + self.remote_addr = self.addr[4] + + def _socket(self, addr): + return socket.socket(addr[0], addr[1], addr[2]) + + def _connect_addr(self): + return self.remote_addr + + def _connect(self, addr): + self.sock.bind((self.bind_to, 0)) + self.sock.connect(self.remote_addr) + + def _recv(self): + data, _ = self.sock.recvfrom(self.maxsize) + return data + + def _send(self, data): + return self.sock.sendto(data, self.remote_addr) + + +class UnixConnection(StreamConnection): + def __init__(self, addr, endpoint, bufsize): + if addr[0] != socket.AF_UNIX: + raise ValueError("Address family %s cannot be used with %s" % (addr[0], self.__class__.name)) + super(UnixConnection, self).__init__(addr, endpoint, bufsize) + + def _socket(self, addr): + return socket.socket(addr[0], addr[1]) + + +class MsgMeta: + __slots__ = ["deadline", "retries"] + + def __init__(self, deadline): + self.deadline = deadline + self.retries = 0 + + +class AsyncLogStore: + def __init__(self, queue_maxsize, queue_circular, send_timeout): + self.send_timeout = send_timeout + self._queue_circular = queue_circular + self._queue = qdict(maxsize=queue_maxsize) + self._acks = set() + self.mutex = self._queue.mutex + self.ack_received = Condition(self.mutex) + + def post(self, msg): + remaining = self.send_timeout + t1 = time() + msg_meta = MsgMeta(t1 + remaining) + + if self.mutex.acquire(timeout=remaining): + try: + if self._queue_circular and self._queue.full(): + # discard oldest + try: + self._queue.get(block=False) + except Empty: # pragma: no cover + pass + t2 = time() + remaining -= t2 - t1 + t1 = t2 + try: + self._queue.put(msg, + block=not self._queue_circular, + timeout=remaining, + value=msg_meta) + except Full: + return False + t2 = time() + remaining -= t2 - t1 + t1 = t2 + + + finally: + self.mutex.release() + else: + return False + + def ack(self, msg): + with self.mutex: + self._acks[msg] = 1 + self.ack_received.notify_all() + + +class SyncLogStore: + def __init__(self, send_timeout, at_least_once): + self.send_timeout = send_timeout + self.at_least_once = at_least_once + self.send_sem = BoundedSemaphore() + self.pending_sem = Semaphore(0) + self.mutex = RLock() + self.delivered = Condition(lock=self.mutex) + self.available = Condition(lock=self.mutex) + self.msg = None + + self.logger = None + + def post(self, tag, ts, payload): + msg = (tag, ts, payload) + + remaining = self.send_timeout + t1 = time() + deadline = t1 + remaining + if self.send_sem.acquire(timeout=remaining): + try: + with self.mutex: + remaining = deadline - time() + self.msg = msg + self.pending_sem.release() + if self.at_least_once: + return self.delivered.wait(remaining) + finally: + self.send_sem.release() + + def ack(self, msg): + with self.mutex: + if msg is self.msg: + self.delivered.notify() + else: + self.logger.warning("Message ACK delivered after timeout", extra={"message", msg}) + + def next(self, timeout=None): + remaining = timeout + + with self.mutex: + while True: + if self.msg is not None: + return self.msg + if timeout and remaining > 0: + self.available.wait(timeout=remaining) + else: + self.available.wait() + + +class SenderConnection(Thread): + def __init__(self, + endpoints, + ha_strategy, + log_store, + refresh_period, + logger): + """ + Internal Sender connection that maintains an aggregate connection for the Sender. + The Sender may be connected to various servers and clusters via different protocols over multiple endpoints. + How endpoints are treated depends on a strategy specified, which is transparent to the Sender. + :param endpoints: iterable of Endpoint + :param ha_strategy: logic underlying selection of the endpoint for sending + :param log_store: a store for log messages allowing to customize store behavior + :param refresh_period: how often to refresh the endpoints + :param logger: internal Fluent logger + """ + super(SenderConnection, self).__init__(name=self.__class__.name, daemon=True) + + self.endpoints = endpoints + self.ha_strategy = ha_strategy + self.log_store = log_store + self.refresh_period = refresh_period + self.logger = logger + + self.endpoint_connections = {} + + self._close_pending = deque() + self._open_pending = deque() + + self.mutex = RLock() + + self.wakeup_sock_r, self.wakeup_sock_w = socket.socketpair() + self.wakeup_sock_r.setblocking(False) + + self.op_queue = deque() + + def refresh_endpoints(self): + for endpoint in self.endpoints: + endpoint.refresh_connections() + + def schedule_op(self, op, conn, enable=True): + self.op_queue.append((enable, op, conn)) + self.wakeup_sock_w.send(NEW_OP) + + def close(self, timeout=None): + self.wakeup_sock_w.close() + self.join(timeout) + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def ack_chunk(self, chunk_id): + logger.debug("Acknowledging chunk %r", chunk_id) + + def send(self, tag, ts, payload): + return self.log_store.post(tag, ts, payload) + + def run(self): + logger = self.logger + r_int = set() + w_int = set() + + refresh_period = self.refresh_period + + wakeup_sock_r = self.wakeup_sock_r + r_int.add(wakeup_sock_r) + + op_queue = self.op_queue + # last_ts = time.time() + + with wakeup_sock_r, self.wakeup_sock_w: + while r_int or w_int: + r_ready, w_ready, _ = select(r_int, w_int, (), timeout=refresh_period) + for r in r_ready: + if r is wakeup_sock_r: + while True: + try: + cmds = r.recv(2048) + except socket.error as e: + if e.errno == EWOULDBLOCK: + break + if cmds == b"": + r_int.remove(r) + break + # Handle exception here + for cmd in cmds: + cmd = bytes((cmd,)) + if cmd == CLOSED: + r_int.remove(r) + break + elif cmd == NEW_OP: + enable, op, conn = op_queue.pop() + if op == OP_READ: + if enable: + r_int.add(conn) + else: + r_int.discard(conn) + elif op == OP_WRITE: + if enable: + w_int.add(conn) + else: + w_int.discard(conn) + elif op == OP_CLOSE: + conn.close() + else: + keep = False + try: + keep = r.on_read() + except Exception as e: + with r: + logger.warning("Read error on %s", r, exc_info=e) + + if not keep: + r_int.remove(r) + + for w in w_ready: + keep = False + try: + keep = w.on_write() + except Exception as e: + with w: + logger.warning("Write error on %s", w, exc_info=e) + + if not keep: + w_int.remove(w) + + r_ready.clear() + w_ready.clear() + + +if __name__ == '__main__': + logger = logging.getLogger("fluent") + logger.propagate = False + logger.addHandler(logging.StreamHandler()) + logger.setLevel(1) + + with Popen(["docker", "run", "-i", + "-p", "24224:24224", "-p", "24224:24224/udp", + "-p", "24225:24225", "-p", "24225:24225/udp", + "-p", "24226:24226", "-p", "24226:24226/udp", + "-v", "%s:/fluentd/log" % abspath("../tests"), + "-v", "%s:/fluentd/etc/fluent.conf" % abspath("../tests/fluent.conf"), + "-v", "%s:/var/run/fluent" % abspath("../tests/fluent_sock"), + "fluent/fluentd:v1.1.0"]) as docker: + sleep(5) + log_store = SyncLogStore(send_timeout=3.0, at_least_once=True) + with SenderConnection([endpoint("tcp://localhost:24224")], None, log_store, 5.0, logger) as conn: + conn.send("tag-name", time(), {"value-x": "a", "value-y": 1}) + conn.send_msgs("tag-name", ((int(time()), {"value-x": "a", "value-y": 1}), + (int(time()), {"value-x": "m", "value-b": 200}))) + sleep(3) + + docker.terminate() diff --git a/tests/__init__.py b/src/unittest/python/__init__.py similarity index 100% rename from tests/__init__.py rename to src/unittest/python/__init__.py diff --git a/src/unittest/python/fluentd_tester.py b/src/unittest/python/fluentd_tester.py new file mode 100644 index 0000000..5a29e3b --- /dev/null +++ b/src/unittest/python/fluentd_tester.py @@ -0,0 +1,68 @@ +import os +import tempfile +from collections import namedtuple + +import docker + +""" + with Popen(["docker", "run", "-i", + "-p", "24224:24224", "-p", "24224:24224/udp", + "-p", "24225:24225", "-p", "24225:24225/udp", + "-p", "24226:24226", "-p", "24226:24226/udp", + "-v", "%s:/fluentd/log" % abspath("../tests"), + "-v", "%s:/fluentd/etc/fluent.conf" % abspath("../tests/fluent.conf"), + "-v", "%s:/var/run/fluent" % abspath("../tests/fluent_sock"), + "fluent/fluentd:v1.1.0"]) as docker: +""" +PROTOS_SUPPORTED = ("tcp", "tcp+tls", "udp", "unix", "unix+tls") +FluentConfig = namedtuple("FluentConfig", ["proto", "proto_confs", "port"], defaults=[{}, None]) + + +class FluentInstance: + def __init__(self, config, *, port_generator, docker, data_dir): + if config.proto not in PROTOS_SUPPORTED: + raise ValueError("proto must be one of %r" % (PROTOS_SUPPORTED,)) + + self.config = config + if config.proto.startswith("unix"): + self.port = tempfile.mktemp() + else: + self.port = config.port or port_generator() + self.config_dir = tempfile.TemporaryDirectory() + self.docker = docker + self.data_dir = data_dir + + def start(self): + pass + + def cleanup(self): + self.config_dir.cleanup() + if self.config.proto.startswith("unix"): + os.unlink(self.config.proto) + + +class FluentDockerTester: + def __init__(self, fluent_image, configs, *, base_port=24224): + def port_generator(): + nonlocal base_port + try: + return base_port + finally: + base_port += 1 + + self.docker = docker.from_env() + self.data_dir = tempfile.TemporaryDirectory() + self.fluent_image = fluent_image + self.instances = {} + for config in configs: + self.instances[config] = FluentInstance(config, + port_generator=port_generator, + docker=self.docker, + data_dir=self.data_dir) + + def setUp(self): + pass + + def tearDown(self): + for config in self.instances: + config.cleanup() diff --git a/tests/mockserver.py b/src/unittest/python/mockserver.py similarity index 93% rename from tests/mockserver.py rename to src/unittest/python/mockserver.py index 426d139..77ecdd3 100644 --- a/tests/mockserver.py +++ b/src/unittest/python/mockserver.py @@ -66,9 +66,7 @@ def run(self): def get_received(self): self.join() self._buf.seek(0) - # TODO: have to process string encoding properly. currently we assume - # that all encoding is utf-8. - return list(Unpacker(self._buf, encoding='utf-8')) + return list(Unpacker(self._buf)) def close(self): diff --git a/tests/test_asynchandler.py b/src/unittest/python/test_asynchandler.py similarity index 81% rename from tests/test_asynchandler.py rename to src/unittest/python/test_asynchandler.py index 52d9182..1df77b3 100644 --- a/tests/test_asynchandler.py +++ b/src/unittest/python/test_asynchandler.py @@ -4,9 +4,20 @@ import sys import unittest +try: + from unittest import mock +except ImportError: + import mock +try: + from unittest.mock import patch +except ImportError: + from mock import patch + + + import fluent.asynchandler import fluent.handler -from tests import mockserver +from src.unittest.python import mockserver class TestHandler(unittest.TestCase): @@ -309,3 +320,68 @@ def test_simple(self): eq('userB', el[2]['to']) self.assertTrue(el[1]) self.assertTrue(isinstance(el[1], int)) + + +class QueueOverflowException(BaseException): + pass + + +def queue_overflow_handler(discarded_bytes): + raise QueueOverflowException(discarded_bytes) + + +class TestHandlerWithCircularQueueHandler(unittest.TestCase): + Q_SIZE = 1 + + def setUp(self): + super(TestHandlerWithCircularQueueHandler, self).setUp() + self._server = mockserver.MockRecvServer('localhost') + self._port = self._server.port + + def tearDown(self): + self._server.close() + + def get_handler_class(self): + # return fluent.handler.FluentHandler + return fluent.asynchandler.FluentHandler + + def test_simple(self): + handler = self.get_handler_class()('app.follow', port=self._port, + queue_maxsize=self.Q_SIZE, + queue_circular=True, + queue_overflow_handler=queue_overflow_handler) + with handler: + def custom_full_queue(): + handler.sender._queue.put(b'Mock', block=True) + return True + + with patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(side_effect=custom_full_queue)): + self.assertEqual(handler.sender.queue_circular, True) + self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE) + + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + + exc_counter = 0 + + try: + log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) + except QueueOverflowException: + exc_counter += 1 + + try: + log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) + except QueueOverflowException: + exc_counter += 1 + + try: + log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) + except QueueOverflowException: + exc_counter += 1 + + # we can't be sure to have exception in every case due to multithreading, + # so we can test only for a cautelative condition here + print('Exception raised: {} (expected 3)'.format(exc_counter)) + assert exc_counter >= 0 diff --git a/tests/test_asyncsender.py b/src/unittest/python/test_asyncsender.py similarity index 99% rename from tests/test_asyncsender.py rename to src/unittest/python/test_asyncsender.py index eb36f96..ae11ca0 100644 --- a/tests/test_asyncsender.py +++ b/src/unittest/python/test_asyncsender.py @@ -8,7 +8,7 @@ import msgpack import fluent.asyncsender -from tests import mockserver +from src.unittest.python import mockserver class TestSetup(unittest.TestCase): diff --git a/tests/test_event.py b/src/unittest/python/test_event.py similarity index 92% rename from tests/test_event.py rename to src/unittest/python/test_event.py index d341616..2d2f8ab 100644 --- a/tests/test_event.py +++ b/src/unittest/python/test_event.py @@ -2,11 +2,12 @@ import unittest -from fluent import event, sender -from tests import mockserver +from fluent import sender, event +from src.unittest.python import mockserver -class TestException(BaseException): pass +class TestException(BaseException): + pass class TestEvent(unittest.TestCase): @@ -15,7 +16,7 @@ def setUp(self): sender.setup('app', port=self._server.port) def tearDown(self): - from fluent.sender import _set_global_sender + from fluent import _set_global_sender sender.close() _set_global_sender(None) diff --git a/tests/test_handler.py b/src/unittest/python/test_handler.py similarity index 99% rename from tests/test_handler.py rename to src/unittest/python/test_handler.py index 45fea86..b39932e 100644 --- a/tests/test_handler.py +++ b/src/unittest/python/test_handler.py @@ -5,7 +5,7 @@ import unittest import fluent.handler -from tests import mockserver +from src.unittest.python import mockserver class TestHandler(unittest.TestCase): diff --git a/tests/test_sender.py b/src/unittest/python/test_sender.py similarity index 99% rename from tests/test_sender.py rename to src/unittest/python/test_sender.py index 1c0fbe9..6eac57f 100644 --- a/tests/test_sender.py +++ b/src/unittest/python/test_sender.py @@ -12,12 +12,12 @@ import msgpack import fluent.sender -from tests import mockserver +from src.unittest.python import mockserver class TestSetup(unittest.TestCase): def tearDown(self): - from fluent.sender import _set_global_sender + from fluent import _set_global_sender _set_global_sender(None) def test_no_kwargs(self): diff --git a/src/unittest/python/test_with_fluentd.py b/src/unittest/python/test_with_fluentd.py new file mode 100644 index 0000000..e69de29 diff --git a/tox.ini b/tox.ini deleted file mode 100644 index 6c3f032..0000000 --- a/tox.ini +++ /dev/null @@ -1,9 +0,0 @@ -[tox] -minversion = 1.7.2 -envlist = py27, py32, py33, py34, py35, py36, py37, py38 -skip_missing_interpreters = True - -[testenv] -deps = nose - coverage~=4.5.4 -commands = python setup.py nosetests