Skip to content

Commit

Permalink
Fix process leak and start/stop 500 issue (#344)
Browse files Browse the repository at this point in the history
* Fix process leak and start/stop 500 issue

* Fix test case fail
  • Loading branch information
Li Wu authored Nov 8, 2019
1 parent 1ecce27 commit 34f06af
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 46 deletions.
4 changes: 2 additions & 2 deletions splunk_eventgen/eventgen_api_server/eventgen_core_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ def check_and_configure_eventgen(self):
self.logger.info("Configured Eventgen from {}".format(CUSTOM_CONFIG_PATH))

def refresh_eventgen_core_object(self):
self.eventgen_core_object.kill_processes()
self.eventgen_core_object = eventgen_core.EventGenerator(self._create_args())
self.eventgen_core_object.stop(force_stop=True)
self.configured = False
self.configfile = None
self.check_and_configure_eventgen()
Expand All @@ -37,6 +36,7 @@ def _create_args(self):
args.version = False
args.backfill = None
args.count = None
args.end = None
args.devnull = False
args.disableOutputQueue = False
args.generators = None
Expand Down
7 changes: 1 addition & 6 deletions splunk_eventgen/eventgen_api_server/eventgen_server_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,7 @@ def http_post_start():
@bp.route('/stop', methods=['POST'])
def http_post_stop():
try:
force_stop = False
try:
force_stop = True
except:
force_stop = False
response = self.stop(force_stop = force_stop)
response = self.stop(force_stop=True)
self.eventgen.refresh_eventgen_core_object()
return Response(json.dumps(response), mimetype='application/json', status=200)
except Exception as e:
Expand Down
66 changes: 29 additions & 37 deletions splunk_eventgen/eventgen_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time
from queue import Empty, Queue
import signal
from threading import Thread
from threading import Thread, Event
import multiprocessing

from splunk_eventgen.lib.eventgenconfig import Config
Expand All @@ -32,13 +32,14 @@ def __init__(self, args=None):
localized .conf entries.
:param args: __main__ parse_args() object.
'''
self.stopping = False
self.stop_request = Event()
self.force_stop = False
self.started = False
self.completed = False
self.config = None
self.args = args

self.workerPool = []
self.manager = None
self._setup_loggers(args=args)
# attach to the logging queue
self.logger.info("Logging Setup Complete.")
Expand Down Expand Up @@ -94,9 +95,6 @@ def _load_config(self, configfile, **kwargs):
else:
generator_worker_count = self.config.generatorWorkers

# TODO: Probably should destroy pools better so processes are cleaned.
if self.args.multiprocess:
self.kill_processes()
self._setup_pools(generator_worker_count)

def _reload_plugins(self):
Expand Down Expand Up @@ -192,7 +190,7 @@ def _create_generator_pool(self, workercount=20):
'''
if self.args.multiprocess:
self.manager = multiprocessing.Manager()
if self.config.disableLoggingQueue:
if self.config and self.config.disableLoggingQueue:
self.loggingQueue = None
else:
# TODO crash caused by logging Thread https://github.com/splunk/eventgen/issues/217
Expand Down Expand Up @@ -236,6 +234,7 @@ def _create_generator_workers(self, workercount=20):
))
self.workerPool.append(process)
process.start()
self.logger.info("create process: {}".format(process.pid))
else:
pass

Expand All @@ -252,7 +251,7 @@ def _setup_loggers(self, args=None):
self.logger.setLevel(logging.ERROR)

def _worker_do_work(self, work_queue, logging_queue):
while not self.stopping:
while not self.stop_request.isSet():
try:
item = work_queue.get(timeout=10)
startTime = time.time()
Expand All @@ -271,7 +270,7 @@ def _worker_do_work(self, work_queue, logging_queue):
raise e

def _generator_do_work(self, work_queue, logging_queue, output_counter=None):
while not self.stopping:
while not self.stop_request.isSet():
try:
item = work_queue.get(timeout=10)
startTime = time.time()
Expand Down Expand Up @@ -326,7 +325,7 @@ def _proc_worker_do_work(work_queue, logging_queue, config, disable_logging):
sys.exit(0)

def logger_thread(self, loggingQueue):
while not self.stopping:
while not self.stop_request.isSet():
try:
record = loggingQueue.get(timeout=10)
logger.handle(record)
Expand Down Expand Up @@ -420,7 +419,7 @@ def _initializePlugins(self, dirname, plugins, plugintype, name=None):
return ret

def start(self, join_after_start=True):
self.stopping = False
self.stop_request.clear()
self.started = True
self.config.stopping = False
self.completed = False
Expand Down Expand Up @@ -460,23 +459,19 @@ def join_process(self):
raise e

def stop(self, force_stop=False):
# empty the sample queue:
self.config.stopping = True
self.stopping = True
if hasattr(self.config, "stopping"):
self.config.stopping = True
self.force_stop = force_stop
# set the thread event to stop threads
self.stop_request.set()

self.logger.info("All timers exited, joining generation queue until it's empty.")
if force_stop:
self.logger.info("Forcibly stopping Eventgen: Deleting workerQueue.")
del self.workerQueue
self._create_generator_pool()
self.workerQueue.join()
# if we're in multiprocess, make sure we don't add more generators after the timers stopped.
if self.args.multiprocess:
if force_stop:
self.kill_processes()
else:
self.genconfig["stopping"] = True
if hasattr(self, "genconfig"):
self.genconfig["stopping"] = True
for worker in self.workerPool:
count = 0
# We wait for a minute until terminating the worker
Expand All @@ -490,12 +485,9 @@ def stop(self, force_stop=False):
time.sleep(2)
count += 1

self.logger.info("All generators working/exited, joining output queue until it's empty.")
if not self.args.multiprocess and not force_stop:
self.outputQueue.join()
self.logger.info("All items fully processed. Cleaning up internal processes.")
self.started = False
self.stopping = False
# clear the thread event
self.stop_request.clear()

def reload_conf(self, configfile):
'''
Expand Down Expand Up @@ -540,14 +532,14 @@ def check_done(self):
return self.sampleQueue.empty() and self.sampleQueue.unfinished_tasks <= 0 and self.workerQueue.empty()

def kill_processes(self):
try:
if self.args.multiprocess:
for worker in self.workerPool:
try:
os.kill(int(worker.pid), signal.SIGKILL)
except:
continue
del self.outputQueue
self.manager.shutdown()
except:
pass
self.logger.info("Kill worker processes")
for worker in self.workerPool:
try:
self.logger.info("Kill worker process: {}".format(worker.pid))
os.kill(int(worker.pid), signal.SIGKILL)
except Exception as e:
self.logger.ERROR(str(e))
continue
self.workerPool = []
if self.manager:
self.manager.shutdown()
1 change: 0 additions & 1 deletion splunk_eventgen/lib/eventgenoutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,3 @@ def flush(self, endOfInterval=False):
tmp = None
outputer.run()
q = None

1 change: 1 addition & 0 deletions splunk_eventgen/lib/eventgentimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def real_run(self):
# referenced in the config object, while, self.stopping will only stop this one.
if self.config.stopping or self.stopping:
end = True
continue
count = self.rater.rate()
# First run of the generator, see if we have any backfill work to do.
if self.countdown <= 0:
Expand Down
1 change: 1 addition & 0 deletions splunk_eventgen/lib/plugins/output/syslogout.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def filter(self, record):
record.host = self.host
return True


class SyslogOutOutputPlugin(OutputPlugin):
useOutputQueue = True
name = 'syslogout'
Expand Down

0 comments on commit 34f06af

Please sign in to comment.