Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka support for eventBus #1682

Closed
0xgj opened this issue Feb 28, 2022 · 22 comments
Closed

Add kafka support for eventBus #1682

0xgj opened this issue Feb 28, 2022 · 22 comments
Assignees
Labels
enhancement New feature or request

Comments

@0xgj
Copy link

0xgj commented Feb 28, 2022

Is your feature request related to a problem? Please describe.
Kafka is widely used, and can easily get support from cloud vendors, such as aws, gcloud, azure, aliyun, tencentcloud etc.
It will be good if we can add kafka support for eventBus.

Describe the solution you'd like
Add kafka as an alternative solution for eventbus.

Describe alternatives you've considered
In argo-events, NATs is not only used as eventBus, but also leaderElection for Sensor deployment, so the first step should
be move leader election from NATs to kubernetes. Second, add kafka support for eventBus.

Additional context
Add any other context or screenshots about the feature request here.
As disscused in #1163, using kubernetes for leader election involves extra configuration like RBAC, this should also be taken into consideration; for me, it's more important for data security (cloud vendor to gurrantee our data), instead of use self-maintained NATs.


Message from the maintainers:

If you wish to see this enhancement implemented please add a 👍 reaction to this issue! We often sort issues this way to know what to prioritize.

@0xgj 0xgj added the enhancement New feature or request label Feb 28, 2022
@0xgj
Copy link
Author

0xgj commented Mar 10, 2022

@alexec @whynowy any ideas? This may be a big challenge for people who want to adopt argo-events when there is some team or people already provider message queue service. And if it is possible, i'd like to implement

@whynowy
Copy link
Member

whynowy commented Mar 10, 2022

We are open to adopt Kafka eventbus.

Few things need to be addressed for the implementation:

  1. How to implement multiple dependency triggering like A && B? I don't think there's a way to do that with Kafka only, but a storage like database is needed to store the dependency status.
  2. If an extra storage is introduced, who and how to manage it?

@whynowy
Copy link
Member

whynowy commented Mar 21, 2022

Maybe we can assume both Kafka and the extra storage (e.g. a database) are managed by the providers (users).

@dfarr
Copy link
Member

dfarr commented Oct 14, 2022

We are open to adopt Kafka eventbus.

Few things need to be addressed for the implementation:

  1. How to implement multiple dependency triggering like A && B? I don't think there's a way to do that with Kafka only, but a storage like database is needed to store the dependency status.
  2. If an extra storage is introduced, who and how to manage it?

@whynowy what if the sensor itself was used for storage of the status? This has the benefit of mitigating your second point, also it could be used to standardize the way information is persisted regardless of the event bus type.

An (over-simplified) example:

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: sensor-jetstream
spec:
  dependencies:
  - name: d1
    eventSourceName: es
    eventName: e1
  - name: d2
    eventSourceName: es
    eventName: e2
status:
  dependencies:
    d1: true
    d2: true

@whynowy
Copy link
Member

whynowy commented Oct 14, 2022

We are open to adopt Kafka eventbus.
Few things need to be addressed for the implementation:

  1. How to implement multiple dependency triggering like A && B? I don't think there's a way to do that with Kafka only, but a storage like database is needed to store the dependency status.
  2. If an extra storage is introduced, who and how to manage it?

@whynowy what if the sensor itself was used for storage of the status? This has the benefit of mitigating your second point, also it could be used to standardize the way information is persisted regardless of the event bus type.

An (over-simplified) example:

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: sensor-jetstream
spec:
  dependencies:
  - name: d1
    eventSourceName: es
    eventName: e1
  - name: d2
    eventSourceName: es
    eventName: e2
status:
  dependencies:
    d1: true
    d2: true

This means the application running in the sensor pod has the privilege to watch Sensor objects, this is thing we try to avoid.

@dfarr
Copy link
Member

dfarr commented Oct 14, 2022

Yes, however, the service account would only need namespace scoped access to Sensor objects. I'm curious about the reason for avoiding this?

If information is not shared via the Sensor object I think the only other solution would be to share state through a database which as you point out would need to be managed and maintained. Currently if jetstream is used as an eventbus then the jetstream key-value of store is used for storage, this won't work with kafka - do you think it would desirable to decouple this storage layer from the eventbus?

@whynowy
Copy link
Member

whynowy commented Oct 15, 2022

Yes, however, the service account would only need namespace scoped access to Sensor objects. I'm curious about the reason for avoiding this?

  1. This violates the basic SRP, imagine the controller watches the Sensor object, create pods, and update the Sensor object, meanwhile the created pods also watch the Sensor object, and update the object...
  2. It makes no sense to ask users give extra privileges to execute actions which does not require any of them - I want to trigger a workflow, which only requires Workflow write privilege, why do I need to grant it Sensor read/write access? I want to execute a Lambda or hit an HTTP endpoint, why do I need to give an extra k8s RBAC?
  3. Even though it's just namespace scoped, think about the case your pod being hijacked, then all the Sensor objects in the same namespace are exposed, this is a big security issue.

If information is not shared via the Sensor object I think the only other solution would be to share state through a database which as you point out would need to be managed and maintained. Currently if jetstream is used as an eventbus then the jetstream key-value of store is used for storage, this won't work with kafka - do you think it would desirable to decouple this storage layer from the eventbus?

Why don't put the storage layer together with Kafka as EventBus?

@dfarr
Copy link
Member

dfarr commented Oct 15, 2022

Why don't put the storage layer together with Kafka as EventBus?

Yes this is possible and perhaps the best route given the circumstances, but the drawback here is tight coupling. If the storage layer and the eventbus are tightly coupled then adding a new eventbus requires also adding a new storage technology, decoupling would have the benefit of simplifying argo events (eg: one codepath for maintaining this state vs duplicated code depending on whether or not the user chooses jetstream or kafka as an eventbus).

I understand and agree that mixing of concerns between the controller and the sensor deployment is problematic. What I struggle to understand is why the sensor state is managed independently of the sensor object?

Events are consumed by the sensor deployment, and transition the state of the sensor (and therefore it's trigger conditions). However, the sensor state is opaque, it is not knowable by looking at the sensor object. This feels off to me, the sensor object itself feels like the most logical place to maintain this information. Do you think there is a way we could maintain the sensor state in the sensor object without violating SRP (and security concerns)? For example, could the controller manage the sensor state?

@whynowy
Copy link
Member

whynowy commented Oct 15, 2022

Why don't put the storage layer together with Kafka as EventBus?

Yes this is possible and perhaps the best route given the circumstances, but the drawback here is tight coupling. If the storage layer and the eventbus are tightly coupled then adding a new eventbus requires also adding a new storage technology, decoupling would have the benefit of simplifying argo events (eg: one codepath for maintaining this state vs duplicated code depending on whether or not the user chooses jetstream or kafka as an eventbus).

I think making it easy-to-use for the users is more important than others.

I understand and agree that mixing of concerns between the controller and the sensor deployment is problematic. What I struggle to understand is why the sensor state is managed independently of the sensor object?

Don't make yourself confused by Sensor state and the state of the workload running in the sensor pod, they are two different things. Sensor state represents the orchestration state, when the orchestration is done, any change on the control plane (including k8s control plane and your applications control plane) should not impact your workload - think about if you have an application running in a k8s deployment, and somehow you have your application status stored in the deployment object status - then you will see your application being unreliable because of control plane upgrade or an etcd issue, on the other hand, the other ppl might also see the k8s control plane being unreliable because of application's heavy operations.

Events are consumed by the sensor deployment, and transition the state of the sensor (and therefore it's trigger conditions). However, the sensor state is opaque, it is not knowable by looking at the sensor object. This feels off to me, the sensor object itself feels like the most logical place to maintain this information. Do you think there is a way we could maintain the sensor state in the sensor object without violating SRP (and security concerns)? For example, could the controller manage the sensor state?

The controller should only be responsible for orchestration.

@dfarr
Copy link
Member

dfarr commented Oct 17, 2022

Thanks @whynowy, we would like to tackle implementing this. From what I gather from our conversation

  1. The most important design consideration is developer experience. IMO this means simplifying the pre-reqs for using kafka as an eventbus, as a user I would prefer to not require an additional db to use kafka.
  2. The sensor (and eventsource, eventbus) object should not contain application information/state

@whynowy
Copy link
Member

whynowy commented Oct 17, 2022

I think making it easy-to-use for the users is more important than others.

Thank you so much @dfarr !

@devops-42
Copy link

Hi, I'm wondering that here the EventBus using Kafka is already described, but the link to the whole description is broken.
Moreover, when using the example I got an error:

 message: 'invalid spec: either "nats" or "jetstream" needs to be specified'

Cheers!

@dfarr
Copy link
Member

dfarr commented Mar 24, 2023

Sorry about the link. There's a PR open to fix it, the link is supposed to go here.

 message: 'invalid spec: either "nats" or "jetstream" needs to be specified'

What version of argo events have you deployed?

@devops-42
Copy link

Hi @dfarr

thanks for sharing the links. I use the v1.7.6 release (chart version argo-events-2.1.4)

@dfarr
Copy link
Member

dfarr commented Mar 27, 2023

Hi @dfarr

thanks for sharing the links. I use the v1.7.6 release (chart version argo-events-2.1.4)

There hasn't been a release since the kafka eventbus feature has been merged. You will need to use the master version of argo events until there is a release.

@devops-42
Copy link

Hi @dfarr

have now deployed the latest tag for which I assume that it corresponds to the master branch:

kubectl apply -f https://raw.githubusercontent.com/argoproj/argo-events/master/manifests/install.yaml 

customresourcedefinition.apiextensions.k8s.io/eventbus.argoproj.io created
customresourcedefinition.apiextensions.k8s.io/eventsources.argoproj.io created
customresourcedefinition.apiextensions.k8s.io/sensors.argoproj.io created
serviceaccount/argo-events-sa created
clusterrole.rbac.authorization.k8s.io/argo-events-aggregate-to-admin created
clusterrole.rbac.authorization.k8s.io/argo-events-aggregate-to-edit created
clusterrole.rbac.authorization.k8s.io/argo-events-aggregate-to-view created
clusterrole.rbac.authorization.k8s.io/argo-events-role created
clusterrolebinding.rbac.authorization.k8s.io/argo-events-binding created
configmap/argo-events-controller-config created
deployment.apps/controller-manager created

Then deployed the EventBus resource using the following spec:

cat <<EOM | kubectl apply -n argo-events -f -
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: default
spec:
  kafka:
    url: my-cluster-kafka-brokers.kafka:9092 
    topic: argo.event.bus
EOM

This seems to work now, but when looking into the state of the bus argo complains about exotic specs:

apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: default
  namespace: argo-events
spec:
  kafka:
    topic: argo.event.bus
    url: my-cluster-kafka-brokers.kafka:9092
status:
  conditions:
  - lastTransitionTime: "2023-03-28T08:12:16Z"
    status: "True"
    type: Configured
  - lastTransitionTime: "2023-03-28T08:12:16Z"
    message: Skip deployment because of using exotic config.
    reason: Skipped
    status: "True"
    type: Deployed
  config:
    kafka:
      topic: argo.event.bus
      url: my-cluster-kafka-brokers.kafka:9092                                                                                                                                                                             

Created an eventsource (with Kafka too), and a sensor. The eventsource pod gots notified about an event when publishing a message, but no workflow has been triggered. I assume that is due to the skipped EventBus.

Any chance to get this setup working?

Thanks for your help!

@dfarr
Copy link
Member

dfarr commented Mar 28, 2023

Do you have a kafka broker available at my-cluster-kafka-brokers.kafka:9092? As mentioned in the docs, you are responsible for providing a kafka cluster, Argo Events doesn't create one.

If you don't have a kafka cluster (and topics created, unless you have configured auto create), I would expect to see an connection failure in both the EventSource and Sensor. Can you post the logs from one or both of these?

@devops-42
Copy link

Yes, I have a Kafka broker (from Strimzi) deployed in the namespace kafka. The according topics were set up properly.

image

@dfarr
Copy link
Member

dfarr commented Mar 30, 2023

I cannot replicate this problem locally. Can you provide logs from both your EventSource and Sensor pods?

I am wondering:

  1. Is your kafka broker running in the same namespace as your eventsource/sensor?
  2. Have you set up RBAC for the eventsource pods to have access to k8s leases as described here?

@dfarr
Copy link
Member

dfarr commented Mar 30, 2023

Created an eventsource (with Kafka too), and a sensor. The eventsource pod gots notified about an event when publishing a message, but no workflow has been triggered. I assume that is due to the skipped EventBus.

I forgot to mention reason: Skipped is expected. Kafka requires you to bring your own kafka cluster (in contrast to nats and non-exotic jetstream) so the step to deploy a kafka cluster is skipped.

@whynowy
Copy link
Member

whynowy commented Mar 30, 2023

Closed by #2502.

@whynowy whynowy closed this as completed Mar 30, 2023
@devops-42
Copy link

I forgot to mention reason: Skipped is expected. Kafka requires you to bring your own kafka cluster (in contrast to nats and non-exotic jetstream) so the step to deploy a kafka cluster is skipped.

Ok, so the Skipped... message is a bit misleading :) Finally, it was RBAC. I missed to add the according roles for handling workflows. Now it works completely through Kafka.

Thanks for your help and patience!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants