From b1b6c4a22903b3b5c197692ebb5cecda938454ea Mon Sep 17 00:00:00 2001 From: Li Wu Date: Thu, 1 Aug 2019 13:17:02 +0800 Subject: [PATCH 01/14] Fix structlog dependency for app (#280) --- splunk_eventgen/lib/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/splunk_eventgen/lib/requirements.txt b/splunk_eventgen/lib/requirements.txt index 4bfb4b0b..24b66fae 100644 --- a/splunk_eventgen/lib/requirements.txt +++ b/splunk_eventgen/lib/requirements.txt @@ -1,2 +1,3 @@ ujson==1.35 jinja2==2.10.1 +structlog==19.1.0 From 3e4cb1b4d77c6136446f48002bfd3531e25a5a94 Mon Sep 17 00:00:00 2001 From: Tony Lee Date: Tue, 20 Aug 2019 10:39:14 -0700 Subject: [PATCH 02/14] zipfile fix (#284) --- splunk_eventgen/eventgen_api_server/eventgen_server_api.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/splunk_eventgen/eventgen_api_server/eventgen_server_api.py b/splunk_eventgen/eventgen_api_server/eventgen_server_api.py index c355cdfa..1bc0ee1d 100644 --- a/splunk_eventgen/eventgen_api_server/eventgen_server_api.py +++ b/splunk_eventgen/eventgen_api_server/eventgen_server_api.py @@ -464,10 +464,7 @@ def unarchive_bundle(self, path): zipf = zipfile.ZipFile(path) for info in zipf.infolist(): old_file_name = info.filename - if info.filename.find('/') == len(info.filename) - 1: - info.filename = "eg-bundle/" - else: - info.filename = "eg-bundle/" + info.filename[info.filename.find('/') + 1:] + info.filename = "eg-bundle/" + info.filename zipf.extract(info, os.path.dirname(path)) output = os.path.join(os.path.dirname(path), 'eg-bundle') zipf.close() From 9bcfdfb003c85ea5d829e988bd18a22eb0c330c8 Mon Sep 17 00:00:00 2001 From: Li Wu Date: Thu, 22 Aug 2019 11:34:59 +0800 Subject: [PATCH 03/14] Fix bug 286 random token replacement (#287) * Fix bug 286 random token replacement * Change perdayvolume generator logic to get random token value replacement --- splunk_eventgen/lib/eventgentoken.py | 2 +- .../generator/perdayvolumegenerator.py | 22 +++++++++---------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/splunk_eventgen/lib/eventgentoken.py b/splunk_eventgen/lib/eventgentoken.py index 56b97ac8..e393e232 100644 --- a/splunk_eventgen/lib/eventgentoken.py +++ b/splunk_eventgen/lib/eventgentoken.py @@ -350,7 +350,7 @@ def _getReplacement(self, old=None, earliestTime=None, latestTime=None, s=None, except: logger.error("Could not parse json for '%s' in sample '%s'" % (listMatch.group(1), s.name)) return old - return random.choice(value) + return random.SystemRandom().choice(value) else: logger.error("Unknown replacement value '%s' for replacementType '%s'; will not replace" % diff --git a/splunk_eventgen/lib/plugins/generator/perdayvolumegenerator.py b/splunk_eventgen/lib/plugins/generator/perdayvolumegenerator.py index 9183208a..7b84845d 100644 --- a/splunk_eventgen/lib/plugins/generator/perdayvolumegenerator.py +++ b/splunk_eventgen/lib/plugins/generator/perdayvolumegenerator.py @@ -31,17 +31,15 @@ def gen(self, count, earliest, latest, samplename=None): # Create a counter for the current byte size of the read in samples currentSize = 0 - # Replace event tokens before calculating the size of the event - updated_sample_dict = GeneratorPlugin.replace_tokens(self, self._sample.sampleDict, earliest, latest) # If we're random, fill random events from sampleDict into eventsDict eventsDict = [] if self._sample.randomizeEvents: - sdlen = len(updated_sample_dict) + sdlen = len(self._sample.sampleDict) logger.debug("Random filling eventsDict for sample '%s' in app '%s' with %d bytes" % (self._sample.name, self._sample.app, size)) while currentSize < size: - currentevent = updated_sample_dict[random.randint(0, sdlen - 1)] + currentevent = self._sample.sampleDict[random.randint(0, sdlen - 1)] eventsDict.append(currentevent) currentSize += len(currentevent['_raw']) @@ -51,8 +49,8 @@ def gen(self, count, earliest, latest, samplename=None): "Bundlelines, filling eventsDict for sample '%s' in app '%s' with %d copies of sampleDict" % (self._sample.name, self._sample.app, size)) while currentSize <= size: - sizeofsample = sum(len(sample['_raw']) for sample in updated_sample_dict) - eventsDict.extend(updated_sample_dict) + sizeofsample = sum(len(sample['_raw']) for sample in self._sample.sampleDict) + eventsDict.extend(self._sample.sampleDict) currentSize += sizeofsample # Otherwise fill count events into eventsDict or keep making copies of events out of sampleDict until @@ -63,28 +61,28 @@ def gen(self, count, earliest, latest, samplename=None): # or i've read the entire file. linecount = 0 currentreadsize = 0 - linesinfile = len(updated_sample_dict) + linesinfile = len(self._sample.sampleDict) logger.debug("Lines in files: %s " % linesinfile) while currentreadsize <= size: targetline = linecount % linesinfile sizeremaining = size - currentreadsize - targetlinesize = len(updated_sample_dict[targetline]['_raw']) + targetlinesize = len(self._sample.sampleDict[targetline]['_raw']) if size < targetlinesize: logger.error( "Size is too small for sample {}. We need {} bytes but size of one event is {} bytes.".format( self._sample.name, size, targetlinesize)) break - if targetlinesize <= sizeremaining or targetlinesize * .9 <= sizeremaining: + if targetlinesize <= sizeremaining: currentreadsize += targetlinesize - eventsDict.append(updated_sample_dict[targetline]) + eventsDict.append(self._sample.sampleDict[targetline]) else: break linecount += 1 logger.debug("Events fill complete for sample '%s' in app '%s' length %d" % (self._sample.name, self._sample.app, len(eventsDict))) - # Ignore token replacement here because we completed it at the beginning of event generation - GeneratorPlugin.build_events(self, eventsDict, startTime, earliest, latest, ignore_tokens=True) + # build the events and replace tokens + GeneratorPlugin.build_events(self, eventsDict, startTime, earliest, latest) def load(): From fe50964980a2715a1855124e02a623130f850d01 Mon Sep 17 00:00:00 2001 From: Tony Lee Date: Mon, 26 Aug 2019 14:48:35 -0700 Subject: [PATCH 04/14] Versioning scheme (#278) --- docs/CONTRIBUTE_CODE.md | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/docs/CONTRIBUTE_CODE.md b/docs/CONTRIBUTE_CODE.md index 14c4453d..79aed24c 100644 --- a/docs/CONTRIBUTE_CODE.md +++ b/docs/CONTRIBUTE_CODE.md @@ -5,18 +5,26 @@ If you want to contribute code to eventgen, please read over the following guide ## Pull request guidelines - If you want to contribute to an eventgen repo, please use a GitHub pull request. This is the fastest way for us to evaluate your code and to merge it into the code base. Please don’t file an issue with snippets of code. Doing so means that we need to manually merge the changes in and update any appropriate tests. That decreases the likelihood that your code is going to get included in a timely manner. Please use pull requests. + +## Release versioning guidelines + +Major Release — Increment the first digit by 1 if the new features break backwards compatibility/current features + +Minor Release — Increment the middle digit by 1 if the new features don’t break any existing features and are compatible with the app in it’s current state + +Patch Release — Increment the last digit by 1 if you’re publishing bug/patch fixes to your app + ### Get started If you’d like to work on a pull request and you’ve never submitted code before, follow these steps: 1. fork eventgen to your github workspace 2. If you want to fix bugs or make enhancement, please make sure there is a issue in eventgen project. Refer [this guide](FILE_ISSUES.md) to create a issue. - After that, you’re ready to start working on code. + ### Working on the code The process of submitting a pull request is fairly straightforward and generally follows the same pattern each time: @@ -75,6 +83,7 @@ The message summary should be a one-sentence description of the change, and it m **Note**: please squash you changes in one commit before firing the pull request. One commit in one PR keeps the git history clean. + #### Step 3: Rebase onto upstream Before you send the pull request, be sure to rebase onto the upstream source. This ensures your code is running on the latest available code. We prefer rebase instead of merge when upstream changes. Rebase keeps the git history clearer. @@ -83,6 +92,7 @@ git fetch upstream git rebase upstream/master ``` + #### Step 4: Run the tests The is a place holder as well. We should write about @@ -101,6 +111,7 @@ Next, push your changes to your clone: git push origin fix/issue123 ``` + #### Step 6: Submit the pull request Before creating a pull request, here are some recommended **check points**. @@ -118,7 +129,6 @@ Next, create a pull request from your branch to the eventgen develop branch. Mark @lephino , @arctan5x , @jmeixensperger , @li-wu , @GordonWang as the reviewers. - ## Code style and formatting tools Since Eventgen is written in python, we apply a coding style based on [PEP8](https://www.python.org/dev/peps/pep-0008/). From 6ef4255dcffcd24ee3c6e5c495a3864b64ba2dc5 Mon Sep 17 00:00:00 2001 From: Jack Meixensperger Date: Thu, 29 Aug 2019 17:56:41 -0700 Subject: [PATCH 05/14] [global] perDayVolume (#288) * exclude global from perDayVolume assignment * Address comment --- splunk_eventgen/eventgen_api_server/eventgen_server_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splunk_eventgen/eventgen_api_server/eventgen_server_api.py b/splunk_eventgen/eventgen_api_server/eventgen_server_api.py index 1bc0ee1d..9880e409 100644 --- a/splunk_eventgen/eventgen_api_server/eventgen_server_api.py +++ b/splunk_eventgen/eventgen_api_server/eventgen_server_api.py @@ -371,7 +371,7 @@ def set_volume(self, target_volume): stanza_num -= 1 divided_volume = float(target_volume) / stanza_num for stanza, kv_pair in conf_dict.iteritems(): - if isinstance(kv_pair, dict) and stanza != '.*' not in stanza: + if isinstance(kv_pair, dict) and stanza != 'global' and '.*' not in stanza: conf_dict[stanza]["perDayVolume"] = divided_volume self.set_conf(conf_dict) From aa48a44e8786d195fff8404e8135774c539ad0cc Mon Sep 17 00:00:00 2001 From: Li Wu Date: Fri, 30 Aug 2019 10:00:51 +0800 Subject: [PATCH 06/14] Fix security vulnerability issue (#289) --- docs/Gemfile.lock | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/docs/Gemfile.lock b/docs/Gemfile.lock index 057c847b..5d1b4e46 100644 --- a/docs/Gemfile.lock +++ b/docs/Gemfile.lock @@ -1,12 +1,12 @@ GEM remote: https://rubygems.org/ specs: - activesupport (4.2.10) + activesupport (4.2.11.1) i18n (~> 0.7) minitest (~> 5.1) thread_safe (~> 0.3, >= 0.3.4) tzinfo (~> 1.1) - addressable (2.5.2) + addressable (2.6.0) public_suffix (>= 2.0.2, < 4.0) coffee-script (2.4.1) coffee-script-source @@ -16,7 +16,7 @@ GEM commonmarker (0.17.13) ruby-enum (~> 0.5) concurrent-ruby (1.1.5) - dnsruby (1.61.2) + dnsruby (1.61.3) addressable (~> 2.5) em-websocket (0.5.1) eventmachine (>= 0.12.9) @@ -27,13 +27,13 @@ GEM execjs (2.7.0) faraday (0.15.4) multipart-post (>= 1.2, < 3) - ffi (1.10.0) + ffi (1.11.1) forwardable-extended (2.6.0) - gemoji (3.0.0) - github-pages (197) - activesupport (= 4.2.10) + gemoji (3.0.1) + github-pages (198) + activesupport (= 4.2.11.1) github-pages-health-check (= 1.16.1) - jekyll (= 3.7.4) + jekyll (= 3.8.5) jekyll-avatar (= 0.6.0) jekyll-coffeescript (= 1.1.1) jekyll-commonmark-ghpages (= 0.1.5) @@ -81,13 +81,13 @@ GEM octokit (~> 4.0) public_suffix (~> 3.0) typhoeus (~> 1.3) - html-pipeline (2.10.0) + html-pipeline (2.12.0) activesupport (>= 2) nokogiri (>= 1.4) http_parser.rb (0.6.0) i18n (0.9.5) concurrent-ruby (~> 1.0) - jekyll (3.7.4) + jekyll (3.8.5) addressable (~> 2.4) colorator (~> 1.0) em-websocket (~> 0.5) @@ -204,14 +204,14 @@ GEM jekyll-feed (~> 0.9) jekyll-seo-tag (~> 2.1) minitest (5.11.3) - multipart-post (2.0.0) - nokogiri (1.10.2) + multipart-post (2.1.1) + nokogiri (1.10.4) mini_portile2 (~> 2.4.0) octokit (4.14.0) sawyer (~> 0.8.0, >= 0.5.3) pathutil (0.16.2) forwardable-extended (~> 2.6) - public_suffix (3.0.3) + public_suffix (3.1.1) rb-fsevent (0.10.3) rb-inotify (0.10.0) ffi (~> 1.0) @@ -219,16 +219,16 @@ GEM ruby-enum (0.7.2) i18n ruby_dep (1.5.0) - rubyzip (1.2.2) + rubyzip (1.2.3) safe_yaml (1.0.5) - sass (3.7.3) + sass (3.7.4) sass-listen (~> 4.0.0) sass-listen (4.0.0) rb-fsevent (~> 0.9, >= 0.9.4) rb-inotify (~> 0.9, >= 0.9.7) - sawyer (0.8.1) - addressable (>= 2.3.5, < 2.6) - faraday (~> 0.8, < 1.0) + sawyer (0.8.2) + addressable (>= 2.3.5) + faraday (> 0.8, < 2.0) terminal-table (1.8.0) unicode-display_width (~> 1.1, >= 1.1.1) thread_safe (0.3.6) @@ -236,7 +236,7 @@ GEM ethon (>= 0.9.0) tzinfo (1.2.5) thread_safe (~> 0.1) - unicode-display_width (1.5.0) + unicode-display_width (1.6.0) PLATFORMS ruby @@ -245,4 +245,4 @@ DEPENDENCIES github-pages BUNDLED WITH - 2.0.1 + 2.0.2 From 77cfabfdc61f8d349693ec9024917a631cae7318 Mon Sep 17 00:00:00 2001 From: Li Wu Date: Fri, 30 Aug 2019 10:22:21 +0800 Subject: [PATCH 07/14] Fix custom plugin stale docs (#290) --- docs/PLUGINS.md | 90 ++++++++++++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 42 deletions(-) diff --git a/docs/PLUGINS.md b/docs/PLUGINS.md index 1f42c973..5866133b 100644 --- a/docs/PLUGINS.md +++ b/docs/PLUGINS.md @@ -16,29 +16,37 @@ Plugins inherit from a base plugin class and are placed in their appropriate dir Let's take a look at the simplest plugin available to us, the Devnull output plugin: ```python -from __future__ import division from outputplugin import OutputPlugin -import sys +from logging_config import logger + class DevNullOutputPlugin(OutputPlugin): + name = 'devnull' MAXQUEUELENGTH = 1000 + useOutputQueue = True - def __init__(self, sample): - OutputPlugin.__init__(self, sample) + def __init__(self, sample, output_counter=None): + OutputPlugin.__init__(self, sample, output_counter) self.firsttime = True def flush(self, q): + logger.info('flush data to devnull') if self.firsttime: self.f = open('/dev/null', 'w') + self.firsttime = False buf = '\n'.join(x['_raw'].rstrip() for x in q) self.f.write(buf) + def load(): """Returns an instance of the plugin""" return DevNullOutputPlugin + ``` -First, we import the OutputPlugin superclass. For output plugins, they define a constant MAXQUEUELENGTH to determine the maximum amount of items in queue before forcing a queue flush. +First, we import the OutputPlugin superclass. For output plugins, they define a constant `MAXQUEUELENGTH` to determine the maximum amount of items in queue before forcing a queue flush. + +`useOutputQueue` is set to `True` here to use the output queue which functions as a reduce step when you need to maintain a single thread or a limited number of threads outputting data ``__init__()`` is very simple. It calls its superclass init and sets one variable, firsttime. ``flush()`` is also very simple. If it's the first time, open the file /dev/null, otherwise, output the queue by writing it to the already open file. @@ -56,26 +64,26 @@ class SplunkStreamOutputPlugin(OutputPlugin): intSettings = [ 'splunkPort' ] ``` -MAXQUEUELENGTH should look normal, but these other class variables need a little explanation. +`MAXQUEUELENGTH` should look normal, but these other class variables need a little explanation. ### Configuration Validation Config validation is a modular system in Eventgen, and plugins must be allowed to specify additional configuration parameters that the main Eventgen will consider valid and store. -*Note that eventgen.conf.spec generation is not yet automated, which means plugins must ship with the default distribution and eventgen.conf.spec must be maintained manually.* +> Note that `eventgen.conf.spec` generation is not yet automated, which means plugins must ship with the default distribution and eventgen.conf.spec must be maintained manually. Eventually spec file generation will be automated as well. The main configuration of Eventgen validates itself by a list of configuration parameters assigned by type, and each of the configuration parameters is validated by that type. The settings list is required: -* validSettings | Defines the list of valid settings for this plugin +* validSettings: Defines the list of valid settings for this plugin The following lists are optional and likely to be used by many plugins: -* intSettings | Will validate the settings as integers -* floatSettings | Will validate the settings as floating point numbers -* boolSettings | Will validate the settings as booleans -* jsonSettings | Will validate the settings as a JSON string -* defaultableSettings | Settings which can be specified in the [global] stanza and will pass down to individual stanzas -* complexSettings | A dictionary of lists or function callbacks, containing a setting name with list of valid options or a callback function to validate the setting. +* intSettings: Will validate the settings as integers +* floatSettings: Will validate the settings as floating point numbers +* boolSettings: Will validate the settings as booleans +* jsonSettings: Will validate the settings as a JSON string +* defaultableSettings: Settings which can be specified in the [global] stanza and will pass down to individual stanzas +* complexSettings: A dictionary of lists or function callbacks, containing a setting name with list of valid options or a callback function to validate the setting. ## Methods required per plugin type @@ -83,7 +91,7 @@ Each plugin type will define a different method required. **Plugin Type** | **Method** | **Returns** | **Notes** --- | --- | --- | --- -Rater | ``rate()`` | Integer count of events to generate | n/a +Rater | ``rate()`` | Integer count of events to generate | N/A Generator | ``gen(count, earliest, latest) `` | Success (0) | Events get put into an output queue by calling the Sample's ``send()`` or ``bulksend()`` methods in the output object. Output | ``flush(q)`` | Success (0) | Gets a deque list q to operate upon and output as configured. @@ -92,48 +100,46 @@ Output | ``flush(q)`` | Success (0) | Gets a deque list q to operate upon and ou We reviewed a simple Output Plugin earlier, let's look at a simple Generator Plugin: ```python -from __future__ import division +import datetime +from datetime import timedelta + from generatorplugin import GeneratorPlugin -import os -import logging -import datetime, time -import itertools -from collections import deque +from logging_config import logger + class WindbagGenerator(GeneratorPlugin): def __init__(self, sample): GeneratorPlugin.__init__(self, sample) - # Logger already setup by config, just get an instance - logger = logging.getLogger('eventgen') - globals()['logger'] = logger - - from eventgenconfig import Config - globals()['c'] = Config() - - def gen(self, count, earliest, latest): - l = [ {'_raw': '2014-01-05 23:07:08 WINDBAG Event 1 of 100000'} for i in xrange(count) ] - - self._out.bulksend(l) + def gen(self, count, earliest, latest, samplename=None): + if count < 0: + logger.warning('Sample size not found for count=-1 and generator=windbag, defaulting to count=60') + count = 60 + time_interval = timedelta.total_seconds((latest - earliest)) / count + for i in xrange(count): + current_time_object = earliest + datetime.timedelta(0, time_interval * (i + 1)) + msg = '{0} -0700 WINDBAG Event {1} of {2}'.format(current_time_object, (i + 1), count) + self._out.send(msg) return 0 + def load(): return WindbagGenerator + ``` -For this generator plugin, notice we inherit from GeneratorPlugin instead of OutputPlugin. This plugin is also quite simple. -In its ``__init__()`` method, it calls the superclass ``__init__()`` and it sets up two global variables, c, which holds the config -(and is a Singleton pattern which can be instantiated many times) and a copy of the logger which we'll use for logging in most plugins. +For this generator plugin, notice we inherit from `GeneratorPlugin` instead of `OutputPlugin`. This plugin is also quite simple. -Secondly, it defines a gen() method, which generates ``count`` events between ``earliest`` and ``latest`` time. In this case, we ignore the timestamp and return just event text. -Then we call bulksend. This plugin has several performance optimizations: using a list constructor instead of a loop and using bulksend instead of send. +Secondly, it defines a `gen()` method, which generates ``count`` events between ``earliest`` and ``latest`` time. In this case, we ignore the timestamp and return just event text. +Then we call `bulksend`. This plugin has several performance optimizations: using a list constructor instead of a loop and using bulksend instead of send. Let's see how this could be implemented in a slightly less performant but easier to understand way: ```python - def gen(self, count, earliest, latest): - for x in xrange(count): - self._sample.send({ '_raw': '2014-01-05 23:07:08 WINDBAG Event 1 of 100000' }) - + def gen(self, count, earliest, latest, samplename=None): + for i in xrange(count): + current_time_object = earliest + datetime.timedelta(0, time_interval * (i + 1)) + msg = '{0} -0700 WINDBAG Event {1} of {2}'.format(current_time_object, (i + 1), count) + self._out.send(msg) return 0 ``` @@ -142,4 +148,4 @@ Here, we use ``send()`` instead of ``bulksend()`` and a loop to make it easier t # Shipping a Plugin When you've developed a plugin that you want to use in your app, shipping it with your app is easy. -Place any Eventgen plugin in your Splunk app's ``bin/`` directory and we'll search for and find any plugins referenced by a ``outputMode``, ``generator`` or ``rater`` config statement. \ No newline at end of file +Place any Eventgen plugin in your Splunk app's ``bin/`` directory and we'll search for and find any plugins referenced by a ``outputMode``, ``generator`` or ``rater`` config statement. From f29c3320f3449cdc1ece1c6933152bbc8519fda1 Mon Sep 17 00:00:00 2001 From: Tony Lee Date: Tue, 3 Sep 2019 12:19:15 -0700 Subject: [PATCH 08/14] Server fix (#293) * Flag added * server fix for count and env clean --- .../eventgen_api_server/eventgen_controller_api.py | 2 +- .../eventgen_api_server/eventgen_server_api.py | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/splunk_eventgen/eventgen_api_server/eventgen_controller_api.py b/splunk_eventgen/eventgen_api_server/eventgen_controller_api.py index 55480909..cd0f2cd2 100644 --- a/splunk_eventgen/eventgen_api_server/eventgen_controller_api.py +++ b/splunk_eventgen/eventgen_api_server/eventgen_controller_api.py @@ -35,7 +35,7 @@ def gather_response(response_number_target=0): if not response_number_target: response_number_target = int(self.redis_connector.message_connection.pubsub_numsub(self.redis_connector.servers_channel)[0][1]) response_num = 0 - countdown = 1.5 / self.interval + countdown = 60 / self.interval for i in range(0, int(countdown)): if response_num == response_number_target: break diff --git a/splunk_eventgen/eventgen_api_server/eventgen_server_api.py b/splunk_eventgen/eventgen_api_server/eventgen_server_api.py index 9880e409..02fb882f 100644 --- a/splunk_eventgen/eventgen_api_server/eventgen_server_api.py +++ b/splunk_eventgen/eventgen_api_server/eventgen_server_api.py @@ -444,6 +444,11 @@ def set_bundle(self, url): def download_bundle(self, url): bundle_path = os.path.join(DEFAULT_PATH, "eg-bundle.tgz") + try: + os.remove(bundle_path) + shutil.rmtree(os.path.join(os.path.dirname(bundle_path), 'eg-bundle')) + except: + pass r = requests.get(url, stream=True) with open(bundle_path, 'wb') as f: for chunk in r.iter_content(chunk_size=None): @@ -457,9 +462,17 @@ def unarchive_bundle(self, path): output = '' if tarfile.is_tarfile(path): tar = tarfile.open(path) + foldername = '' + for name in tar.getnames(): + if '/' not in name: + foldername = name + break output = os.path.join(os.path.dirname(path), os.path.commonprefix(tar.getnames())) tar.extractall(path=os.path.dirname(path)) tar.close() + if foldername: + os.rename(os.path.join(os.path.dirname(path), foldername), os.path.join(os.path.dirname(path), 'eg-bundle')) + output = os.path.join(os.path.dirname(path), 'eg-bundle') elif zipfile.is_zipfile(path): zipf = zipfile.ZipFile(path) for info in zipf.infolist(): From 1720fee954bb47b5d023fede85b8307268ddb517 Mon Sep 17 00:00:00 2001 From: Li Wu Date: Thu, 19 Sep 2019 06:17:58 +0800 Subject: [PATCH 09/14] Fix bug 285 (#297) --- splunk_eventgen/lib/eventgentimer.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/splunk_eventgen/lib/eventgentimer.py b/splunk_eventgen/lib/eventgentimer.py index d500c0ba..5e3c8405 100644 --- a/splunk_eventgen/lib/eventgentimer.py +++ b/splunk_eventgen/lib/eventgentimer.py @@ -1,4 +1,5 @@ import time +import copy from Queue import Full from timeparser import timeParserTimeMath @@ -132,7 +133,10 @@ def real_run(self): break et = backfillearliest lt = timeParserTimeMath(plusminus="+", num=self.interval, unit="s", ret=et) - genPlugin = self.generatorPlugin(sample=self.sample) + copy_sample = copy.copy(self.sample) + tokens = copy.deepcopy(self.sample.tokens) + copy_sample.tokens = tokens + genPlugin = self.generatorPlugin(sample=copy_sample) # need to make sure we set the queue right if we're using multiprocessing or thread modes genPlugin.updateConfig(config=self.config, outqueue=self.outputQueue) genPlugin.updateCounts(count=count, start_time=et, end_time=lt) @@ -176,7 +180,10 @@ def real_run(self): logger.info("Starting '%d' generatorWorkers for sample '%s'" % (self.sample.config.generatorWorkers, self.sample.name)) for worker_id in range(self.config.generatorWorkers): - genPlugin = self.generatorPlugin(sample=self.sample) + copy_sample = copy.copy(self.sample) + tokens = copy.deepcopy(self.sample.tokens) + copy_sample.tokens = tokens + genPlugin = self.generatorPlugin(sample=copy_sample) # Adjust queue for threading mode genPlugin.updateConfig(config=self.config, outqueue=self.outputQueue) genPlugin.updateCounts(count=count, start_time=et, end_time=lt) From 906e4d92f2cc434c58031f94bbadee363d6becd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patrik=20Nordl=C3=A9n?= Date: Tue, 24 Sep 2019 09:20:51 +0200 Subject: [PATCH 10/14] Add syslogAddHeader config directive (#296) * Add syslog header to event in syslog mode --- docs/CONFIGURE.md | 14 ++++++- docs/REFERENCE.md | 3 ++ splunk_eventgen/lib/eventgenconfig.py | 3 +- .../lib/plugins/output/syslogout.py | 20 +++++++++- .../splunk_app/README/eventgen.conf.spec | 15 ++++++- .../plugins/test_syslog_output_with_header.py | 40 +++++++++++++++++++ .../medium_test/eventgen.conf.syslogoutput | 2 +- .../eventgen.conf.syslogoutputwithheader | 11 +++++ 8 files changed, 100 insertions(+), 8 deletions(-) create mode 100644 tests/medium/plugins/test_syslog_output_with_header.py create mode 100644 tests/sample_eventgen_conf/medium_test/eventgen.conf.syslogoutputwithheader diff --git a/docs/CONFIGURE.md b/docs/CONFIGURE.md index 14fe668e..78047bd5 100644 --- a/docs/CONFIGURE.md +++ b/docs/CONFIGURE.md @@ -382,8 +382,10 @@ specifically be supported by all plugins. Plugins that write to files like spool * Set event sourcetype in Splunk to Defaults to 'eventgen' if none specified. host = - * ONLY VALID WITH outputMode SPLUNKSTREAM - * Set event host in Splunk to . Defaults to 127.0.0.1 if none specified. + * When outputMode is splunkstream, set event host in Splunk to . + * When outputMode is syslogout and syslogAddHeader is set to true, add initial header with hostname , + see syslogAddHeader for details. + * Defaults to 127.0.0.1 if none specified. host.token = * PCRE expression used to identify the host name (or partial name) for replacement. @@ -420,6 +422,14 @@ specifically be supported by all plugins. Plugins that write to files like spool * Only supports UDP ports * Required + syslogAddHeader = true | false + * Controls whether syslog messages should be prefixed with an RFC3164 compliant header + including the host value defined for the sample. + * Useful in situations where you want to output generated events to syslog and make it + possible for the receiving syslog server to use the sample's defined host value instead of + the hostname of the host that eventgen is running on. + * Defaults to false + ###### tcpout tcpDestinationHost = * Defaults to 127.0.0.1 diff --git a/docs/REFERENCE.md b/docs/REFERENCE.md index 4488c48d..2c9c8a76 100644 --- a/docs/REFERENCE.md +++ b/docs/REFERENCE.md @@ -139,6 +139,9 @@ syslogDestinationPort = * Defaults to port 1514 * Only supports UDP ports +syslogAddHeader = true | false + * Defaults to false + tcpDestinationHost = * Defaults to 127.0.0.1 diff --git a/splunk_eventgen/lib/eventgenconfig.py b/splunk_eventgen/lib/eventgenconfig.py index 37af3789..fe0d4eed 100644 --- a/splunk_eventgen/lib/eventgenconfig.py +++ b/splunk_eventgen/lib/eventgenconfig.py @@ -88,6 +88,7 @@ class Config(object): 'minuteOfHourRate', 'timezone', 'dayOfMonthRate', 'monthOfYearRate', 'perDayVolume', 'outputWorkers', 'generator', 'rater', 'generatorWorkers', 'timeField', 'sampleDir', 'threading', 'profiler', 'maxIntervalsBeforeFlush', 'maxQueueLength', 'splunkMethod', 'splunkPort', + 'syslogDestinationHost', 'syslogDestinationPort', 'syslogAddHeader', 'verbosity', 'useOutputQueue', 'seed','end', 'autotimestamps', 'autotimestamp', 'httpeventWaitResponse', 'outputCounter', 'sequentialTimestamp', 'extendIndexes', 'disableLoggingQueue'] _validTokenTypes = {'token': 0, 'replacementType': 1, 'replacement': 2} @@ -99,7 +100,7 @@ class Config(object): _floatSettings = ['randomizeCount', 'delay', 'timeMultiple'] _boolSettings = [ 'disabled', 'randomizeEvents', 'bundlelines', 'profiler', 'useOutputQueue', 'autotimestamp', - 'httpeventWaitResponse', 'outputCounter', 'sequentialTimestamp', 'disableLoggingQueue'] + 'httpeventWaitResponse', 'outputCounter', 'sequentialTimestamp', 'disableLoggingQueue', 'syslogAddHeader'] _jsonSettings = [ 'hourOfDayRate', 'dayOfWeekRate', 'minuteOfHourRate', 'dayOfMonthRate', 'monthOfYearRate', 'autotimestamps'] _defaultableSettings = [ diff --git a/splunk_eventgen/lib/plugins/output/syslogout.py b/splunk_eventgen/lib/plugins/output/syslogout.py index b1faad28..226f3bd9 100644 --- a/splunk_eventgen/lib/plugins/output/syslogout.py +++ b/splunk_eventgen/lib/plugins/output/syslogout.py @@ -9,15 +9,26 @@ loggerInitialized = {} +# This filter never returns False, because its purpose is just to add the host field so it's +# available to the logging formatter. +class HostFilter(logging.Filter): + def __init__(self, host): + self.host = host + + def filter(self, record): + record.host = self.host + return True + class SyslogOutOutputPlugin(OutputPlugin): useOutputQueue = True name = 'syslogout' MAXQUEUELENGTH = 10 - validSettings = ['syslogDestinationHost', 'syslogDestinationPort'] - defaultableSettings = ['syslogDestinationHost', 'syslogDestinationPort'] + validSettings = ['syslogDestinationHost', 'syslogDestinationPort', 'syslogAddHeader'] + defaultableSettings = ['syslogDestinationHost', 'syslogDestinationPort', 'syslogAddHeader'] intSettings = ['syslogDestinationPort'] def __init__(self, sample, output_counter=None): + syslogAddHeader = getattr(sample, 'syslogAddHeader', False) OutputPlugin.__init__(self, sample, output_counter) self._syslogDestinationHost = sample.syslogDestinationHost if hasattr( sample, 'syslogDestinationHost') and sample.syslogDestinationHost else '127.0.0.1' @@ -26,6 +37,8 @@ def __init__(self, sample, output_counter=None): loggerName = 'syslog' + sample.name self._l = logging.getLogger(loggerName) + if syslogAddHeader: + self._l.addFilter(HostFilter(host=sample.host)) self._l.setLevel(logging.INFO) global loggerInitialized @@ -34,6 +47,9 @@ def __init__(self, sample, output_counter=None): if loggerName not in loggerInitialized: syslogHandler = logging.handlers.SysLogHandler( address=(self._syslogDestinationHost, int(self._syslogDestinationPort))) + if syslogAddHeader: + formatter = logging.Formatter(fmt='%(asctime)s %(host)s %(message)s', datefmt='%b %d %H:%M:%S') + syslogHandler.setFormatter(formatter) self._l.addHandler(syslogHandler) loggerInitialized[loggerName] = True diff --git a/splunk_eventgen/splunk_app/README/eventgen.conf.spec b/splunk_eventgen/splunk_app/README/eventgen.conf.spec index a4b45cea..422cbd00 100644 --- a/splunk_eventgen/splunk_app/README/eventgen.conf.spec +++ b/splunk_eventgen/splunk_app/README/eventgen.conf.spec @@ -129,6 +129,14 @@ syslogDestinationPort = * Defaults to port 1514 * Only supports UDP ports +syslogAddHeader = true | false + * Controls whether syslog messages should be prefixed with an RFC3164 compliant header + including the host value defined for the sample. + * Useful in situations where you want to output generated events to syslog and make it + possible for the receiving syslog server to use the sample's defined host value instead of + the hostname of the host that eventgen is running on. + * Defaults to false + tcpDestinationHost = * Defaults to 127.0.0.1 @@ -218,8 +226,11 @@ sourcetype = * Set event sourcetype in Splunk to Defaults to 'eventgen' if none specified. host = - * ONLY VALID WITH outputMode SPLUNKSTREAM - * Set event host in Splunk to . Defaults to 127.0.0.1 if none specified. + * ONLY VALID WITH outputMode SPLUNKSTREAM and SYSLOGOUT + * When outputMode is splunkstream, set event host in Splunk to . + * When outputMode is syslogout and syslogAddHeader is set to true, add initial header with hostname , + see syslogAddHeader for details. + * Defaults to 127.0.0.1 if none specified. hostRegex = * ONLY VALID WITH outputMode SPLUNKSTREAM diff --git a/tests/medium/plugins/test_syslog_output_with_header.py b/tests/medium/plugins/test_syslog_output_with_header.py new file mode 100644 index 00000000..4bc69cc2 --- /dev/null +++ b/tests/medium/plugins/test_syslog_output_with_header.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# encoding: utf-8 + +import os +import sys + +from mock import MagicMock, patch + +from splunk_eventgen.__main__ import parse_args +from splunk_eventgen.eventgen_core import EventGenerator +from splunk_eventgen.lib.plugins.output.syslogout import SyslogOutOutputPlugin + +FILE_DIR = os.path.dirname(os.path.abspath(__file__)) + + +class TestSyslogOutputWithHeaderPlugin(object): + def test_output_data_to_syslog_with_header(self): + configfile = "tests/sample_eventgen_conf/medium_test/eventgen.conf.syslogoutputwithheader" + testargs = ["eventgen", "generate", configfile] + with patch.object(sys, 'argv', testargs): + with patch('logging.getLogger'): + pargs = parse_args() + assert pargs.subcommand == 'generate' + assert pargs.configfile == configfile + eventgen = EventGenerator(args=pargs) + + sample = MagicMock() + sample.name = 'test' + sample.syslogDestinationHost = '127.0.0.1' + sample.syslogDestinationPort = 9999 + syslogoutput = SyslogOutOutputPlugin(sample) + + eventgen.start() + for i in xrange(1, 6): + appearance = False + for logger_call in syslogoutput._l.info.call_args_list: + if "WINDBAG Event {} of 5".format(i) in str(logger_call): + appearance = True + if not appearance: + assert False diff --git a/tests/sample_eventgen_conf/medium_test/eventgen.conf.syslogoutput b/tests/sample_eventgen_conf/medium_test/eventgen.conf.syslogoutput index 000b2396..3c71311d 100644 --- a/tests/sample_eventgen_conf/medium_test/eventgen.conf.syslogoutput +++ b/tests/sample_eventgen_conf/medium_test/eventgen.conf.syslogoutput @@ -8,4 +8,4 @@ end = 1 outputMode = syslogout syslogDestinationHost = 127.0.0.1 syslogDestinationPort = 9999 - +syslogAddHeader = false diff --git a/tests/sample_eventgen_conf/medium_test/eventgen.conf.syslogoutputwithheader b/tests/sample_eventgen_conf/medium_test/eventgen.conf.syslogoutputwithheader new file mode 100644 index 00000000..8ce2910a --- /dev/null +++ b/tests/sample_eventgen_conf/medium_test/eventgen.conf.syslogoutputwithheader @@ -0,0 +1,11 @@ +[windbag] +generator = windbag +earliest = -3s +latest = now +interval = 3 +count = 5 +end = 1 +outputMode = syslogout +syslogDestinationHost = 127.0.0.1 +syslogDestinationPort = 9999 +syslogAddHeader = true From 901453c34233695fd8cc2858efd15f0220e21a97 Mon Sep 17 00:00:00 2001 From: Guodong Wang Date: Wed, 25 Sep 2019 10:47:59 +0800 Subject: [PATCH 11/14] timezone setting bugfix #249 --- splunk_eventgen/lib/plugins/generator/replay.py | 2 +- splunk_eventgen/lib/timeparser.py | 2 +- tests/large/conf/eventgen_replay_csv_with_tz.conf | 11 +++++++++++ tests/large/sample/timezone.csv | 5 +++++ tests/large/test_mode_replay.py | 14 +++++++++++++- 5 files changed, 31 insertions(+), 3 deletions(-) create mode 100755 tests/large/conf/eventgen_replay_csv_with_tz.conf create mode 100644 tests/large/sample/timezone.csv diff --git a/splunk_eventgen/lib/plugins/generator/replay.py b/splunk_eventgen/lib/plugins/generator/replay.py index 2de78619..d8c68a49 100644 --- a/splunk_eventgen/lib/plugins/generator/replay.py +++ b/splunk_eventgen/lib/plugins/generator/replay.py @@ -100,7 +100,7 @@ def gen(self, count, earliest, latest, samplename=None): current_event_timestamp = self._sample.getTSFromEvent(line[self._sample.timeField]) except Exception: try: - logger.debug("Sample timeField {} failed to locate. Trying to locate _time field.".format( + logger.error("Sample timeField {} failed to locate. Trying to locate _time field.".format( self._sample.timeField)) current_event_timestamp = self._sample.getTSFromEvent(line["_time"]) except Exception: diff --git a/splunk_eventgen/lib/timeparser.py b/splunk_eventgen/lib/timeparser.py index 812456b3..2298a6ee 100644 --- a/splunk_eventgen/lib/timeparser.py +++ b/splunk_eventgen/lib/timeparser.py @@ -28,7 +28,7 @@ def timeParser(ts='now', timezone=datetime.timedelta(days=1), now=None, utcnow=N return now() else: if utcnow is None: - return datetime.datetime.now() + return datetime.datetime.utcnow() + timezone else: return utcnow() + timezone else: diff --git a/tests/large/conf/eventgen_replay_csv_with_tz.conf b/tests/large/conf/eventgen_replay_csv_with_tz.conf new file mode 100755 index 00000000..21ac7878 --- /dev/null +++ b/tests/large/conf/eventgen_replay_csv_with_tz.conf @@ -0,0 +1,11 @@ +[timezone] +sampleDir = ../sample +mode = replay +sampletype = csv +outputMode = stdout +timezone = -0100 +timeField = _raw + +token.0.token = \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2},\d{3,6} +token.0.replacementType = timestamp +token.0.replacement = %Y-%m-%dT%H:%M:%S,%f diff --git a/tests/large/sample/timezone.csv b/tests/large/sample/timezone.csv new file mode 100644 index 00000000..7591d079 --- /dev/null +++ b/tests/large/sample/timezone.csv @@ -0,0 +1,5 @@ +_time,_raw,index,host,source,sourcetype +"2015-08-18 16:28:54,569","2015-08-18T16:28:54,569 INFO streams_utils:24 - utils::readAsJson:: /usr/local/bamboo/itsi-demo/local/splunk/etc/apps/splunk_app_stream/local/apps",_internal,host5.foobar.com,/usr/local/bamboo/itsi-demo/local/splunk/var/log/splunk/splunk_app_stream.log,splunk_app_stream.log +"2015-08-18 16:28:54,568","2015-08-18T16:28:54,568 INFO streams_utils:74 - create dir /usr/local/bamboo/itsi-demo/local/splunk/etc/apps/splunk_app_stream/local/",_internal,host5.foobar.com,/usr/local/bamboo/itsi-demo/local/splunk/var/log/splunk/splunk_app_stream.log,splunk_app_stream.log +"2015-08-18 16:28:52,270","2015-08-18T16:28:52,270 ERROR pid=16324 tid=MainThread file=__init__.py:execute:957 | Execution failed: [HTTP 401] Client is not authenticated",_internal,host5.foobar.com,/usr/local/bamboo/itsi-demo/local/splunk/var/log/splunk/python_modular_input.log,python_modular_input +"2015-08-18 16:28:52,247","2015-08-18T16:28:52,247 INFO pid=16324 tid=MainThread file=__init__.py:execute:906 | Execute called",_internal,host5.foobar.com,/usr/local/bamboo/itsi-demo/local/splunk/var/log/splunk/python_modular_input.log,python_modular_input diff --git a/tests/large/test_mode_replay.py b/tests/large/test_mode_replay.py index d964101d..99821d99 100644 --- a/tests/large/test_mode_replay.py +++ b/tests/large/test_mode_replay.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, timedelta import re import time @@ -77,3 +77,15 @@ def test_mode_replay_csv(eventgen_test_helper): events = eventgen_test_helper('eventgen_replay_csv.conf').get_events() # assert the events equals to the sample csv file assert len(events) == 10 + + +def test_mode_replay_with_timezone(eventgen_test_helper): + """Test normal replay mode with sampletype = csv which will get _raw row from the sample""" + events = eventgen_test_helper('eventgen_replay_csv_with_tz.conf').get_events() + # assert the events equals to the sample csv file + assert len(events) == 4 + now_ts = datetime.utcnow() + timedelta(hours=-1) + for event in events: + event_ts = datetime.strptime(event.split(' ')[0], '%Y-%m-%dT%H:%M:%S,%f') + d = now_ts - event_ts + assert d.seconds < 60, 'timestamp with timezone check fails.' From e36b1f883d6f7da4725f71e47172d27f6d254663 Mon Sep 17 00:00:00 2001 From: Li Wu Date: Tue, 8 Oct 2019 10:57:19 +0800 Subject: [PATCH 12/14] Using multiprocess pool to address the OOM issue (#301) * Using multiprocess pool to address the OOM issue * Fix test case fail --- .../eventgen_server_api.py | 2 +- splunk_eventgen/eventgen_core.py | 110 +++++------------- splunk_eventgen/lib/eventgentimer.py | 18 ++- 3 files changed, 48 insertions(+), 82 deletions(-) diff --git a/splunk_eventgen/eventgen_api_server/eventgen_server_api.py b/splunk_eventgen/eventgen_api_server/eventgen_server_api.py index 02fb882f..58ae238b 100644 --- a/splunk_eventgen/eventgen_api_server/eventgen_server_api.py +++ b/splunk_eventgen/eventgen_api_server/eventgen_server_api.py @@ -392,7 +392,7 @@ def stop(self, force_stop=False): response = {} if self.eventgen.eventgen_core_object.check_running(): try: - self.eventgen.eventgen_core_object.stop(force_stop=force_stop) + self.eventgen.eventgen_core_object.stop() except: pass response['message'] = "Eventgen is stopped." diff --git a/splunk_eventgen/eventgen_core.py b/splunk_eventgen/eventgen_core.py index 70a93d0a..3db57660 100644 --- a/splunk_eventgen/eventgen_core.py +++ b/splunk_eventgen/eventgen_core.py @@ -6,9 +6,9 @@ import os import sys import time -import signal from Queue import Empty, Queue from threading import Thread +import multiprocessing from lib.eventgenconfig import Config from lib.eventgenexceptions import PluginNotLoaded @@ -140,7 +140,7 @@ def _load_custom_plugins(self, PluginNotLoadedException): # APPPERF-263: be greedy when scanning plugin dir (eat all the pys) self._initializePlugins(plugindir, pluginsdict, plugintype) - def _setup_pools(self, generator_worker_count): + def _setup_pools(self, generator_worker_count=20): ''' This method is an internal method called on init to generate pools needed for processing. @@ -150,7 +150,8 @@ def _setup_pools(self, generator_worker_count): self._create_generator_pool() self._create_timer_threadpool() self._create_output_threadpool() - self._create_generator_workers(generator_worker_count) + if self.args.multiprocess: + self.pool = multiprocessing.Pool(generator_worker_count, maxtasksperchild=1000000) def _create_timer_threadpool(self, threadcount=100): ''' @@ -162,11 +163,13 @@ def _create_timer_threadpool(self, threadcount=100): ''' self.sampleQueue = Queue(maxsize=0) num_threads = threadcount + self.timer_thread_pool = [] for i in range(num_threads): worker = Thread(target=self._worker_do_work, args=( self.sampleQueue, self.loggingQueue, ), name="TimeThread{0}".format(i)) + self.timer_thread_pool.append(worker) worker.setDaemon(True) worker.start() @@ -185,11 +188,13 @@ def _create_output_threadpool(self, threadcount=1): else: self.outputQueue = Queue(maxsize=500) num_threads = threadcount + self.output_thread_pool = [] for i in range(num_threads): worker = Thread(target=self._worker_do_work, args=( self.outputQueue, self.loggingQueue, ), name="OutputThread{0}".format(i)) + self.output_thread_pool.append(worker) worker.setDaemon(True) worker.start() @@ -202,8 +207,7 @@ def _create_generator_pool(self, workercount=20): has over 10 generators working, additional samples won't run until the first ones end. :return: ''' - if self.args.multiprocess: - import multiprocessing + if self.args.multiprocess: self.manager = multiprocessing.Manager() if self.config.disableLoggingQueue: self.loggingQueue = None @@ -234,22 +238,6 @@ def _create_generator_pool(self, workercount=20): worker.setDaemon(True) worker.start() - def _create_generator_workers(self, workercount=20): - if self.args.multiprocess: - import multiprocessing - self.workerPool = [] - for worker in xrange(workercount): - # builds a list of tuples to use the map function - process = multiprocessing.Process(target=self._proc_worker_do_work, args=( - self.workerQueue, - self.loggingQueue, - self.genconfig, - )) - self.workerPool.append(process) - process.start() - else: - pass - def _setup_loggers(self, args=None): self.logger = logger self.loggingQueue = None @@ -293,37 +281,6 @@ def _generator_do_work(self, work_queue, logging_queue, output_counter=None): self.logger.exception(str(e)) raise e - @staticmethod - def _proc_worker_do_work(work_queue, logging_queue, config): - genconfig = config - stopping = genconfig['stopping'] - root = logging.getLogger() - root.setLevel(logging.DEBUG) - if logging_queue is not None: - # TODO https://github.com/splunk/eventgen/issues/217 - qh = logutils.queue.QueueHandler(logging_queue) - root.addHandler(qh) - else: - root.addHandler(logging.StreamHandler()) - while not stopping: - try: - root.info("Checking for work") - item = work_queue.get(timeout=10) - item.logger = root - item._out.updateConfig(item.config) - item.run() - work_queue.task_done() - stopping = genconfig['stopping'] - item.logger.debug("Current Worker Stopping: {0}".format(stopping)) - except Empty: - stopping = genconfig['stopping'] - except Exception as e: - root.exception(e) - raise e - else: - root.info("Stopping Process") - sys.exit(0) - def logger_thread(self, loggingQueue): while not self.stopping: try: @@ -426,8 +383,12 @@ def start(self, join_after_start=True): self.logger.info("Creating timer object for sample '%s' in app '%s'" % (s.name, s.app)) # This is where the timer is finally sent to a queue to be processed. Needs to move to this object. try: - t = Timer(1.0, sample=s, config=self.config, genqueue=self.workerQueue, - outputqueue=self.outputQueue, loggingqueue=self.loggingQueue) + if self.args.multiprocess: + t = Timer(1.0, sample=s, config=self.config, genqueue=self.workerQueue, + outputqueue=self.outputQueue, loggingqueue=self.loggingQueue, pool=self.pool) + else: + t = Timer(1.0, sample=s, config=self.config, genqueue=self.workerQueue, + outputqueue=self.outputQueue, loggingqueue=self.loggingQueue) except PluginNotLoaded as pnl: self._load_custom_plugins(pnl) t = Timer(1.0, sample=s, config=self.config, genqueue=self.workerQueue, @@ -460,6 +421,12 @@ def stop(self, force_stop=False): self.stopping = True self.force_stop = force_stop + # join timer thread and output thread + for output_thread in self.output_thread_pool: + output_thread.join() + for timer_thread in self.timer_thread_pool: + timer_thread.join() + self.logger.info("All timers exited, joining generation queue until it's empty.") if force_stop: self.logger.info("Forcibly stopping Eventgen: Deleting workerQueue.") @@ -472,18 +439,9 @@ def stop(self, force_stop=False): self.kill_processes() else: self.genconfig["stopping"] = True - for worker in self.workerPool: - count = 0 - # We wait for a minute until terminating the worker - while worker.exitcode is None and count != 20: - if count == 30: - self.logger.info("Terminating worker {0}".format(worker._name)) - worker.terminate() - count = 0 - break - self.logger.info("Worker {0} still working, waiting for it to finish.".format(worker._name)) - time.sleep(2) - count += 1 + self.pool.close() + self.pool.join() + self.logger.info("All generators working/exited, joining output queue until it's empty.") if not self.args.multiprocess and not force_stop: self.outputQueue.join() @@ -531,17 +489,13 @@ def check_done(self): :return: if eventgen jobs are finished, return True else False ''' - return self.sampleQueue.empty() and self.sampleQueue.unfinished_tasks <= 0 and self.workerQueue.empty() and self.workerQueue.unfinished_tasks <= 0 + return self.sampleQueue.empty() and self.sampleQueue.unfinished_tasks <= 0 and \ + self.workerQueue.empty() and self.workerQueue.unfinished_tasks <= 0 def kill_processes(self): - try: - if self.args.multiprocess: - for worker in self.workerPool: - try: os.kill(int(worker.pid), signal.SIGKILL) - except: continue - del self.outputQueue - self.manager.shutdown() - except: - pass - - \ No newline at end of file + if self.args.multiprocess and hasattr(self, "pool"): + self.pool.close() + self.pool.terminate() + self.pool.join() + del self.outputQueue + self.manager.shutdown() diff --git a/splunk_eventgen/lib/eventgentimer.py b/splunk_eventgen/lib/eventgentimer.py index 5e3c8405..b802d9c9 100644 --- a/splunk_eventgen/lib/eventgentimer.py +++ b/splunk_eventgen/lib/eventgentimer.py @@ -26,7 +26,7 @@ class Timer(object): countdown = None # Added by CS 5/7/12 to emulate threading.Timer - def __init__(self, time, sample=None, config=None, genqueue=None, outputqueue=None, loggingqueue=None): + def __init__(self, time, sample=None, config=None, genqueue=None, outputqueue=None, loggingqueue=None, pool=None): # Logger already setup by config, just get an instance # setup default options self.profiler = config.profiler @@ -36,6 +36,7 @@ def __init__(self, time, sample=None, config=None, genqueue=None, outputqueue=No self.endts = getattr(self.sample, "endts", None) self.generatorQueue = genqueue self.outputQueue = outputqueue + self.pool = pool self.time = time self.stopping = False self.countdown = 0 @@ -141,7 +142,10 @@ def real_run(self): genPlugin.updateConfig(config=self.config, outqueue=self.outputQueue) genPlugin.updateCounts(count=count, start_time=et, end_time=lt) try: - self.generatorQueue.put(genPlugin, True, 3) + if self.pool is not None: + self.pool.apply_async(run_task, args=(genPlugin,)) + else: + self.generatorQueue.put(genPlugin, True, 3) self.executions += 1 backfillearliest = lt except Full: @@ -189,7 +193,11 @@ def real_run(self): genPlugin.updateCounts(count=count, start_time=et, end_time=lt) try: - self.generatorQueue.put(genPlugin) + if self.pool is not None: + self.pool.apply_async(run_task, args=(genPlugin,)) + else: + self.generatorQueue.put(genPlugin) + logger.debug(("Worker# {0}: Put {1} MB of events in queue for sample '{2}'" + "with et '{3}' and lt '{4}'").format( worker_id, round((count / 1024.0 / 1024), 4), @@ -231,3 +239,7 @@ def real_run(self): else: time.sleep(self.time) self.countdown -= self.time + + +def run_task(generator_plugin): + generator_plugin.run() From 48c25c41d82467f987a2130047252ca55a8b217c Mon Sep 17 00:00:00 2001 From: Jack Meixensperger Date: Tue, 8 Oct 2019 00:12:21 -0700 Subject: [PATCH 13/14] Remove workerQueue unfinished tasks (#302) --- splunk_eventgen/eventgen_core.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/splunk_eventgen/eventgen_core.py b/splunk_eventgen/eventgen_core.py index 3db57660..b5c8cf6d 100644 --- a/splunk_eventgen/eventgen_core.py +++ b/splunk_eventgen/eventgen_core.py @@ -489,8 +489,7 @@ def check_done(self): :return: if eventgen jobs are finished, return True else False ''' - return self.sampleQueue.empty() and self.sampleQueue.unfinished_tasks <= 0 and \ - self.workerQueue.empty() and self.workerQueue.unfinished_tasks <= 0 + return self.sampleQueue.empty() and self.sampleQueue.unfinished_tasks <= 0 and self.workerQueue.empty() def kill_processes(self): if self.args.multiprocess and hasattr(self, "pool"): From 0b8e3f8c18850f19393aca02b57e0df827a6ecce Mon Sep 17 00:00:00 2001 From: Lynch Wu Date: Tue, 8 Oct 2019 16:04:30 +0800 Subject: [PATCH 14/14] Bumped version to 6.5.2 --- splunk_eventgen/version.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splunk_eventgen/version.json b/splunk_eventgen/version.json index ac6c6837..d04847bd 100644 --- a/splunk_eventgen/version.json +++ b/splunk_eventgen/version.json @@ -1 +1 @@ -{"version": "6.5.0"} +{"version": "6.5.2"}