Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

监听event事件同一个reson会有两条类似的event数据 #236

Open
amberGao123 opened this issue May 25, 2022 · 2 comments
Open

监听event事件同一个reson会有两条类似的event数据 #236

amberGao123 opened this issue May 25, 2022 · 2 comments

Comments

@amberGao123
Copy link

本地参考这个https://github.com/AliyunContainerService/kube-eventer/blob/0f5aaeee21d79da541b9245ee9bdbe83ef445aa6/sources/kubernetes/kubernetes_source.go
实现了对不同集群下的event事件的监听,测试的时候发现,监听的数据会有两条近乎一样的event数据,resourceVersion和annotations不一样,其他都一样,所以是k8s监听的时候每次会处理两次?事件记录两次?
image
image
代码如下:

func (c *EventInterface) GetNewEvents() {
	for {
		start := time.Now()
		select {
		case event := <-c.LocalEventsBuffer:
			fmt.Println("------ event ------")
			data, err := json.Marshal(event)
			if err != nil {
				klog.Fatal(err)
			}
			fmt.Printf("data: %s\n\n", string(data))
		case <-time.After(time.Second * 10):
			fmt.Println("------ timeout ------")
		}
		fmt.Println("time:", time.Since(start))
	}

}



func (c *EventInterface) Watch() {
	for {
		events, err := c.EventInterface.List(context.TODO(), metav1.ListOptions{})
		if err != nil {
			klog.Errorf("Failed to load events: %v", err)
			time.Sleep(time.Second)
			continue
		}
		resourceVersion := events.ResourceVersion

		watcher, err := c.EventInterface.Watch(context.TODO(),
			metav1.ListOptions{
				Watch:           true,
				ResourceVersion: resourceVersion})
		if err != nil {
			klog.Errorf("Failed to start watch for new events: %v", err)
			time.Sleep(time.Second)
			continue
		}

		watchChannel := watcher.ResultChan()
	inner_loop:
		for {
			select {
			case watchUpdate, ok := <-watchChannel:
				if !ok {
					klog.Errorf("Event watch channel closed")
					break inner_loop
				}

				if watchUpdate.Type == kubewatch.Error {
					if status, ok := watchUpdate.Object.(*metav1.Status); ok {
						klog.Errorf("Error during watch: %#v", status)
						break inner_loop
					}
					klog.Errorf("Received unexpected error: %#v", watchUpdate.Object)
					break inner_loop
				}

				if event, ok := watchUpdate.Object.(*kubeapi.Event); ok {
					switch watchUpdate.Type {
					case kubewatch.Added, kubewatch.Modified:
						klog.Infof("[Event = cluster -> %s] -> watchUpdate.", c.ClusterName)
						select {
						case c.LocalEventsBuffer <- event:
							// Ok, buffer not full.
						default:
							// Buffer full, need to drop the event.
							klog.Errorf("Event buffer full, dropping event")
						}
					case kubewatch.Deleted:
						// Deleted events are silently ignored.
					default:
						klog.Warningf("Unknown watchUpdate.Type: %#v", watchUpdate.Type)
					}
				} else {
					klog.Errorf("Wrong object received: %v", watchUpdate)
				}
				//case <-this.stopChannel:
				//	klog.Infof("Event watching stopped")
				//	return
			}
		}
	}
}
@ringtail
Copy link
Member

不同集群是指写了多个source吗

@KeyOfSpectator
Copy link
Member

麻烦提供更多kube-eventer的command配置

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants