Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix process leak and start/stop 500 issue #344

Merged
merged 2 commits into from
Nov 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions splunk_eventgen/eventgen_api_server/eventgen_core_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ def check_and_configure_eventgen(self):
self.logger.info("Configured Eventgen from {}".format(CUSTOM_CONFIG_PATH))

def refresh_eventgen_core_object(self):
self.eventgen_core_object.kill_processes()
self.eventgen_core_object = eventgen_core.EventGenerator(self._create_args())
self.eventgen_core_object.stop(force_stop=True)
self.configured = False
self.configfile = None
self.check_and_configure_eventgen()
Expand All @@ -37,6 +36,7 @@ def _create_args(self):
args.version = False
args.backfill = None
args.count = None
args.end = None
args.devnull = False
args.disableOutputQueue = False
args.generators = None
Expand Down
7 changes: 1 addition & 6 deletions splunk_eventgen/eventgen_api_server/eventgen_server_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,7 @@ def http_post_start():
@bp.route('/stop', methods=['POST'])
def http_post_stop():
try:
force_stop = False
try:
force_stop = True
except:
force_stop = False
response = self.stop(force_stop = force_stop)
response = self.stop(force_stop=True)
self.eventgen.refresh_eventgen_core_object()
return Response(json.dumps(response), mimetype='application/json', status=200)
except Exception as e:
Expand Down
66 changes: 29 additions & 37 deletions splunk_eventgen/eventgen_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time
from queue import Empty, Queue
import signal
from threading import Thread
from threading import Thread, Event
import multiprocessing

from splunk_eventgen.lib.eventgenconfig import Config
Expand All @@ -32,13 +32,14 @@ def __init__(self, args=None):
localized .conf entries.
:param args: __main__ parse_args() object.
'''
self.stopping = False
self.stop_request = Event()
self.force_stop = False
self.started = False
self.completed = False
self.config = None
self.args = args

self.workerPool = []
self.manager = None
self._setup_loggers(args=args)
# attach to the logging queue
self.logger.info("Logging Setup Complete.")
Expand Down Expand Up @@ -94,9 +95,6 @@ def _load_config(self, configfile, **kwargs):
else:
generator_worker_count = self.config.generatorWorkers

# TODO: Probably should destroy pools better so processes are cleaned.
if self.args.multiprocess:
self.kill_processes()
self._setup_pools(generator_worker_count)

def _reload_plugins(self):
Expand Down Expand Up @@ -192,7 +190,7 @@ def _create_generator_pool(self, workercount=20):
'''
if self.args.multiprocess:
self.manager = multiprocessing.Manager()
if self.config.disableLoggingQueue:
if self.config and self.config.disableLoggingQueue:
self.loggingQueue = None
else:
# TODO crash caused by logging Thread https://github.com/splunk/eventgen/issues/217
Expand Down Expand Up @@ -236,6 +234,7 @@ def _create_generator_workers(self, workercount=20):
))
self.workerPool.append(process)
process.start()
self.logger.info("create process: {}".format(process.pid))
else:
pass

Expand All @@ -252,7 +251,7 @@ def _setup_loggers(self, args=None):
self.logger.setLevel(logging.ERROR)

def _worker_do_work(self, work_queue, logging_queue):
while not self.stopping:
while not self.stop_request.isSet():
try:
item = work_queue.get(timeout=10)
startTime = time.time()
Expand All @@ -271,7 +270,7 @@ def _worker_do_work(self, work_queue, logging_queue):
raise e

def _generator_do_work(self, work_queue, logging_queue, output_counter=None):
while not self.stopping:
while not self.stop_request.isSet():
try:
item = work_queue.get(timeout=10)
startTime = time.time()
Expand Down Expand Up @@ -326,7 +325,7 @@ def _proc_worker_do_work(work_queue, logging_queue, config, disable_logging):
sys.exit(0)

def logger_thread(self, loggingQueue):
while not self.stopping:
while not self.stop_request.isSet():
try:
record = loggingQueue.get(timeout=10)
logger.handle(record)
Expand Down Expand Up @@ -420,7 +419,7 @@ def _initializePlugins(self, dirname, plugins, plugintype, name=None):
return ret

def start(self, join_after_start=True):
self.stopping = False
self.stop_request.clear()
self.started = True
self.config.stopping = False
self.completed = False
Expand Down Expand Up @@ -460,23 +459,19 @@ def join_process(self):
raise e

def stop(self, force_stop=False):
# empty the sample queue:
self.config.stopping = True
self.stopping = True
if hasattr(self.config, "stopping"):
self.config.stopping = True
self.force_stop = force_stop
# set the thread event to stop threads
self.stop_request.set()

self.logger.info("All timers exited, joining generation queue until it's empty.")
if force_stop:
self.logger.info("Forcibly stopping Eventgen: Deleting workerQueue.")
del self.workerQueue
self._create_generator_pool()
self.workerQueue.join()
# if we're in multiprocess, make sure we don't add more generators after the timers stopped.
if self.args.multiprocess:
if force_stop:
self.kill_processes()
else:
self.genconfig["stopping"] = True
if hasattr(self, "genconfig"):
self.genconfig["stopping"] = True
for worker in self.workerPool:
count = 0
# We wait for a minute until terminating the worker
Expand All @@ -490,12 +485,9 @@ def stop(self, force_stop=False):
time.sleep(2)
count += 1

self.logger.info("All generators working/exited, joining output queue until it's empty.")
if not self.args.multiprocess and not force_stop:
self.outputQueue.join()
self.logger.info("All items fully processed. Cleaning up internal processes.")
self.started = False
self.stopping = False
# clear the thread event
self.stop_request.clear()

def reload_conf(self, configfile):
'''
Expand Down Expand Up @@ -540,14 +532,14 @@ def check_done(self):
return self.sampleQueue.empty() and self.sampleQueue.unfinished_tasks <= 0 and self.workerQueue.empty()

def kill_processes(self):
try:
if self.args.multiprocess:
for worker in self.workerPool:
try:
os.kill(int(worker.pid), signal.SIGKILL)
except:
continue
del self.outputQueue
self.manager.shutdown()
except:
pass
self.logger.info("Kill worker processes")
for worker in self.workerPool:
try:
self.logger.info("Kill worker process: {}".format(worker.pid))
os.kill(int(worker.pid), signal.SIGKILL)
except Exception as e:
self.logger.ERROR(str(e))
continue
self.workerPool = []
if self.manager:
self.manager.shutdown()
1 change: 0 additions & 1 deletion splunk_eventgen/lib/eventgenoutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,3 @@ def flush(self, endOfInterval=False):
tmp = None
outputer.run()
q = None

1 change: 1 addition & 0 deletions splunk_eventgen/lib/eventgentimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def real_run(self):
# referenced in the config object, while, self.stopping will only stop this one.
if self.config.stopping or self.stopping:
end = True
continue
count = self.rater.rate()
# First run of the generator, see if we have any backfill work to do.
if self.countdown <= 0:
Expand Down
1 change: 1 addition & 0 deletions splunk_eventgen/lib/plugins/output/syslogout.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def filter(self, record):
record.host = self.host
return True


class SyslogOutOutputPlugin(OutputPlugin):
useOutputQueue = True
name = 'syslogout'
Expand Down