Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

EOS-25893: Implement init method for Event_message #670

Merged
merged 8 commits into from
Dec 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 22 additions & 49 deletions py-utils/src/utils/iem_framework/event_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@
import json
import time
import errno
from cortx.utils.common import CortxConf

from cortx.utils import errors
from cortx.template import Singleton
from cortx.utils.conf_store import Conf
from cortx.utils.iem_framework.error import EventMessageError
from cortx.utils.message_bus import MessageProducer, MessageConsumer
from cortx.utils.message_bus import MessageProducer, MessageConsumer, MessageBus
from cortx.utils.log import Log


class EventMessage(metaclass=Singleton):
""" Event Message framework to generate alerts """
_producer = None
_consumer = None
_message_server_endpoints = None

# VALID VALUES for IEC Components
_SEVERITY_LEVELS = {
Expand All @@ -52,56 +52,35 @@ class EventMessage(metaclass=Singleton):
'O': 'OS'
}

@staticmethod
def _initiate_logger():
"""Initialize logger if required."""
Conf.load('config_file', 'json:///etc/cortx/cortx.conf',
skip_reload=True)
# if Log.logger is already initialized by some parent process
# the same file will be used to log all the messagebus related
# logs, else standard iem.log will be used.
if not Log.logger:
LOG_DIR='/var/log'
iem_log_dir = os.path.join(LOG_DIR, 'cortx/utils/iem')
log_level = Conf.get('config_file', 'utils>log_level', 'INFO')
Log.init('iem', iem_log_dir, level=log_level, \
backup_count=5, file_size_in_mb=5)
@classmethod
def prep(cls, cluster_id: str, message_server_endpoints: str):
"""Prepare the Event Message with required."""
cls._cluster_id = cluster_id
cls._message_server_endpoints = message_server_endpoints
MessageBus.init(message_server_endpoints)

@classmethod
def init(cls, component: str, source: str,\
cluster_conf: str = 'yaml:///etc/cortx/cluster.conf'):
def init(cls, component: str, source: str):
"""
Set the Event Message context

Parameters:
component Component that generates the IEM. For e.g. 'S3', 'SSPL'
source Single character that indicates the type of component.
For e.g. H-Hardware, S-Software, F-Firmware, O-OS
cluster_conf ConfStore URL of cluster.conf file
"""
utils_index = 'utils_ind'
CortxConf.init(cluster_conf=cluster_conf)
local_storage = CortxConf.get_storage_path('local')
utils_conf = os.path.join(local_storage, 'utils/conf/utils.conf')
cls._conf_file = f'json://{utils_conf}'
if cls._message_server_endpoints is None:
Log.error("Call IEM.prep() method before calling IEM.init()")
raise EventMessageError(errors.ERR_SERVICE_NOT_INITIALIZED, \
"Call IEM prep() before calling init()")

cls._component = component
cls._source = source

cls._initiate_logger()

try:
Conf.load(utils_index, cls._conf_file, skip_reload=True)
machine_id = Conf.machine_id
ids = Conf.get(utils_index, f'node>{machine_id}')
cls._site_id = ids['site_id']
cls._rack_id = ids['rack_id']
cls._node_id = machine_id
cls._cluster_id = ids['cluster_id']
except Exception as e:
Log.error("Invalid config in %s." % cls._conf_file)
raise EventMessageError(errno.EINVAL, "Invalid config in %s. %s", \
cls._conf_file, e)
machine_id = Conf.machine_id
cls._site_id = 1
cls._rack_id = 1
cls._node_id = machine_id
cls._cluster_id = cls._cluster_id

if cls._component is None:
Log.error("Invalid component type: %s" % cls._component )
Expand All @@ -112,9 +91,8 @@ def init(cls, component: str, source: str,\
Log.error("Invalid source type: %s" % cls._source)
raise EventMessageError(errno.EINVAL, "Invalid source type: %s", \
cls._source)

cls._producer = MessageProducer(producer_id='event_producer', \
message_type='IEM', method='sync', cluster_conf=cluster_conf)
message_type='IEM', method='sync')
Log.info("IEM Producer initialized for component %s and source %s" % \
(cls._component, cls._source))

Expand Down Expand Up @@ -149,7 +127,6 @@ def send(cls, module: str, event_id: str, severity: str, message_blob: str,\
import socket
sender_host = socket.gethostname()

cls._initiate_logger()
if cls._producer is None:
Log.error("IEM Producer not initialized.")
raise EventMessageError(errors.ERR_SERVICE_NOT_INITIALIZED, \
Expand Down Expand Up @@ -215,30 +192,26 @@ def send(cls, module: str, event_id: str, severity: str, message_blob: str,\
Log.debug("alert sent: %s" % alert)

@classmethod
def subscribe(cls, component: str,\
cluster_conf: str = 'yaml:///etc/cortx/cluster.conf', **filters):
def subscribe(cls, component: str, **filters):
"""
Subscribe to IEM alerts

Parameters:
component Component that generates the IEM. For e.g. 'S3', 'SSPL'
cluster_conf ConfStore URL of cluster.conf file
"""
if component is None:
Log.error("Invalid component type: %s" % component)
raise EventMessageError(errno.EINVAL, "Invalid component type: %s", \
component)

cls._consumer = MessageConsumer(consumer_id='event_consumer', \
consumer_group=component, message_types=['IEM'], \
auto_ack=True, offset='earliest', cluster_conf=cluster_conf)
auto_ack=True, offset='earliest')
Log.info("IEM Consumer initialized for component: %s" % component)


@classmethod
def receive(cls):
""" Receive IEM alert message """
cls._initiate_logger()
if cls._consumer is None:
Log.error("IEM Consumer is not subscribed")
raise EventMessageError(errors.ERR_SERVICE_NOT_INITIALIZED, \
Expand Down
26 changes: 16 additions & 10 deletions py-utils/test/iem_framework/test_iem.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import unittest
from cortx.utils.iem_framework import EventMessage
from cortx.utils.iem_framework.error import EventMessageError
from cortx.utils.conf_store import Conf


class TestMessage(unittest.TestCase):
Expand All @@ -31,29 +32,34 @@ def setUpClass(cls, cluster_conf_path: str = 'yaml:///etc/cortx/cluster.conf'):
cls.cluster_conf_path = TestMessage._cluster_conf_path
else:
cls.cluster_conf_path = cluster_conf_path
Conf.load('config', cls.cluster_conf_path, skip_reload=True)
cls.cluster_id = Conf.get('config', 'cluster>id')
cls.message_server_endpoints = Conf.get('config',\
'cortx>external>kafka>endpoints')
EventMessage.prep(cls.cluster_id, cls.message_server_endpoints)

def test_alert_send(self):
""" Test send alerts """
EventMessage.init(component='cmp', source='H', cluster_conf=TestMessage.cluster_conf_path)
EventMessage.init(component='cmp', source='H')
EventMessage.send(module='mod', event_id='500', severity='B', \
message_blob='This is message')

def test_alert_verify_receive(self):
""" Test receive alerts """
EventMessage.subscribe(component='cmp', cluster_conf=TestMessage.cluster_conf_path)
EventMessage.subscribe(component='cmp')
alert = EventMessage.receive()
self.assertIs(type(alert), dict)

def test_bulk_alert_send(self):
""" Test bulk send alerts """
EventMessage.init(component='cmp', source='H', cluster_conf=TestMessage.cluster_conf_path)
EventMessage.init(component='cmp', source='H')
for alert_count in range(0, 1000):
EventMessage.send(module='mod', event_id='500', severity='B', \
message_blob='This is message' + str(alert_count))

def test_bulk_verify_receive(self):
""" Test bulk receive alerts """
EventMessage.subscribe(component='cmp', cluster_conf=TestMessage.cluster_conf_path)
EventMessage.subscribe(component='cmp')
count = 0
while True:
alert = EventMessage.receive()
Expand All @@ -76,15 +82,15 @@ def test_alert_fail_send(self):

def test_receive_without_send(self):
""" Receive message without send """
EventMessage.subscribe(component='cmp', cluster_conf=TestMessage.cluster_conf_path)
EventMessage.subscribe(component='cmp')
alert = EventMessage.receive()
self.assertIsNone(alert)

def test_init_validation(self):
""" Validate init attributes """
with self.assertRaises(EventMessageError):
EventMessage.init(component=None, source='H', cluster_conf=TestMessage.cluster_conf_path)
EventMessage.init(component='cmp', source='I', cluster_conf=TestMessage.cluster_conf_path)
EventMessage.init(component=None, source='H')
EventMessage.init(component='cmp', source='I')

def test_send_validation(self):
""" Validate send attributes """
Expand All @@ -100,17 +106,17 @@ def test_send_validation(self):

def test_subscribe_validation(self):
with self.assertRaises(EventMessageError):
EventMessage.subscribe(component=None, cluster_conf=TestMessage.cluster_conf_path)
EventMessage.subscribe(component=None)

def test_json_alert_send(self):
""" Test send json as message description """
EventMessage.init(component='cmp', source='H', cluster_conf=TestMessage.cluster_conf_path)
EventMessage.init(component='cmp', source='H')
EventMessage.send(module='mod', event_id='500', severity='B', \
message_blob={'input': 'This is message'})

def test_json_verify_receive(self):
""" Test receive json as message description """
EventMessage.subscribe(component='cmp', cluster_conf=TestMessage.cluster_conf_path)
EventMessage.subscribe(component='cmp')
alert = EventMessage.receive()
self.assertIs(type(alert), dict)

Expand Down