Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assync Task with celery #173

Closed
asrocha opened this issue May 28, 2021 · 7 comments
Closed

Assync Task with celery #173

asrocha opened this issue May 28, 2021 · 7 comments

Comments

@asrocha
Copy link

asrocha commented May 28, 2021

Many python applications (django/Falsk and others ) run tasks on Background (assync) using celery.
This architecture is very useful and good to long time running tasks.
I would like to take advantage of all the control (logs, monitoring, etc.) already existing in the celery tasks code, so that the worker is the orchestrator to send status(job/task started, job/task finished) to zeebe. (maybe a integration with Kafka)
My bet is to use ZeebeTaskRouter to route tasks to celery and using a kind of callback function to set job status to worker or using kafka.
I 'm oppening this issue to discuss the subject and take opnions about how to code it and so create a pull request.

Thanks

@asrocha
Copy link
Author

asrocha commented May 28, 2021

image

@JonatanMartens
Copy link
Collaborator

You could catch the completion of the celery task using an intermediate message catch event.

The zeebe process would look like this:

image

Here is what the zeebe worker would look like:

worker = ZeebeWorker()

@worker.task("start-task")
def start_task():
    # Call the celery task
    celery.send_task("tasks.my_task")

And the celery task:

app = Celery()
zeebe_client = ZeebeClient()

@app.task
def my_task():
    do_stuff()
    zeebe_client.publish_message("message-name", "correlation-key")

You can read more about zeebe messages here.

@asrocha
Copy link
Author

asrocha commented May 31, 2021

@JonatanMartens,
Good idea! I had trying something like it, but, instead of zeebe_client send the message, publish on kafka topic, thereby if I create news tasks on celery I dont need to change worker.
thanks.

@asrocha
Copy link
Author

asrocha commented May 31, 2021

@JonatanMartens thansks again!!
This week I will work hard to do it, I will use Celery Task Signals, to send messages to zeebe, after my tests I will post here how its ended.

@asrocha
Copy link
Author

asrocha commented Jun 2, 2021

@JonatanMartens hello,
Have you idea, how to create @worker.task("start-task") dinamicaly ?I trying to loop for celery tasks and create it once
Like
`for task in celeryapp.control.inspect().registered_tasks()[F'celery@{os.uname().nodename}']:
def start_task():
# Call the celery task
celery.send_task(F"tasks.{task}")

`
but it is raising but no jobs available and at least one broker returned 'RESOURCE_EXHAUSTED'

@kbakk
Copy link
Collaborator

kbakk commented Jun 4, 2021

@asrocha That's not how you'd typically run Zeebe workers, I would say. Can you give an example of a BPMN used for this?

In general, you would have one worker, that handled many different tasks. If different logic is to be performed, you could control that through the variable provided.

E.g. building on the quickstart example:

@worker.task(task_type="my_task")
def my_task(x: int, operation="addition"):
    if operation == "addition":
        return {"y": x + 1}
    elif operation == "subtraction":
        return {"y": x + 1}
    else:
        res = request.get("https://httpbin.org/get").json()
        return {"http-result": res}

The RESOURCE_EXHAUSTED exception would need to be fixed via #140 I think.

@JonatanMartens
Copy link
Collaborator

I'm closing this issue, if you have any more questions feel free to start a new discussion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants