Skip to content

Commit

Permalink
feat: EventSource and Sensor HA without extra RBAC (#1163)
Browse files Browse the repository at this point in the history
* feat: EventSource and Sensor HA without extra RBAC

Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Apr 8, 2021
1 parent 1efd3de commit 5cd535b
Show file tree
Hide file tree
Showing 18 changed files with 242 additions and 270 deletions.
159 changes: 159 additions & 0 deletions common/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package leaderelection

import (
"context"

"github.com/fsnotify/fsnotify"
"github.com/nats-io/graft"
nats "github.com/nats-io/nats.go"
"github.com/pkg/errors"
"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventbusdriver "github.com/argoproj/argo-events/eventbus/driver"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
)

type Elector interface {
RunOrDie(context.Context, LeaderCallbacks)
}

type LeaderCallbacks struct {
OnStartedLeading func(context.Context)
OnStoppedLeading func()
}

func NewEventBusElector(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig, clusterName string, clusterSize int) (Elector, error) {
logger := logging.FromContext(ctx)
var eventBusType apicommon.EventBusType
var eventBusAuth *eventbusv1alpha1.AuthStrategy
if eventBusConfig.NATS != nil {
eventBusType = apicommon.EventBusNATS
eventBusAuth = eventBusConfig.NATS.Auth
} else {
return nil, errors.New("invalid event bus")
}
var auth *eventbusdriver.Auth
cred := &eventbusdriver.AuthCredential{}
if eventBusAuth == nil || *eventBusAuth == eventbusv1alpha1.AuthStrategyNone {
auth = &eventbusdriver.Auth{
Strategy: eventbusv1alpha1.AuthStrategyNone,
}
} else {
v := viper.New()
v.SetConfigName("auth")
v.SetConfigType("yaml")
v.AddConfigPath(common.EventBusAuthFileMountPath)
err := v.ReadInConfig()
if err != nil {
return nil, errors.Errorf("failed to load auth.yaml. err: %+v", err)
}
err = v.Unmarshal(cred)
if err != nil {
logger.Errorw("failed to unmarshal auth.yaml", zap.Error(err))
return nil, err
}
v.WatchConfig()
v.OnConfigChange(func(e fsnotify.Event) {
logger.Info("eventbus auth config file changed.")
err = v.Unmarshal(cred)
if err != nil {
logger.Errorw("failed to unmarshal auth.yaml after reloading", zap.Error(err))
}
})
auth = &eventbusdriver.Auth{
Strategy: *eventBusAuth,
Crendential: cred,
}
}
var elector Elector
switch eventBusType {
case apicommon.EventBusNATS:
elector = &natsEventBusElector{
clusterName: clusterName,
size: clusterSize,
url: eventBusConfig.NATS.URL,
auth: auth,
}
default:
return nil, errors.New("invalid eventbus type")
}
return elector, nil
}

type natsEventBusElector struct {
clusterName string
size int
url string
auth *eventbusdriver.Auth
}

func (e *natsEventBusElector) RunOrDie(ctx context.Context, callbacks LeaderCallbacks) {
log := logging.FromContext(ctx)
ci := graft.ClusterInfo{Name: e.clusterName, Size: e.size}
opts := &nats.DefaultOptions
opts.Url = e.url
if e.auth.Strategy == eventbusv1alpha1.AuthStrategyToken {
opts.Token = e.auth.Crendential.Token
}
rpc, err := graft.NewNatsRpc(opts)
if err != nil {
log.Fatalw("failed to new Nats Rpc", zap.Error(err))
}
errChan := make(chan error)
stateChangeChan := make(chan graft.StateChange)
handler := graft.NewChanHandler(stateChangeChan, errChan)
node, err := graft.New(ci, handler, rpc, "/tmp/graft.log")
if err != nil {
log.Fatalw("failed to new a node", zap.Error(err))
}
defer node.Close()

cctx, cancel := context.WithCancel(ctx)
defer cancel()

if node.State() == graft.LEADER {
log.Info("I'm the LEADER, starting ...")
go callbacks.OnStartedLeading(cctx)
} else {
log.Info("Not the LEADER, stand by ...")
}

handleStateChange := func(sc graft.StateChange) {
switch sc.To {
case graft.LEADER:
log.Info("I'm the LEADER, starting ...")
go callbacks.OnStartedLeading(cctx)
case graft.FOLLOWER, graft.CANDIDATE:
log.Infof("Becoming a %v, stand by ...", sc.To)
if sc.From == graft.LEADER {
cancel()
callbacks.OnStoppedLeading()
cctx, cancel = context.WithCancel(ctx)
}
case graft.CLOSED:
if sc.From == graft.LEADER {
cancel()
callbacks.OnStoppedLeading()
}
log.Fatal("Leader elector connection was CLOSED")
default:
log.Fatalf("Unknown state: %s", sc.To)
}
}

for {
select {
case <-ctx.Done():
log.Info("exiting...")
return
case sc := <-stateChangeChan:
handleStateChange(sc)
case err := <-errChan:
log.Errorw("Error happened", zap.Error(err))
}
}
}
14 changes: 0 additions & 14 deletions controllers/eventsource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"go.uber.org/zap"
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -71,9 +70,6 @@ func (r *reconciler) reconcile(ctx context.Context, eventSource *v1alpha1.EventS
log.Info("deleting eventsource")
if controllerutil.ContainsFinalizer(eventSource, finalizerName) {
// Finalizer logic should be added here.
if err := r.finalize(ctx, eventSource); err != nil {
return err
}
controllerutil.RemoveFinalizer(eventSource, finalizerName)
}
return nil
Expand All @@ -97,16 +93,6 @@ func (r *reconciler) reconcile(ctx context.Context, eventSource *v1alpha1.EventS
return Reconcile(r.client, args, log)
}

func (r *reconciler) finalize(ctx context.Context, eventSource *v1alpha1.EventSource) error {
// Clean up Lease objects if there's any
if err := r.client.DeleteAllOf(ctx, &coordinationv1.Lease{},
client.InNamespace(eventSource.Namespace),
client.MatchingFields{"metadata.name": "eventsource-" + eventSource.Name}); err != nil {
return err
}
return nil
}

func (r *reconciler) needsUpdate(old, new *v1alpha1.EventSource) bool {
if old == nil {
return true
Expand Down
27 changes: 5 additions & 22 deletions controllers/eventsource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (

"github.com/argoproj/argo-events/common"
controllerscommon "github.com/argoproj/argo-events/controllers/common"
"github.com/argoproj/argo-events/eventsources"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
)
Expand Down Expand Up @@ -221,9 +219,14 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
},
},
})
emptyDirVolName := "tmp"
volumes = append(volumes, corev1.Volume{
Name: emptyDirVolName, VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}},
})
deploymentSpec.Template.Spec.Volumes = volumes
volumeMounts := deploymentSpec.Template.Spec.Containers[0].VolumeMounts
volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: "auth-volume", MountPath: common.EventBusAuthFileMountPath})
volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: emptyDirVolName, MountPath: "/tmp"})
deploymentSpec.Template.Spec.Containers[0].VolumeMounts = volumeMounts
}
} else {
Expand Down Expand Up @@ -332,26 +335,6 @@ func buildDeploymentSpec(args *AdaptorArgs) (*appv1.DeploymentSpec, error) {
spec.Template.Spec.PriorityClassName = args.EventSource.Spec.Template.PriorityClassName
spec.Template.Spec.Priority = args.EventSource.Spec.Template.Priority
}
allEventTypes := eventsources.GetEventingServers(args.EventSource, nil)
recreateTypes := make(map[apicommon.EventSourceType]bool)
for _, esType := range apicommon.RecreateStrategyEventSources {
recreateTypes[esType] = true
}
recreates := 0
for eventType := range allEventTypes {
if _, ok := recreateTypes[eventType]; ok {
recreates++
break
}
}
if recreates > 0 && replicas == 1 {
// For those event types, if there's only 1 replica, use recreate strategy.
// If replicas > 1, which means HA is available for them, rolling update strategy
// is better.
spec.Strategy = appv1.DeploymentStrategy{
Type: appv1.RecreateDeploymentStrategyType,
}
}
return spec, nil
}

Expand Down
14 changes: 0 additions & 14 deletions controllers/sensor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"

"go.uber.org/zap"
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -87,9 +86,6 @@ func (r *reconciler) reconcile(ctx context.Context, sensor *v1alpha1.Sensor) err
log.Info("deleting sensor")
if controllerutil.ContainsFinalizer(sensor, finalizerName) {
// Finalizer logic should be added here.
if err := r.finalize(ctx, sensor); err != nil {
return err
}
controllerutil.RemoveFinalizer(sensor, finalizerName)
}
return nil
Expand All @@ -113,16 +109,6 @@ func (r *reconciler) reconcile(ctx context.Context, sensor *v1alpha1.Sensor) err
return Reconcile(r.client, args, log)
}

func (r *reconciler) finalize(ctx context.Context, sensor *v1alpha1.Sensor) error {
// Clean up Lease objects if there's any
if err := r.client.DeleteAllOf(ctx, &coordinationv1.Lease{},
client.InNamespace(sensor.Namespace),
client.MatchingFields{"metadata.name": "sensor-" + sensor.Name}); err != nil {
return err
}
return nil
}

func (r *reconciler) needsUpdate(old, new *v1alpha1.Sensor) bool {
if old == nil {
return true
Expand Down
9 changes: 5 additions & 4 deletions controllers/sensor/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,14 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
},
},
})
emptyDirVolName := "tmp"
volumes = append(volumes, corev1.Volume{
Name: emptyDirVolName, VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}},
})
deploymentSpec.Template.Spec.Volumes = volumes
volumeMounts := deploymentSpec.Template.Spec.Containers[0].VolumeMounts
volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: "auth-volume", MountPath: common.EventBusAuthFileMountPath})
volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: emptyDirVolName, MountPath: "/tmp"})
deploymentSpec.Template.Spec.Containers[0].VolumeMounts = volumeMounts
}
} else {
Expand Down Expand Up @@ -270,10 +275,6 @@ func buildDeploymentSpec(args *AdaptorArgs) (*appv1.DeploymentSpec, error) {
MatchLabels: args.Labels,
},
Replicas: &replicas,
Strategy: appv1.DeploymentStrategy{
// Event bus does not allow multiple clients with same clientID to connect to the server at the same time.
Type: appv1.RecreateDeploymentStrategyType,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podTemplateLabels,
Expand Down
25 changes: 0 additions & 25 deletions docs/eventsources/ha.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,31 +51,6 @@ old one is gone.
- Calendar
- Generic

### RBAC

To achieve `Active-Passive` strategy for these EventSources, a Service Account
with extra RBAC settings is needed. The Service Account needs to be bound to a
Role like following, and specified in the spec through
`spec.template.serviceAccountName`.

```yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: lease-role
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
resourceNames:
- eventsource-{event-source-name}
verbs:
- "*"
```
**NOTE: This is not requried if `spec.replicas = 1`.**

## More

Check [this](../dr_ha_recommendations.md) out to learn more information about
Expand Down
24 changes: 0 additions & 24 deletions docs/sensors/ha.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,6 @@ elected to be active if the old one is gone.
**Please DO NOT manually scale up the replicas, that might cause unexpected
behaviors!**

## RBAC

To achieve HA for Sensor Pods, a Service Account with extra RBAC settings is
needed. The Service Account needs to be bound to a Role like following, and
specified in the spec through `spec.template.serviceAccountName`.

```yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: lease-role
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
resourceNames:
- sensor-{sensor-name)
verbs:
- "*"
```
**NOTE: This is not requried if `spec.replicas = 1`.**

## More

Check [this](../dr_ha_recommendations.md) out to learn more information about
Expand Down
10 changes: 4 additions & 6 deletions docs/service-accounts.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

A `Service Account` can be specified in the EventSource object with
`spec.template.serviceAccountName`, however it is not needed for all the
EventSource types except `resource`, unless you want to achieve
[HA](eventsources/ha.md) for some of them. For a `resource` EventSource, you
need to specify a Service Accout and give it `list` and `watch` permissions for
the resource being watched.
EventSource types except `resource`. For a `resource` EventSource, you need to
specify a Service Accout and give it `list` and `watch` permissions for the
resource being watched.

For example, if you want to watch actions on `Deployment` objects, you need to:

Expand All @@ -31,8 +30,7 @@ For example, if you want to watch actions on `Deployment` objects, you need to:

A `Service Account` also can be specified in a Sensor object via
`spec.template.serviceAccountName`, this is only needed when `k8s` trigger or
`argoWorkflow` trigger is defined in the Sensor object, or you want to run the
Sensor with [HA](sensors/ha.md).
`argoWorkflow` trigger is defined in the Sensor object.

The sensor examples provided by us use `argo-events-sa` service account to
execute the triggers, but it has more permissions than needed, and you may want
Expand Down
Loading

0 comments on commit 5cd535b

Please sign in to comment.