Skip to content

Commit

Permalink
feat: clean up openfaas trigger (#526)
Browse files Browse the repository at this point in the history
* feat(openfaas-trigger): added openfaas connector

* feat(openfaas-trigger): cleaned up trigger implementation

* feat(openfaas-trigger): update deps and fix tests
  • Loading branch information
VaibhavPage authored Mar 8, 2020
1 parent 698dd2a commit 1a70502
Show file tree
Hide file tree
Showing 11 changed files with 686 additions and 237 deletions.
758 changes: 590 additions & 168 deletions Gopkg.lock

Large diffs are not rendered by default.

16 changes: 12 additions & 4 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ required = [

[[override]]
name = "k8s.io/code-generator"
branch = "release-1.15"
version = "v0.17.3"

[[constraint]]
name = "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -125,15 +125,15 @@ required = [
version = "v1.0.8"

[[override]]
branch = "release-1.15"
version = "v0.17.3"
name = "k8s.io/api"

[[override]]
branch = "release-1.15"
version = "v0.17.3"
name = "k8s.io/apimachinery"

[[override]]
version = "v12.0.0"
version = "v0.17.3"
name = "k8s.io/client-go"

[prune]
Expand All @@ -144,6 +144,14 @@ required = [
name = "k8s.io/code-generator"
unused-packages = false

[[prune.project]]
name = "k8s.io/api"
unused-packages = false

[[prune.project]]
name = "k8s.io/apimachinery"
unused-packages = false

[[prune.project]]
name = "k8s.io/gengo"
unused-packages = false
Expand Down
6 changes: 4 additions & 2 deletions gateways/server/github/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package github
import (
"bytes"
"encoding/json"
"github.com/argoproj/argo-events/gateways/server/common/webhook"
"io/ioutil"
"net/http"
"testing"

"github.com/argoproj/argo-events/gateways/server/common/webhook"
"github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1"
"github.com/ghodss/yaml"
"github.com/google/go-github/github"
Expand Down Expand Up @@ -51,7 +51,8 @@ func TestGetCredentials(t *testing.T) {
convey.Convey("Given a kubernetes secret, get credentials", t, func() {
secret, err := router.k8sClient.CoreV1().Secrets(router.githubEventSource.Namespace).Create(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretName,
Name: secretName,
Namespace: router.githubEventSource.Namespace,
},
Data: map[string][]byte{
LabelAccessKey: []byte(accessKey),
Expand All @@ -77,6 +78,7 @@ func TestGetCredentials(t *testing.T) {
Name: "github-access",
},
},
Namespace: "fake",
}

creds, err := router.getCredentials(githubEventSource.APIToken, githubEventSource.Namespace)
Expand Down
12 changes: 7 additions & 5 deletions pkg/apis/sensor/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
"hash/fnv"
"time"

"k8s.io/apimachinery/pkg/util/wait"

apicommon "github.com/argoproj/argo-events/pkg/apis/common"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"
)

// NotificationType represent a type of notifications that are handled by a sensor
Expand Down Expand Up @@ -372,15 +371,18 @@ type OpenFaasTrigger struct {
// +listType=triggerParameters
// +optional
Parameters []TriggerParameter `json:"parameters,omitempty" protobuf:"bytes,3,rep,name=parameters"`
// Username refers to the Kubernetes secret that holds the username required to log into the gateway.
// +optional
Username *corev1.SecretKeySelector `json:"username,omitempty" protobuf:"bytes,4,opt,name=username"`
// Password refers to the Kubernetes secret that holds the password required to log into the gateway.
// +optional
Password *corev1.SecretKeySelector `json:"password,omitempty" protobuf:"bytes,4,opt,name=password"`
Password *corev1.SecretKeySelector `json:"password,omitempty" protobuf:"bytes,5,opt,name=password"`
// Namespace to read the password secret from.
// This is required if the password secret selector is specified.
// +optional
Namespace string `json:"namespace,omitempty" protobuf:"bytes,5,opt,name=namespace"`
Namespace string `json:"namespace,omitempty" protobuf:"bytes,6,opt,name=namespace"`
// FunctionName refers to the name of OpenFaas function that will be invoked once the trigger executes
FunctionName string `json:"functionName" protobuf:"bytes,6,name=functionName"`
FunctionName string `json:"functionName" protobuf:"bytes,7,name=functionName"`
}

// AWSLambdaTrigger refers to specification of the trigger to invoke an AWS Lambda function
Expand Down
8 changes: 7 additions & 1 deletion sensors/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sensors

import (
Expand All @@ -25,6 +24,8 @@ import (
"google.golang.org/grpc"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"net/http"
"time"
)

// SensorContext contains execution context for Sensor
Expand All @@ -47,6 +48,8 @@ type SensorContext struct {
Updated bool
// customTriggerClients holds the references to the gRPC clients for the custom trigger servers
customTriggerClients map[string]*grpc.ClientConn
// http client to invoke openfaas functions.
openfaasHttpClient *http.Client
}

// NewSensorContext returns a new sensor execution context.
Expand All @@ -60,5 +63,8 @@ func NewSensorContext(sensorClient sensorclientset.Interface, kubeClient kuberne
NotificationQueue: make(chan *types.Notification),
ControllerInstanceID: controllerInstanceID,
customTriggerClients: make(map[string]*grpc.ClientConn),
openfaasHttpClient: &http.Client{
Timeout: time.Minute * 5,
},
}
}
12 changes: 11 additions & 1 deletion sensors/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,28 @@ func (sensorCtx *SensorContext) GetTrigger(trigger *v1alpha1.Trigger) Trigger {
if trigger.Template.K8s != nil {
return standardk8s.NewStandardK8sTrigger(sensorCtx.KubeClient, sensorCtx.DynamicClient, sensorCtx.Sensor, trigger, sensorCtx.Logger)
}

if trigger.Template.ArgoWorkflow != nil {
return argoworkflow.NewArgoWorkflowTrigger(sensorCtx.KubeClient, sensorCtx.DynamicClient, sensorCtx.Sensor, trigger, sensorCtx.Logger)
}

if trigger.Template.OpenFaas != nil {
return openfaas.NewOpenFaasTrigger(sensorCtx.KubeClient, sensorCtx.Sensor, trigger, sensorCtx.Logger)
result, err := openfaas.NewOpenFaasTrigger(sensorCtx.KubeClient, sensorCtx.Sensor, trigger, sensorCtx.Logger, sensorCtx.openfaasHttpClient)
if err != nil {
sensorCtx.Logger.WithError(err).WithField("trigger", trigger.Template.Name).Errorln("failed to invoke the trigger")
return nil
}
return result
}

if trigger.Template.HTTP != nil {
return http.NewHTTPTrigger(sensorCtx.Sensor, trigger, sensorCtx.Logger)
}

if trigger.Template.AWSLambda != nil {
return awslambda.NewAWSLambdaTrigger(sensorCtx.KubeClient, sensorCtx.Sensor, trigger, sensorCtx.Logger)
}

if trigger.Template.CustomTrigger != nil {
result, err := customtrigger.NewCustomTrigger(sensorCtx.Sensor, trigger, sensorCtx.Logger, sensorCtx.customTriggerClients)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion sensors/triggers/custom-trigger/custom-trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package custom_trigger
package customtrigger

import (
"context"
Expand Down Expand Up @@ -41,6 +41,7 @@ type CustomTrigger struct {
triggerClient triggers.TriggerClient
}

// NewCustomTrigger returns a new custom trigger
func NewCustomTrigger(sensor *v1alpha1.Sensor, trigger *v1alpha1.Trigger, logger *logrus.Logger, customTriggerClients map[string]*grpc.ClientConn) (*CustomTrigger, error) {
customTrigger := &CustomTrigger{
Sensor: sensor,
Expand Down
2 changes: 1 addition & 1 deletion sensors/triggers/custom-trigger/custom-trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package custom_trigger
package customtrigger
90 changes: 37 additions & 53 deletions sensors/triggers/openfaas/openfaas.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,17 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"os/exec"
"time"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/argoproj/argo-events/sensors/policy"
"github.com/argoproj/argo-events/sensors/triggers"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
)

const (
EnvVarOpenFaasGatewayURL = "OPENFAAS_URL"
)

// OpenFaasTrigger holds the context to invoke OpenFaas functions
type OpenFaasTrigger struct {
// K8sClient is the Kubernetes client
Expand All @@ -48,16 +40,19 @@ type OpenFaasTrigger struct {
Trigger *v1alpha1.Trigger
// Logger to log stuff
Logger *logrus.Logger
// http client to invoke function.
httpClient *http.Client
}

// NewOpenFaasTrigger returns a new OpenFaas trigger context
func NewOpenFaasTrigger(k8sClient kubernetes.Interface, sensor *v1alpha1.Sensor, trigger *v1alpha1.Trigger, logger *logrus.Logger) *OpenFaasTrigger {
func NewOpenFaasTrigger(k8sClient kubernetes.Interface, sensor *v1alpha1.Sensor, trigger *v1alpha1.Trigger, logger *logrus.Logger, httpClient *http.Client) (*OpenFaasTrigger, error) {
return &OpenFaasTrigger{
K8sClient: k8sClient,
Sensor: sensor,
Trigger: trigger,
Logger: logger,
}
K8sClient: k8sClient,
Sensor: sensor,
Trigger: trigger,
Logger: logger,
httpClient: httpClient,
}, nil
}

func (t *OpenFaasTrigger) FetchResource() (interface{}, error) {
Expand Down Expand Up @@ -105,58 +100,47 @@ func (t *OpenFaasTrigger) Execute(resource interface{}) (interface{}, error) {
return nil, err
}

if err := os.Setenv(EnvVarOpenFaasGatewayURL, obj.GatewayURL); err != nil {
return nil, errors.Wrapf(err, "failed to set environment variable OPENFAAS_URL to %s", obj.GatewayURL)
}
username := "admin"
password := ""

openfaastrigger := t.Trigger.Template.OpenFaas

if obj.Password != nil {
password, err := common.GetSecrets(t.K8sClient, obj.Namespace, obj.Password)
if openfaastrigger.Username != nil {
password, err = common.GetSecrets(t.K8sClient, openfaastrigger.Namespace, openfaastrigger.Username)
if err != nil {
return nil, errors.Wrapf(err, "failed to retrieve the password from secret %s and namespace %s", obj.Password.Name, obj.Namespace)
return nil, errors.Wrapf(err, "failed to retrieve the username from secret %s and namespace %s", openfaastrigger.Username.Name, openfaastrigger.Namespace)
}
}

cmd := exec.Command("faas", "login", "--password", password)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return nil, errors.Wrap(err, "failed to login using faas client")
if openfaastrigger.Password != nil {
password, err = common.GetSecrets(t.K8sClient, openfaastrigger.Namespace, openfaastrigger.Password)
if err != nil {
return nil, errors.Wrapf(err, "failed to retrieve the password from secret %s and namespace %s", openfaastrigger.Password.Name, openfaastrigger.Namespace)
}
}

functionURL := fmt.Sprintf("%s/function/%s", obj.GatewayURL, obj.FunctionName)

parsedURL, err := url.Parse(functionURL)
request, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/function/%s", openfaastrigger.GatewayURL, openfaastrigger.FunctionName), bytes.NewReader(payload))
if err != nil {
return nil, errors.Wrapf(err, "failed to parse url %s", parsedURL)
return nil, errors.Wrapf(err, "failed to construct request for function %s", openfaastrigger.FunctionName)
}
request.SetBasicAuth(username, password)

client := http.Client{
Timeout: 1 * time.Minute,
}
t.Logger.WithField("function", openfaastrigger.FunctionName).Infoln("invoking the function...")

request, err := http.NewRequest(http.MethodPost, parsedURL.String(), bytes.NewBuffer(payload))
if err != nil {
return nil, errors.Wrapf(err, "failed to create the function request %s", obj.FunctionName)
}
return t.httpClient.Do(request)
}

resp, err := client.Do(request)
if err != nil {
return nil, errors.Wrapf(err, "function invocation %s failed", obj.FunctionName)
// ApplyPolicy applies a policy on trigger execution response if any
func (t *OpenFaasTrigger) ApplyPolicy(resource interface{}) error {
if t.Trigger.Policy == nil || t.Trigger.Policy.Status == nil || t.Trigger.Policy.Status.Allow == nil {
return nil
}

defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "failed to read the response")
response, ok := resource.(*http.Response)
if !ok {
return errors.New("failed to interpret the trigger execution response")
}

t.Logger.WithField("body", string(body)).Infoln("response body")

return body, nil
}
p := policy.NewStatusPolicy(response.StatusCode, t.Trigger.Policy.Status.Allow)

// ApplyPolicy applies a policy on trigger execution response if any
func (t *OpenFaasTrigger) ApplyPolicy(resource interface{}) error {
return nil
return p.ApplyPolicy()
}
9 changes: 8 additions & 1 deletion sensors/triggers/openfaas/openfaas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
package openfaas

import (
"net/http"
"testing"

"github.com/argoproj/argo-events/common"
Expand Down Expand Up @@ -55,7 +56,13 @@ var sensorObj = &v1alpha1.Sensor{
}

func getOpenFaasTrigger() *OpenFaasTrigger {
return NewOpenFaasTrigger(fake.NewSimpleClientset(), sensorObj.DeepCopy(), sensorObj.Spec.Triggers[0].DeepCopy(), common.NewArgoEventsLogger())
return &OpenFaasTrigger{
K8sClient: fake.NewSimpleClientset(),
Sensor: sensorObj.DeepCopy(),
Trigger: sensorObj.Spec.Triggers[0].DeepCopy(),
Logger: common.NewArgoEventsLogger(),
httpClient: &http.Client{},
}
}

func TestOpenFaasTrigger_FetchResource(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions sensors/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package types
import (
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
openfaas "github.com/openfaas-incubator/connector-sdk/types"
)

// Notification to update event dependency's state or the sensor resource
Expand All @@ -34,3 +35,9 @@ type Notification struct {
// NotificationType for event notification and state update notification
NotificationType v1alpha1.NotificationType
}

// OpenFaasContext holds the context for the openfaas controller context
type OpenFaasContext struct {
Controller openfaas.Controller
ResponseCh chan openfaas.InvokerResponse
}

0 comments on commit 1a70502

Please sign in to comment.