Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Declarative streams as class methods #163

Open
JeroennC opened this issue Jan 24, 2024 · 0 comments
Open

Declarative streams as class methods #163

JeroennC opened this issue Jan 24, 2024 · 0 comments

Comments

@JeroennC
Copy link
Contributor

Is your feature request related to a problem? Please describe.
The @stream decorator does not play nicely with class methods. The reason to use class methods as the streaming function is to reuse state from the class instance, removing the need to reinitialize for every event that is processed, or having to keep state in the module context.
Using the @stream decorator next to the method brings the advantage that the declarative part of the streaming function is right next to the code.
For example:

    @stream(
        "my-topic",
        group_id="consumer-invalidate-cache",
    )
    async def consume_event(self, cr: ConsumerRecord) -> None:
        self.do_something(cr)

As the @stream decorator performs the UDF typing check, it will see both the self and cr parameters, and assume it needs to pass both cr and stream to call the stream function.
When this is subsequently passed into stream_engine.add_stream(self.consume_event), the self argument is not bound to the class.

The workaround I found to still get the desired behaviour:

def add_stream(self, stream: Stream):
      stream.func = partial(stream.func, self)
      stream.udf_type = UDFType.CR_ONLY_TYPING  # Note: this removes the dynamic part of this check :(
      self.stream_engine.add_stream(stream)

Describe the solution you'd like
It's not directly clear to me what would be a nice approach, especially as by applying the @stream decorator, the method is no longer a method but it becomes a Stream object within the class. This means passing self.consume_event does not bind the self to the method like you may expect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant