-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
temp: sad path poc #43
Conversation
try: | ||
event_key_as_str = str(event_key if event_key else "Missing Key") | ||
event_data_as_str = json.dumps(event_data, allow_nan=True, skipkeys=True, default=lambda x: str(x)) | ||
self.producer.produce(f"{topic}-error", key=event_key_as_str, value=event_data_as_str) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we would need to encode to bytes, here. (We don't actually have to worry about schemas for the error topic, right? This wouldn't be an SerializingProducer and wouldn't know about Avro.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could go that way. I went with just serializing everything to a string first but we could also use a non-serializing producer and try to instruct Kafka not to care about the schema (i'm not 100% positive how to do that but I think it's possible). That would require setting up a separate producer though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's already a regular (non-serializing) Producer
, though -- I completed the work to separate out the serialization step. So all we need to do is convert the strings to bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, it's interesting that it worked without going down to bytes. It's still probably a better idea, but now we know we can send other things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, here we go -- looks like it accepts either string or bytes: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Producer.produce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think there's a value to one over the other? I can't see a particular advantage but at this point you know more about the guts of kafka than I do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably doesn't matter -- some code in there is automatically encoding strings to bytes. Doing the encoding ourselves would be more explicit and so I'd favor it slightly, but I don't think it matters much either way.
except MissingKeyException as mke: | ||
self._send_to_error_topic(topic=topic, event_data=event_data) | ||
except: | ||
self._send_to_error_topic(topic=topic, event_data=event_data, event_key=extract_event_key(event_data, event_key_field)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Questions:
- Will this be returning https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafkaerror? Might we need to investigate details of the error?
- Do we know what types of exceptions could happen here vs in
on_event_deliver
? Does it matter? - Should we send exception information like in https://docs.confluent.io/cloud/current/connectors/dead-letter-queue.html#view-the-dlq error headers?
- Should we rename to
_send_to_error_topic_or_log
? I was confused at first how we were going to get to Kafka if we just had problems getting to Kafka. I think the plan here is just to try, and if it doesn't work, then we log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For 1/2, so far the only remotely helpful information I can find is https://github.com/confluentinc/confluent-kafka-python/blob/master/tests/test_KafkaError.py , but it's not actually as helpful as I would like it to be. It's clear some things raise and some things just pass along the error but I'm having trouble distinguishing them.
3. yes, probably a good idea
4. i put very little thought into method names, i figured they would be cleaned up in the real thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually these tests are slightly more useful, but only slightly https://github.com/confluentinc/confluent-kafka-python/blob/master/tests/test_Producer.py
@rgraber: Should this be closed? |
This is not for actual merging, just for review of the concept of using error topics and standardized logs on the producer side.
The idea is to first try sending events to an error topic. This is for cases where there may be a serialization problem or a missing primary key field or something. If we cannot send to the error topic either, we log in a standard format. The error topic has a very open schema so we should be able to send events that are malformed in multiple ways to the same error topic.