Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

temp: sad path poc #43

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions edx_event_bus_kafka/internal/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class BadConfigurationException(Exception):
pass


class MissingKeyException(Exception):
pass
53 changes: 38 additions & 15 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from openedx_events.tooling import OpenEdxPublicSignal

from .config import get_schema_registry_client, load_common_settings
from .exceptions import BadConfigurationException, MissingKeyException

logger = logging.getLogger(__name__)

Expand All @@ -27,14 +28,16 @@
import confluent_kafka
from confluent_kafka import Producer
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import MessageField, SerializationContext
from confluent_kafka.serialization import MessageField, SerializationContext, StringSerializer
except ImportError: # pragma: no cover
confluent_kafka = None

# CloudEvent standard name for the event type header, see
# https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#325-example
EVENT_TYPE_HEADER_KEY = "ce_type"

LOG_FORMAT_STRING = "Error sending message. Message: {} Error: {}"


def extract_event_key(event_data: dict, event_key_field: str) -> Any:
"""
Expand All @@ -53,14 +56,14 @@ def extract_event_key(event_data: dict, event_key_field: str) -> Any:
for field_name in field_path:
if isinstance(current_data, dict):
if field_name not in current_data:
raise Exception(
raise MissingKeyException(
f"Could not extract key from event; lookup in {event_key_field} "
f"failed at {field_name!r} in dictionary"
)
current_data = current_data[field_name]
else:
if not hasattr(current_data, field_name):
raise Exception(
raise MissingKeyException(
f"Could not extract key from event; lookup in {event_key_field} "
f"failed at {field_name!r} in object"
)
Expand Down Expand Up @@ -134,7 +137,7 @@ def get_serializers(signal: OpenEdxPublicSignal, event_key_field: str):
"""
client = get_schema_registry_client()
if client is None:
raise Exception('Cannot create Kafka serializers -- missing library or settings')
raise BadConfigurationException('Cannot create Kafka serializers -- missing library or settings')

signal_serializer = AvroSignalSerializer(signal)

Expand Down Expand Up @@ -191,16 +194,23 @@ def send(
string naming the dictionary keys to descend)
event_data: The event data (kwargs) sent to the signal
"""
event_key = extract_event_key(event_data, event_key_field)
headers = {EVENT_TYPE_HEADER_KEY: signal.event_type}

key_serializer, value_serializer = get_serializers(signal, event_key_field)
key_bytes = key_serializer(event_key, SerializationContext(topic, MessageField.KEY, headers))
value_bytes = value_serializer(event_data, SerializationContext(topic, MessageField.VALUE, headers))

self.producer.produce(
topic, key=key_bytes, value=value_bytes, headers=headers, on_delivery=on_event_deliver,
)
try:
event_key = extract_event_key(event_data, event_key_field)
headers = {EVENT_TYPE_HEADER_KEY: signal.event_type}

key_serializer, value_serializer = get_serializers(signal, event_key_field)
key_bytes = key_serializer(event_key, SerializationContext(topic, MessageField.KEY, headers))
value_bytes = value_serializer(event_data, SerializationContext(topic, MessageField.VALUE, headers))

self.producer.produce(
topic, key=key_bytes, value=value_bytes, headers=headers, on_delivery=on_event_deliver,
)
except BadConfigurationException as bce:
logger.error(LOG_FORMAT_STRING.format(str(event_data), bce))
except MissingKeyException as mke:
self._send_to_error_topic(topic=topic, event_data=event_data)
except:
self._send_to_error_topic(topic=topic, event_data=event_data, event_key=extract_event_key(event_data, event_key_field))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Questions:

  1. Will this be returning https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafkaerror? Might we need to investigate details of the error?
  2. Do we know what types of exceptions could happen here vs in on_event_deliver? Does it matter?
  3. Should we send exception information like in https://docs.confluent.io/cloud/current/connectors/dead-letter-queue.html#view-the-dlq error headers?
  4. Should we rename to _send_to_error_topic_or_log? I was confused at first how we were going to get to Kafka if we just had problems getting to Kafka. I think the plan here is just to try, and if it doesn't work, then we log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1/2, so far the only remotely helpful information I can find is https://github.com/confluentinc/confluent-kafka-python/blob/master/tests/test_KafkaError.py , but it's not actually as helpful as I would like it to be. It's clear some things raise and some things just pass along the error but I'm having trouble distinguishing them.
3. yes, probably a good idea
4. i put very little thought into method names, i figured they would be cleaned up in the real thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually these tests are slightly more useful, but only slightly https://github.com/confluentinc/confluent-kafka-python/blob/master/tests/test_Producer.py


# Opportunistically ensure any pending callbacks from recent event-sends are triggered.
# This ensures that we're polling at least as often as we're producing, which is a
Expand All @@ -210,6 +220,16 @@ def send(
# poll(0) on a regular interval (see `poll_indefinitely`).
self.producer.poll(0)

def _send_to_error_topic(self, *, topic: str, event_data: dict, event_key = None):
try:
event_key_as_str = str(event_key if event_key else "Missing Key")
event_data_as_str = json.dumps(event_data, allow_nan=True, skipkeys=True, default=lambda x: str(x))
self.producer.produce(f"{topic}-error", key=event_key_as_str, value=event_data_as_str)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would need to encode to bytes, here. (We don't actually have to worry about schemas for the error topic, right? This wouldn't be an SerializingProducer and wouldn't know about Avro.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could go that way. I went with just serializing everything to a string first but we could also use a non-serializing producer and try to instruct Kafka not to care about the schema (i'm not 100% positive how to do that but I think it's possible). That would require setting up a separate producer though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already a regular (non-serializing) Producer, though -- I completed the work to separate out the serialization step. So all we need to do is convert the strings to bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, it's interesting that it worked without going down to bytes. It's still probably a better idea, but now we know we can send other things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think there's a value to one over the other? I can't see a particular advantage but at this point you know more about the guts of kafka than I do

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably doesn't matter -- some code in there is automatically encoding strings to bytes. Doing the encoding ourselves would be more explicit and so I'd favor it slightly, but I don't think it matters much either way.

except e:
logger.error(str(event_data), e)



def prepare_for_shutdown(self):
"""
Prepare producer for a clean shutdown.
Expand Down Expand Up @@ -280,8 +300,9 @@ def get_producer() -> Optional[EventProducerKafka]:
producer_settings = load_common_settings()
if producer_settings is None:
return None
thing = Producer(producer_settings)

return EventProducerKafka(Producer(producer_settings))
return EventProducerKafka(thing)


def on_event_deliver(err, evt):
Expand All @@ -297,6 +318,7 @@ def on_event_deliver(err, evt):
"""
if err is not None:
logger.warning(f"Event delivery failed: {err!r}")
logger.error(LOG_FORMAT_STRING.format(evt, err))
else:
# Don't log msg.value() because it may contain userids and/or emails
logger.info(f"Event delivered to topic {evt.topic()}; key={evt.key()}; "
Expand All @@ -306,5 +328,6 @@ def on_event_deliver(err, evt):
@receiver(setting_changed)
def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument
"""Reset caches when settings change during unit tests."""
logger.info("Clearin'")
get_serializers.cache_clear()
get_producer.cache_clear()