Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
EOS-26876 : MessageBus, EventMessage code should not refer any config…
Browse files Browse the repository at this point in the history
… file (#687)

* Fix init phase of Miniprovisioning (#645)

* Fix init phase of Miniprovisioning

Signed-off-by: Selvakumar <[email protected]>

* Fix test cases related to message_bus

Signed-off-by: Selvakumar <[email protected]>

* Remove creation of config files

Signed-off-by: Selvakumar <[email protected]>

* Make use of components logger (#650)

* Make use of components logger

Signed-off-by: Selvakumar <[email protected]>

* Use utils logger if initialised

Signed-off-by: Selvakumar <[email protected]>

* Ignore logging in message_bus if logger is not initialised (#652)

* Ignore logging if logger is not initialised

Signed-off-by: Selvakumar <[email protected]>

* Ignore logging if logger is not initialised

Signed-off-by: Selvakumar <[email protected]>

* Updated message_bus tests to take cluster.conf path as input and get kafka endpoints from conf. (#658)

* PATCH: update PyYaml version

Signed-off-by: Rohit Dwivedi <[email protected]>

* EOS-25893: Implement init method for Event_message (#670)

* Iem prep and messagebus init changes

Signed-off-by: suryakumar.kumaravelan <[email protected]>

* Update event_message.py

* Update event_message.py

* Update event_message.py

* Update test_iem.py

* Update event_message.py

* Update test_iem.py

* Update event_message.py

Co-authored-by: Sachin Punadikar <[email protected]>
Co-authored-by: Selva Nambi <[email protected]>

* MessageBus library code should not refer any conf file (#644)

* EOS-25892: Implement init method for message_bus (#610)

* Read passed in config values

Signed-off-by: Selvakumar <[email protected]>

* modify the testcases

Signed-off-by: Selvakumar <[email protected]>

* Initialize logger

Signed-off-by: Selvakumar <[email protected]>

* change parameter name

Signed-off-by: Selvakumar <[email protected]>

* Read conf values in MiniProvisioning init phase

Signed-off-by: Selvakumar <[email protected]>

* dump dict to json string

Signed-off-by: Selvakumar <[email protected]>

* EOS-25892: Implement MessageBus.init() (#639)

* Read the passed in config

Signed-off-by: Selvakumar <[email protected]>

* Read kwargs

Signed-off-by: Selvakumar <[email protected]>

* change config params

Signed-off-by: Selvakumar <[email protected]>

* Remove Invalid line

Signed-off-by: Selvakumar <[email protected]>

* EOS-26899: Message server changes for init argument (#642)

* Message server changes for init argument fixes

Signed-off-by: suryakumar.kumaravelan <[email protected]>

* code optimize

Signed-off-by: suryakumar.kumaravelan <[email protected]>

* Add MessageBus.init() in testcase (#643)

Signed-off-by: Selvakumar <[email protected]>

* Update test_message_bus.py

Co-authored-by: Selva Nambi <[email protected]>
Co-authored-by: SURYA KUMAR K <[email protected]>

Signed-off-by: Sachin Punadikar <[email protected]>

Conflicts:
	py-utils/src/setup/utils.py
	py-utils/src/utils/message_bus/message_bus.py
	py-utils/src/utils/message_bus/message_bus_client.py

* Update utils.py

* Update test_message_bus.py

Co-authored-by: Selva Nambi <[email protected]>
Co-authored-by: veerendra-simha-garikipati <[email protected]>
Co-authored-by: Rohit Dwivedi <[email protected]>
Co-authored-by: SURYA KUMAR K <[email protected]>
  • Loading branch information
5 people authored Jan 7, 2022
1 parent 702f9e5 commit 55150d2
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 275 deletions.
125 changes: 29 additions & 96 deletions py-utils/src/setup/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,67 +92,15 @@ def _get_from_conf_file(key) -> str:
return val

@staticmethod
def _get_server_info(conf_url_index: str, machine_id: str) -> dict:
"""Reads the ConfStore and derives keys related to Event Message.
Args:
conf_url_index (str): Index for loaded conf_url
machine_id (str): Machine_id
Returns:
dict: Server Information
"""
key_list = [f'node>{machine_id}']
ConfKeysV().validate('exists', conf_url_index, key_list)
server_info = Conf.get(conf_url_index, key_list[0])
return server_info

@staticmethod
def _copy_cluster_map(conf_url_index: str):
Conf.load('cluster', 'yaml:///etc/cortx/cluster.conf', skip_reload=True)
cluster_data = Conf.get(conf_url_index, 'node')
def _copy_cluster_map(config_path: str):
Conf.load('cluster', config_path, skip_reload=True)
cluster_data = Conf.get('cluster', 'node')
for _, node_data in cluster_data.items():
hostname = node_data.get('hostname')
node_name = node_data.get('name')
Conf.set('cluster', f'cluster>{node_name}', hostname)
Conf.save('cluster')

@staticmethod
def _create_utils_config(server_info: dict, machine_id: str, message_server_list: list, port_list: list, config: dict):
"""
Create the utils config file required for message_bus and iem
"""
utils_index = 'utils_ind'
local_path = CortxConf.get_storage_path('local')
utils_conf = os.path.join(local_path, 'utils/conf/utils.conf')
utils_conf_sample = utils_conf + '.sample'
with open(utils_conf_sample, 'w+') as file:
json.dump({}, file, indent=2)
# IEM
for _id in ['site_id', 'rack_id']:
if _id not in server_info.keys():
server_info[_id] = 1
Conf.load(utils_index, f'json://{utils_conf_sample}', skip_reload=True)
Conf.set(utils_index, f'node>{machine_id}>cluster_id', server_info['cluster_id'])
Conf.set(utils_index, f'node>{machine_id}>site_id', server_info['site_id'])
Conf.set(utils_index, f'node>{machine_id}>rack_id', server_info['rack_id'])
Conf.set(utils_index, f'node>{machine_id}>node_id', machine_id)
# Message bus conf
Conf.set(utils_index, 'message_broker>type', 'kafka')
for i in range(len(message_server_list)):
Conf.set(utils_index, f'message_broker>cluster[{i}]>server', \
message_server_list[i])
Conf.set(utils_index, f'message_broker>cluster[{i}]>port', port_list[i])
Conf.set(utils_index, 'message_broker>message_bus', config)
Conf.save(utils_index)


# copy this sample conf file as utils.conf
try:
os.rename(utils_conf_sample, utils_conf)
except OSError as e:
raise SetupError(e.errno, "Failed to create %s. %s", utils_conf, e)

@staticmethod
def _configure_rsyslog():
"""
Expand All @@ -173,7 +121,7 @@ def validate(phase: str):
pass

@staticmethod
def post_install(post_install_template: str):
def post_install(config_path: str):
""" Performs post install operations """
# Check required python packages
install_path = Utils._get_from_conf_file('install_path')
Expand All @@ -197,66 +145,31 @@ def post_install(post_install_template: str):
os.makedirs(default_sb_path, exist_ok=True)

post_install_template_index = 'post_install_index'
Conf.load(post_install_template_index, post_install_template)
Conf.load(post_install_template_index, config_path)

machine_id = Conf.machine_id
key_list = [f'node>{machine_id}>hostname', f'node>{machine_id}>name']
ConfKeysV().validate('exists', post_install_template_index, key_list)

#set cluster nodename:hostname mapping to cluster.conf (needed for Support Bundle)
Utils._copy_cluster_map(post_install_template_index)

return 0

@staticmethod
def init(config_path: str):
""" Perform initialization """
# Create message_type for Event Message and audit log
from cortx.utils.message_bus import MessageBusAdmin
from cortx.utils.message_bus.error import MessageBusError
try:
admin = MessageBusAdmin(admin_id='register', cluster_conf=config_path)
admin.register_message_type(message_types=['IEM', 'audit_messages'], \
partitions=1)
except MessageBusError as e:
if 'TOPIC_ALREADY_EXISTS' not in e.desc:
raise SetupError(e.rc, "Unable to create message_type. %s", e)
Utils._copy_cluster_map(config_path)

return 0

@staticmethod
def config(config_template: str):
def config(config_path: str):
"""Performs configurations."""
# Load required files
config_template_index = 'config'
Conf.load(config_template_index, config_template)
Conf.load(config_template_index, config_path)
# Configure log_dir for utils
log_dir = CortxConf.get_storage_path('log')
if log_dir is not None:
CortxConf.set('log_dir', log_dir)
CortxConf.save()

# Add message_bus config to utils conf
try:
from cortx.utils.message_bus import MessageBrokerFactory
server_list, port_list, config = \
MessageBrokerFactory.get_server_list(config_template_index)
except SetupError:
Log.error(f"Could not find server information in {config_template}")
raise SetupError(errno.EINVAL, \
"Could not find server information in %s", config_template)

# Add iem config to utils conf
machine_id = Conf.machine_id
server_info = Utils._get_server_info(config_template_index, machine_id)
if server_info is None:
Log.error(f"Could not find server information in {config_template}")
raise SetupError(errno.EINVAL, "Could not find server " +\
"information in %s", config_template)
Utils._create_utils_config(server_info, machine_id, server_list, port_list, config)

# set cluster nodename:hostname mapping to cluster.conf
Utils._copy_cluster_map(config_template_index)
Utils._copy_cluster_map(config_path)
Utils._configure_rsyslog()

# get shared storage from cluster.conf and set it to cortx.conf
Expand All @@ -282,6 +195,26 @@ def config(config_template: str):
os.chmod(os.path.join(utils_log_dir, 'iem/iem.log'), 0o0666)
return 0

@staticmethod
def init(config_path: str):
""" Perform initialization """
# Create message_type for Event Message
from cortx.utils.message_bus import MessageBus, MessageBusAdmin
from cortx.utils.message_bus.error import MessageBusError
try:
# Read the config values
Conf.load('config', config_path, skip_reload=True)
message_server_endpoints = Conf.get('config', \
'cortx>external>kafka>endpoints')
MessageBus.init(message_server_endpoints)
admin = MessageBusAdmin(admin_id='register')
admin.register_message_type(message_types=['IEM', 'audit_messages'], partitions=1)
except MessageBusError as e:
if 'TOPIC_ALREADY_EXISTS' not in e.desc:
raise SetupError(e.rc, "Unable to create message_type. %s", e)

return 0

@staticmethod
def test(config_path: str, plan: str):
""" Perform configuration testing """
Expand Down
62 changes: 13 additions & 49 deletions py-utils/src/utils/iem_framework/event_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
import json
import time
import errno
from cortx.utils.common import CortxConf
from cortx.utils import errors
from cortx.template import Singleton
from cortx.utils.conf_store import Conf
from cortx.utils.iem_framework.error import EventMessageError
from cortx.utils.message_bus import MessageProducer, MessageConsumer
from cortx.utils.message_bus import MessageProducer, MessageConsumer, MessageBus
from cortx.utils.log import Log


class EventMessage(metaclass=Singleton):
""" Event Message framework to generate alerts """
_producer = None
Expand All @@ -52,56 +50,25 @@ class EventMessage(metaclass=Singleton):
'O': 'OS'
}

@staticmethod
def _initiate_logger():
"""Initialize logger if required."""
Conf.load('config_file', 'json:///etc/cortx/cortx.conf',
skip_reload=True)
# if Log.logger is already initialized by some parent process
# the same file will be used to log all the messagebus related
# logs, else standard iem.log will be used.
if not Log.logger:
LOG_DIR='/var/log'
iem_log_dir = os.path.join(LOG_DIR, 'cortx/utils/iem')
log_level = Conf.get('config_file', 'utils>log_level', 'INFO')
Log.init('iem', iem_log_dir, level=log_level, \
backup_count=5, file_size_in_mb=5)

@classmethod
def init(cls, component: str, source: str,\
cluster_conf: str = 'yaml:///etc/cortx/cluster.conf'):
def init(cls, component: str, source: str, cluster_id: str, \
message_server_endpoints: str, **message_server_kwargs):
"""
Set the Event Message context
Parameters:
component Component that generates the IEM. For e.g. 'S3', 'SSPL'
source Single character that indicates the type of component.
For e.g. H-Hardware, S-Software, F-Firmware, O-OS
cluster_conf ConfStore URL of cluster.conf file
"""
utils_index = 'utils_ind'
CortxConf.init(cluster_conf=cluster_conf)
local_storage = CortxConf.get_storage_path('local')
utils_conf = os.path.join(local_storage, 'utils/conf/utils.conf')
cls._conf_file = f'json://{utils_conf}'

cls._component = component
cls._source = source

cls._initiate_logger()

try:
Conf.load(utils_index, cls._conf_file, skip_reload=True)
machine_id = Conf.machine_id
ids = Conf.get(utils_index, f'node>{machine_id}')
cls._site_id = ids['site_id']
cls._rack_id = ids['rack_id']
cls._node_id = machine_id
cls._cluster_id = ids['cluster_id']
except Exception as e:
Log.error("Invalid config in %s." % cls._conf_file)
raise EventMessageError(errno.EINVAL, "Invalid config in %s. %s", \
cls._conf_file, e)
cls._site_id = 1
cls._rack_id = 1
cls._node_id = Conf.machine_id
cls._cluster_id = cluster_id

if cls._component is None:
Log.error("Invalid component type: %s" % cls._component )
Expand All @@ -112,9 +79,9 @@ def init(cls, component: str, source: str,\
Log.error("Invalid source type: %s" % cls._source)
raise EventMessageError(errno.EINVAL, "Invalid source type: %s", \
cls._source)

MessageBus.init(message_server_endpoints, **message_server_kwargs)
cls._producer = MessageProducer(producer_id='event_producer', \
message_type='IEM', method='sync', cluster_conf=cluster_conf)
message_type='IEM', method='sync')
Log.info("IEM Producer initialized for component %s and source %s" % \
(cls._component, cls._source))

Expand Down Expand Up @@ -149,7 +116,6 @@ def send(cls, module: str, event_id: str, severity: str, message_blob: str,\
import socket
sender_host = socket.gethostname()

cls._initiate_logger()
if cls._producer is None:
Log.error("IEM Producer not initialized.")
raise EventMessageError(errors.ERR_SERVICE_NOT_INITIALIZED, \
Expand Down Expand Up @@ -215,30 +181,28 @@ def send(cls, module: str, event_id: str, severity: str, message_blob: str,\
Log.debug("alert sent: %s" % alert)

@classmethod
def subscribe(cls, component: str,\
cluster_conf: str = 'yaml:///etc/cortx/cluster.conf', **filters):
def subscribe(cls, component: str, message_server_endpoints: str, \
**message_server_kwargs):
"""
Subscribe to IEM alerts
Parameters:
component Component that generates the IEM. For e.g. 'S3', 'SSPL'
cluster_conf ConfStore URL of cluster.conf file
"""
if component is None:
Log.error("Invalid component type: %s" % component)
raise EventMessageError(errno.EINVAL, "Invalid component type: %s", \
component)

MessageBus.init(message_server_endpoints, **message_server_kwargs)
cls._consumer = MessageConsumer(consumer_id='event_consumer', \
consumer_group=component, message_types=['IEM'], \
auto_ack=True, offset='earliest', cluster_conf=cluster_conf)
auto_ack=True, offset='earliest')
Log.info("IEM Consumer initialized for component: %s" % component)


@classmethod
def receive(cls):
""" Receive IEM alert message """
cls._initiate_logger()
if cls._consumer is None:
Log.error("IEM Consumer is not subscribed")
raise EventMessageError(errors.ERR_SERVICE_NOT_INITIALIZED, \
Expand Down
Loading

0 comments on commit 55150d2

Please sign in to comment.