From 548f6640e83a2b5328c03de2c41fb267456a7742 Mon Sep 17 00:00:00 2001 From: yuxuan-ms <43603130+yuxuan-ms@users.noreply.github.com> Date: Thu, 13 Aug 2020 16:36:28 +0800 Subject: [PATCH] Add Kafka driver --- .travis.yml | 4 + doc/en/download/Transports.rst | 21 +++ .../Kafka/server_template.properties | 127 ++++++++++++++++++ examples/Transports/Kafka/test_plan.py | 102 ++++++++++++++ examples/Transports/Kafka/zoo_template.cfg | 24 ++++ requirements.txt | 1 + testplan/testing/multitest/driver/kafka.py | 93 +++++++++++++ .../testing/multitest/driver/zookeeper.py | 4 + .../driver/mykafka/server_template.properties | 127 ++++++++++++++++++ .../multitest/driver/mykafka/test_kafka.py | 72 ++++++++++ 10 files changed, 575 insertions(+) create mode 100644 examples/Transports/Kafka/server_template.properties create mode 100644 examples/Transports/Kafka/test_plan.py create mode 100644 examples/Transports/Kafka/zoo_template.cfg create mode 100644 testplan/testing/multitest/driver/kafka.py create mode 100644 tests/unit/testplan/testing/multitest/driver/mykafka/server_template.properties create mode 100644 tests/unit/testplan/testing/multitest/driver/mykafka/test_kafka.py diff --git a/.travis.yml b/.travis.yml index c83806b6a..bc9bd2e12 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,6 +10,10 @@ python: before_install: - sudo apt-get -y install zookeeper zookeeper-bin zookeeperd + - wget https://downloads.apache.org/kafka/2.6.0/kafka_2.12-2.6.0.tgz -O kafka.tgz + - sudo mkdir /opt/kafka + - sudo chown -R $USER:$USER /opt/kafka + - tar zxf kafka.tgz -C /opt/kafka --strip-components 1 install: - pip install -r requirements.txt -U diff --git a/doc/en/download/Transports.rst b/doc/en/download/Transports.rst index eaaed82c4..ba16ec1be 100644 --- a/doc/en/download/Transports.rst +++ b/doc/en/download/Transports.rst @@ -105,3 +105,24 @@ custom_http_request_handler.py test.txt ++++++++ .. literalinclude:: ../../../examples/Transports/HTTP/test.txt + +.. _example_kafka: + +Kafka +----- +Required files: + - :download:`test_plan.py <../../../examples/Transports/Kafka/test_plan.py>` + - :download:`server_template.properties <../../../examples/Transports/Kafka/server_template.properties>` + - :download:`zoo_template.cfg <../../../examples/Transports/Kafka/zoo_template.cfg>` + +test_plan.py +++++++++++++ +.. literalinclude:: ../../../examples/Transports/Kafka/test_plan.py + +server_template.properties +++++++++++++++++++++++++++ +.. literalinclude:: ../../../examples/Transports/Kafka/server_template.properties + +zoo_template.cfg +++++++++++++++++ +.. literalinclude:: ../../../examples/Transports/Kafka/zoo_template.cfg diff --git a/examples/Transports/Kafka/server_template.properties b/examples/Transports/Kafka/server_template.properties new file mode 100644 index 000000000..a091e09ee --- /dev/null +++ b/examples/Transports/Kafka/server_template.properties @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +# Switch to enable topic deletion or not, default value is false +delete.topic.enable=true + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. It will get the value returned from +# java.net.InetAddress.getCanonicalHostName() if not configured. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 + +listeners=PLAINTEXT://localhost:{{port}} + +# Hostname and port the broker will advertise to producers and consumers. If not set, +# it uses the value for "listeners" if configured. Otherwise, it will use the value +# returned from java.net.InetAddress.getCanonicalHostName(). +#advertised.listeners=PLAINTEXT://your.host.name:9092 + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + +# The number of threads handling network requests +num.network.threads=3 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +#log.dirs=/tmp/jiaweil/kafka-vista-9098 +log.dirs={{log_path}} + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=3 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={{context['zk'].connection_str}} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=6000 + +offsets.topic.replication.factor=1 + diff --git a/examples/Transports/Kafka/test_plan.py b/examples/Transports/Kafka/test_plan.py new file mode 100644 index 000000000..094a8a2a7 --- /dev/null +++ b/examples/Transports/Kafka/test_plan.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +""" +Demostrates Kafka driver usage from within the testcases. +""" + +import os +import sys +import uuid + +from testplan import test_plan +from testplan.testing.multitest import MultiTest + +try: + from confluent_kafka import Producer, Consumer +except ImportError: + print("Cannot import confluent_kafka!") + exit() + +from testplan.testing.multitest.driver.zookeeper import ( + ZookeeperStandalone, + ZK_SERVER, +) +from testplan.testing.multitest.driver.kafka import ( + KafkaStandalone, + KAFKA_START, +) +from testplan.testing.multitest import testsuite, testcase +from testplan.report.testing.styles import Style, StyleEnum + + +OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL) + + +@testsuite +class KafkaTest(object): + """Suite that contains testcases that perform kafka operation.""" + + @testcase + def test_send_receive(self, env, result): + producer = Producer( + { + "bootstrap.servers": "localhost:{}".format(env.kafka.port), + "max.in.flight": 1, + } + ) + consumer = Consumer( + { + "bootstrap.servers": "localhost:{}".format(env.kafka.port), + "group.id": uuid.uuid4(), + "default.topic.config": {"auto.offset.reset": "smallest"}, + "enable.auto.commit": True, + } + ) + + topic = "testplan" + message = str(uuid.uuid4()).encode("utf-8") + producer.produce(topic=topic, value=message) + consumer.subscribe([topic]) + msg = consumer.poll(10) + result.equal(message, msg.value(), "Test producer and consumer") + + +# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the +# downloadable example gives meaningful and presentable output. +# NOTE: this programmatic arguments passing approach will cause Testplan +# to ignore any command line arguments related to that functionality. +@test_plan( + name="KafkaExample", + stdout_style=OUTPUT_STYLE, + pdf_style=OUTPUT_STYLE, + pdf_path="report.pdf", +) +def main(plan): + """ + Testplan decorated main function to add and execute MultiTests. + + :return: Testplan result object. + :rtype: ``testplan.base.TestplanResult`` + """ + current_path = os.path.dirname(os.path.abspath(__file__)) + zookeeper_template = os.path.join(current_path, "zoo_template.cfg") + kafka_template = os.path.join(current_path, "server_template.properties") + + plan.add( + MultiTest( + name="KafkaTest", + suites=[KafkaTest()], + environment=[ + ZookeeperStandalone( + name="zk", cfg_template=zookeeper_template + ), + KafkaStandalone(name="kafka", cfg_template=kafka_template), + ], + ) + ) + + +if __name__ == "__main__": + if os.path.exists(ZK_SERVER) and os.path.exists(KAFKA_START): + sys.exit(not main()) + else: + print("Zookeeper doesn't exist in this server.") diff --git a/examples/Transports/Kafka/zoo_template.cfg b/examples/Transports/Kafka/zoo_template.cfg new file mode 100644 index 000000000..c287fb728 --- /dev/null +++ b/examples/Transports/Kafka/zoo_template.cfg @@ -0,0 +1,24 @@ +# limits the number of active connections from a host, +# specified by IP address, to a single ZooKeeper server. +maxClientCnxns=100 + +# The basic time unit in milliseconds used by ZooKeeper. +# It is used to do heartbeats and the minimum session timeout will be twice the tickTime. +tickTime=2000 + +# Timeouts ZooKeeper uses to limit the length of time the ZooKeeper +# servers in quorum have to connect to a leader. +initLimit=10 + +# Limits how far out of date a server can be from a leader. +syncLimit=5 + +# Enable admin server. +admin.enableServer=false + +# The localtion to store the in-memory database snapshots and, unless specified otherwise, +# the transaction log of updates to the database. +dataDir={{zkdata_path}} + +# The port to listen for client connections. +clientPort={{port}} diff --git a/requirements.txt b/requirements.txt index e42c32909..e90dd43fb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ pylint scikit-learn < 0.21.0 black==19.10b0; python_version >= '3.0' kazoo +confluent-kafka diff --git a/testplan/testing/multitest/driver/kafka.py b/testplan/testing/multitest/driver/kafka.py new file mode 100644 index 000000000..ff86ef1c3 --- /dev/null +++ b/testplan/testing/multitest/driver/kafka.py @@ -0,0 +1,93 @@ +""" +Driver for Kafka server +""" +import os +import re + +from schema import Or +from testplan.common.config import ConfigOption +from testplan.common.utils.path import ( + makeemptydirs, + makedirs, + instantiate, +) +from testplan.testing.multitest.driver import app + +KAFKA_START = "/opt/kafka/bin/kafka-server-start.sh" + + +class KafkaStandaloneConfig(app.AppConfig): + """" + Configuration object for + :py:class:`~testplan.testing.multitest.driver.kafka.KafkaStandalone` resource. + """ + + @classmethod + def get_options(cls): + return { + ConfigOption("cfg_template"): str, + ConfigOption("port"): int, + } + + +class KafkaStandalone(app.App): + """ + Driver for starting a Kafka instance in standalone mode. + + :param cfg_template: Zookeeper config file template. + :type cfg_template: ``str`` + :param binary: zkServer.sh file path. + :type binary: ``str`` + :param port: Zookeeper listen port. Zookeeper doesn't support random port + :type port: ``int`` + :param env: Environmental variables to be made available to Zookeeper process. + :type env: ``dict`` + """ + + CONFIG = KafkaStandaloneConfig + + def __init__( + self, name, cfg_template, binary=KAFKA_START, port=0, **options + ): + log_regexps = [ + re.compile( + r".*Awaiting socket connections on\s*(?P[^:]+):(?P[0-9]+).*" + ), + re.compile(".*started.*"), + ] + super(KafkaStandalone, self).__init__( + name=name, + cfg_template=cfg_template, + binary=binary, + port=port, + log_regexps=log_regexps, + **options + ) + + self.zookeeper_connect = None + self.log_path = None + self.etc_path = None + self.config = None + self.port = port + + def pre_start(self): + super(KafkaStandalone, self).pre_start() + self.log_path = os.path.join(self.runpath, "log") + self.etc_path = os.path.join(self.runpath, "etc") + for directory in (self.log_path, self.etc_path): + if self.cfg.path_cleanup is False: + makedirs(directory) + else: + makeemptydirs(directory) + self.config = os.path.join(self.runpath, "etc", "server.properties") + instantiate(self.cfg.cfg_template, self.context_input(), self.config) + + @property + def cmd(self): + return [self.cfg.binary, self.config] + + def started_check(self, timeout=None): + """Driver started status condition check.""" + + super(KafkaStandalone, self).started_check(timeout) + self.port = int(self.extracts["port"]) diff --git a/testplan/testing/multitest/driver/zookeeper.py b/testplan/testing/multitest/driver/zookeeper.py index b7bfd0d03..5023a2fa4 100644 --- a/testplan/testing/multitest/driver/zookeeper.py +++ b/testplan/testing/multitest/driver/zookeeper.py @@ -76,6 +76,10 @@ def __init__( self.pid_file = None self.std = None + @property + def connection_str(self): + return "{}:{}".format("localhost", self.port) + def pre_start(self): """ Create mandatory directories and install files from given templates diff --git a/tests/unit/testplan/testing/multitest/driver/mykafka/server_template.properties b/tests/unit/testplan/testing/multitest/driver/mykafka/server_template.properties new file mode 100644 index 000000000..a091e09ee --- /dev/null +++ b/tests/unit/testplan/testing/multitest/driver/mykafka/server_template.properties @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +# Switch to enable topic deletion or not, default value is false +delete.topic.enable=true + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. It will get the value returned from +# java.net.InetAddress.getCanonicalHostName() if not configured. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 + +listeners=PLAINTEXT://localhost:{{port}} + +# Hostname and port the broker will advertise to producers and consumers. If not set, +# it uses the value for "listeners" if configured. Otherwise, it will use the value +# returned from java.net.InetAddress.getCanonicalHostName(). +#advertised.listeners=PLAINTEXT://your.host.name:9092 + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + +# The number of threads handling network requests +num.network.threads=3 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +#log.dirs=/tmp/jiaweil/kafka-vista-9098 +log.dirs={{log_path}} + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=3 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={{context['zk'].connection_str}} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=6000 + +offsets.topic.replication.factor=1 + diff --git a/tests/unit/testplan/testing/multitest/driver/mykafka/test_kafka.py b/tests/unit/testplan/testing/multitest/driver/mykafka/test_kafka.py new file mode 100644 index 000000000..e496b421d --- /dev/null +++ b/tests/unit/testplan/testing/multitest/driver/mykafka/test_kafka.py @@ -0,0 +1,72 @@ +"""Unit tests for the Zookeeper drivers.""" + +import os +import uuid +import pytest + +from testplan.common import entity +from testplan.base import TestplanMock +from testplan.testing.multitest.driver import zookeeper +from testplan.testing.multitest.driver import kafka + +pytest.importorskip("confluent_kafka") +from confluent_kafka import Producer, Consumer + +pytestmark = pytest.mark.skipif( + not os.path.exists(kafka.KAFKA_START), + reason="Kafka doesn't exist in this server.", +) + +zk_cfg_template = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + os.pardir, + "myzookeeper", + "zoo_template.cfg", +) + +kafka_cfg_template = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "server_template.properties" +) + + +@pytest.fixture(scope="module") +def zookeeper_server(): + server = zookeeper.ZookeeperStandalone("zk", cfg_template=zk_cfg_template) + with server: + yield server + + +@pytest.fixture(scope="module") +def kafka_server(zookeeper_server): + server = kafka.KafkaStandalone("kafka", cfg_template=kafka_cfg_template,) + + testplan = TestplanMock("KafkaTest", parse_cmdline=False) + env = entity.Environment(parent=testplan) + env.add(zookeeper_server) + env.add(server) + with server: + yield server + + +def test_kafka(kafka_server): + producer = Producer( + { + "bootstrap.servers": "localhost:{}".format(kafka_server.port), + "max.in.flight": 1, + } + ) + consumer = Consumer( + { + "bootstrap.servers": "localhost:{}".format(kafka_server.port), + "group.id": uuid.uuid4(), + "default.topic.config": {"auto.offset.reset": "smallest"}, + "enable.auto.commit": True, + } + ) + + topic = "testplan" + message = str(uuid.uuid4()).encode("utf-8") + producer.produce(topic=topic, value=message) + consumer.subscribe([topic]) + msg = consumer.poll(10) + assert message == msg.value()