Skip to content

Commit

Permalink
feat: add json-formatted event data to error logs (#186)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rebecca Graber authored Aug 3, 2023
1 parent ca9ee23 commit 59e41ba
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Change Log
Unreleased
**********

[5.2.0] - 2023-08-03
********************
Changed
=======
* Added event_data_as_dict to ProducingContext for easier replay from logs

[5.1.0] - 2023-05-17
********************
Changed
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '5.1.0'
__version__ = '5.2.0'
27 changes: 23 additions & 4 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,26 +136,42 @@ def extract_key_schema(signal_serializer: AvroSignalSerializer, event_key_field:
return json.dumps(subschema, sort_keys=True)


@lru_cache()
def get_signal_serializer(signal: OpenEdxPublicSignal):
"""
Get the AvroSignalSerializer for a signal
This is cached in order to save work re-transforming classes into Avro schemas.
Arguments:
signal: The OpenEdxPublicSignal to make a serializer for.
Returns:
An AvroSignalSerializer configured to serialize event data to dictionaries
"""
return AvroSignalSerializer(signal)


@lru_cache
def get_serializers(signal: OpenEdxPublicSignal, event_key_field: str):
"""
Get the key and value serializers for a signal and a key field path.
Get the full key and value serializers for a signal and a key field path.
This is cached in order to save work re-transforming classes into Avro schemas.
This is cached in order to save work re-transforming AvroSignalSerializers into AvroSerializers.
Arguments:
signal: The OpenEdxPublicSignal to make a serializer for.
event_key_field: Path to descend in the signal schema to find the subschema for the key
(period-delimited string naming the field names to descend).
Returns:
2-tuple of AvroSignalSerializers, for event key and value
2-tuple of AvroSerializers, for event key and value
"""
client = get_schema_registry_client()
if client is None:
raise Exception('Cannot create Kafka serializers -- missing library or settings')

signal_serializer = AvroSignalSerializer(signal)
signal_serializer = get_signal_serializer(signal)

def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
"""Tells Avro how to turn objects into dictionaries."""
Expand Down Expand Up @@ -191,6 +207,7 @@ class ProducingContext:
event_key_field = attr.ib(type=str, default=None)
event_data = attr.ib(type=dict, default=None)
event_metadata = attr.ib(type=EventsMetadata, default=None)
event_data_as_json = attr.ib(type=str, default=None)

def __repr__(self):
"""Create a logging-friendly string"""
Expand Down Expand Up @@ -266,6 +283,7 @@ def send(
context = ProducingContext(signal=signal, initial_topic=topic, event_key_field=event_key_field,
event_data=event_data, event_metadata=event_metadata)
try:
context.event_data_as_json = json.dumps(get_signal_serializer(signal).to_dict(event_data))
full_topic = get_full_topic(topic)
context.full_topic = full_topic

Expand Down Expand Up @@ -361,3 +379,4 @@ def create_producer() -> Optional[KafkaEventProducer]:
def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument
"""Reset caches when settings change during unit tests."""
get_serializers.cache_clear()
get_signal_serializer.cache_clear()
4 changes: 4 additions & 0 deletions edx_event_bus_kafka/internal/tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ def test_full_event_data_present_in_key_extraction_error(self, mock_logger, *arg
assert "source='openedx/test/web'" in error_string
assert f"id=UUID('{metadata.id}')" in error_string
assert f"sourcehost='{metadata.sourcehost}'" in error_string
assert "event_data_as_json='{\"test_data\": {\"course_id\": \"id\", \"sub_name\": \"name\"}}'"\
in error_string

@patch(
'edx_event_bus_kafka.internal.producer.get_serializers', autospec=True,
Expand Down Expand Up @@ -283,6 +285,8 @@ def test_full_event_data_present_in_kafka_error(self, mock_logger, *args):
# since we didn't fail until after key extraction we should have an event_key to report
assert "event_key='ABCx'" in error_string
assert "error=bad!" in error_string
assert "event_data_as_json='{\"test_data\": {\"course_id\": \"ABCx\", \"sub_name\": \"name\"}}'"\
in error_string

@override_settings(EVENT_BUS_KAFKA_POLL_INTERVAL_SEC=0.05)
def test_polling_loop_terminates(self):
Expand Down

0 comments on commit 59e41ba

Please sign in to comment.