diff --git a/.circleci/config.yml b/.circleci/config.yml
index 662b0312..df86c3ec 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -16,6 +16,7 @@ jobs:
- run:
name: Run Tests
command: |
+ set -e
make test
no_output_timeout: 30m
- store_test_results:
diff --git a/.gitignore b/.gitignore
index e6dbee38..2140fda5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,6 +27,7 @@ dist
_book
*.result
venv/*
+eventgenEnv/*
*.log.*
splunk_eventgen-*/
.env
diff --git a/LICENSE b/LICENSE
index 7678be52..75e7576d 100644
--- a/LICENSE
+++ b/LICENSE
@@ -220,7 +220,6 @@ The following components are provided under the Apache License 2.0. See project
(Apache License 2.0) pyOpenSSL (https://github.com/pyca/pyopenssl/blob/master/LICENSE)
(Apache License 2.0) docker (https://github.com/docker/docker-py/blob/master/LICENSE)
(Apache License 2.0) requests-futures (https://github.com/ross/requests-futures/blob/master/LICENSE)
- (Apache License 2.0) nameko (https://github.com/nameko/nameko/blob/master/LICENSE.txt)
========================================================================
MIT licenses
@@ -244,7 +243,6 @@ BSD-style licenses
The following components are provided under a BSD-style license. See project link for details.
(BSD 2-Clause "Simplified" License) mock (https://github.com/testing-cabal/mock/blob/master/LICENSE.txt)
- (BSD 3-Clause) pyrabbit (https://github.com/bkjones/pyrabbit/blob/master/LICENSE)
(BSD 3-Clause) logutils (https://opensource.org/licenses/BSD-3-Clause)
(BSD 3-Clause) jinja2 (https://github.com/pallets/jinja/blob/master/LICENSE)
(BSD 3-Clause) ujson(https://github.com/esnme/ultrajson/blob/master/LICENSE.txt)
diff --git a/Makefile b/Makefile
index c95d0718..f4e6a854 100644
--- a/Makefile
+++ b/Makefile
@@ -107,12 +107,20 @@ eg_network:
run_server: eg_network
docker kill eg_server || true
docker rm eg_server || true
- docker run --network eg_network --name eg_server -e EVENTGEN_AMQP_HOST="eg_controller" -d -p 9501:9500 eventgen:latest server
+ docker run --network eg_network --name eg_server -e REDIS_HOST=eg_controller -d -p 9501:9500 eventgen:latest server
run_controller: eg_network
docker kill eg_controller || true
docker rm eg_controller || true
- docker run --network eg_network --name eg_controller -d -p 5672:5672 -p 15672:15672 -p 9500:9500 eventgen:latest controller
+ docker run --network eg_network --name eg_controller -d -p 6379:6379 -p 9500:9500 eventgen:latest controller
+
+run_standalone:
+ docker kill eg_standalone || true
+ docker rm eg_standalone || true
+ docker run --name eg_standalone -d -p 9500:9500 eventgen:latest standalone
+
+run_local_standalone:
+ python -m splunk_eventgen service -r standalone
docs:
cd docs/; bundle install; bundle exec jekyll serve
diff --git a/dockerfiles/Dockerfile b/dockerfiles/Dockerfile
index 90b8c8eb..19628ab5 100644
--- a/dockerfiles/Dockerfile
+++ b/dockerfiles/Dockerfile
@@ -1,4 +1,4 @@
-FROM rabbitmq:3.7.15-management-alpine
+FROM redis:5.0.5-alpine
RUN apk --no-cache upgrade && \
apk add --no-cache --update \
@@ -10,17 +10,31 @@ RUN apk --no-cache upgrade && \
openssl-dev \
libxml2-dev \
libxslt-dev \
+ bash \
+ sudo \
+ openssh \
+ tar \
+ acl \
+ g++ \
+ git \
curl && \
pip install --upgrade pip && \
rm -rf /tmp/* && \
- rm -rf /var/cache/apk/*
+ rm -rf /var/cache/apk/* && \
+ ssh-keygen -f /etc/ssh/ssh_host_rsa_key -N '' -t rsa && \
+ mkdir -p /var/run/sshd && \
+ mkdir -p /root/.ssh && \
+ chmod 0700 /root/.ssh && \
+ passwd -u root && \
+ pip install git+git://github.com/esnme/ultrajson.git
+COPY dockerfiles/sshd_config /etc/ssh/sshd_config
COPY dockerfiles/entrypoint.sh /sbin/entrypoint.sh
COPY dist/* /root/splunk_eventgen.tgz
RUN pip install /root/splunk_eventgen.tgz && \
- rm /root/splunk_eventgen.tgz && \
- echo "[{rabbit, [{loopback_users, []}]}]." >> /etc/rabbitmq/rabbitmq.config
+ rm /root/splunk_eventgen.tgz
-EXPOSE 5672 15672 9500
-WORKDIR /usr/lib/python2.7/site-packages/splunk_eventgen
+EXPOSE 2222 6379 9500
+RUN chmod a+x /sbin/entrypoint.sh
+WORKDIR /usr/lib/python2.7/site-packages/splunk_eventgen
ENTRYPOINT ["/sbin/entrypoint.sh"]
diff --git a/dockerfiles/entrypoint.sh b/dockerfiles/entrypoint.sh
index e899fd9d..165e5e7d 100755
--- a/dockerfiles/entrypoint.sh
+++ b/dockerfiles/entrypoint.sh
@@ -1,11 +1,12 @@
#!/bin/bash
set -e
+/usr/sbin/sshd
if [ "$#" = 0 ]; then
tail -F -n0 /etc/hosts && wait
elif [ "$1" = "controller" ]; then
- rabbitmq-server &
+ redis-server &
splunk_eventgen service --role controller &
tail -F -n0 /etc/hosts && wait
elif [ "$1" = "server" ]; then
@@ -16,4 +17,4 @@ elif [ "$1" = "standalone" ]; then
tail -F -n0 /etc/hosts && wait
else
"$@"
-fi
+fi
\ No newline at end of file
diff --git a/dockerfiles/sshd_config b/dockerfiles/sshd_config
new file mode 100644
index 00000000..09b817d7
--- /dev/null
+++ b/dockerfiles/sshd_config
@@ -0,0 +1,35 @@
+Port 2222
+AcceptEnv LANG LANGUAGE XMODIFIERS LC_* RD_*
+AddressFamily any
+AllowAgentForwarding yes
+AllowTcpForwarding yes
+AuthorizedKeysFile %h/.ssh/authorized_keys
+ChallengeResponseAuthentication no
+Ciphers chacha20-poly1305@openssh.com,aes256-gcm@openssh.com,aes128-gcm@openssh.com,aes256-ctr,aes192-ctr,aes128-ctr
+ClientAliveInterval 180
+Compression delayed
+HostBasedAuthentication no
+HostKey /etc/ssh/ssh_host_rsa_key
+IgnoreRhosts yes
+IgnoreUserKnownHosts yes
+KexAlgorithms curve25519-sha256@libssh.org,ecdh-sha2-nistp521,ecdh-sha2-nistp384,ecdh-sha2-nistp256,diffie-hellman-group-exchange-sha256
+LogLevel INFO
+LoginGraceTime 30s
+MACs hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-512,hmac-sha2-256,umac-128@openssh.com
+MaxAuthTries 5
+MaxSessions 128
+PasswordAuthentication no
+PermitEmptyPasswords no
+PermitRootLogin yes
+PermitTunnel yes
+PermitUserEnvironment no
+PidFile /var/run/sshd.pid
+PrintMotd no
+Protocol 2
+PubKeyAuthentication yes
+StrictModes no
+Subsystem sftp /usr/lib/ssh/sftp-server
+SyslogFacility AUTH
+TcpKeepalive yes
+UseDns no
+X11Forwarding yes
diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md
index bcb926cb..870fe7af 100644
--- a/docs/ARCHITECTURE.md
+++ b/docs/ARCHITECTURE.md
@@ -46,11 +46,16 @@ Given the complexity and the reimplementation of a number of features during ref
python -m splunk_eventgen generate tests/
/.conf
-# Server-Controller Architecture
+# Controller-Server Architecture
-This is a new feature included in version >= 6.0 Traditionally, it has been difficult to configure multiple Eventgen instances at the same time. The performance of a single Eventgen instance is often limited in its architecture and compute power of the host machine.
-Therefore, it is inevitable that we will need to use more than one Eventgen instance for larger data generation. We introduce server-controller architecture to do this in a more user friendly way.
+This is a new feature included in version >= 6.0 Traditionally, it has been difficult to configure multiple Eventgen instances at the same time. The performance of a single Eventgen instance is often limited in its architecture and compute power of the host machine. We introduce Controller-Server architecture to support scalable Eventgen deployment.
-In Server-Controller Architecture, we may have one or multiple servers (Eventgen instances), one controller and one instance of RabbitMQ. It is ok to run RabbitMQ locally with the controller.
-The Servers and controller are communicating via RabbitMQ, where the controller has a capability to broadcast incoming requests to the servers and aggregate the results to output to the users.
+In this Architecture, we may have one or multiple servers (Eventgen instances), one controller and one instance of Redis (which lives inside of the same container as the controller). You don't have to worry about setting up Redis yourself. Redis is used for communication between Controller and Servers.
+
+In order to get setup, I would recommend building a Docker image locally using ``make image`` command.
+Then, start a single instance of Controller (using ``controller`` as the container argument). You may start any number of servers (using ``server`` as the container argument) with env var ``REDIS_HOST`` and ``REDIS_PORT`` that points to the controller's host address and Redis port (by default ``6379``).
+
+# Standalone Architecture
+
+Users can also set up Standalone Eventgen servers without a controller (using ``standalone`` as the container argument). Standalone mode allows users to use the same backend API calls.
diff --git a/docs/REFERENCE.md b/docs/REFERENCE.md
index 6a821cbc..4488c48d 100644
--- a/docs/REFERENCE.md
+++ b/docs/REFERENCE.md
@@ -605,8 +605,12 @@ Note, "TARGET_NAME" is a variable that should be replaced by the hostname of Eve
* Starts target Eventgen instance's data generation
* ```POST /stop```
* Stops all Eventgen instances' data generation
+ * body is optional; default is false. Setting force to true will destroy current Queues and trying to hard stop a running Eventgen object by causing errors.
+ * Format: ```{"force": true}``` or ```{"force": false}```
* ```POST /stop/```
* Stops target Eventgen instance's data generation
+ * body is optional; default is false. Setting force to true will destroy current Queues and trying to hard stop a running Eventgen object by causing errors.
+ * Format: ```{"force": true}``` or ```{"force": false}```
* ```POST /restart```
* Restarts all Eventgen instances' data generation
* ```POST /restart/```
@@ -651,6 +655,7 @@ Note, "TARGET_NAME" is a variable that should be replaced by the hostname of Eve
* Default values
* mode: "roundrobin"
* hostname_template: "idx{0}"
+ * hosts: [] # list of host addresses
* protocol: "https"
* key: "00000000-0000-0000-0000-000000000000"
* key_name: "eventgen"
@@ -664,6 +669,7 @@ Note, "TARGET_NAME" is a variable that should be replaced by the hostname of Eve
* Default values
* mode: "roundrobin"
* hostname_template: "idx{0}"
+ * hosts: [] # list of host addresses
* protocol: "https"
* key: "00000000-0000-0000-0000-000000000000"
* key_name: "eventgen"
@@ -703,3 +709,17 @@ Note, "TARGET_NAME" is a variable that should be replaced by the hostname of Eve
```
$ curl http://localhost:9500/volume/egx1 -X POST -d '{"perDayVolume": 200}'
```
+* ```POST /reset```
+ * Stops a running Eventgen run, reset the Eventgen Core Object, and reconfigure the server.
+ * Example:
+ ```
+ $ curl http://localhost:9500/reset -X POST
+ ```
+
+* ```POST /reset/```
+ * Stops a running Eventgen run, reset the Eventgen Core Object, and reconfigure the server.
+ * Example:
+ ```
+ $ curl http://localhost:9500/reset/egx1 -X POST
+ ```
+
diff --git a/docs/SETUP.md b/docs/SETUP.md
index 6c50433c..ebb1f996 100644
--- a/docs/SETUP.md
+++ b/docs/SETUP.md
@@ -101,18 +101,15 @@ $ python -m splunk_eventgen -v generate tests/sample_eventgen_conf/replay/eventg
$ splunk_eventgen -v generate path/to/eventgen.conf
```
-##### Controller/Server Cluster ###
-
-A quick preface on this mode of operation: due to its complexity, this is only recommended if you're developing or comfortable with technical setups. Having said that, you can follow these instructions:
-
-1. Install and run [RabbitMQ](https://www.rabbitmq.com/download.html) locally
-2. Install [Eventgen PyPI module](SETUP.md#pypi-setup)
-3. To set up a controller, run `splunk_eventgen service --role controller`
-4. To set up a server, run `splunk_eventgen service --role server`
-5. By default, the controller and server will try to locate RabbitMQ on pyamqp://localhost:5672 using credentials guest/guest and RabbitMQ's web UI at http://localhost:15672. If you're running another rabbitMQ server, you may error out.
-6. You can change any of those parameters using the CLI - for instance, if your RabbitMQ is accessible on rabbit-mq.company.com with credentials admin/changeme you should run `splunk_eventgen service --role controller --amqp-host rabbit-mq.company.com --amqp-user admin --amqp-pass changeme`
-7. Please see `splunk_eventgen service --help` for additional CLI options
-8. **NOTE:** Running the controller and server on the same machine will cause port collisions for Eventgen web server. To mitigate this, you can tell the server to run on a separate port using `splunk_eventgen service --web-server-address 0.0.0.0:9501`
+##### Controller-Server Cluster ###
+
+Please follow these instructions to run an Eventgen cluster on your Docker environment:
+
+1. `make image`
+2. Create a Docker network: `docker network create --attchable --driver bridge eg_network`
+3. To set up a controller, run `docker run --network eg_network --name eg_controller -d -p 6379:6379 -p 9500:9500 eventgen:latest controller`
+4. To set up a server, run `docker run --network eg_network --name eg_server -e REDIS_HOST=eg_controller -d -p 9501:9500 eventgen:latest server`
+* Note that REDIS_HOST needs to be a resolvable host address to the controller. Also, --name should be used to differientiate a server from another.
---
diff --git a/requirements.txt b/requirements.txt
index a7852b74..e64a108c 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,7 +3,6 @@ pytest-xdist
mock
pytest-cov
docker==2.7.0
-nameko
pyOpenSSL
lxml==4.3.4
pytest-mock>=1.10.4
@@ -16,10 +15,11 @@ ujson>=1.35
pyyaml
httplib2
jinja2
-pyrabbit==1.1.0
urllib3==1.24.2
pyOpenSSL
flake8>=3.7.7
yapf>=0.26.0
isort>=4.3.15
-structlog==19.1.0
+Flask>=1.0.3
+redis==3.2.1
+structlog==19.1.0
\ No newline at end of file
diff --git a/splunk_eventgen/__main__.py b/splunk_eventgen/__main__.py
index 9a64204c..9340e2f2 100644
--- a/splunk_eventgen/__main__.py
+++ b/splunk_eventgen/__main__.py
@@ -10,9 +10,6 @@
import os
import shutil
import sys
-import time
-
-import requests
FILE_LOCATION = os.path.dirname(os.path.abspath(__file__))
path_prepend = os.path.join(FILE_LOCATION, 'lib')
@@ -63,46 +60,16 @@ def parse_args():
build_subparser.add_argument("--destination", help="Specify where to store the output of the build command.")
build_subparser.add_argument("--remove", default=True,
help="Remove the build directory after completion. Defaults to True")
- # WSGI subparser
- wsgi_subparser = subparsers.add_parser('wsgi', help="start a wsgi server to interact with eventgen.")
- wsgi_subparser.add_argument(
- "--daemon", action="store_true",
- help="Daemon will tell the wsgi server to start in a daemon mode and will release the cli.")
# Service subparser
service_subparser = subparsers.add_parser(
'service',
- help=("Run Eventgen as a Nameko service. Parameters for starting this service can be defined as either env"
+ help=("Run Eventgen as an api server. Parameters for starting this service can be defined as either env"
"variables or CLI arguments, where env variables takes precedence. See help for more info."))
service_subparser.add_argument("--role", "-r", type=str, default=None, required=True, choices=[
- "controller", "server"], help="Define the role for this Eventgen node. Options: controller, server")
- service_subparser.add_argument(
- "--amqp-uri", type=str, default=None,
- help=("Full URI to AMQP endpoint in the format pyamqp://:@:."
- "This can also be set using the environment variable EVENTGEN_AMQP_URI"))
- service_subparser.add_argument(
- "--amqp-host", type=str, default=None,
- help=("Specify AMQP hostname. This can also be set using the environment variable EVENTGEN_AMQP_HOST." +
- "Default is localhost"))
- service_subparser.add_argument(
- "--amqp-port", type=int, default=None,
- help=("Specify AMQP port. This can also be set using the environment variable EVENTGEN_AMQP_PORT." +
- "Default is 5672"))
- service_subparser.add_argument(
- "--amqp-webport", type=int, default=None,
- help=("Specify AMQP web port. This can also be set using the environment variable EVENTGEN_AMQP_WEBPORT." +
- "Default is 15672"))
- service_subparser.add_argument(
- "--amqp-user", type=str, default=None,
- help=("Specify AMQP user. This can also be set using the environment variable EVENTGEN_AMQP_USER." +
- "Default is 'guest'"))
- service_subparser.add_argument(
- "--amqp-pass", type=str, default=None,
- help=("Specify AMQP password. This can also be set using the environment variable EVENTGEN_AMQP_PASS." +
- "Default is 'guest'"))
- service_subparser.add_argument(
- "--web-server-address", type=str, default=None,
- help=("Specify nameko webserver address. This can also be set using the environment variable" +
- "EVENTGEN_WEB_SERVER_ADDR. Default is 0.0.0.0:9500"))
+ "controller", "server", "standalone"], help="Define the role for this Eventgen node. Options: controller, server, standalone")
+ service_subparser.add_argument("--redis-host", type=str, default='127.0.0.1', help="Redis Host")
+ service_subparser.add_argument("--redis-port", type=str, default='6379', help="Redis Port")
+ service_subparser.add_argument("--web-server-port", type=str, default='9500', help="Port you want to run a web server on")
# Help subparser
# NOTE: Keep this at the end so we can use the subparser_dict.keys() to display valid commands
help_subparser = subparsers.add_parser('help', help="Display usage on a subcommand")
@@ -111,7 +78,6 @@ def parse_args():
# add subparsers to the subparser dict, this will be used later for usage / help statements.
subparser_dict['generate'] = generate_subparser
subparser_dict['build'] = build_subparser
- subparser_dict['wsgi'] = wsgi_subparser
subparser_dict['help'] = help_subparser
if len(sys.argv) == 1:
@@ -157,118 +123,6 @@ def parse_args():
return args
-def wait_for_response(address, webport, timeout=300):
- '''
- Extracts the hostname off the given address in the form ://:@: and
- builds a URL in the form http://:. Using this URL, it tries to verify the endpoint is reachable.
-
- Retry will occur for ~300s
- '''
- protocol, url = address.split("://")
- creds, addr = url.split("@")
- host, port = addr.split(":")
- userid, password = creds.split(":")
- start = time.time()
- end = start
- while end - start < timeout:
- try:
- r = requests.get("http://{}:{}".format(host, webport))
- r.raise_for_status()
- return
- except requests.exceptions.ConnectionError:
- time.sleep(1)
- finally:
- end = time.time()
- msg = "Unable to contact broker URL."
- logger.exception(msg)
- raise Exception(msg)
-
-
-def parse_cli_vars(config, args):
- config["AMQP_URI"] = args.amqp_uri if args.amqp_uri else config["AMQP_URI"]
- config["AMQP_HOST"] = args.amqp_host if args.amqp_host else config["AMQP_HOST"]
- config["AMQP_PORT"] = args.amqp_port if args.amqp_port else config["AMQP_PORT"]
- config["AMQP_WEBPORT"] = args.amqp_webport if args.amqp_webport else config["AMQP_WEBPORT"]
- config["AMQP_USER"] = args.amqp_user if args.amqp_user else config["AMQP_USER"]
- config["AMQP_PASS"] = args.amqp_pass if args.amqp_pass else config["AMQP_PASS"]
- config["WEB_SERVER_ADDRESS"] = args.web_server_address if args.web_server_address else config["WEB_SERVER_ADDRESS"]
- return config
-
-
-def parse_env_vars():
- osvars, config = dict(os.environ), {}
- config["AMQP_URI"] = osvars.get("EVENTGEN_AMQP_URI", None)
- config["AMQP_HOST"] = osvars.get("EVENTGEN_AMQP_HOST", "localhost")
- config["AMQP_PORT"] = osvars.get("EVENTGEN_AMQP_PORT", 5672)
- config["AMQP_WEBPORT"] = osvars.get("EVENTGEN_AMQP_WEBPORT", 15672)
- config["AMQP_USER"] = osvars.get("EVENTGEN_AMQP_URI", "guest")
- config["AMQP_PASS"] = osvars.get("EVENTGEN_AMQP_PASS", "guest")
- config["WEB_SERVER_ADDRESS"] = osvars.get("EVENTGEN_WEB_SERVER_ADDR", "0.0.0.0:9500")
- return config
-
-
-def rectify_config(config):
- # For nameko purposes, all we need to pass into the config is AMQP_URI and WEB_SERVER_ADDRESS.
- new = {}
- new["WEB_SERVER_ADDRESS"] = config.get("WEB_SERVER_ADDRESS", "0.0.0.0:9500")
- new["AMQP_WEBPORT"] = config.get("AMQP_WEBPORT", 15672)
- if "AMQP_URI" in config and config["AMQP_URI"]:
- new["AMQP_URI"] = config["AMQP_URI"]
- else:
- if all([config["AMQP_HOST"], config["AMQP_PORT"], config["AMQP_USER"], config["AMQP_PASS"]]):
- new["AMQP_URI"] = "pyamqp://{user}:{pw}@{host}:{port}".format(
- user=config["AMQP_USER"], pw=config["AMQP_PASS"], host=config["AMQP_HOST"], port=config["AMQP_PORT"])
- else:
- msg = "AMQP_URI is not defined and cannot be constructed. Check environment variables/CLI arguments."
- logger.exception(msg)
- raise Exception(msg)
- return new
-
-
-def run_nameko(args):
- # Running nameko imports here so that Eventgen as a module does not require nameko to run.
- import eventlet
- eventlet.monkey_patch()
- from nameko.runners import ServiceRunner
- # In order to make this run locally as well as within a container-ized environment, we're to pull variables
- # from both environment variables and CLI arguments, where CLI will take precendence.
- config = parse_env_vars()
- config = parse_cli_vars(config, args)
- config = rectify_config(config)
- print "Config used: {}".format(config)
- # Wait up to 30s for RMQ service to be up
- wait_for_response(config["AMQP_URI"], config["AMQP_WEBPORT"])
- # Start Nameko service
- runner = ServiceRunner(config=config)
- if args.role == "controller":
- from eventgen_nameko_controller import EventgenController
- runner.add_service(EventgenController)
- else:
- from eventgen_nameko_server import EventgenServer
- runner.add_service(EventgenServer)
- runner.start()
- runnlet = eventlet.spawn(runner.wait)
- while True:
- try:
- runnlet.wait()
- except OSError as exc:
- if exc.errno == errno.EINTR:
- # this is the OSError(4) caused by the signalhandler.
- # ignore and go back to waiting on the runner
- continue
- raise
- except KeyboardInterrupt:
- print() # looks nicer with the ^C e.g. bash prints in the terminal
- try:
- runner.stop()
- except KeyboardInterrupt:
- print() # as above
- runner.kill()
- else:
- # runner.wait completed
- break
-
-
def exclude_function(filename):
# removing any hidden . files.
last_index = filename.rfind('/')
@@ -339,6 +193,14 @@ def convert_verbosity_count_to_logging_level(verbosity):
return logging.ERROR
+def gather_env_vars(args):
+ os_vars, env_vars = dict(os.environ), {}
+ env_vars["REDIS_HOST"] = os.environ.get("REDIS_HOST", args.redis_host)
+ env_vars["REDIS_PORT"] = os.environ.get("REDIS_PORT", args.redis_port)
+ env_vars["WEB_SERVER_PORT"] = os.environ.get("WEB_SERVER_PORT", args.web_server_port)
+ return env_vars
+
+
def main():
cwd = os.getcwd()
args = parse_args()
@@ -347,7 +209,16 @@ def main():
eventgen = eventgen_core.EventGenerator(args=args)
eventgen.start()
elif args.subcommand == "service":
- run_nameko(args)
+ env_vars = gather_env_vars(args)
+ if args.role == "controller":
+ from eventgen_api_server.eventgen_controller import EventgenController
+ EventgenController(env_vars=env_vars).app_run()
+ elif args.role == "server":
+ from eventgen_api_server.eventgen_server import EventgenServer
+ EventgenServer(env_vars=env_vars, mode="cluster").app_run()
+ elif args.role == "standalone":
+ from eventgen_api_server.eventgen_server import EventgenServer
+ EventgenServer(env_vars=env_vars, mode="standalone").app_run()
elif args.subcommand == "build":
if not args.destination:
args.destination = cwd
diff --git a/splunk_eventgen/controller_conf.yml b/splunk_eventgen/controller_conf.yml
deleted file mode 100644
index 1f476d3f..00000000
--- a/splunk_eventgen/controller_conf.yml
+++ /dev/null
@@ -1,2 +0,0 @@
-AMQP_URI: 'pyamqp://guest:guest@localhost:5672'
-WEB_SERVER_ADDRESS: '0.0.0.0:9500'
diff --git a/splunk_eventgen/eventgen_api_server/__init__.py b/splunk_eventgen/eventgen_api_server/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/splunk_eventgen/eventgen_api_server/eventgen_controller.py b/splunk_eventgen/eventgen_api_server/eventgen_controller.py
new file mode 100644
index 00000000..57e769d7
--- /dev/null
+++ b/splunk_eventgen/eventgen_api_server/eventgen_controller.py
@@ -0,0 +1,70 @@
+from flask import Flask
+import socket
+import os
+import logging
+
+from eventgen_controller_api import EventgenControllerAPI
+from redis_connector import RedisConnector
+
+FILE_PATH = os.path.dirname(os.path.realpath(__file__))
+LOG_PATH = os.path.join(FILE_PATH, '..', 'logs')
+
+class EventgenController():
+
+ def __init__(self, *args, **kwargs):
+ self.env_vars = kwargs.get('env_vars')
+
+ self.role = 'controller'
+ self.host = socket.gethostname() + self.role
+
+ self.redis_connector = RedisConnector(host=self.env_vars.get('REDIS_HOST'), port=self.env_vars.get('REDIS_PORT'))
+ self.redis_connector.register_myself(hostname=self.host, role=self.role)
+
+ self._setup_loggers()
+ self.logger = logging.getLogger('eventgen_server')
+ self.logger.info('Initialized Eventgen Controller: hostname [{}]'.format(self.host))
+
+ self.app = self._create_app()
+
+ def app_run(self):
+ self.app.run(host="0.0.0.0", port=int(self.env_vars.get('WEB_SERVER_PORT')), threaded=True)
+
+ def _create_app(self):
+ app = Flask(__name__)
+ app.config['SECRET_KEY'] = 'does-not-exist'
+ app.register_blueprint(EventgenControllerAPI(redis_connector=self.redis_connector, host=self.host).get_blueprint())
+
+ @app.route('/')
+ def index():
+ return "running_eventgen_controller"
+
+ return app
+
+ def _setup_loggers(self):
+ log_path = os.path.join(FILE_PATH, 'logs')
+ eventgen_controller_logger_path = os.path.join(LOG_PATH, 'eventgen-controller.log')
+ eventgen_error_logger_path = os.path.join(LOG_PATH, 'eventgen-error.log')
+
+ log_format = '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
+ date_format = '%Y-%m-%d %H:%M:%S'
+ detailed_formatter = logging.Formatter(log_format, datefmt=date_format)
+
+ console_handler = logging.StreamHandler()
+ console_handler.setFormatter(detailed_formatter)
+ console_handler.setLevel(logging.DEBUG)
+
+ eventgen_controller_file_handler = logging.handlers.RotatingFileHandler(eventgen_controller_logger_path, maxBytes=2500000, backupCount=20)
+ eventgen_controller_file_handler.setFormatter(detailed_formatter)
+ eventgen_controller_file_handler.setLevel(logging.DEBUG)
+
+ error_file_handler = logging.handlers.RotatingFileHandler(eventgen_error_logger_path, maxBytes=2500000, backupCount=20)
+ error_file_handler.setFormatter(detailed_formatter)
+ error_file_handler.setLevel(logging.ERROR)
+
+ logger = logging.getLogger('eventgen_controller')
+ logger.setLevel(logging.INFO)
+ logger.propagate = False
+ logger.handlers = []
+ logger.addHandler(eventgen_controller_file_handler)
+ logger.addHandler(console_handler)
+ logger.addHandler(error_file_handler)
diff --git a/splunk_eventgen/eventgen_api_server/eventgen_controller_api.py b/splunk_eventgen/eventgen_api_server/eventgen_controller_api.py
new file mode 100644
index 00000000..55480909
--- /dev/null
+++ b/splunk_eventgen/eventgen_api_server/eventgen_controller_api.py
@@ -0,0 +1,229 @@
+import atexit
+from flask import Blueprint, Response, request
+import os
+import socket
+import time
+import json
+import requests
+import logging
+
+INTERNAL_ERROR_RESPONSE = json.dumps({"message": "Internal Error Occurred"})
+
+class EventgenControllerAPI():
+
+ def __init__(self, redis_connector, host):
+ self.bp = self.__create_blueprint()
+ self.redis_connector = redis_connector
+ self.host = host
+
+ self.logger = logging.getLogger("eventgen_controller")
+ self.logger.info("Initialized the EventgenControllerAPI Blueprint")
+
+ self.interval = 0.001
+
+ def get_blueprint(self):
+ return self.bp
+
+ def __create_blueprint(self):
+ bp = Blueprint('api', __name__)
+
+ def format_message(job, request_method, body=None, target='all'):
+ return json.dumps({'job': job, 'target': target, 'body': body, 'request_method': request_method})
+
+ def gather_response(response_number_target=0):
+ response = {}
+ if not response_number_target:
+ response_number_target = int(self.redis_connector.message_connection.pubsub_numsub(self.redis_connector.servers_channel)[0][1])
+ response_num = 0
+ countdown = 1.5 / self.interval
+ for i in range(0, int(countdown)):
+ if response_num == response_number_target:
+ break
+ else:
+ time.sleep(self.interval)
+ message = self.redis_connector.pubsub.get_message()
+ if message and type(message.get('data')) == str:
+ status_response = json.loads(message.get('data'))
+ response[status_response['host']] = status_response['response']
+ response_num += 1
+ return response
+
+ @bp.route('/index', methods=['GET'])
+ def index():
+ home_page = '''*** Eventgen Controller ***
+Host: {0}
+Connected Servers: {1}
+You are running Eventgen Controller.\n'''
+ host = self.host
+ return home_page.format(host, self.redis_connector.get_registered_servers())
+
+ @bp.route('/status', methods=['GET'])
+ def http_all_status():
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('status', request.method, target='all'))
+ return Response(json.dumps(gather_response()), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/status/', methods=['GET'])
+ def http_target_status(target):
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('status', request.method, target=target))
+ return Response(json.dumps(gather_response(response_number_target=1)), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/conf', methods=['GET', 'POST', 'PUT'])
+ def http_all_conf():
+ try:
+ body = None if request.method == 'GET' else request.get_json(force=True)
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('conf', request.method, body=body, target='all'))
+ return Response(json.dumps(gather_response()), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/conf/', methods=['GET', 'POST', 'PUT'])
+ def http_target_conf(target):
+ try:
+ body = None if request.method == 'GET' else request.get_json(force=True)
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('conf', request.method, body=body, target=target))
+ return Response(json.dumps(gather_response(response_number_target=1)), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/bundle', methods=['POST'])
+ def http_all_bundle():
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('bundle', request.method, body=request.get_json(force=True), target='all'))
+ return Response(json.dumps(gather_response()), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/bundle/', methods=['POST'])
+ def http_target_bundle(target):
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('bundle', request.method, body=request.get_json(force=True), target=target))
+ return Response(json.dumps(gather_response(response_number_target=1)), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/setup', methods=['POST'])
+ def http_all_setup():
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('setup', request.method, body=request.get_json(force=True), target='all'))
+ return Response(json.dumps(gather_response()), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/setup/', methods=['POST'])
+ def http_target_setup(target):
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('setup', request.method, body=request.get_json(force=True), target=target))
+ return Response(json.dumps(gather_response(response_number_target=1)), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/volume', methods=['GET', 'POST'])
+ def http_all_volume():
+ try:
+ body = None if request.method == 'GET' else request.get_json(force=True)
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('volume', request.method, body=body, target='all'))
+ return Response(json.dumps(gather_response()), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/volume/', methods=['GET', 'POST'])
+ def http_target_volume(target):
+ try:
+ body = None if request.method == 'GET' else request.get_json(force=True)
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('volume', request.method, body=body, target=target))
+ return Response(json.dumps(gather_response(response_number_target=1)), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/start', methods=['POST'])
+ def http_all_start():
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('start', request.method, target='all'))
+ return Response(json.dumps(gather_response()), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/start/', methods=['POST'])
+ def http_target_start(target):
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('start', request.method, target=target))
+ return Response(json.dumps(gather_response(response_number_target=1)), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/stop', methods=['POST'])
+ def http_all_stop():
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('stop', request.method, target='all'))
+ return Response(json.dumps(gather_response()), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/stop/', methods=['POST'])
+ def http_target_stop(target):
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('stop', request.method, target=target))
+ return Response(json.dumps(gather_response(response_number_target=1)), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/restart', methods=['POST'])
+ def http_all_restart():
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('restart', request.method, target='all'))
+ return Response(json.dumps(gather_response()), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/restart/', methods=['POST'])
+ def http_target_restart(target):
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('restart', request.method, target=target))
+ return Response(json.dumps(gather_response(response_number_target=1)), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/reset', methods=['POST'])
+ def http_all_reset():
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('reset', request.method, target='all'))
+ return Response(json.dumps(gather_response()), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/reset/', methods=['POST'])
+ def http_target_reset(target):
+ try:
+ self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, format_message('reset', request.method, target=target))
+ return Response(json.dumps(gather_response(response_number_target=1)), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ return bp
+
+ def __make_error_response(self, status, message):
+ return Response(json.dumps({'message': message}), status=status)
diff --git a/splunk_eventgen/eventgen_api_server/eventgen_core_object.py b/splunk_eventgen/eventgen_api_server/eventgen_core_object.py
new file mode 100644
index 00000000..b02ba219
--- /dev/null
+++ b/splunk_eventgen/eventgen_api_server/eventgen_core_object.py
@@ -0,0 +1,57 @@
+import argparse
+import logging
+import os
+
+import splunk_eventgen.eventgen_core as eventgen_core
+
+FILE_PATH = os.path.dirname(os.path.realpath(__file__))
+CUSTOM_CONFIG_PATH = os.path.realpath(os.path.join(FILE_PATH, "..", "default", "eventgen_wsgi.conf"))
+
+class EventgenCoreObject():
+ def __init__(self):
+ self.logger = logging.getLogger('eventgen_server')
+ self.eventgen_core_object = eventgen_core.EventGenerator(self._create_args())
+ self.configured = False
+ self.configfile = None
+ self.check_and_configure_eventgen()
+
+ def check_and_configure_eventgen(self):
+ if os.path.isfile(CUSTOM_CONFIG_PATH):
+ self.configured = True
+ self.configfile = CUSTOM_CONFIG_PATH
+ self.eventgen_core_object.reload_conf(CUSTOM_CONFIG_PATH)
+ self.logger.info("Configured Eventgen from {}".format(CUSTOM_CONFIG_PATH))
+
+ def refresh_eventgen_core_object(self):
+ self.eventgen_core_object.kill_processes()
+ self.eventgen_core_object = eventgen_core.EventGenerator(self._create_args())
+ self.configured = False
+ self.configfile = None
+ self.check_and_configure_eventgen()
+ self.logger.info("Refreshed the eventgen core object")
+
+ def _create_args(self):
+ args = argparse.Namespace()
+ args.daemon = False
+ args.verbosity = None
+ args.version = False
+ args.backfill = None
+ args.count = None
+ args.devnull = False
+ args.disableOutputQueue = False
+ args.end = None
+ args.generators = None
+ args.interval = None
+ args.keepoutput = False
+ args.modinput = False
+ args.multiprocess = True
+ args.outputters = None
+ args.profiler = False
+ args.sample = None
+ args.version = False
+ args.subcommand = 'generate'
+ args.verbosity = 20
+ args.wsgi = True
+ args.modinput_mode = False
+ args.generator_queue_size = 1500
+ return args
diff --git a/splunk_eventgen/eventgen_api_server/eventgen_server.py b/splunk_eventgen/eventgen_api_server/eventgen_server.py
new file mode 100644
index 00000000..3edeeeb2
--- /dev/null
+++ b/splunk_eventgen/eventgen_api_server/eventgen_server.py
@@ -0,0 +1,42 @@
+from flask import Flask
+
+import socket
+import logging
+
+from eventgen_server_api import EventgenServerAPI
+import eventgen_core_object
+
+class EventgenServer():
+
+ def __init__(self, *args, **kwargs):
+ self.eventgen = eventgen_core_object.EventgenCoreObject()
+ self.mode = kwargs.get('mode', 'standalone')
+ self.env_vars = kwargs.get('env_vars')
+ self.host = socket.gethostname()
+ self.role = 'server'
+
+ self.logger = logging.getLogger('eventgen_server')
+ self.logger.info('Initialized Eventgen Controller: hostname [{}]'.format(self.host))
+
+ if self.mode != 'standalone':
+ from redis_connector import RedisConnector
+ self.redis_connector = RedisConnector(host=self.env_vars.get('REDIS_HOST'), port=self.env_vars.get('REDIS_PORT'))
+ self.redis_connector.register_myself(hostname=self.host, role=self.role)
+ self.app = self._create_app()
+
+ def app_run(self):
+ self.app.run(host="0.0.0.0", port=int(self.env_vars.get('WEB_SERVER_PORT')), threaded=True)
+
+ def _create_app(self):
+ app = Flask(__name__)
+ app.config['SECRET_KEY'] = 'does-not-exist'
+ if self.mode == 'standalone':
+ app.register_blueprint(EventgenServerAPI(eventgen=self.eventgen, redis_connector=None, host=self.host).get_blueprint())
+ else:
+ app.register_blueprint(EventgenServerAPI(eventgen=self.eventgen, redis_connector=self.redis_connector, host=self.host, mode=self.mode).get_blueprint())
+
+ @app.route('/')
+ def index():
+ return "running_eventgen_server"
+
+ return app
diff --git a/splunk_eventgen/eventgen_api_server/eventgen_server_api.py b/splunk_eventgen/eventgen_api_server/eventgen_server_api.py
new file mode 100644
index 00000000..c355cdfa
--- /dev/null
+++ b/splunk_eventgen/eventgen_api_server/eventgen_server_api.py
@@ -0,0 +1,588 @@
+import flask
+from flask import Response, request
+import socket
+import json
+import ConfigParser
+import os
+import time
+import requests
+import zipfile
+import tarfile
+import glob
+import shutil
+import collections
+import logging
+import requests
+from requests.packages.urllib3.util.retry import Retry
+import threading
+
+import eventgen_core_object
+
+INTERNAL_ERROR_RESPONSE = json.dumps({"message": "Internal Error Occurred"})
+
+FILE_PATH = os.path.dirname(os.path.realpath(__file__))
+DEFAULT_PATH = os.path.realpath(os.path.join(FILE_PATH, "..", "default"))
+SAMPLE_DIR_PATH = os.path.realpath(os.path.join(FILE_PATH, "..", "samples"))
+
+class EventgenServerAPI():
+ def __init__(self, eventgen, redis_connector, host, mode='standalone'):
+ self.bp = self._create_blueprint()
+ self.eventgen = eventgen
+
+ self.logger = logging.getLogger('eventgen_server')
+ self.logger.info("Initialized the EventgenServerAPI Blueprint")
+
+ self.total_volume = 0.0
+ self.host = host
+
+ self.interval = 0.01
+ if mode != 'standalone':
+ self.redis_connector = redis_connector
+ self._channel_listener()
+ self.logger.info("Initialized the channel listener. Cluster mode ready.")
+
+ def get_blueprint(self):
+ return self.bp
+
+ def _channel_listener(self):
+ def start_listening(self):
+ while True:
+ message = self.redis_connector.pubsub.get_message()
+ if message and type(message.get('data')) == str:
+ data = json.loads(message.get('data'))
+ self.logger.info("Message Recieved {}".format(message['data']))
+ if data['target'] == 'all' or data['target'] == self.host:
+ thread = threading.Thread(target=self._delegate_jobs, args=(data.get('job'), data.get('request_method'), data.get('body')))
+ thread.daemon = True
+ thread.start()
+ time.sleep(self.interval)
+ thread = threading.Thread(target=start_listening, args=(self,))
+ thread.daemon = True
+ thread.start()
+
+ def format_message(self, job, request_method, response):
+ return json.dumps({'job': job, 'request_method': request_method, 'response': response, 'host': self.host})
+
+ def _delegate_jobs(self, job, request_method, body):
+ if not job: return
+ else:
+ if job == 'status':
+ response = self.get_status()
+ self.redis_connector.message_connection.publish(self.redis_connector.controller_channel, self.format_message('status', request_method, response=response))
+ elif job == 'conf':
+ if request_method == 'POST':
+ self.set_conf(body)
+ elif request_method == 'PUT':
+ self.edit_conf(body)
+ self.redis_connector.message_connection.publish(self.redis_connector.controller_channel, self.format_message('conf', request_method, response=self.get_conf()))
+ elif job == 'bundle':
+ self.set_bundle(body.get("url", ''))
+ self.redis_connector.message_connection.publish(self.redis_connector.controller_channel, self.format_message('bundle', request_method, response=self.get_conf()))
+ elif job == 'setup':
+ self.clean_bundle_conf()
+ self.setup_http(body)
+ self.redis_connector.message_connection.publish(self.redis_connector.controller_channel, self.format_message('setup', request_method, response=self.get_conf()))
+ elif job == 'volume':
+ if request_method == 'POST':
+ self.set_volume(body.get("perDayVolume", 0.0))
+ self.redis_connector.message_connection.publish(self.redis_connector.controller_channel, self.format_message('volume', request_method, response=self.get_volume()))
+ elif job == 'start':
+ self.redis_connector.message_connection.publish(self.redis_connector.controller_channel, self.format_message('start', request_method, response=self.start()))
+ elif job == 'stop':
+ message = {'message': 'Eventgen is stopping. Might take some time to terminate all processes.'}
+ self.redis_connector.message_connection.publish(self.redis_connector.controller_channel, self.format_message('stop', request_method, response=message))
+ self.stop(force_stop=True)
+ elif job == 'restart':
+ message = {'message': 'Eventgen is restarting. Might take some time to restart.'}
+ self.redis_connector.message_connection.publish(self.redis_connector.controller_channel, self.format_message('restart', request_method, response=message))
+ self.restart()
+ elif job == 'reset':
+ message = {'message': 'Eventgen is resetting. Might take some time to reset.'}
+ self.redis_connector.message_connection.publish(self.redis_connector.controller_channel, self.format_message('reset', request_method, response=message))
+ self.reset()
+
+
+ def _create_blueprint(self):
+ bp = flask.Blueprint('server_api', __name__)
+
+ @bp.route('/index', methods=['GET'])
+ def http_get_index():
+ return self.get_index()
+
+ @bp.route('/status', methods=['GET'])
+ def http_get_status():
+ try:
+ response = self.get_status()
+ return Response(json.dumps(response), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/conf', methods=['GET', 'POST', 'PUT'])
+ def http_conf():
+ try:
+ if request.method == 'POST':
+ self.set_conf(request.get_json(force=True))
+ elif request.method == 'PUT':
+ self.edit_conf(request.get_json(force=True))
+ return Response(json.dumps(self.get_conf()), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/volume', methods=['GET'])
+ def http_get_volume():
+ try:
+ response = self.get_volume()
+ return Response(json.dumps(response), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/volume', methods=['POST'])
+ def http_post_volume():
+ try:
+ self.set_volume(request.get_json(force=True).get("perDayVolume", 0.0))
+ return Response(json.dumps(self.get_volume()), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/start', methods=['POST'])
+ def http_post_start():
+ try:
+ response = self.start()
+ return Response(json.dumps(response), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/stop', methods=['POST'])
+ def http_post_stop():
+ try:
+ force_stop = False
+ try:
+ force_stop = True
+ except:
+ force_stop = False
+ response = self.stop(force_stop = force_stop)
+ self.eventgen.refresh_eventgen_core_object()
+ return Response(json.dumps(response), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/restart', methods=['POST'])
+ def http_post_restart():
+ try:
+ response = self.restart()
+ return Response(json.dumps(response), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/reset', methods=['POST'])
+ def http_post_reset():
+ try:
+ response = self.reset()
+ return Response(json.dumps(response), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/bundle', methods=['POST'])
+ def http_post_bundle():
+ try:
+ self.set_bundle(request.get_json(force=True).get("url", ''))
+ self.clean_bundle_conf()
+ return Response(json.dumps(self.get_conf()), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ @bp.route('/setup', methods=['POST'])
+ def http_post_setup():
+ try:
+ self.stop(force_stop=True)
+ self.clean_bundle_conf()
+ self.setup_http(request.get_json(force=True))
+ return Response(json.dumps(self.get_conf()), mimetype='application/json', status=200)
+ except Exception as e:
+ self.logger.error(e)
+ return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)
+
+ return bp
+
+ def get_index(self):
+ home_page = '''*** Eventgen WSGI ***\nHost: {0}\nEventgen Status: {1}\nEventgen Config file exists: {2}\nEventgen Config file path: {3}\nTotal volume: {4}\nWorker Queue Status: {5}\nSample Queue Status: {6}\nOutput Queue Status: {7}\n'''
+ status = self.get_status()
+ eventgen_status = "running" if status["EVENTGEN_STATUS"] else "stopped"
+ host = status["EVENTGEN_HOST"]
+ configured = status["CONFIGURED"]
+ config_file = status["CONFIG_FILE"]
+ total_volume = status["TOTAL_VOLUME"]
+ worker_queue_status = status["QUEUE_STATUS"]["WORKER_QUEUE"]
+ sample_queue_status = status["QUEUE_STATUS"]["SAMPLE_QUEUE"]
+ output_queue_status = status["QUEUE_STATUS"]["OUTPUT_QUEUE"]
+ return home_page.format(host, eventgen_status, configured, config_file, total_volume, worker_queue_status,
+ sample_queue_status, output_queue_status)
+
+ def get_conf(self):
+ response = collections.OrderedDict()
+ if self.eventgen.configured:
+ config = ConfigParser.ConfigParser()
+ config.optionxform = str
+ config_path = self.eventgen.configfile
+ if os.path.isfile(config_path):
+ config.read(config_path)
+ for section in config.sections():
+ response[section] = collections.OrderedDict()
+ for k, v in config.items(section):
+ response[section][k] = v
+ return response
+
+ def set_conf(self, request_body):
+ config = ConfigParser.ConfigParser({}, collections.OrderedDict)
+ config.optionxform = str
+
+ for sample in request_body.iteritems():
+ config.add_section(sample[0])
+ for pair in sample[1].iteritems():
+ value = pair[1]
+ if type(value) == dict:
+ value = json.dumps(value)
+ config.set(sample[0], pair[0], value)
+
+ with open(eventgen_core_object.CUSTOM_CONFIG_PATH, 'w+') as conf_content:
+ config.write(conf_content)
+
+ self.eventgen.refresh_eventgen_core_object()
+
+ def edit_conf(self, request_body):
+ conf_dict = self.get_conf()
+
+ for stanza, kv_pairs in request_body.iteritems():
+ for key, value in kv_pairs.iteritems():
+ if stanza not in conf_dict.keys():
+ conf_dict[stanza] = {}
+ if stanza == "global" and key == "index":
+ for stanza, kv_pairs in conf_dict.iteritems():
+ conf_dict[stanza]["index"] = value
+ conf_dict[stanza][key] = value
+
+ self.set_conf(conf_dict)
+
+ def get_status(self):
+ response = dict()
+ if self.eventgen.eventgen_core_object.check_running():
+ status = 1 if not self.eventgen.eventgen_core_object.check_done() else 2 # 1 is running and 2 is done
+ else:
+ status = 0 # not start yet
+ response["EVENTGEN_STATUS"] = status
+ response["EVENTGEN_HOST"] = self.host
+ response["CONFIGURED"] = self.eventgen.configured
+ response["CONFIG_FILE"] = self.eventgen.configfile
+ response["TOTAL_VOLUME"] = self.total_volume
+ response["QUEUE_STATUS"] = {
+ 'SAMPLE_QUEUE': {
+ 'UNFINISHED_TASK': 'N/A',
+ 'QUEUE_LENGTH': 'N/A'},
+ 'OUTPUT_QUEUE': {
+ 'UNFINISHED_TASK': 'N/A',
+ 'QUEUE_LENGTH': 'N/A'},
+ 'WORKER_QUEUE': {
+ 'UNFINISHED_TASK': 'N/A',
+ 'QUEUE_LENGTH': 'N/A'}
+ }
+ response['THROUGHPUT_STATUS'] = self.get_throughput()
+ if hasattr(self.eventgen.eventgen_core_object, "sampleQueue"):
+ response["QUEUE_STATUS"]['SAMPLE_QUEUE']['UNFINISHED_TASK'] = self.eventgen.eventgen_core_object.sampleQueue.unfinished_tasks
+ response["QUEUE_STATUS"]['SAMPLE_QUEUE']['QUEUE_LENGTH'] = self.eventgen.eventgen_core_object.sampleQueue.qsize()
+ if hasattr(self.eventgen.eventgen_core_object, "outputQueue"):
+ try:
+ response["QUEUE_STATUS"]['OUTPUT_QUEUE']['UNFINISHED_TASK'] = self.eventgen.eventgen_core_object.outputQueue.unfinished_tasks
+ except:
+ response["QUEUE_STATUS"]['OUTPUT_QUEUE']['UNFINISHED_TASK'] = "N/A"
+ try:
+ response["QUEUE_STATUS"]['OUTPUT_QUEUE']['QUEUE_LENGTH'] = self.eventgen.eventgen_core_object.outputQueue.qsize()
+ except:
+ response["QUEUE_STATUS"]['OUTPUT_QUEUE']['QUEUE_LENGTH'] = "N/A"
+ if hasattr(self.eventgen.eventgen_core_object, "workerQueue"):
+ try:
+ response["QUEUE_STATUS"]['WORKER_QUEUE']['UNFINISHED_TASK'] = self.eventgen.eventgen_core_object.workerQueue.unfinished_tasks
+ except:
+ response["QUEUE_STATUS"]['WORKER_QUEUE']['UNFINISHED_TASK'] = "N/A"
+ try:
+ response["QUEUE_STATUS"]['WORKER_QUEUE']['QUEUE_LENGTH'] = self.eventgen.eventgen_core_object.workerQueue.qsize()
+ except:
+ response["QUEUE_STATUS"]['WORKER_QUEUE']['QUEUE_LENGTH'] = "N/A"
+ return response
+
+ def get_throughput(self):
+ empty_throughput = {'TOTAL_VOLUME_MB': 0, 'TOTAL_COUNT': 0, 'THROUGHPUT_VOLUME_KB': 0, 'THROUGHPUT_COUNT': 0}
+ if hasattr(self.eventgen.eventgen_core_object, 'output_counters'):
+ total_volume = 0
+ total_count = 0
+ throughput_volume = 0
+ throughput_count = 0
+ for output_counter in self.eventgen.eventgen_core_object.output_counters:
+ total_volume += output_counter.total_output_volume
+ total_count += output_counter.total_output_count
+ throughput_volume += output_counter.throughput_volume
+ throughput_count += output_counter.throughput_count
+ return {
+ 'TOTAL_VOLUME_MB': total_volume / (1024 * 1024),
+ 'TOTAL_COUNT': total_count,
+ 'THROUGHPUT_VOLUME_KB': throughput_volume / (1024),
+ 'THROUGHPUT_COUNT': throughput_count}
+ else:
+ return empty_throughput
+
+ def get_volume(self):
+ response = dict()
+ config = self.get_conf()
+ total_volume = 0.0
+ volume_distribution = {}
+ for stanza in config.keys():
+ if isinstance(config[stanza], dict) and "perDayVolume" in config[stanza].keys():
+ total_volume += float(config[stanza]["perDayVolume"])
+ volume_distribution[stanza] = float(config[stanza]["perDayVolume"])
+
+ if total_volume:
+ self.total_volume = total_volume
+ response['perDayVolume'] = self.total_volume
+ response['volume_distribution'] = volume_distribution
+ return response
+
+ def set_volume(self, target_volume):
+ conf_dict = self.get_conf()
+ if self.get_volume()['perDayVolume'] != 0:
+ ratio = float(target_volume) / float(self.total_volume)
+ for stanza, kv_pair in conf_dict.iteritems():
+ if isinstance(kv_pair, dict):
+ if '.*' not in stanza and "perDayVolume" in kv_pair.keys():
+ conf_dict[stanza]["perDayVolume"] = round(float(conf_dict[stanza]["perDayVolume"]) * ratio, 2)
+ else:
+ # If there is no total_volume existing, divide the volume equally into stanzas
+ stanza_num = len(conf_dict.keys())
+ if '.*' in conf_dict.keys():
+ stanza_num -= 1
+ if 'global' in conf_dict.keys():
+ stanza_num -= 1
+ divided_volume = float(target_volume) / stanza_num
+ for stanza, kv_pair in conf_dict.iteritems():
+ if isinstance(kv_pair, dict) and stanza != '.*' not in stanza:
+ conf_dict[stanza]["perDayVolume"] = divided_volume
+
+ self.set_conf(conf_dict)
+ self.total_volume = round(float(target_volume), 2)
+
+ def start(self):
+ response = {}
+ if not self.eventgen.configured:
+ response['message'] = "Eventgen is not configured."
+ elif self.eventgen.eventgen_core_object.check_running():
+ response['message'] = "Eventgen already started."
+ else:
+ self.eventgen.eventgen_core_object.start(join_after_start=False)
+ response['message'] = "Eventgen has successfully started."
+ return response
+
+ def stop(self, force_stop=False):
+ response = {}
+ if self.eventgen.eventgen_core_object.check_running():
+ try:
+ self.eventgen.eventgen_core_object.stop(force_stop=force_stop)
+ except:
+ pass
+ response['message'] = "Eventgen is stopped."
+ else:
+ response['message'] = "There is no Eventgen process running."
+ return response
+
+ def restart(self):
+ response = {}
+ if self.eventgen.eventgen_core_object.check_running():
+ self.reset()
+ self.start()
+ response['message'] = "Eventgen has successfully restarted."
+ else:
+ self.start()
+ response['message'] = "Eventgen was not running. Starting Eventgen."
+ return response
+
+ def reset(self):
+ response = {}
+ self.stop(force_stop=True)
+ time.sleep(0.1)
+ self.eventgen.refresh_eventgen_core_object()
+ self.get_volume()
+ response['message'] = "Eventgen has been reset."
+ return response
+
+ def set_bundle(self, url):
+ if not url:
+ return
+
+ bundle_dir = self.unarchive_bundle(self.download_bundle(url))
+
+ if os.path.isdir(os.path.join(bundle_dir, "samples")):
+ for file in glob.glob(os.path.join(bundle_dir, "samples", "*")):
+ shutil.copy(file, SAMPLE_DIR_PATH)
+ self.logger.info("Copied all samples to the sample directory.")
+
+ if os.path.isfile(os.path.join(bundle_dir, "default", "eventgen.conf")):
+ self.eventgen.configured = False
+ config = ConfigParser.ConfigParser()
+ config.optionxform = str
+ config.read(os.path.join(bundle_dir, "default", "eventgen.conf"))
+ config_dict = {s: collections.OrderedDict(config.items(s)) for s in config.sections()}
+ self.set_conf(config_dict)
+ self.eventgen.configured = True
+ self.logger.info("Configured Eventgen with the downloaded bundle.")
+
+
+ def download_bundle(self, url):
+ bundle_path = os.path.join(DEFAULT_PATH, "eg-bundle.tgz")
+ r = requests.get(url, stream=True)
+ with open(bundle_path, 'wb') as f:
+ for chunk in r.iter_content(chunk_size=None):
+ if chunk:
+ f.write(chunk)
+ r.close()
+ self.logger.info("Downloaded bundle to the path {}".format(bundle_path))
+ return bundle_path
+
+ def unarchive_bundle(self, path):
+ output = ''
+ if tarfile.is_tarfile(path):
+ tar = tarfile.open(path)
+ output = os.path.join(os.path.dirname(path), os.path.commonprefix(tar.getnames()))
+ tar.extractall(path=os.path.dirname(path))
+ tar.close()
+ elif zipfile.is_zipfile(path):
+ zipf = zipfile.ZipFile(path)
+ for info in zipf.infolist():
+ old_file_name = info.filename
+ if info.filename.find('/') == len(info.filename) - 1:
+ info.filename = "eg-bundle/"
+ else:
+ info.filename = "eg-bundle/" + info.filename[info.filename.find('/') + 1:]
+ zipf.extract(info, os.path.dirname(path))
+ output = os.path.join(os.path.dirname(path), 'eg-bundle')
+ zipf.close()
+ else:
+ msg = "Unknown archive format!"
+ raise Exception(msg)
+ self.logger.info("Unarchived bundle to the path {}".format(path))
+ return output
+
+ def clean_bundle_conf(self):
+ conf_dict = self.get_conf()
+
+ if ".*" not in conf_dict.keys():
+ conf_dict['.*'] = {}
+
+ # 1. Remove sampleDir from individual stanza and set a global sampleDir
+ # 2. Change token sample path to a local sample path
+ for stanza, kv_pair in conf_dict.iteritems():
+ if stanza != ".*":
+ if 'sampleDir' in kv_pair:
+ del kv_pair['sampleDir']
+
+ for key, value in kv_pair.iteritems():
+ if 'replacementType' in key and value in ['file', 'mvfile', 'seqfile']:
+ token_num = key[key.find('.')+1:key.rfind('.')]
+ if not token_num: continue
+ else:
+ existing_path = kv_pair['token.{}.replacement'.format(token_num)]
+ kv_pair['token.{}.replacement'.format(token_num)] = os.path.join(SAMPLE_DIR_PATH, existing_path[existing_path.rfind('/')+1:])
+
+ conf_dict['.*']['sampleDir'] = SAMPLE_DIR_PATH
+ self.set_conf(conf_dict)
+
+ def setup_http(self, data):
+ if data.get("servers"):
+ conf_dict = self.get_conf()
+ if 'global' not in conf_dict.keys():
+ conf_dict['global'] = {}
+ for stanza, kv_pair in conf_dict.iteritems():
+ if 'outputMode' in kv_pair:
+ del kv_pair['outputMode']
+ if 'httpeventServers' in kv_pair:
+ del kv_pair['httpeventServers']
+ conf_dict['global']['threading'] = 'process'
+ conf_dict['global']['httpeventMaxPayloadSize'] = '256000'
+ conf_dict['global']['outputMode'] = 'httpevent'
+ conf_dict['global']['httpeventServers'] = {"servers": data.get("servers")}
+ self.set_conf(conf_dict)
+ else:
+ # If hec_servers information doesn't exist, do service discovery
+ mode = data.get("mode", "roundrobin")
+ hostname_template = data.get("hostname_template", "idx{0}")
+ hosts = data.get("other_hosts", [])
+ protocol = data.get("protocol", "https")
+ key = data.get("key", "00000000-0000-0000-0000-000000000000")
+ key_name = data.get("key_name", "eventgen") + '_' + self.host
+ password = data.get("password", "Chang3d!")
+ hec_port = int(data.get("hec_port", 8088))
+ mgmt_port = int(data.get("mgmt_port", 8089))
+ new_key = bool(data.get("new_key", True))
+
+ def create_new_hec_key(hostname):
+ requests.post(
+ "https://{0}:{1}/servicesNS/admin/splunk_httpinput/data/inputs/http/http".format(
+ hostname, mgmt_port), auth=("admin", password), data={"disabled": "0"}, verify=False)
+ requests.delete(
+ "https://{0}:{1}/servicesNS/admin/splunk_httpinput/data/inputs/http/{2}".format(
+ hostname, mgmt_port, key_name), verify=False, auth=("admin", password))
+ requests.post(
+ "https://{0}:{1}/servicesNS/admin/splunk_httpinput/data/inputs/http?output_mode=json".format(
+ hostname, mgmt_port), verify=False, auth=("admin", password), data={"name": key_name})
+ r = requests.post(
+ "https://{0}:{1}/servicesNS/admin/splunk_httpinput/data/inputs/http/{2}?output_mode=json".format(
+ hostname, mgmt_port, key_name), verify=False, auth=("admin", password))
+ return str(json.loads(r.text)["entry"][0]["content"]["token"])
+
+ self.discovered_servers = []
+ for host in hosts:
+ try:
+ formatted_hostname = socket.gethostbyname(host)
+ if new_key:
+ key = create_new_hec_key(formatted_hostname)
+ except (socket.gaierror, requests.ConnectionError):
+ self.logger.warning('failed to reach %s, skip...' % host)
+ continue
+ except (ValueError, KeyError):
+ self.logger.warning('failed to setup hec token for %s, skip...' % host)
+ continue
+
+ self.discovered_servers.append({"protocol": str(protocol), "address": str(formatted_hostname), "port": str(hec_port), "key": str(key)})
+
+ counter = 1
+ while True:
+ try:
+ formatted_hostname = socket.gethostbyname(hostname_template.format(counter))
+ if new_key:
+ key = create_new_hec_key(formatted_hostname)
+
+ self.discovered_servers.append({
+ "protocol": str(protocol), "address": str(formatted_hostname), "port": str(hec_port), "key":
+ str(key)})
+ counter += 1
+ except socket.gaierror:
+ break
+
+ conf_dict = self.get_conf()
+ if 'global' not in conf_dict.keys():
+ conf_dict['global'] = {}
+ for stanza, kv_pair in conf_dict.iteritems():
+ if 'outputMode' in kv_pair:
+ del kv_pair['outputMode']
+ if 'httpeventServers' in kv_pair:
+ del kv_pair['httpeventServers']
+ conf_dict['global']['threading'] = 'process'
+ conf_dict['global']['httpeventMaxPayloadSize'] = '256000'
+ conf_dict['global']['outputMode'] = 'httpevent'
+ conf_dict['global']['httpeventServers'] = {"servers": self.discovered_servers}
+ self.set_conf(conf_dict)
diff --git a/splunk_eventgen/eventgen_api_server/redis_connector.py b/splunk_eventgen/eventgen_api_server/redis_connector.py
new file mode 100644
index 00000000..660add01
--- /dev/null
+++ b/splunk_eventgen/eventgen_api_server/redis_connector.py
@@ -0,0 +1,53 @@
+import redis
+import logging
+import time
+
+class RedisConnector():
+
+ def __init__(self, host, port):
+ self.host = host
+ self.port = port
+ self.logger = logging.getLogger('eventgen_server')
+ self.members_connection = redis.Redis(host=self.host, port=int(self.port), db=0)
+ self.message_connection = redis.Redis(host=self.host, port=int(self.port), db=1)
+ self.pubsub = self.message_connection.pubsub()
+ self.logger.info("Initialized RedisConnector")
+ self.servers_channel = 'servers_channel'
+ self.controller_channel = 'controller_channel'
+ self.retry_time_list = [5, 10, 20, 30, 60, 0]
+
+ def register_myself(self, hostname, role="server"):
+ for retry_time in self.retry_time_list:
+ try:
+ if role == "server":
+ self.members_connection.sadd("servers", hostname)
+ self.pubsub.subscribe(self.servers_channel)
+ else:
+ self.members_connection.set("controller", hostname)
+ self.pubsub.subscribe(self.controller_channel)
+ self.logger.info("Registered as {} and subscribed the channel.".format(role))
+ return
+ except:
+ self.logger.warning("Could not connect to Redis at {}:{}. Retrying in {} seconds.".format(self.host, self.port, retry_time))
+ if not retry_time:
+ raise Exception("Failed to connect to Redis.")
+ time.sleep(retry_time)
+ continue
+
+ def get_registered_servers(self):
+ for retry_time in self.retry_time_list:
+ try:
+ servers = list(self.members_connection.smembers("servers"))
+ self.logger.info("Registered Servers: {}".format(servers))
+ return servers
+ except:
+ self.logger.warning("Could not connect to Redis at {}:{}. Retrying in {} seconds.".format(self.host, self.port, retry_time))
+ if not retry_time:
+ raise Exception("Failed to connect to Redis.")
+ time.sleep(retry_time)
+ continue
+
+
+
+
+
diff --git a/splunk_eventgen/eventgen_core.py b/splunk_eventgen/eventgen_core.py
index 4088d2b2..70a93d0a 100644
--- a/splunk_eventgen/eventgen_core.py
+++ b/splunk_eventgen/eventgen_core.py
@@ -6,6 +6,7 @@
import os
import sys
import time
+import signal
from Queue import Empty, Queue
from threading import Thread
@@ -44,6 +45,7 @@ def __init__(self, args=None):
:param args: __main__ parse_args() object.
'''
self.stopping = False
+ self.force_stop = False
self.started = False
self.completed = False
self.config = None
@@ -97,6 +99,7 @@ def _load_config(self, configfile, **kwargs):
new_args["verbosity"] = args.verbosity
self.config = Config(configfile, **new_args)
self.config.parse()
+ self.args.multiprocess = True if self.config.threading == "process" else self.args.multiprocess
self._reload_plugins()
if "args" in kwargs and getattr(kwargs["args"], "generators"):
generator_worker_count = kwargs["args"].generators
@@ -104,6 +107,8 @@ def _load_config(self, configfile, **kwargs):
generator_worker_count = self.config.generatorWorkers
# TODO: Probably should destroy pools better so processes are cleaned.
+ if self.args.multiprocess:
+ self.kill_processes()
self._setup_pools(generator_worker_count)
def _reload_plugins(self):
@@ -197,7 +202,7 @@ def _create_generator_pool(self, workercount=20):
has over 10 generators working, additional samples won't run until the first ones end.
:return:
'''
- if self.args.multiprocess:
+ if self.args.multiprocess:
import multiprocessing
self.manager = multiprocessing.Manager()
if self.config.disableLoggingQueue:
@@ -283,6 +288,8 @@ def _generator_do_work(self, work_queue, logging_queue, output_counter=None):
except Empty:
pass
except Exception as e:
+ if self.force_stop:
+ break
self.logger.exception(str(e))
raise e
@@ -327,6 +334,8 @@ def logger_thread(self, loggingQueue):
except Empty:
pass
except Exception as e:
+ if self.force_stop:
+ break
self.logger.exception(str(e))
raise e
@@ -445,30 +454,39 @@ def join_process(self):
self.logger.exception(str(e))
raise e
- def stop(self):
+ def stop(self, force_stop=False):
# empty the sample queue:
self.config.stopping = True
self.stopping = True
+ self.force_stop = force_stop
self.logger.info("All timers exited, joining generation queue until it's empty.")
+ if force_stop:
+ self.logger.info("Forcibly stopping Eventgen: Deleting workerQueue.")
+ del self.workerQueue
+ self._create_generator_pool()
self.workerQueue.join()
# if we're in multiprocess, make sure we don't add more generators after the timers stopped.
if self.args.multiprocess:
- self.genconfig["stopping"] = True
- for worker in self.workerPool:
- count = 0
- # We wait for a minute until terminating the worker
- while worker.exitcode is None and count != 20:
- if count == 30:
- self.logger.info("Terminating worker {0}".format(worker._name))
- worker.terminate()
- count = 0
- break
- self.logger.info("Worker {0} still working, waiting for it to finish.".format(worker._name))
- time.sleep(2)
- count += 1
+ if force_stop:
+ self.kill_processes()
+ else:
+ self.genconfig["stopping"] = True
+ for worker in self.workerPool:
+ count = 0
+ # We wait for a minute until terminating the worker
+ while worker.exitcode is None and count != 20:
+ if count == 30:
+ self.logger.info("Terminating worker {0}".format(worker._name))
+ worker.terminate()
+ count = 0
+ break
+ self.logger.info("Worker {0} still working, waiting for it to finish.".format(worker._name))
+ time.sleep(2)
+ count += 1
self.logger.info("All generators working/exited, joining output queue until it's empty.")
- self.outputQueue.join()
+ if not self.args.multiprocess and not force_stop:
+ self.outputQueue.join()
self.logger.info("All items fully processed. Cleaning up internal processes.")
self.started = False
self.stopping = False
@@ -484,21 +502,28 @@ def reload_conf(self, configfile):
def check_running(self):
'''
-
:return: if eventgen is running, return True else False
'''
if hasattr(self, "outputQueue") and hasattr(self, "sampleQueue") and hasattr(self, "workerQueue"):
# If all queues are not empty, eventgen is running.
# If all queues are empty and all tasks are finished, eventgen is not running.
# If all queues are empty and there is an unfinished task, eventgen is running.
- if self.outputQueue.empty() and self.sampleQueue.empty() and self.workerQueue.empty() \
- and self.sampleQueue.unfinished_tasks <= 0 \
- and self.outputQueue.unfinished_tasks <= 0 \
- and self.workerQueue.unfinished_tasks <= 0:
- self.logger.info("Queues are all empty and there are no pending tasks")
- return self.started
+ if not self.args.multiprocess:
+ if self.outputQueue.empty() and self.sampleQueue.empty() and self.workerQueue.empty() \
+ and self.sampleQueue.unfinished_tasks <= 0 \
+ and self.outputQueue.unfinished_tasks <= 0 \
+ and self.workerQueue.unfinished_tasks <= 0:
+ self.logger.info("Queues are all empty and there are no pending tasks")
+ return self.started
+ else:
+ return True
else:
- return True
+ if self.outputQueue.empty() and self.sampleQueue.empty() and self.workerQueue.empty() \
+ and self.sampleQueue.unfinished_tasks <= 0:
+ self.logger.info("Queues are all empty and there are no pending tasks")
+ return self.started
+ else:
+ return True
return False
def check_done(self):
@@ -507,3 +532,16 @@ def check_done(self):
:return: if eventgen jobs are finished, return True else False
'''
return self.sampleQueue.empty() and self.sampleQueue.unfinished_tasks <= 0 and self.workerQueue.empty() and self.workerQueue.unfinished_tasks <= 0
+
+ def kill_processes(self):
+ try:
+ if self.args.multiprocess:
+ for worker in self.workerPool:
+ try: os.kill(int(worker.pid), signal.SIGKILL)
+ except: continue
+ del self.outputQueue
+ self.manager.shutdown()
+ except:
+ pass
+
+
\ No newline at end of file
diff --git a/splunk_eventgen/eventgen_nameko_controller.py b/splunk_eventgen/eventgen_nameko_controller.py
deleted file mode 100644
index 6ea271ee..00000000
--- a/splunk_eventgen/eventgen_nameko_controller.py
+++ /dev/null
@@ -1,525 +0,0 @@
-import atexit
-import json
-import logging
-import os
-import socket
-import time
-
-from pyrabbit.api import Client
-
-from logger.logger_config import controller_logger_config
-from nameko.events import BROADCAST, EventDispatcher, event_handler
-from nameko.rpc import rpc
-from nameko.web.handlers import http
-
-FILE_PATH = os.path.dirname(os.path.realpath(__file__))
-EVENTGEN_ENGINE_CONF_PATH = os.path.abspath(os.path.join(FILE_PATH, "default", "eventgen_engine.conf"))
-
-
-def exit_handler(client, hostname, logger):
- client.delete_vhost(hostname)
- logger.info("Deleted vhost {}. Shutting down.".format(hostname))
-
-
-class EventgenController(object):
- name = "eventgen_controller"
-
- dispatch = EventDispatcher()
- PAYLOAD = 'Payload'
- logging.config.dictConfig(controller_logger_config)
- log = logging.getLogger(name)
- log.info("Logger set as eventgen_controller")
- host = socket.gethostname() + '_controller'
-
- server_status = {}
- server_confs = {}
- server_volumes = {}
-
- osvars, config = dict(os.environ), {}
- config["AMQP_HOST"] = osvars.get("EVENTGEN_AMQP_HOST", "localhost")
- config["AMQP_WEBPORT"] = osvars.get("EVENTGEN_AMQP_WEBPORT", 15672)
- config["AMQP_USER"] = osvars.get("EVENTGEN_AMQP_URI", "guest")
- config["AMQP_PASS"] = osvars.get("EVENTGEN_AMQP_PASS", "guest")
-
- pyrabbit_cl = Client('{0}:{1}'.format(config['AMQP_HOST'], config['AMQP_WEBPORT']),
- '{0}'.format(config['AMQP_USER']), '{0}'.format(config['AMQP_PASS']))
- pyrabbit_cl.create_vhost(host)
- log.info("Vhost set to {}".format(host))
- log.info("Current Vhosts are {}".format(pyrabbit_cl.get_vhost_names()))
-
- atexit.register(exit_handler, client=pyrabbit_cl, hostname=host, logger=log)
-
- # RPC Methods
-
- @event_handler("eventgen_server", "server_status", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_server_status(self, payload):
- return self.receive_status(payload)
-
- @event_handler("eventgen_server", "server_conf", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_server_conf(self, payload):
- return self.receive_conf(payload)
-
- @event_handler("eventgen_server", "server_volume", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_get_volume(self, payload):
- return self.receive_volume(payload)
-
- @rpc
- def index(self, target):
- try:
- if target == "all":
- self.dispatch("all_index", self.PAYLOAD)
- else:
- self.dispatch("{}_index".format(target), self.PAYLOAD)
- msg = "Index event dispatched to {}".format(target)
- self.log.info(msg)
- return msg
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- @rpc
- def status(self, target):
- try:
- if target == "all":
- self.dispatch("all_status", self.PAYLOAD)
- else:
- self.dispatch("{}_status".format(target), self.PAYLOAD)
- msg = "Status event dispatched to {}".format(target)
- self.log.info(msg)
- return msg
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- @rpc
- def start(self, target):
- try:
- if target == "all":
- self.dispatch("all_start", self.PAYLOAD)
- else:
- self.dispatch("{}_start".format(target), self.PAYLOAD)
- msg = "Start event dispatched to {}".format(target)
- self.log.info(msg)
- return msg
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- @rpc
- def stop(self, target):
- try:
- if target == "all":
- self.dispatch("all_stop", self.PAYLOAD)
- else:
- self.dispatch("{}_stop".format(target), self.PAYLOAD)
- msg = "Stop event dispatched to {}".format(target)
- self.log.info(msg)
- return msg
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- @rpc
- def restart(self, target):
- try:
- if target == "all":
- self.dispatch("all_restart", self.PAYLOAD)
- else:
- self.dispatch("{}_restart".format(target), self.PAYLOAD)
- msg = "Restart event dispatched to {}".format(target)
- self.log.info(msg)
- return msg
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- @rpc
- def get_conf(self, target):
- try:
- if target == "all":
- self.dispatch("all_get_conf", self.PAYLOAD)
- else:
- self.dispatch("{}_get_conf".format(target), self.PAYLOAD)
- msg = "Get_conf event dispatched to {}".format(target)
- self.log.info(msg)
- return msg
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- @rpc
- def set_conf(self, target, data):
- try:
- payload = data
- if target == "all":
- self.dispatch("all_set_conf", payload)
- else:
- self.dispatch("{}_set_conf".format(target), payload)
- msg = "Set_conf event dispatched to {}".format(target)
- self.log.info(msg)
- return msg
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- @rpc
- def edit_conf(self, target, data):
- try:
- payload = data
- if target == "all":
- self.dispatch("all_edit_conf", payload)
- else:
- self.dispatch("{}_edit_conf".format(target), payload)
- msg = "Edit_conf event dispatched to {}".format(target)
- self.log.info(msg)
- return msg
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- @rpc
- def bundle(self, target, data):
- try:
- data = json.loads(data)
- url = data["url"]
- self.dispatch("{}_bundle".format(target), {"url": url})
- msg = "Bundle event dispatched to {} with url {}".format(target, url)
- self.log.info(msg)
- return msg
- except Exception as e:
- self.log.exception(str(e))
- return "500", "Exception: {}".format(e.message)
-
- @rpc
- def setup(self, target, data):
- try:
- self.dispatch("{}_setup".format(target), data)
- msg = "Setup event dispatched to {}.".format(target)
- self.log.info(msg)
- return msg
- except Exception as e:
- self.log.exception(str(e))
- return "500", "Exception: {}".format(e.message)
-
- @rpc
- def get_volume(self, target):
- try:
- self.dispatch("{}_get_volume".format(target), self.PAYLOAD)
- msg = "get_volume event dispatched to {}.".format(target)
- self.log.info(msg)
- return msg
- except Exception as e:
- self.log.exception(str(e))
- return "500", "Exception: {}".format(e.message)
-
- @rpc
- def set_volume(self, target, data):
- try:
- data = json.loads(data)
- volume = data["perDayVolume"]
- self.dispatch("{}_set_volume".format(target), {"perDayVolume": volume})
- msg = "set_volume event dispatched to {}.".format(target)
- self.log.info(msg)
- return msg
- except Exception as e:
- self.log.exception(str(e))
- return "500", "Exception: {}".format(e.message)
-
- @rpc
- def reset(self, target):
- try:
- if target == "all":
- self.dispatch("all_reset", self.PAYLOAD)
- else:
- self.dispatch("{}_reset".format(target), self.PAYLOAD)
- msg = "Reset event dispatched to {}".format(target)
- self.log.info(msg)
- return msg
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- # HTTP Methods
-
- @http('GET', '/')
- def root_page(self, request):
- self.log.info("index method called")
- home_page = '''*** Eventgen Controller ***
-Host: {0}
-Connected Servers: {1}
-You are running Eventgen Controller.\n'''
- host = socket.gethostname()
- return home_page.format(host, self.get_current_server_vhosts())
-
- @http('GET', '/index')
- def http_index(self, request):
- self.index(target="all")
- return self.root_page(request)
-
- @http('GET', '/status')
- def http_status(self, request):
- current_time = time.time()
- self.log.info("call http_status, current time:{}".format(current_time))
- self.status("all")
- return json.dumps(self.process_server_status(current_time), indent=4)
-
- @http('GET', '/status/')
- def http_status_target(self, request, target="all"):
- if self.check_vhost(target):
- self.status(target=target)
- return json.dumps(self.process_server_status()[target], indent=4)
- else:
- return 404, json.dumps("Target not available.", indent=4)
-
- @http('POST', '/start')
- def http_start(self, request):
- return self.start(target="all")
-
- @http('POST', '/start/')
- def http_start_target(self, request, target="all"):
- if self.check_vhost(target):
- return self.start(target=target)
- else:
- return 404, json.dumps("Target not available.", indent=4)
-
- @http('POST', '/stop')
- def http_stop(self, request):
- return self.stop(target="all")
-
- @http('POST', '/stop/')
- def http_stop_target(self, request, target="all"):
- if self.check_vhost(target):
- return self.stop(target=target)
- else:
- return 404, json.dumps("Target not available.", indent=4)
-
- @http('POST', '/restart')
- def http_restart(self, request):
- return self.restart(target="all")
-
- @http('POST', '/restart/')
- def http_restart_target(self, request, target="all"):
- if self.check_vhost(target):
- return self.restart(target=target)
- else:
- return 404, json.dumps("Target not available.", indent=4)
-
- @http('GET', '/conf')
- def http_get_conf(self, request):
- self.get_conf("all")
- return json.dumps(self.process_server_confs(), indent=4)
-
- @http('GET', '/conf/')
- def http_get_conf_target(self, request, target="all"):
- if self.check_vhost(target):
- self.get_conf(target=target)
- processed_server_confs = self.process_server_confs()
- try:
- return json.dumps(processed_server_confs[target], indent=4)
- except:
- return json.dumps({}, indent=4)
- else:
- return 404, json.dumps("Target not available.", indent=4)
-
- @http('POST', '/conf')
- def http_set_conf(self, request):
- data = request.get_data()
- if data:
- self.set_conf(target="all", data=data)
- return self.http_get_conf(request)
- else:
- return 400, 'Please pass valid config data.'
-
- @http('POST', '/conf/')
- def http_set_conf_target(self, request, target):
- data = request.get_data()
- if data:
- if self.check_vhost(target):
- self.set_conf(target=target, data=data)
- return self.http_get_conf_target(request, target)
- else:
- return 404, json.dumps("Target not available.", indent=4)
- else:
- return 400, 'Please pass valid config data.'
-
- @http('PUT', '/conf')
- def http_edit_conf(self, request):
- data = request.get_data()
- if data:
- self.edit_conf(target="all", data=data)
- return self.http_get_conf(request)
- else:
- return 400, 'Please pass valid config data.'
-
- @http('PUT', '/conf/')
- def http_edit_conf_target(self, request, target):
- data = request.get_data()
- if data:
- if self.check_vhost(target):
- self.edit_conf(target=target, data=data)
- return self.http_get_conf_target(request, target)
- else:
- return 404, json.dumps("Target not available.", indent=4)
- else:
- return 400, 'Please pass valid config data.'
-
- @http('POST', '/bundle')
- def http_bundle(self, request):
- data = request.get_data(as_text=True)
- if data:
- return self.bundle(target="all", data=data)
- else:
- return 400, "Please pass in a valid object with bundle URL."
-
- @http('POST', '/bundle/')
- def http_bundle_target(self, request, target):
- data = request.get_data(as_text=True)
- if data:
- if self.check_vhost(target):
- return self.bundle(target=target, data=data)
- else:
- return 404, json.dumps("Target not available.", indent=4)
- else:
- return 400, "Please pass in a valid object with bundle URL."
-
- @http('POST', '/setup')
- def http_setup(self, request):
- data = request.get_data(as_text=True)
- self.setup(target="all", data=data)
- return self.http_get_conf(request)
-
- @http('POST', '/setup/')
- def http_setup_target(self, request, target):
- data = request.get_data(as_text=True)
- if self.check_vhost(target):
- self.setup(target=target, data=data)
- return self.http_get_conf_target(request, target)
- else:
- return 404, json.dumps("Target not available.", indent=4)
-
- @http('GET', '/volume')
- def http_get_volume(self, request):
- self.get_volume(target="all")
- return json.dumps(self.process_server_volumes(), indent=4)
-
- @http('GET', '/volume/')
- def http_get_volume_target(self, request, target="all"):
- if self.check_vhost(target):
- self.get_volume(target=target)
- processed_server_confs = self.process_server_volumes()
- try:
- return json.dumps(processed_server_confs[target], indent=4)
- except:
- return json.dumps({}, indent=4)
- else:
- return 404, json.dumps("Target not available.", indent=4)
-
- @http('POST', '/volume')
- def http_set_volume(self, request):
- data = request.get_data(as_text=True)
- if data:
- return self.set_volume(target="all", data=data)
- else:
- return 400, "Please pass in a valid object with volume."
-
- @http('POST', '/volume/')
- def http_set_volume_target(self, request, target="all"):
- data = request.get_data(as_text=True)
- if data:
- if self.check_vhost(target):
- return self.set_volume(target=target, data=data)
- else:
- return 404, json.dumps("Target not available.", indent=4)
- else:
- return 400, "Please pass in a valid object with volume."
-
- @http('POST', '/reset')
- def http_reset(self, request):
- return self.reset(target="all")
-
- # Helper Methods
-
- def receive_status(self, data):
- if data['server_name'] and data['server_status']:
- self.server_status[data['server_name']] = data['server_status']
- rec_time = time.time()
- self.log.info("receive {}'s status, update the status at time:{}".format(data['server_name'], rec_time))
- self.server_status['time'] = rec_time
-
- def receive_conf(self, data):
- if data['server_name']:
- self.server_confs[data['server_name']] = data['server_conf']
-
- def receive_volume(self, data):
- if data['server_name'] and "total_volume" in data:
- self.server_volumes[data['server_name']] = data['total_volume']
-
- def process_server_status(self, current_time, num_retries=15, delay=0.3):
- current_server_vhosts = self.get_current_server_vhosts()
- server_time = self.server_status['time'] if 'time' in self.server_status else 0
- server_vhost_len = len(self.server_status) if 'time' not in self.server_status else len(self.server_status) - 1
- if current_server_vhosts:
- for i in range(num_retries):
- if server_vhost_len != len(current_server_vhosts) or server_time < current_time:
- time.sleep(delay)
- current_server_vhosts = self.get_current_server_vhosts()
- server_time = self.server_status['time'] if 'time' in self.server_status else 0
- server_vhost_len = len(
- self.server_status) if 'time' not in self.server_status else len(self.server_status) - 1
- else:
- break
- dump_value = self.calculate_throughput(self.server_status)
- else:
- dump_value = {}
- self.server_status = {}
- return dump_value
-
- def process_server_confs(self, num_retries=15, delay=0.3):
- current_server_vhosts = self.get_current_server_vhosts()
- if current_server_vhosts:
- for i in range(num_retries):
- if len(self.server_confs) != len(current_server_vhosts):
- time.sleep(delay)
- current_server_vhosts = self.get_current_server_vhosts()
- dump_value = self.server_confs
- else:
- dump_value = {}
- self.server_confs = {}
- return dump_value
-
- def process_server_volumes(self, num_retries=15, delay=0.3):
- current_server_vhosts = self.get_current_server_vhosts()
- if current_server_vhosts:
- for i in range(num_retries):
- if len(self.server_volumes) != len(current_server_vhosts):
- time.sleep(delay)
- current_server_vhosts = self.get_current_server_vhosts()
- dump_value = self.server_volumes
- else:
- dump_value = {}
- self.server_volumes = {}
- return dump_value
-
- def get_current_server_vhosts(self):
- current_vhosts = self.pyrabbit_cl.get_vhost_names()
- return [name for name in current_vhosts if name != '/' and name != self.host]
-
- def check_vhost(self, vhost_name):
- current_server_vhosts = self.get_current_server_vhosts()
- if vhost_name in current_server_vhosts:
- return True
- else:
- return False
-
- def calculate_throughput(self, data):
- throughput_summary = {'TOTAL_VOLUME_MB': 0, 'TOTAL_COUNT': 0, 'THROUGHPUT_VOLUME_KB': 0, 'THROUGHPUT_COUNT': 0}
- for server_name, server_status in data.items():
- if server_name != 'time' and 'THROUGHPUT_STATUS' in server_status:
- server_throughput = server_status['THROUGHPUT_STATUS']
- throughput_summary['TOTAL_VOLUME_MB'] += server_throughput['TOTAL_VOLUME_MB']
- throughput_summary['TOTAL_COUNT'] += server_throughput['TOTAL_COUNT']
- throughput_summary['THROUGHPUT_VOLUME_KB'] += server_throughput['THROUGHPUT_VOLUME_KB']
- throughput_summary['THROUGHPUT_COUNT'] += server_throughput['THROUGHPUT_COUNT']
- data['THROUGHTPUT_SUMMARY'] = throughput_summary
- self.log.debug("throughput summary: {}".format(throughput_summary))
- return data
diff --git a/splunk_eventgen/eventgen_nameko_dependency.py b/splunk_eventgen/eventgen_nameko_dependency.py
deleted file mode 100644
index cfb2c65c..00000000
--- a/splunk_eventgen/eventgen_nameko_dependency.py
+++ /dev/null
@@ -1,66 +0,0 @@
-import argparse
-import logging
-import os
-import sys
-
-import eventgen_core
-from nameko.extensions import DependencyProvider
-
-FILE_PATH = os.path.dirname(os.path.realpath(__file__))
-CUSTOM_CONFIG_PATH = os.path.realpath(os.path.join(FILE_PATH, "default", "eventgen_wsgi.conf"))
-
-# For some reason, the args from __main__ get passed to eventgen_nameko_dependency and causes this error:
-# usage: eventgen_nameko_dependency [-h]
-# eventgen_nameko_dependency: error: unrecognized arguments: --role master --config server_conf.yml
-sys.argv = [sys.argv.pop(0)]
-
-
-def create_args():
- parser = argparse.ArgumentParser(prog="eventgen_nameko_dependency")
- args = parser.parse_args()
- args.daemon = False
- args.version = False
- args.backfill = None
- args.count = None
- args.devnull = False
- args.disableOutputQueue = False
- args.end = None
- args.generators = None
- args.interval = None
- args.keepoutput = False
- args.modinput = False
- args.multiprocess = False
- args.outputters = None
- args.profiler = False
- args.sample = None
- args.version = False
- args.subcommand = 'generate'
- args.verbosity = 20
- args.wsgi = True
- args.modinput_mode = False
- return args
-
-
-class EventgenDependency(DependencyProvider):
-
- arguments = create_args()
- eventgen = eventgen_core.EventGenerator(arguments)
- log = logging.getLogger('eventgen_dependency')
- log.info("EventgenDependency Init. Memory reference to eventgen object: {}".format(eventgen))
-
- configured = False
- configfile = 'N/A'
-
- if os.path.isfile(CUSTOM_CONFIG_PATH):
- configured = True
- configfile = CUSTOM_CONFIG_PATH
- eventgen.reload_conf(CUSTOM_CONFIG_PATH)
-
- def get_dependency(self, worker_ctx):
- return self
-
- def refresh_eventgen(self):
- self.eventgen = eventgen_core.EventGenerator(self.arguments)
- self.configured = False
- self.configfile = 'N/A'
- self.log.info("Refreshed Eventgen Object: {}".format(self.eventgen))
diff --git a/splunk_eventgen/eventgen_nameko_server.py b/splunk_eventgen/eventgen_nameko_server.py
deleted file mode 100644
index dbcf422a..00000000
--- a/splunk_eventgen/eventgen_nameko_server.py
+++ /dev/null
@@ -1,759 +0,0 @@
-import atexit
-import ConfigParser
-import glob
-import json
-import logging
-import os
-import shutil
-import socket
-import tarfile
-import time
-import zipfile
-
-import requests
-import yaml
-from pyrabbit.api import Client
-
-import eventgen_nameko_dependency
-from nameko.events import BROADCAST, EventDispatcher, event_handler
-from nameko.rpc import rpc
-from nameko.web.handlers import http
-
-FILE_PATH = os.path.dirname(os.path.realpath(__file__))
-EVENTGEN_DIR = os.path.realpath(os.path.join(FILE_PATH, ".."))
-CUSTOM_CONFIG_PATH = os.path.realpath(os.path.join(FILE_PATH, "default", "eventgen_wsgi.conf"))
-EVENTGEN_DEFAULT_CONF_PATH = os.path.abspath(os.path.join(FILE_PATH, "default", "eventgen.conf"))
-
-
-def get_eventgen_name_from_conf():
- with open(os.path.abspath(os.path.join(FILE_PATH, "server_conf.yml"))) as config_yml:
- loaded_yml = yaml.load(config_yml)
- return loaded_yml['EVENTGEN_NAME'] if 'EVENTGEN_NAME' in loaded_yml else socket.gethostname()
-
-
-def exit_handler(client, hostname, logger):
- client.delete_vhost(hostname)
- logger.info("Deleted vhost {}. Shutting down.".format(hostname))
-
-
-class EventgenServer(object):
- name = "eventgen_server"
-
- dispatch = EventDispatcher()
-
- eventgen_dependency = eventgen_nameko_dependency.EventgenDependency()
- eventgen_name = get_eventgen_name_from_conf()
- host = socket.gethostname()
- log = logging.getLogger(name)
- log.info("Eventgen name is set to [{}] at host [{}]".format(eventgen_name, host))
-
- osvars, config = dict(os.environ), {}
- config["AMQP_HOST"] = osvars.get("EVENTGEN_AMQP_HOST", "localhost")
- config["AMQP_WEBPORT"] = osvars.get("EVENTGEN_AMQP_WEBPORT", 15672)
- config["AMQP_USER"] = osvars.get("EVENTGEN_AMQP_URI", "guest")
- config["AMQP_PASS"] = osvars.get("EVENTGEN_AMQP_PASS", "guest")
-
- pyrabbit_cl = Client('{0}:{1}'.format(config['AMQP_HOST'], config['AMQP_WEBPORT']),
- '{0}'.format(config['AMQP_USER']), '{0}'.format(config['AMQP_PASS']))
- pyrabbit_cl.create_vhost(host)
- log.info("Vhost set to {}".format(host))
-
- atexit.register(exit_handler, client=pyrabbit_cl, hostname=host, logger=log)
- total_volume = 0.0
-
- def get_status(self):
- '''
- Get status of eventgen
-
- return value structure
- {
- "EVENTGEN_STATUS" :
- "EVENTGEN_HOST" :
- "CONFIGURED" :
- "CONFIG_FILE" :
- "TOTAL_VOLUME" :
- "QUEUE_STATUS" : { "SAMPLE_QUEUE": {'UNFISHED_TASK': , 'QUEUE_LENGTH': },
- "OUTPUT_QUEUE": {'UNFISHED_TASK': , 'QUEUE_LENGTH': },
- "WORKER_QUEUE": {'UNFISHED_TASK': , 'QUEUE_LENGTH': }}
- "THROUGHPUT_STATUS": { "TOTAL_VOLUME_MB": '',
- "TOTAL_COUNT": '',
- "THROUGHPUT_VOLUME_KB": '',
- "THROUGHPUT_COUNT": ''}
- }
- '''
- res = dict()
- if self.eventgen_dependency.eventgen.check_running():
- if self.eventgen_dependency.eventgen.check_done():
- # all jobs completed
- status = 2
- else:
- # still running
- status = 1
- else:
- # not start yet
- status = 0
- res["EVENTGEN_STATUS"] = status
- res["EVENTGEN_HOST"] = self.host
- res["CONFIGURED"] = self.eventgen_dependency.configured
- res["CONFIG_FILE"] = self.eventgen_dependency.configfile
- res["TOTAL_VOLUME"] = self.total_volume
- res["QUEUE_STATUS"] = {
- 'SAMPLE_QUEUE': {'UNFINISHED_TASK': 'N/A', 'QUEUE_LENGTH': 'N/A'}, 'OUTPUT_QUEUE': {
- 'UNFINISHED_TASK': 'N/A', 'QUEUE_LENGTH': 'N/A'}, 'WORKER_QUEUE': {
- 'UNFINISHED_TASK': 'N/A', 'QUEUE_LENGTH': 'N/A'}}
- res['THROUGHPUT_STATUS'] = self.get_throughput()
- if hasattr(self.eventgen_dependency.eventgen, "sampleQueue"):
- res["QUEUE_STATUS"]['SAMPLE_QUEUE'][
- 'UNFINISHED_TASK'] = self.eventgen_dependency.eventgen.sampleQueue.unfinished_tasks
- res["QUEUE_STATUS"]['SAMPLE_QUEUE']['QUEUE_LENGTH'] = self.eventgen_dependency.eventgen.sampleQueue.qsize()
- if hasattr(self.eventgen_dependency.eventgen, "outputQueue"):
- res["QUEUE_STATUS"]['OUTPUT_QUEUE'][
- 'UNFINISHED_TASK'] = self.eventgen_dependency.eventgen.outputQueue.unfinished_tasks
- res["QUEUE_STATUS"]['OUTPUT_QUEUE']['QUEUE_LENGTH'] = self.eventgen_dependency.eventgen.outputQueue.qsize()
- if hasattr(self.eventgen_dependency.eventgen, "workerQueue"):
- res["QUEUE_STATUS"]['WORKER_QUEUE'][
- 'UNFINISHED_TASK'] = self.eventgen_dependency.eventgen.workerQueue.unfinished_tasks
- res["QUEUE_STATUS"]['WORKER_QUEUE']['QUEUE_LENGTH'] = self.eventgen_dependency.eventgen.workerQueue.qsize()
- return res
-
- # Real Methods
-
- def index(self):
- self.log.info("index method called")
- home_page = '''*** Eventgen WSGI ***
-Host: {0}
-Eventgen Status: {1}
-Eventgen Config file exists: {2}
-Eventgen Config file path: {3}
-Total volume: {4}
-Worker Queue Status: {5}
-Sample Queue Status: {6}
-Output Queue Status: {7}\n'''
- status = self.get_status()
- eventgen_status = "running" if status["EVENTGEN_STATUS"] else "stopped"
- host = status["EVENTGEN_HOST"]
- configured = status["CONFIGURED"]
- config_file = status["CONFIG_FILE"]
- total_volume = status["TOTAL_VOLUME"]
- worker_queue_status = status["QUEUE_STATUS"]["WORKER_QUEUE"]
- sample_queue_status = status["QUEUE_STATUS"]["SAMPLE_QUEUE"]
- output_queue_status = status["QUEUE_STATUS"]["OUTPUT_QUEUE"]
-
- return home_page.format(host, eventgen_status, configured, config_file, total_volume, worker_queue_status,
- sample_queue_status, output_queue_status)
-
- def status(self):
- self.log.info('Status method called.')
- status = self.get_status()
- self.log.info(status)
- self.send_status_to_controller(server_status=status)
- return json.dumps(status, indent=4)
-
- @rpc
- def send_status_to_controller(self, server_status):
- data = {}
- data['server_name'] = self.eventgen_name
- data['server_status'] = server_status
- self.dispatch("server_status", data)
-
- def start(self):
- self.log.info("start method called. Config is {}".format(self.eventgen_dependency.configfile))
- try:
- if not self.eventgen_dependency.configured:
- return "There is not config file known to eventgen. Pass in the config file to /conf before you start."
- if self.eventgen_dependency.eventgen.check_running():
- return "Eventgen already started."
- self.eventgen_dependency.eventgen.start(join_after_start=False)
- return "Eventgen has successfully started."
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- def stop(self):
- self.log.info("stop method called")
- try:
- if self.eventgen_dependency.eventgen.check_running():
- self.eventgen_dependency.eventgen.stop()
- return "Eventgen is stopped."
- return "There is no eventgen process running."
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- def restart(self):
- try:
- self.log.info("restart method called.")
- self.stop()
- time.sleep(2)
- self.start()
- return "Eventgen restarted."
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- def get_conf(self):
- self.log.info("get_conf method called.")
- try:
- if self.eventgen_dependency.configured:
- config = ConfigParser.ConfigParser()
- config.optionxform = str
- config_path = CUSTOM_CONFIG_PATH
-
- if os.path.isfile(config_path):
- config.read(config_path)
- out_json = dict()
- for section in config.sections():
- out_json[section] = dict()
- for k, v in config.items(section):
- out_json[section][k] = v
- # self.log.info(out_json)
- self.send_conf_to_controller(server_conf=out_json)
- return json.dumps(out_json, indent=4)
- else:
- self.send_conf_to_controller(server_conf={})
- return json.dumps({}, indent=4)
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- @rpc
- def send_conf_to_controller(self, server_conf):
- data = {}
- data['server_name'] = self.eventgen_name
- data['server_conf'] = server_conf
- self.dispatch("server_conf", data)
-
- def set_conf(self, conf):
- '''
-
- customconfig data format
- {sample: {key: value}, sample2: {key: value}}
- '''
- self.log.info("set_conf method called with {}".format(json.loads(conf)))
- try:
- config = ConfigParser.ConfigParser()
- config.optionxform = str
- conf_content = json.loads(conf)
-
- for sample in conf_content.iteritems():
- sample_name = sample[0]
- sample_key_value_pairs = sample[1]
- config.add_section(sample_name)
- for pair in sample_key_value_pairs.iteritems():
- value = pair[1]
- if type(value) == dict:
- value = json.dumps(value)
- config.set(sample_name, pair[0], value)
-
- with open(CUSTOM_CONFIG_PATH, 'wb') as conf_content:
- config.write(conf_content)
-
- self.eventgen_dependency.configured = True
- self.eventgen_dependency.configfile = CUSTOM_CONFIG_PATH
- self.eventgen_dependency.eventgen.reload_conf(CUSTOM_CONFIG_PATH)
- self.log.info("custom_config_json is {}".format(conf_content))
- return self.get_conf()
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- def edit_conf(self, conf):
- self.log.info("edit_conf method called with {}".format(json.loads(conf)))
- try:
- config = ConfigParser.ConfigParser()
- config.optionxform = str
- conf_content = json.loads(conf)
- config.read(CUSTOM_CONFIG_PATH)
-
- for stanza, kv_pairs in conf_content.iteritems():
- for k, v in kv_pairs.iteritems():
- try:
- config.get(stanza, k)
- config.set(stanza, k, v)
- except Exception as e:
- if type(e) == ConfigParser.NoSectionError:
- config.add_section(stanza)
- config.set(stanza, k, v)
-
- with open(CUSTOM_CONFIG_PATH, 'wb') as conf_content:
- config.write(conf_content)
-
- self.eventgen_dependency.configured = True
- self.eventgen_dependency.configfile = CUSTOM_CONFIG_PATH
- self.eventgen_dependency.eventgen.reload_conf(CUSTOM_CONFIG_PATH)
- if self.eventgen_dependency.eventgen.check_running():
- self.restart()
- return self.get_conf()
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- def bundle(self, url):
- # Set these parameters to notify that eventgen is in the process of configuration
- self.eventgen_dependency.configured = False
- try:
- # Download the bundle
- bundle_path = self.download_bundle(url)
- # Extract bundle
- bundle_dir = self.unarchive_bundle(bundle_path)
- # Move sample files
- self.log.info("Detecting sample files...")
- if os.path.isdir(os.path.join(bundle_dir, "samples")):
- self.log.info("Moving sample files...")
- for file in glob.glob(os.path.join(bundle_dir, "samples", "*")):
- shutil.copy(file, os.path.join(FILE_PATH, "samples"))
- self.log.info("Sample files moved!")
- # Read eventgen.conf
- self.log.info("Detecting eventgen.conf...")
- if os.path.isfile(os.path.join(bundle_dir, "default", "eventgen.conf")):
- self.log.info("Reading eventgen.conf...")
- config_dict = self.parse_eventgen_conf(os.path.join(bundle_dir, "default", "eventgen.conf"))
- # If an eventgen.conf exists, enable the configured flag
- self.eventgen_dependency.configured = True
- return self.set_conf(json.dumps(config_dict))
- else:
- return self.get_conf()
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- def setup(self, data):
- self.log.info("setup method called with {}".format(json.loads(data)))
- if not data:
- data = {}
- if type(data) != dict:
- data = json.loads(data)
-
- try:
- # set default values that follow default ORCA setting
- mode = data.get("mode", "roundrobin")
- hostname_template = data.get("hostname_template", "idx{0}")
- hosts = data.get("other_hosts", [])
- protocol = data.get("protocol", "https")
- key = data.get("key", "00000000-0000-0000-0000-000000000000")
- key_name = data.get("key_name", "eventgen") + '_' + self.host
- password = data.get("password", "Chang3d!")
- hec_port = int(data.get("hec_port", 8088))
- mgmt_port = int(data.get("mgmt_port", 8089))
- new_key = bool(data.get("new_key", True))
-
- def create_new_hec_key(hostname):
- requests.post(
- "https://{0}:{1}/servicesNS/admin/splunk_httpinput/data/inputs/http/http".format(
- hostname, mgmt_port), auth=("admin", password), data={"disabled": "0"}, verify=False)
- requests.delete(
- "https://{0}:{1}/servicesNS/admin/splunk_httpinput/data/inputs/http/{2}".format(
- hostname, mgmt_port, key_name), verify=False, auth=("admin", password))
- requests.post(
- "https://{0}:{1}/servicesNS/admin/splunk_httpinput/data/inputs/http?output_mode=json".format(
- hostname, mgmt_port), verify=False, auth=("admin", password), data={"name": key_name})
- r = requests.post(
- "https://{0}:{1}/servicesNS/admin/splunk_httpinput/data/inputs/http/{2}?output_mode=json".format(
- hostname, mgmt_port, key_name), verify=False, auth=("admin", password))
- return str(json.loads(r.text)["entry"][0]["content"]["token"])
-
- self.discovered_servers = []
- for host in hosts:
- try:
- formatted_hostname = socket.gethostbyname(host)
- if new_key:
- key = create_new_hec_key(formatted_hostname)
- except (socket.gaierror, requests.ConnectionError):
- self.log.warning('failed to reach %s, skip...' % host)
- continue
- except (ValueError, KeyError):
- self.log.warning('failed to setup hec token for %s, skip...' % host)
- continue
-
- self.discovered_servers.append({"protocol": str(protocol),
- "address": str(formatted_hostname),
- "port": str(hec_port),
- "key": str(key)})
-
- counter = 1
- while True:
- try:
- formatted_hostname = socket.gethostbyname(hostname_template.format(counter))
- if new_key:
- key = create_new_hec_key(formatted_hostname)
-
- self.discovered_servers.append({
- "protocol": str(protocol), "address": str(formatted_hostname), "port": str(hec_port), "key":
- str(key)})
- counter += 1
- except socket.gaierror:
- break
-
- config = ConfigParser.ConfigParser()
- config.optionxform = str
- config.read(EVENTGEN_DEFAULT_CONF_PATH)
- try:
- config.get("global", "httpeventServers")
- except Exception as e:
- if type(e) == ConfigParser.NoSectionError:
- config.add_section("global")
- config.set("global", "httpeventServers", json.dumps({"servers": self.discovered_servers}))
- config.set("global", "httpeventOutputMode", mode)
- config.set("global", "outputMode", "httpevent")
-
- with open(EVENTGEN_DEFAULT_CONF_PATH, 'wb') as conf_content:
- config.write(conf_content)
-
- return self.get_conf()
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- def get_volume(self):
- self.log.info("get_volume method called")
- try:
- config = json.loads(self.get_conf())
- self.log.info(config)
- fetched_volume = float(self.get_data_volumes(config))
- if fetched_volume:
- self.total_volume = fetched_volume
- self.send_volume_to_controller(total_volume=self.total_volume)
- return str(self.total_volume)
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- @rpc
- def send_volume_to_controller(self, total_volume):
- data = {}
- data['server_name'] = self.eventgen_name
- data['total_volume'] = total_volume
- self.dispatch("server_volume", data)
-
- def set_volume(self, volume):
- self.log.info("set_volume method called")
- try:
- config = json.loads(self.get_conf())
- # Initial total volume check
- self.get_volume()
- num_stanzas = 0
- if not self.total_volume:
- self.log.warn("There is no stanza found with perDayVolume")
- num_stanzas = len(config.keys())
- self.total_volume = volume
- ratio = float(volume) / float(self.total_volume)
- update_json = {}
- for stanza in config.keys():
- if isinstance(config[stanza], dict):
- if "perDayVolume" in config[stanza].keys():
- divided_value = float(config[stanza]["perDayVolume"]) * ratio
- else:
- if not num_stanzas:
- num_stanzas = 1
- divided_value = float(volume) / float(num_stanzas)
- update_json[stanza] = {"perDayVolume": divided_value}
- output = self.edit_conf(json.dumps(update_json))
- self.get_volume()
- return output
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- def reset(self):
- self.log.info("reset method called")
- try:
- self.stop()
- self.eventgen_dependency.refresh_eventgen()
- return "Eventgen Refreshed"
- except Exception as e:
- self.log.exception(str(e))
- return '500', "Exception: {}".format(e.message)
-
- # Event Handler Methods
-
- @event_handler("eventgen_controller", "all_index", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_all_index(self, payload):
- return self.index()
-
- @event_handler("eventgen_controller", "all_status", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_all_status(self, payload):
- return self.status()
-
- @event_handler("eventgen_controller", "all_start", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_all_start(self, payload):
- return self.start()
-
- @event_handler("eventgen_controller", "all_stop", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_all_stop(self, payload):
- return self.stop()
-
- @event_handler("eventgen_controller", "all_restart", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_all_restart(self, payload):
- return self.restart()
-
- @event_handler("eventgen_controller", "all_get_conf", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_all_get_conf(self, payload):
- return self.get_conf()
-
- @event_handler("eventgen_controller", "all_set_conf", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_all_set_conf(self, payload):
- return self.set_conf(conf=payload)
-
- @event_handler("eventgen_controller", "all_edit_conf", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_all_edit_conf(self, payload):
- return self.edit_conf(conf=payload)
-
- @event_handler("eventgen_controller", "all_bundle", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_all_bundle(self, payload):
- if payload['url']:
- return self.bundle(payload['url'])
-
- @event_handler("eventgen_controller", "all_setup", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_all_setup(self, payload):
- return self.setup(data=payload)
-
- @event_handler("eventgen_controller", "all_get_volume", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_all_get_volume(self, payload):
- return self.get_volume()
-
- @event_handler("eventgen_controller", "all_set_volume", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_all_set_volume(self, payload):
- if payload['perDayVolume']:
- return self.set_volume(payload['perDayVolume'])
-
- @event_handler("eventgen_controller", "all_reset", handler_type=BROADCAST, reliable_delivery=False)
- def event_handler_all_reset(self, payload):
- return self.reset()
-
- @event_handler("eventgen_controller", "{}_index".format(eventgen_name), handler_type=BROADCAST,
- reliable_delivery=False)
- def event_handler_index(self, payload):
- return self.index()
-
- @event_handler("eventgen_controller", "{}_status".format(eventgen_name), handler_type=BROADCAST,
- reliable_delivery=False)
- def event_handler_status(self, payload):
- return self.status()
-
- @event_handler("eventgen_controller", "{}_start".format(eventgen_name), handler_type=BROADCAST,
- reliable_delivery=False)
- def event_handler_start(self, payload):
- return self.start()
-
- @event_handler("eventgen_controller", "{}_stop".format(eventgen_name), handler_type=BROADCAST,
- reliable_delivery=False)
- def event_handler_stop(self, payload):
- return self.stop()
-
- @event_handler("eventgen_controller", "{}_restart".format(eventgen_name), handler_type=BROADCAST,
- reliable_delivery=False)
- def event_handler_restart(self, payload):
- return self.restart()
-
- @event_handler("eventgen_controller", "{}_get_conf".format(eventgen_name), handler_type=BROADCAST,
- reliable_delivery=False)
- def event_handler_get_conf(self, payload):
- return self.get_conf()
-
- @event_handler("eventgen_controller", "{}_set_conf".format(eventgen_name), handler_type=BROADCAST,
- reliable_delivery=False)
- def event_handler_set_conf(self, payload):
- return self.set_conf(conf=payload)
-
- @event_handler("eventgen_controller", "{}_edit_conf".format(eventgen_name), handler_type=BROADCAST,
- reliable_delivery=False)
- def event_handler_edit_conf(self, payload):
- return self.edit_conf(conf=payload)
-
- @event_handler("eventgen_controller", "{}_bundle".format(eventgen_name), handler_type=BROADCAST,
- reliable_delivery=False)
- def event_handler_bundle(self, payload):
- if payload['url']:
- return self.bundle(payload['url'])
-
- @event_handler("eventgen_controller", "{}_setup".format(eventgen_name), handler_type=BROADCAST,
- reliable_delivery=False)
- def event_handler_setup(self, payload):
- return self.setup(data=payload)
-
- @event_handler("eventgen_controller", "{}_get_volume".format(eventgen_name), handler_type=BROADCAST,
- reliable_delivery=False)
- def event_handler_get_volume(self):
- return self.get_volume()
-
- @event_handler("eventgen_controller", "{}_set_volume".format(eventgen_name), handler_type=BROADCAST,
- reliable_delivery=False)
- def event_handler_set_volume(self, payload):
- if payload['perDayVolume']:
- return self.set_volume(payload['perDayVolume'])
-
- @event_handler("eventgen_controller", "{}_reset".format(eventgen_name), handler_type=BROADCAST,
- reliable_delivery=False)
- def event_handler_reset(self, payload):
- return self.reset()
-
- # HTTP Methods
-
- @http('GET', '/')
- def http_root(self, request):
- return self.index()
-
- @http('GET', '/index')
- def http_index(self, request):
- return self.index()
-
- @http('GET', '/status')
- def http_status(self, request):
- return self.status()
-
- @http('POST', '/start')
- def http_start(self, request):
- return json.dumps(self.start())
-
- @http('POST', '/stop')
- def http_stop(self, request):
- return json.dumps(self.stop())
-
- @http('POST', '/restart')
- def http_restart(self, request):
- return json.dumps(self.restart())
-
- @http('GET', '/conf')
- def http_get_conf(self, request):
- output = self.get_conf()
- if type(output) == str:
- return output
- else:
- return json.dumps(output)
-
- @http('POST', '/conf')
- def http_set_conf(self, request):
- data = request.get_data()
- if data:
- return self.set_conf(data)
- else:
- return '400', 'Please pass the valid parameters.'
-
- @http('PUT', '/conf')
- def http_edit_conf(self, request):
- data = request.get_data()
- if data:
- return self.edit_conf(data)
- else:
- return '400', 'Please pass valid config data.'
-
- @http('POST', '/bundle')
- def http_bundle(self, request):
- data = request.get_data(as_text=True)
- try:
- data = json.loads(data)
- url = data["url"]
- return self.bundle(url)
- except ValueError as e:
- self.log.exception(str(e))
- return '400', "Please pass in a valid object with bundle URL"
- except Exception as e:
- self.log.exception(str(e))
- return '400', "Exception: {}".format(e.message)
-
- @http('POST', '/setup')
- def http_setup(self, request):
- data = request.get_data(as_text=True)
- try:
- return self.setup(data)
- except Exception as e:
- self.log.exception(str(e))
- return '400', "Exception: {}".format(e.message)
-
- @http('GET', '/volume')
- def http_get_volume(self, request):
- return self.get_volume()
-
- @http('POST', '/volume')
- def http_set_volume(self, request):
- data = request.get_data(as_text=True)
- try:
- data = json.loads(data)
- volume = data["perDayVolume"]
- return self.set_volume(volume)
- except Exception as e:
- self.log.exception(str(e))
- return '400', "Exception: {}".format(e.message)
-
- @http('POST', '/reset')
- def http_reset(self, request):
- return json.dumps(self.reset())
-
- # Helper Methods
-
- def parse_eventgen_conf(self, path):
- config = ConfigParser.ConfigParser()
- config.optionxform = str
- config.read(path)
- config_dict = {s: dict(config.items(s)) for s in config.sections()}
- return config_dict
-
- def download_bundle(self, url):
- self.log.info("Downloading bundle at {}...".format(url))
- # Use SPLUNK_HOME if defined
- if "SPLUNK_HOME" in os.environ:
- bundle_path = os.path.join(os.environ["SPLUNK_HOME"], "etc", "apps", "eg-bundle.tgz")
- else:
- bundle_path = os.path.join(os.getcwd(), "eg-bundle.tgz")
- r = requests.get(url, stream=True)
- with open(bundle_path, 'wb') as f:
- for chunk in r.iter_content(chunk_size=None):
- if chunk:
- f.write(chunk)
- r.close()
- self.log.info("Download complete!")
- return bundle_path
-
- def unarchive_bundle(self, path):
- self.log.info("Extracting bundle {}...".format(path))
- output = None
- # Use tarfile or zipfile, depending on the bundle
- if tarfile.is_tarfile(path):
- tar = tarfile.open(path)
- output = os.path.join(os.path.dirname(path), os.path.commonprefix(tar.getnames()))
- tar.extractall(path=os.path.dirname(path))
- tar.close()
- elif zipfile.is_zipfile(path):
- zipf = zipfile.ZipFile(path)
- output = os.path.join(os.path.dirname(path), "eg-bundle")
- zipf.extractall(path=output)
- zipf.close()
- else:
- msg = "Unknown archive format!"
- self.log.exception(msg)
- raise Exception(msg)
- self.log.info("Extraction complete!")
- return output
-
- def get_data_volumes(self, config):
- '''
- This method updates the total volume from the eventgen.conf
-
- :param config: (dict) object representing the current state of the server's eventgen.conf
- '''
- total_volume = 0
- for stanza in config.keys():
- if isinstance(config[stanza], dict) and "perDayVolume" in config[stanza].keys():
- total_volume += float(config[stanza]["perDayVolume"])
- self.log.info("Total volume is currently {}".format(total_volume))
- return total_volume
-
- def get_throughput(self):
- self.log.debug("Getting throughput ...")
- empty_throughput = {'TOTAL_VOLUME_MB': 0, 'TOTAL_COUNT': 0, 'THROUGHPUT_VOLUME_KB': 0, 'THROUGHPUT_COUNT': 0}
- if hasattr(self.eventgen_dependency.eventgen, 'output_counters'):
- total_volume = 0
- total_count = 0
- throughput_volume = 0
- throughput_count = 0
- for output_counter in self.eventgen_dependency.eventgen.output_counters:
- total_volume += output_counter.total_output_volume
- total_count += output_counter.total_output_count
- throughput_volume += output_counter.throughput_volume
- throughput_count += output_counter.throughput_count
- return {
- 'TOTAL_VOLUME_MB': total_volume / (1024 * 1024), 'TOTAL_COUNT': total_count, 'THROUGHPUT_VOLUME_KB':
- throughput_volume / (1024), 'THROUGHPUT_COUNT': throughput_count}
- else:
- self.log.debug("return empty throughput because of output_counters not found.")
- return empty_throughput
diff --git a/splunk_eventgen/lib/eventgenconfig.py b/splunk_eventgen/lib/eventgenconfig.py
index 38980401..37af3789 100644
--- a/splunk_eventgen/lib/eventgenconfig.py
+++ b/splunk_eventgen/lib/eventgenconfig.py
@@ -813,6 +813,8 @@ def _validateSetting(self, stanza, key, value):
logger.debug("Calling function for setting '%s' with value '%s'" % (key, value))
value = complexSetting(value)
elif isinstance(complexSetting, list):
+ if key == 'threading' and self.threading == 'process':
+ value = self.threading
if value not in complexSetting:
logger.error(
"Setting '%s' is invalid for value '%s' in stanza '%s'" % (key, value, stanza))
diff --git a/splunk_eventgen/lib/eventgenoutput.py b/splunk_eventgen/lib/eventgenoutput.py
index 4c25f4d8..502d2968 100644
--- a/splunk_eventgen/lib/eventgenoutput.py
+++ b/splunk_eventgen/lib/eventgenoutput.py
@@ -5,7 +5,7 @@
import logging.handlers
import time
from Queue import Full
-from logging_config import logger
+from logging_config import logger, metrics_logger
# TODO: Figure out why we load plugins from here instead of the base plugin class.
@@ -75,7 +75,6 @@ def flush(self, endOfInterval=False):
Flushes output buffer, unless endOfInterval called, and then only flush if we've been called
more than maxIntervalsBeforeFlush tunable.
"""
- flushing = False
# TODO: Fix interval flushing somehow with a queue, not sure I even want to support this feature anymore.
'''if endOfInterval:
logger.debugv("Sample calling flush, checking increment against maxIntervalsBeforeFlush")
@@ -113,8 +112,7 @@ def flush(self, endOfInterval=False):
if self.config.splunkEmbedded:
tmp = [len(s['_raw']) for s in q]
if len(tmp) > 0:
- metrics = logging.getLogger('eventgen_metrics')
- metrics.info({
+ metrics_logger.info({
'timestamp': datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'), 'sample':
self._sample.name, 'events': len(tmp), 'bytes': sum(tmp)})
tmp = None
diff --git a/splunk_eventgen/lib/eventgensamples.py b/splunk_eventgen/lib/eventgensamples.py
index d05796c4..9c687d91 100644
--- a/splunk_eventgen/lib/eventgensamples.py
+++ b/splunk_eventgen/lib/eventgensamples.py
@@ -359,6 +359,8 @@ def loadSample(self):
self._closeSampleFile()
self.sampleDict = []
for line in self.sampleLines:
+ if line == '\n':
+ continue
if line and line[-1] != '\n':
line = line + '\n'
self.sampleDict.append({
diff --git a/splunk_eventgen/lib/plugins/output/httpevent.py b/splunk_eventgen/lib/plugins/output/httpevent.py
index 90d87e01..699b5f76 100644
--- a/splunk_eventgen/lib/plugins/output/httpevent.py
+++ b/splunk_eventgen/lib/plugins/output/httpevent.py
@@ -37,7 +37,7 @@ class HTTPEventOutputPlugin(HTTPCoreOutputPlugin):
name = 'httpevent'
def __init__(self, sample, output_counter=None):
- super(HTTPEventOutputPlugin,self).__init__(sample,output_counter)
+ super(HTTPEventOutputPlugin, self).__init__(sample,output_counter)
def flush(self, q):
logger.debug("Flush called on httpevent plugin")
diff --git a/splunk_eventgen/lib/plugins/output/httpevent_core.py b/splunk_eventgen/lib/plugins/output/httpevent_core.py
index 72fd1f44..ba2a0b92 100644
--- a/splunk_eventgen/lib/plugins/output/httpevent_core.py
+++ b/splunk_eventgen/lib/plugins/output/httpevent_core.py
@@ -42,7 +42,7 @@ def __init__(self, sample, output_counter=None):
OutputPlugin.__init__(self, sample, output_counter)
# TODO: make workers a param that can be set in eventgen.conf
- def _setup_REST_workers(self, session=None, workers=10):
+ def _setup_REST_workers(self, session=None, workers=20):
# disable any "requests" warnings
requests.packages.urllib3.disable_warnings()
# Bind passed in samples to the outputter.
diff --git a/splunk_eventgen/logger/logger_config.py b/splunk_eventgen/logger/logger_config.py
index 444ef5f9..8c1850bc 100644
--- a/splunk_eventgen/logger/logger_config.py
+++ b/splunk_eventgen/logger/logger_config.py
@@ -8,7 +8,8 @@
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
- 'formatter': 'detailed', }, 'main': {
+ 'formatter': 'detailed', },
+ 'main': {
'class': 'logging.FileHandler',
'filename': 'eventgen-controller-main.log',
'mode': 'w',
diff --git a/splunk_eventgen/server_conf.yml b/splunk_eventgen/server_conf.yml
deleted file mode 100644
index 1c30fd8d..00000000
--- a/splunk_eventgen/server_conf.yml
+++ /dev/null
@@ -1,2 +0,0 @@
-AMQP_URI: 'pyamqp://guest:guest@localhost:5672'
-WEB_SERVER_ADDRESS: '0.0.0.0:9501'
diff --git a/tests/.gitignore b/tests/.gitignore
index 0dde16b9..82f107e3 100644
--- a/tests/.gitignore
+++ b/tests/.gitignore
@@ -1,2 +1,3 @@
*.pyc
results/
+.coverage
diff --git a/tests/large/test_eventgen_orchestration.py b/tests/large/test_eventgen_orchestration.py
index 19d35da3..f5d8e0da 100644
--- a/tests/large/test_eventgen_orchestration.py
+++ b/tests/large/test_eventgen_orchestration.py
@@ -51,11 +51,10 @@ class TestEventgenOrchestration(object):
def setup_class(cls):
# Build the image from scratch
cls.client = APIClient(base_url="unix://var/run/docker.sock")
- response = cls.client.build(path=REPO_DIR, dockerfile=os.path.join("dockerfiles", "Dockerfile"), tag=IMAGE_NAME,
- rm=True, nocache=True, pull=True, stream=False)
+ response = cls.client.build(path=REPO_DIR, dockerfile=os.path.join("dockerfiles", "Dockerfile"), tag=IMAGE_NAME, rm=True, nocache=True, pull=True, stream=False)
for line in response:
print line,
- # Create a network for both the controller + server to run in
+ # Create a network for both the controller and server to run in
cls.client.create_network(NETWORK_NAME, driver="bridge", attachable=True)
networking_config = cls.client.create_networking_config({NETWORK_NAME: cls.client.create_endpoint_config()})
# Start the controller
@@ -69,29 +68,42 @@ def setup_class(cls):
cls.controller_container = cls.client.inspect_container(container["Id"])
cls.controller_eventgen_webport = cls.controller_container["NetworkSettings"]["Ports"]["9500/tcp"][0][
"HostPort"]
- cls.controller_rabbitmq_webport = cls.controller_container["NetworkSettings"]["Ports"]["15672/tcp"][0][
- "HostPort"]
# Start the server
print 'creating server'
+ redis_host = container["Id"][:12]
container = cls.client.create_container(
- image=IMAGE_NAME, command="server", environment=[
- "EVENTGEN_AMQP_HOST={}".format(cls.controller_container["Id"][:12])], host_config=host_config,
+ image=IMAGE_NAME, command="server", environment=["REDIS_HOST={}".format(redis_host)],
+ host_config=host_config,
networking_config=networking_config)
cls.client.start(container["Id"])
TestEventgenOrchestration.server_id = container["Id"]
print container["Id"]
cls.server_container = cls.client.inspect_container(container["Id"])
cls.server_eventgen_webport = cls.server_container["NetworkSettings"]["Ports"]["9500/tcp"][0]["HostPort"]
- cls.server_rabbitmq_webport = cls.server_container["NetworkSettings"]["Ports"]["15672/tcp"][0]["HostPort"]
+
# Wait for the controller to be available
print "Waiting for Eventgen Controller to become available."
wait_for_response("http://127.0.0.1:{}".format(cls.controller_eventgen_webport))
print "Eventgen Controller has become available."
+
# Wait for the server to be available
print "Waiting for Eventgen Server to become available."
wait_for_response("http://127.0.0.1:{}".format(cls.server_eventgen_webport))
print "Eventgen Server has become available."
- time.sleep(60)
+ time.sleep(30)
+
+ cls.test_json = {
+ "windbag": {
+ "generator": "windbag",
+ "earliest": "-3s",
+ "latest": "now",
+ "interval": "5",
+ "count": "5",
+ "outputMode": "stdout",
+ "end": "15",
+ "threading": "process"
+ }
+ }
@classmethod
def teardown_class(cls):
@@ -101,18 +113,10 @@ def teardown_class(cls):
cls.client.remove_network(NETWORK_NAME)
# Controller tests #
-
- def test_controller_rabbitmq(self):
- r = requests.get("http://127.0.0.1:{}".format(self.controller_rabbitmq_webport))
- assert r.status_code == 200
- assert "RabbitMQ" in r.content
-
def test_controller_root(self):
- r = requests.get("http://127.0.0.1:{}".format(self.controller_eventgen_webport))
+ r = requests.get("http://127.0.0.1:{}/".format(self.controller_eventgen_webport))
assert r.status_code == 200
- assert "Eventgen Controller" in r.content
- assert "Host: " in r.content
- assert "You are running Eventgen Controller" in r.content
+ assert "running_eventgen_controller" in r.content
def test_controller_index(self):
r = requests.get("http://127.0.0.1:{}/index".format(self.controller_eventgen_webport))
@@ -132,50 +136,43 @@ def test_controller_status(self):
current_retry += 1
time.sleep(10)
assert output
+
+ def test_controller_conf(self):
+ r = requests.post("http://127.0.0.1:{}/conf".format(self.controller_eventgen_webport), json=self.test_json)
+ assert r.status_code == 200
+ assert "windbag" in r.content
def test_controller_start(self):
r = requests.post("http://127.0.0.1:{}/start".format(self.controller_eventgen_webport))
assert r.status_code == 200
- assert "Start event dispatched to all" in r.content
+ assert "Eventgen has successfully started" in r.content
def test_controller_start_with_target(self):
r = requests.post("http://127.0.0.1:{}/start/{}".format(self.controller_eventgen_webport,
TestEventgenOrchestration.server_id[:12]))
assert r.status_code == 200
- assert "Start event dispatched to {}".format(TestEventgenOrchestration.server_id[:12]) in r.content
-
- def test_controller_stop(self):
- r = requests.post("http://127.0.0.1:{}/stop".format(self.controller_eventgen_webport))
- assert r.status_code == 200
- assert "Stop event dispatched to all" in r.content
-
- def test_controller_stop_with_target(self):
- r = requests.post("http://127.0.0.1:{}/stop/{}".format(self.controller_eventgen_webport,
- TestEventgenOrchestration.server_id[:12]))
- assert r.status_code == 200
- assert "Stop event dispatched to {}".format(TestEventgenOrchestration.server_id[:12]) in r.content
+ assert "Eventgen already started" in r.content
def test_controller_restart(self):
- r = requests.post("http://127.0.0.1:{}/stop".format(self.controller_eventgen_webport))
+ r = requests.post("http://127.0.0.1:{}/restart".format(self.controller_eventgen_webport))
assert r.status_code == 200
- assert "Stop event dispatched to all" in r.content
+ assert "Eventgen is restarting" in r.content
def test_controller_restart_with_target(self):
- r = requests.post("http://127.0.0.1:{}/stop/{}".format(self.controller_eventgen_webport,
+ r = requests.post("http://127.0.0.1:{}/restart/{}".format(self.controller_eventgen_webport,
TestEventgenOrchestration.server_id[:12]))
assert r.status_code == 200
- assert "Stop event dispatched to {}".format(TestEventgenOrchestration.server_id[:12]) in r.content
+ assert "Eventgen is restarting" in r.content
def test_controller_bundle_invalid_request(self):
r = requests.post("http://127.0.0.1:{}/bundle".format(self.controller_eventgen_webport))
- assert r.status_code == 400
- assert "Please pass in a valid object with bundle URL" in r.content
+ assert r.status_code == 500
+ assert "Internal Error Occurred" in r.content
def test_controller_bundle_with_url(self):
r = requests.post("http://127.0.0.1:{}/bundle".format(self.controller_eventgen_webport), json={
"url": "http://server.com/bundle.tgz"})
assert r.status_code == 200
- assert "Bundle event dispatched to all with url http://server.com/bundle.tgz" in r.content
def test_controller_bundle_with_url_and_target(self):
r = requests.post(
@@ -183,10 +180,7 @@ def test_controller_bundle_with_url_and_target(self):
TestEventgenOrchestration.server_id[:12]), json={
"url": "http://server.com/bundle.tgz"})
assert r.status_code == 200
- assert "Bundle event dispatched to {} with url http://server.com/bundle.tgz".format(
- TestEventgenOrchestration.server_id[:12]) in r.content
- @pytest.mark.skip(reason="Change in implementation")
def test_controller_get_volume(self):
max_retry = 5
current_retry = 1
@@ -197,44 +191,57 @@ def test_controller_get_volume(self):
output = json.loads(response.content)
current_retry += 1
time.sleep(10)
- assert output[TestEventgenOrchestration.server_id[:12]] == 0.0
+ assert output[TestEventgenOrchestration.server_id[:12]]["perDayVolume"] == 0.0
def test_controller_set_volume_invalid_request(self):
r = requests.post("http://127.0.0.1:{}/volume".format(self.controller_eventgen_webport))
- assert r.status_code == 400
- assert "Please pass in a valid object with volume" in r.content
+ assert r.status_code == 500
+ assert "Internal Error Occurred" in r.content
def test_controller_set_volume_with_volume(self):
r = requests.post("http://127.0.0.1:{}/volume".format(self.controller_eventgen_webport), json={
"perDayVolume": 10})
assert r.status_code == 200
- assert "set_volume event dispatched to all" in r.content
+ output = json.loads(r.content)
+ assert output[TestEventgenOrchestration.server_id[:12]]["perDayVolume"] == 10
def test_controller_set_volume_with_volume_and_target(self):
r = requests.post(
"http://127.0.0.1:{}/volume/{}".format(self.controller_eventgen_webport,
- TestEventgenOrchestration.server_id[:12]), json={"perDayVolume": 10})
+ TestEventgenOrchestration.server_id[:12]), json={"perDayVolume": 20})
+ assert r.status_code == 200
+ output = json.loads(r.content)
+ assert output[TestEventgenOrchestration.server_id[:12]]["perDayVolume"] == 20
+
+ def test_controller_stop(self):
+ r = requests.post("http://127.0.0.1:{}/stop".format(self.controller_eventgen_webport))
+ assert r.status_code == 200
+ assert r.status_code == 200
+ assert "Eventgen is stopping" in r.content
+
+ def test_controller_stop_with_target(self):
+ r = requests.post("http://127.0.0.1:{}/stop/{}".format(self.controller_eventgen_webport,
+ TestEventgenOrchestration.server_id[:12]))
assert r.status_code == 200
- assert "set_volume event dispatched to {}".format(TestEventgenOrchestration.server_id[:12]) in r.content
+ assert "Eventgen is stopping" in r.content
# Server tests #
+ def test_server_reset(self):
+ r = requests.post("http://127.0.0.1:{}/reset".format(self.server_eventgen_webport))
+ assert r.status_code == 200
+
def test_server_root(self):
r = requests.get("http://127.0.0.1:{}".format(self.server_eventgen_webport))
assert r.status_code == 200
- assert "Host: " in r.content
- assert "Eventgen Status" in r.content
- assert "Eventgen Config file path" in r.content
- assert "Total volume:" in r.content
- assert "Worker Queue Status" in r.content
- assert "Sample Queue Status" in r.content
- assert "Output Queue Status" in r.content
+ assert "running_eventgen_server" in r.content
def test_server_index(self):
r = requests.get("http://127.0.0.1:{}/index".format(self.server_eventgen_webport))
assert r.status_code == 200
assert "Host: " in r.content
assert "Eventgen Status" in r.content
+ assert "Eventgen Config file exists" in r.content
assert "Eventgen Config file path" in r.content
assert "Total volume:" in r.content
assert "Worker Queue Status" in r.content
@@ -247,12 +254,17 @@ def test_server_status(self):
output = json.loads(r.content)
assert output
assert output['EVENTGEN_STATUS'] == 0
+ assert output['TOTAL_VOLUME'] == 20
def test_server_get_and_set_conf(self):
r = requests.get("http://127.0.0.1:{}/conf".format(self.server_eventgen_webport))
assert r.status_code == 200
- assert json.loads(r.content) == {}
- config_json = {"windbag": {"outputMode": "stdout"}}
+ assert json.loads(r.content)
+ config_json = {
+ "windbag": {
+ "end": "10"
+ }
+ }
r = requests.post("http://127.0.0.1:{}/conf".format(self.server_eventgen_webport), json=config_json)
assert r.status_code == 200
assert json.loads(r.content) == config_json
@@ -260,22 +272,22 @@ def test_server_get_and_set_conf(self):
def test_server_start(self):
r = requests.post("http://127.0.0.1:{}/start".format(self.server_eventgen_webport), timeout=5)
assert r.status_code == 200
- assert json.loads(r.content) == "Eventgen has successfully started."
+ assert "Eventgen has successfully started" in r.content
def test_server_restart(self):
r = requests.post("http://127.0.0.1:{}/restart".format(self.server_eventgen_webport))
assert r.status_code == 200
- assert json.loads(r.content) == "Eventgen restarted."
+ assert "Eventgen has successfully restarted" in r.content
def test_server_stop(self):
r = requests.post("http://127.0.0.1:{}/stop".format(self.server_eventgen_webport))
assert r.status_code == 200
- assert json.loads(r.content) == "Eventgen is stopped."
+ assert "Eventgen is stopped" in r.content
def test_server_bundle(self):
r = requests.post("http://127.0.0.1:{}/bundle".format(self.server_eventgen_webport))
- assert r.status_code == 400
- assert "Please pass in a valid object with bundle URL" in r.content
+ assert r.status_code == 500
+ assert "Internal Error Occurred" in r.content
def test_server_get_and_set_volume(self):
# Must initialize a stanza with the perDayVolume setting before hitting the /volume endpoint
@@ -288,11 +300,11 @@ def test_server_get_and_set_volume(self):
r = requests.get("http://127.0.0.1:{}/volume".format(self.server_eventgen_webport))
assert r.status_code == 200
output = json.loads(r.content)
- assert output == 10.0
+ assert output["perDayVolume"] == 10.0
r = requests.post("http://127.0.0.1:{}/volume".format(self.server_eventgen_webport), json={"perDayVolume": 150})
assert r.status_code == 200
assert json.loads(r.content)
r = requests.get("http://127.0.0.1:{}/volume".format(self.server_eventgen_webport))
assert r.status_code == 200
output = json.loads(r.content)
- assert output == 150.0
+ assert output["perDayVolume"] == 150.0
diff --git a/tests/small/test_main.py b/tests/small/test_main.py
index 3f68254d..d0575333 100644
--- a/tests/small/test_main.py
+++ b/tests/small/test_main.py
@@ -6,83 +6,17 @@
import pytest
from mock import MagicMock, patch
-from splunk_eventgen.__main__ import parse_cli_vars, parse_env_vars
+from splunk_eventgen.__main__ import gather_env_vars
FILE_DIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(FILE_DIR, "..", "..", ".."))
sys.path.insert(0, os.path.join(FILE_DIR, "..", "..", "..", "splunk_eventgen"))
-@pytest.mark.parametrize(
- ('config'),
- [
- # Empty config
- ({}),
- # Some elements already defined - function should override
- ({"AMQP_HOST": "guest", "AMQP_PASS": "guest"})])
-def test_parse_cli_vars(config):
+def test_gather_env_vars():
args = MagicMock()
- args.amqp_uri = "pyamqp://user:pass@host:port"
- args.amqp_host = "hostname"
- args.amqp_port = 8001
- args.amqp_webport = 8000
- args.amqp_user = "hello"
- args.amqp_pass = "world"
- args.web_server_address = "0.0.0.:1111"
- obj = parse_cli_vars(config, args)
- assert obj == {
- "AMQP_URI": "pyamqp://user:pass@host:port", "AMQP_HOST": "hostname", "AMQP_PORT": 8001, "AMQP_WEBPORT": 8000,
- "AMQP_USER": "hello", "AMQP_PASS": "world", "WEB_SERVER_ADDRESS": "0.0.0.:1111"}
-
-
-@pytest.mark.parametrize(
- ('env_vars'),
- [
- # No environment vars defined
- ({}),
- # All environemnt vars defined
- ({
- "EVENTGEN_AMQP_URI": "test", "EVENTGEN_AMQP_HOST": "host", "EVENTGEN_AMQP_PORT": 8000,
- "EVENTGEN_AMQP_WEBPORT": 8001, "EVENTGEN_AMQP_USER": "hello", "EVENTGEN_AMQP_PASS": "world",
- "EVENTGEN_WEB_SERVER_ADDR": "0.0.0.0:1111"})])
-def test_parse_env_vars(env_vars):
- with patch("splunk_eventgen.__main__.os") as mock_os:
- mock_os.environ = env_vars
- obj = parse_env_vars()
- assert obj.keys() == [
- 'AMQP_WEBPORT', 'AMQP_USER', 'AMQP_PASS', 'AMQP_PORT', 'AMQP_URI', 'WEB_SERVER_ADDRESS', 'AMQP_HOST']
- if env_vars:
- # If enviroment vars are defined, let's make sure they are set instead of default values
- assert obj["WEB_SERVER_ADDRESS"] == "0.0.0.0:1111"
- assert obj["AMQP_HOST"] == "host"
- assert obj["AMQP_PORT"] == 8000
-
-
-def test_parse_env_vars_and_parse_cli_vars():
- '''
- This test checks the layering effect of both parsing CLI and env vars.
- Arguments passed via CLI should take precedence over those defined in environment.
- '''
- with patch("splunk_eventgen.__main__.os") as mock_os:
- mock_os.environ = {}
- obj = parse_env_vars()
- assert obj["AMQP_WEBPORT"] == 15672
- assert obj["AMQP_USER"] == "guest"
- assert obj["AMQP_PORT"] == 5672
- assert obj["AMQP_PASS"] == "guest"
- assert obj["AMQP_USER"] == "guest"
- assert obj["AMQP_URI"] is None
- assert obj["WEB_SERVER_ADDRESS"] == "0.0.0.0:9500"
- args = MagicMock()
- args.amqp_uri = "pyamqp://user:pass@host:port"
- args.amqp_host = "hostname"
- args.amqp_port = 8001
- args.amqp_webport = 8000
- args.web_server_address = "0.0.0.:1111"
- # Purposely defining None vars here for these CLI args - in this case, environment vars will be used
- args.amqp_user = None
- args.amqp_pass = None
- parse_cli_vars(obj, args)
- assert obj == {
- "AMQP_URI": "pyamqp://user:pass@host:port", "AMQP_HOST": "hostname", "AMQP_PORT": 8001, "AMQP_WEBPORT":
- 8000, "AMQP_USER": "guest", "AMQP_PASS": "guest", "WEB_SERVER_ADDRESS": "0.0.0.:1111"}
+ args.redis_host = "127.0.0.1"
+ args.redis_port = "6379"
+ args.web_server_port = '9500'
+ obj = gather_env_vars(args)
+ assert obj == {"REDIS_HOST": "127.0.0.1", "REDIS_PORT": "6379", "WEB_SERVER_PORT": "9500"}