Skip to content

Commit

Permalink
Updating Master (#310)
Browse files Browse the repository at this point in the history
* Fix structlog dependency for app (#280)

* zipfile fix (#284)

* Fix bug 286 random token replacement (#287)

* Fix bug 286 random token replacement

* Change perdayvolume generator logic to get random token value replacement

* Versioning scheme (#278)

* [global] perDayVolume (#288)

* exclude global from perDayVolume assignment

* Address comment

* Fix security vulnerability issue (#289)

* Fix custom plugin stale docs (#290)

* Server fix (#293)

* Flag added

* server fix for count and env clean

* Fix bug 285 (#297)

* Add syslogAddHeader config directive (#296)

* Add syslog header to event in syslog mode

* timezone setting bugfix #249

* Using multiprocess pool to address the OOM issue (#301)

* Using multiprocess pool to address the OOM issue

* Fix test case fail

* Remove workerQueue unfinished tasks (#302)

* Bumped version to 6.5.2

* controller fix (#304)

* controller fix

* variable assignment fix (#306)

* add healthcheck endpoint and ping it every half an hour (#308)
  • Loading branch information
Tony Lee authored Oct 14, 2019
1 parent 0924152 commit 502b7f6
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 2 deletions.
13 changes: 13 additions & 0 deletions splunk_eventgen/eventgen_api_server/eventgen_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import socket
import os
import logging
import requests
import time
import threading

from eventgen_controller_api import EventgenControllerAPI
from redis_connector import RedisConnector
Expand All @@ -21,6 +24,7 @@ def __init__(self, *args, **kwargs):
self.redis_connector.register_myself(hostname=self.host, role=self.role)

self._setup_loggers()
self.connections_healthcheck()
self.logger = logging.getLogger('eventgen_server')
self.logger.info('Initialized Eventgen Controller: hostname [{}]'.format(self.host))

Expand All @@ -40,6 +44,15 @@ def index():

return app

def connections_healthcheck(self):
def start_checking():
while True:
time.sleep(60 * 30)
requests.get("http://{}:{}/healthcheck".format("0.0.0.0", int(self.env_vars.get('WEB_SERVER_PORT'))))
thread = threading.Thread(target=start_checking)
thread.daemon = True
thread.start()

def _setup_loggers(self):
log_path = os.path.join(FILE_PATH, 'logs')
eventgen_controller_logger_path = os.path.join(LOG_PATH, 'eventgen-controller.log')
Expand Down
12 changes: 11 additions & 1 deletion splunk_eventgen/eventgen_api_server/eventgen_controller_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,17 @@ def http_reset(target):
except Exception as e:
self.logger.error(e)
return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)


@bp.route('/healthcheck', methods=['GET'], defaults={'target': 'all'})
@bp.route('/healthcheck/<string:target>', methods=['GET'])
def http_healthcheck(target):
try:
message_uuid = publish_message('healthcheck', request.method, target=target)
return Response(json.dumps(gather_response('healthcheck', message_uuid=message_uuid, response_number_target=0 if target == 'all' else 1)), mimetype='application/json', status=200)
except Exception as e:
self.logger.error(e)
return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)

return bp

def __make_error_response(self, status, message):
Expand Down
25 changes: 24 additions & 1 deletion splunk_eventgen/eventgen_api_server/eventgen_server_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def start_listening(self):
thread = threading.Thread(target=start_listening, args=(self,))
thread.daemon = True
thread.start()

def format_message(self, job, request_method, response, message_uuid):
return json.dumps({'job': job, 'request_method': request_method, 'response': response, 'host': self.host, 'message_uuid': message_uuid})

Expand Down Expand Up @@ -102,6 +102,10 @@ def _delegate_jobs(self, job, request_method, body, message_uuid):
message = {'message': 'Eventgen is resetting. Might take some time to reset.'}
self.redis_connector.message_connection.publish(self.redis_connector.controller_channel, self.format_message('reset', request_method, response=message, message_uuid=message_uuid))
self.reset()
elif job == 'healthcheck':
response = self.healthcheck()
message = self.format_message('healthcheck', request_method, response=response, message_uuid=message_uuid)
self.redis_connector.message_connection.publish(self.redis_connector.controller_channel, message)


def _create_blueprint(self):
Expand Down Expand Up @@ -213,6 +217,14 @@ def http_post_setup():
self.logger.error(e)
return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)

@bp.route('/healthcheck', methods=['GET'])
def redis_connection_health():
try:
return Response(json.dumps(self.healthcheck()), mimetype='application/json', status=200)
except Exception as e:
self.logger.error(e)
return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)

return bp

def get_index(self):
Expand Down Expand Up @@ -422,6 +434,17 @@ def reset(self):
response['message'] = "Eventgen has been reset."
return response

def healthcheck(self):
response = {}
try:
self.redis_connector.pubsub.check_health()
response['message'] = "Connections are healthy"
except Exception as e:
self.logger.error("Connection to Redis failed: {}, re-registering".format(str(e)))
self.redis_connector.register_myself(hostname=self.host, role="server")
response['message'] = "Connections unhealthy - re-established connections"
return response

def set_bundle(self, url):
if not url:
return
Expand Down

0 comments on commit 502b7f6

Please sign in to comment.