From 59e41ba45be60273c8085b31386eedb1de18c80f Mon Sep 17 00:00:00 2001 From: Rebecca Graber Date: Thu, 3 Aug 2023 13:29:46 -0400 Subject: [PATCH] feat: add json-formatted event data to error logs (#186) --- CHANGELOG.rst | 6 +++++ edx_event_bus_kafka/__init__.py | 2 +- edx_event_bus_kafka/internal/producer.py | 27 ++++++++++++++++--- .../internal/tests/test_producer.py | 4 +++ 4 files changed, 34 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index bb79f04..29684e8 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,12 @@ Change Log Unreleased ********** +[5.2.0] - 2023-08-03 +******************** +Changed +======= +* Added event_data_as_dict to ProducingContext for easier replay from logs + [5.1.0] - 2023-05-17 ******************** Changed diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index c7cbafd..3a20118 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -9,4 +9,4 @@ from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer -__version__ = '5.1.0' +__version__ = '5.2.0' diff --git a/edx_event_bus_kafka/internal/producer.py b/edx_event_bus_kafka/internal/producer.py index 80e145a..7469f6c 100644 --- a/edx_event_bus_kafka/internal/producer.py +++ b/edx_event_bus_kafka/internal/producer.py @@ -136,12 +136,28 @@ def extract_key_schema(signal_serializer: AvroSignalSerializer, event_key_field: return json.dumps(subschema, sort_keys=True) +@lru_cache() +def get_signal_serializer(signal: OpenEdxPublicSignal): + """ + Get the AvroSignalSerializer for a signal + + This is cached in order to save work re-transforming classes into Avro schemas. + + Arguments: + signal: The OpenEdxPublicSignal to make a serializer for. + + Returns: + An AvroSignalSerializer configured to serialize event data to dictionaries + """ + return AvroSignalSerializer(signal) + + @lru_cache def get_serializers(signal: OpenEdxPublicSignal, event_key_field: str): """ - Get the key and value serializers for a signal and a key field path. + Get the full key and value serializers for a signal and a key field path. - This is cached in order to save work re-transforming classes into Avro schemas. + This is cached in order to save work re-transforming AvroSignalSerializers into AvroSerializers. Arguments: signal: The OpenEdxPublicSignal to make a serializer for. @@ -149,13 +165,13 @@ def get_serializers(signal: OpenEdxPublicSignal, event_key_field: str): (period-delimited string naming the field names to descend). Returns: - 2-tuple of AvroSignalSerializers, for event key and value + 2-tuple of AvroSerializers, for event key and value """ client = get_schema_registry_client() if client is None: raise Exception('Cannot create Kafka serializers -- missing library or settings') - signal_serializer = AvroSignalSerializer(signal) + signal_serializer = get_signal_serializer(signal) def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument """Tells Avro how to turn objects into dictionaries.""" @@ -191,6 +207,7 @@ class ProducingContext: event_key_field = attr.ib(type=str, default=None) event_data = attr.ib(type=dict, default=None) event_metadata = attr.ib(type=EventsMetadata, default=None) + event_data_as_json = attr.ib(type=str, default=None) def __repr__(self): """Create a logging-friendly string""" @@ -266,6 +283,7 @@ def send( context = ProducingContext(signal=signal, initial_topic=topic, event_key_field=event_key_field, event_data=event_data, event_metadata=event_metadata) try: + context.event_data_as_json = json.dumps(get_signal_serializer(signal).to_dict(event_data)) full_topic = get_full_topic(topic) context.full_topic = full_topic @@ -361,3 +379,4 @@ def create_producer() -> Optional[KafkaEventProducer]: def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument """Reset caches when settings change during unit tests.""" get_serializers.cache_clear() + get_signal_serializer.cache_clear() diff --git a/edx_event_bus_kafka/internal/tests/test_producer.py b/edx_event_bus_kafka/internal/tests/test_producer.py index 05e42fb..d9f84d6 100644 --- a/edx_event_bus_kafka/internal/tests/test_producer.py +++ b/edx_event_bus_kafka/internal/tests/test_producer.py @@ -245,6 +245,8 @@ def test_full_event_data_present_in_key_extraction_error(self, mock_logger, *arg assert "source='openedx/test/web'" in error_string assert f"id=UUID('{metadata.id}')" in error_string assert f"sourcehost='{metadata.sourcehost}'" in error_string + assert "event_data_as_json='{\"test_data\": {\"course_id\": \"id\", \"sub_name\": \"name\"}}'"\ + in error_string @patch( 'edx_event_bus_kafka.internal.producer.get_serializers', autospec=True, @@ -283,6 +285,8 @@ def test_full_event_data_present_in_kafka_error(self, mock_logger, *args): # since we didn't fail until after key extraction we should have an event_key to report assert "event_key='ABCx'" in error_string assert "error=bad!" in error_string + assert "event_data_as_json='{\"test_data\": {\"course_id\": \"ABCx\", \"sub_name\": \"name\"}}'"\ + in error_string @override_settings(EVENT_BUS_KAFKA_POLL_INTERVAL_SEC=0.05) def test_polling_loop_terminates(self):