Skip to content

Commit

Permalink
feat(eventsource): Support NATS access with auth. Closes argoproj#1050
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Feb 5, 2021
1 parent 706b746 commit b9205da
Show file tree
Hide file tree
Showing 11 changed files with 1,077 additions and 351 deletions.
97 changes: 97 additions & 0 deletions api/event-source.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 95 additions & 0 deletions api/event-source.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 44 additions & 11 deletions eventsources/sources/nats/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,61 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt

natsEventSource := &el.NATSEventSource

var conn *natslib.Conn
var opt []natslib.Option
if natsEventSource.TLS != nil {
tlsConfig, err := common.GetTLSConfig(natsEventSource.TLS)
if err != nil {
return errors.Wrap(err, "failed to get the tls configuration")
}
opt = append(opt, natslib.Secure(tlsConfig))
}
switch natsEventSource.Auth {
case v1alpha1.NATSAuthBasic:
username, err := common.GetSecretFromVolume(natsEventSource.Username)
if err != nil {
return err
}
password, err := common.GetSecretFromVolume(natsEventSource.Password)
if err != nil {
return err
}
opt = append(opt, natslib.UserInfo(username, password))
case v1alpha1.NATSAuthToken:
token, err := common.GetSecretFromVolume(natsEventSource.Token)
if err != nil {
return err
}
opt = append(opt, natslib.Token(token))
case v1alpha1.NATSAuthNKEY:
nkeyFile, err := common.GetSecretVolumePath(natsEventSource.NKey)
if err != nil {
return err
}
o, err := natslib.NkeyOptionFromSeed(nkeyFile)
if err != nil {
return errors.Wrap(err, "failed to get NKey")
}
opt = append(opt, o)
case v1alpha1.NATSAuthCredential:
cFile, err := common.GetSecretVolumePath(natsEventSource.Credential)
if err != nil {
return err
}
opt = append(opt, natslib.UserCredentials(cFile))
}

var conn *natslib.Conn
log.Info("connecting to nats cluster...")
if err := common.Connect(common.GetConnectionBackoff(natsEventSource.ConnectionBackoff), func() error {
var err error
var opt []natslib.Option

if natsEventSource.TLS != nil {
tlsConfig, err := common.GetTLSConfig(natsEventSource.TLS)
if err != nil {
return errors.Wrap(err, "failed to get the tls configuration")
}
opt = append(opt, natslib.Secure(tlsConfig))
}

if conn, err = natslib.Connect(natsEventSource.URL, opt...); err != nil {
return err
}
return nil
}); err != nil {
return errors.Wrapf(err, "failed to connect to the nats server for event source %s", el.GetEventName())
}
defer conn.Close()

if natsEventSource.JSONBody {
log.Info("assuming all events have a json body...")
Expand Down
18 changes: 18 additions & 0 deletions eventsources/sources/nats/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,23 @@ func validate(eventSource *v1alpha1.NATSEventsSource) error {
if eventSource.TLS != nil {
return apicommon.ValidateTLSConfig(eventSource.TLS)
}
switch eventSource.Auth {
case v1alpha1.NATSAuthBasic:
if eventSource.Username == nil || eventSource.Password == nil {
return errors.New("Username and Password secrets must be specified")
}
case v1alpha1.NATSAuthToken:
if eventSource.Token == nil {
return errors.New("Token secret must be specified")
}
case v1alpha1.NATSAuthNKEY:
if eventSource.NKey == nil {
return errors.New("NKey secret must be specified")
}
case v1alpha1.NATSAuthCredential:
if eventSource.Credential == nil {
return errors.New("Credential secret must be specified")
}
}
return nil
}
25 changes: 25 additions & 0 deletions eventsources/sources/nats/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,28 @@ func TestValidateEventSource(t *testing.T) {
assert.NoError(t, err)
}
}

func TestValidateEventSourceMissingAuthSecret(t *testing.T) {
listener := &EventListener{}

err := listener.ValidateEventSource(context.Background())
assert.Error(t, err)
assert.Equal(t, "url must be specified", err.Error())

content, err := ioutil.ReadFile(fmt.Sprintf("%s/%s", sources.EventSourceDir, "nats.yaml"))
assert.Nil(t, err)

var eventSource *v1alpha1.EventSource
err = yaml.Unmarshal(content, &eventSource)
assert.Nil(t, err)
assert.NotNil(t, eventSource.Spec.NATS)

for _, value := range eventSource.Spec.NATS {
value.Auth = v1alpha1.NATSAuthToken
l := &EventListener{
NATSEventSource: value,
}
err := l.ValidateEventSource(context.Background())
assert.Equal(t, err.Error(), "Token secret must be specified")
}
}
Loading

0 comments on commit b9205da

Please sign in to comment.