-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Producer] Make sure we don't lose events on producer shutdown #11
Comments
Purpose: - Revisit #16 since I finally figured out a clean way to have a single producer. - Reduce the burden on future code that will need to adjust how polling is done (#31) and maybe handle shutdown (#11) - Prepare for configurable implementation loading, which will need a singleton and getter: openedx/openedx-events#87 - Get rid of the `sync` argument (which didn't fit the abstraction) and move it to a dedicated method. Relying code should now call `get_producer().send(...)` rather than `send_to_event_bus(...)`. The return value is an object that wraps a `Producer` instance (not a `SerializingProducer`) and that handles the serialization itself. Serialization logic is moved to a cached `get_serializers(...)` that expands upon the previous `get_serializer` function; it now returns a pair of key and value serializers. This also acts as a patch point for mocking. `send_to_event_bus` gets a shorter name (now it's just a `send` method) and loses the `sync` keyword argument; there is instead now a `pre_shutdown` method.
Purpose: - Revisit #16 since I finally figured out a clean way to have a single producer. - Reduce the burden on future code that will need to adjust how polling is done (#31) and maybe handle shutdown (#11) - Prepare for configurable implementation loading, which will need a singleton and getter: openedx/openedx-events#87 - Get rid of the `sync` argument (which didn't fit the abstraction) and move it to a dedicated method. Relying code should now call `get_producer().send(...)` rather than `send_to_event_bus(...)`. The return value is an object that wraps a `Producer` instance (not a `SerializingProducer`) and that handles the serialization itself. Serialization logic is moved to a cached `get_serializers(...)` that expands upon the previous `get_serializer` function; it now returns a pair of key and value serializers. This also acts as a patch point for mocking. `send_to_event_bus` gets a shorter name (now it's just a `send` method) and loses the `sync` keyword argument; there is instead now a `pre_shutdown` method.
Purpose: - Revisit #16 since I finally figured out a clean way to have a single producer. - Reduce the burden on future code that will need to adjust how polling is done (#31) and maybe handle shutdown (#11) - Prepare for configurable implementation loading, which will need a singleton and getter: openedx/openedx-events#87 - Get rid of the `sync` argument (which didn't fit the abstraction) and move it to a dedicated method. Relying code should now call `get_producer().send(...)` rather than `send_to_event_bus(...)`. The return value is an object that wraps a `Producer` instance (not a `SerializingProducer`) and that handles the serialization itself. Serialization logic is moved to a cached `get_serializers(...)` that expands upon the previous `get_serializer` function; it now returns a pair of key and value serializers. This also acts as a patch point for mocking. `send_to_event_bus` gets a shorter name (now it's just a `send` method) and loses the `sync` keyword argument; there is instead now a `pre_shutdown` method.
Purpose: - Revisit #16 since I finally figured out a clean way to have a single producer. - Reduce the burden on future code that will need to adjust how polling is done (#31) and maybe handle shutdown (#11) - Prepare for configurable implementation loading, which will need a singleton and getter: openedx/openedx-events#87 - Get rid of the `sync` argument (which didn't fit the abstraction) and move it to a dedicated method. Relying code should now call `get_producer().send(...)` rather than `send_to_event_bus(...)`. The return value is an object that wraps a `Producer` instance (not a `SerializingProducer`) and that handles the serialization itself. Serialization logic is moved to a cached `get_serializers(...)` that expands upon the previous `get_serializer` function; it now returns a pair of key and value serializers. This also acts as a patch point for mocking. I'd like to test the serializers themselves, but they want to talk to a server. `send_to_event_bus` gets a shorter name (now it's just a `send` method) and loses the `sync` keyword argument; there is instead now a `pre_shutdown` method.
Purpose: - Revisit #16 since I finally figured out a clean way to have a single producer. - Reduce the burden on future code that will need to adjust how polling is done (#31) and maybe handle shutdown (#11) - Prepare for configurable implementation loading, which will need a singleton and getter: openedx/openedx-events#87 - Get rid of the `sync` argument (which didn't fit the abstraction) and move it to a dedicated method. Relying code should now call `get_producer().send(...)` rather than `send_to_event_bus(...)`. The return value is an object that wraps a `Producer` instance (not a `SerializingProducer`) and that handles the serialization itself. Serialization logic is moved to a cached `get_serializers(...)` that expands upon the previous `get_serializer` function; it now returns a pair of key and value serializers. This also acts as a patch point for mocking. I'd like to test the serializers themselves, but they want to talk to a server. `send_to_event_bus` gets a shorter name (now it's just a `send` method) and loses the `sync` keyword argument; there is instead now a `prepare_for_shutdown` method. Other refactoring: - Cache `create_schema_registry_client` and rename to `get_...` - Lift producer test data to be instance variables
While I also spent a while looking into ways to perform an action on shutdown and didn't find anything very promising. We could ask gunicorn to make a call, and we might be able to listen for a certain signal? But there's nothing that's obviously portable. Given that, I think we can skip calling flush. |
Deprioritizing this until we learn that this needs addressing. May just want to leave an ADR. May also want to consider the celery worker case (workers get destroyed after a certain number of tasks.) |
We should probably just switch to using the Outbox pattern so we can do away with all these issues with shutdown timing, logging to allow re-production, etc. |
As outlined in #10 we should make sure to flush out any remaining unsent batch of events when the Django server is shutting down.
linger.ms
,batch.size
, and any librdkafka queue settings are set so that messages are sent more or less immediately, for cases when the server shuts down unexpectedlyReference:
Acceptance Criteria
The text was updated successfully, but these errors were encountered: