diff --git a/pkg/interceptors/cel/cel.go b/pkg/interceptors/cel/cel.go index 2c5f9ba8a..a3b951439 100644 --- a/pkg/interceptors/cel/cel.go +++ b/pkg/interceptors/cel/cel.go @@ -17,6 +17,7 @@ limitations under the License. package cel import ( + "bytes" "encoding/json" "fmt" "io/ioutil" @@ -58,15 +59,15 @@ func (w *Interceptor) ExecuteTrigger(request *http.Request) (*http.Response, err return nil, fmt.Errorf("error creating cel environment: %w", err) } - body, err := request.GetBody() - if err != nil { - return nil, fmt.Errorf("error getting request body: %w", err) - } - defer body.Close() - payload, err := ioutil.ReadAll(body) - if err != nil { - return nil, fmt.Errorf("error reading request body: %w", err) + var payload = []byte(`{}`) + if request.Body != nil { + defer request.Body.Close() + payload, err = ioutil.ReadAll(request.Body) + if err != nil { + return nil, fmt.Errorf("error reading request body: %w", err) + } } + evalContext, err := makeEvalContext(payload, request) if err != nil { return nil, fmt.Errorf("error making the evaluation context: %w", err) @@ -84,7 +85,7 @@ func (w *Interceptor) ExecuteTrigger(request *http.Request) (*http.Response, err return &http.Response{ Header: request.Header, - Body: request.Body, + Body: ioutil.NopCloser(bytes.NewBuffer(payload)), }, nil } diff --git a/pkg/interceptors/cel/cel_test.go b/pkg/interceptors/cel/cel_test.go index 24bd094ad..c9605315a 100644 --- a/pkg/interceptors/cel/cel_test.go +++ b/pkg/interceptors/cel/cel_test.go @@ -15,13 +15,10 @@ import ( ) func TestInterceptor_ExecuteTrigger(t *testing.T) { - type args struct { - payload []byte - } tests := []struct { name string CEL *triggersv1.CELInterceptor - args args + payload io.ReadCloser want []byte wantErr bool }{ @@ -30,9 +27,7 @@ func TestInterceptor_ExecuteTrigger(t *testing.T) { CEL: &triggersv1.CELInterceptor{ Filter: "body.value == 'testing'", }, - args: args{ - payload: []byte(`{"value":"testing"}`), - }, + payload: ioutil.NopCloser(bytes.NewBufferString(`{"value":"testing"}`)), want: []byte(`{"value":"testing"}`), wantErr: false, }, @@ -41,9 +36,7 @@ func TestInterceptor_ExecuteTrigger(t *testing.T) { CEL: &triggersv1.CELInterceptor{ Filter: "body.value == 'test'", }, - args: args{ - payload: []byte(`{"value":"testing"}`), - }, + payload: ioutil.NopCloser(bytes.NewBufferString(`{"value":"testing"}`)), wantErr: true, }, { @@ -51,9 +44,7 @@ func TestInterceptor_ExecuteTrigger(t *testing.T) { CEL: &triggersv1.CELInterceptor{ Filter: "header['X-Test'][0] == 'test-value'", }, - args: args{ - payload: []byte(`{}`), - }, + payload: ioutil.NopCloser(bytes.NewBufferString(`{}`)), want: []byte(`{}`), wantErr: false, }, @@ -62,9 +53,7 @@ func TestInterceptor_ExecuteTrigger(t *testing.T) { CEL: &triggersv1.CELInterceptor{ Filter: "header['X-Test'][0] == 'unknown'", }, - args: args{ - payload: []byte(`{}`), - }, + payload: ioutil.NopCloser(bytes.NewBufferString(`{}`)), wantErr: true, }, { @@ -72,9 +61,7 @@ func TestInterceptor_ExecuteTrigger(t *testing.T) { CEL: &triggersv1.CELInterceptor{ Filter: "header.match('x-test', 'no-match')", }, - args: args{ - payload: []byte(`{}`), - }, + payload: ioutil.NopCloser(bytes.NewBufferString(`{}`)), wantErr: true, }, { @@ -82,9 +69,7 @@ func TestInterceptor_ExecuteTrigger(t *testing.T) { CEL: &triggersv1.CELInterceptor{ Filter: "header.match('x-test', 'test-value')", }, - args: args{ - payload: []byte(`{}`), - }, + payload: ioutil.NopCloser(bytes.NewBufferString(`{}`)), want: []byte(`{}`), wantErr: false, }, @@ -93,9 +78,7 @@ func TestInterceptor_ExecuteTrigger(t *testing.T) { CEL: &triggersv1.CELInterceptor{ Filter: "header.match('x-test', 'test-value') && body.value == 'test'", }, - args: args{ - payload: []byte(`{"value":"test"}`), - }, + payload: ioutil.NopCloser(bytes.NewBufferString(`{"value":"test"}`)), want: []byte(`{"value":"test"}`), wantErr: false, }, @@ -104,9 +87,7 @@ func TestInterceptor_ExecuteTrigger(t *testing.T) { CEL: &triggersv1.CELInterceptor{ Filter: "header['X-Test", }, - args: args{ - payload: []byte(`{"value":"test"}`), - }, + payload: ioutil.NopCloser(bytes.NewBufferString(`{"value":"test"}`)), wantErr: true, }, { @@ -114,10 +95,14 @@ func TestInterceptor_ExecuteTrigger(t *testing.T) { CEL: &triggersv1.CELInterceptor{ Filter: "body.value == 'test'", }, - args: args{ - payload: []byte(`{]`), - }, + payload: ioutil.NopCloser(bytes.NewBufferString(`{}`)), wantErr: true, + }, { + name: "nil body does not panic", + CEL: &triggersv1.CELInterceptor{Filter: "header.match('x-test', 'test-value')"}, + payload: nil, + want: []byte(`{}`), + wantErr: false, }, } for _, tt := range tests { @@ -128,10 +113,7 @@ func TestInterceptor_ExecuteTrigger(t *testing.T) { Logger: logger, } request := &http.Request{ - Body: ioutil.NopCloser(bytes.NewReader(tt.args.payload)), - GetBody: func() (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewReader(tt.args.payload)), nil - }, + Body: tt.payload, Header: http.Header{ "Content-Type": []string{"application/json"}, "X-Test": []string{"test-value"}, diff --git a/pkg/interceptors/github/github.go b/pkg/interceptors/github/github.go index f88addb11..a814f4708 100644 --- a/pkg/interceptors/github/github.go +++ b/pkg/interceptors/github/github.go @@ -17,6 +17,7 @@ limitations under the License. package github import ( + "bytes" "errors" "fmt" "io/ioutil" @@ -46,22 +47,23 @@ func NewInterceptor(gh *triggersv1.GitHubInterceptor, k kubernetes.Interface, ns } func (w *Interceptor) ExecuteTrigger(request *http.Request) (*http.Response, error) { + payload := []byte{} + var err error + + if request.Body != nil { + defer request.Body.Close() + payload, err = ioutil.ReadAll(request.Body) + if err != nil { + return nil, fmt.Errorf("failed to read request body: %w", err) + } + } + // Validate secrets first before anything else, if set if w.GitHub.SecretRef != nil { header := request.Header.Get("X-Hub-Signature") if header == "" { return nil, errors.New("no X-Hub-Signature header set") } - - body, err := request.GetBody() - if err != nil { - return nil, err - } - defer body.Close() - payload, err := ioutil.ReadAll(body) - if err != nil { - return nil, err - } secretToken, err := interceptors.GetSecretToken(w.KubeClientSet, w.GitHub.SecretRef, w.EventListenerNamespace) if err != nil { return nil, err @@ -88,6 +90,6 @@ func (w *Interceptor) ExecuteTrigger(request *http.Request) (*http.Response, err return &http.Response{ Header: request.Header, - Body: request.Body, + Body: ioutil.NopCloser(bytes.NewBuffer(payload)), }, nil } diff --git a/pkg/interceptors/github/github_test.go b/pkg/interceptors/github/github_test.go index ac7c14c8d..8331f49b6 100644 --- a/pkg/interceptors/github/github_test.go +++ b/pkg/interceptors/github/github_test.go @@ -18,7 +18,7 @@ import ( func TestInterceptor_ExecuteTrigger_Signature(t *testing.T) { type args struct { - payload []byte + payload io.ReadCloser secret *corev1.Secret signature string eventType string @@ -34,7 +34,7 @@ func TestInterceptor_ExecuteTrigger_Signature(t *testing.T) { name: "no secret", GitHub: &triggersv1.GitHubInterceptor{}, args: args{ - payload: []byte("somepayload"), + payload: ioutil.NopCloser(bytes.NewBufferString("somepayload")), signature: "foo", }, want: []byte("somepayload"), @@ -58,7 +58,7 @@ func TestInterceptor_ExecuteTrigger_Signature(t *testing.T) { "token": []byte("secrettoken"), }, }, - payload: []byte("somepayload"), + payload: ioutil.NopCloser(bytes.NewBufferString("somepayload")), }, wantErr: true, }, @@ -82,7 +82,7 @@ func TestInterceptor_ExecuteTrigger_Signature(t *testing.T) { "token": []byte("secret"), }, }, - payload: []byte("somepayload"), + payload: ioutil.NopCloser(bytes.NewBufferString("somepayload")), }, wantErr: false, want: []byte("somepayload"), @@ -93,7 +93,7 @@ func TestInterceptor_ExecuteTrigger_Signature(t *testing.T) { EventTypes: []string{"MY_EVENT", "YOUR_EVENT"}, }, args: args{ - payload: []byte("somepayload"), + payload: ioutil.NopCloser(bytes.NewBufferString("somepayload")), eventType: "YOUR_EVENT", }, wantErr: false, @@ -105,7 +105,7 @@ func TestInterceptor_ExecuteTrigger_Signature(t *testing.T) { EventTypes: []string{"MY_EVENT", "YOUR_EVENT"}, }, args: args{ - payload: []byte("somepayload"), + payload: ioutil.NopCloser(bytes.NewBufferString("somepayload")), eventType: "OTHER_EVENT", }, wantErr: true, @@ -132,7 +132,7 @@ func TestInterceptor_ExecuteTrigger_Signature(t *testing.T) { }, }, eventType: "MY_EVENT", - payload: []byte("somepayload"), + payload: ioutil.NopCloser(bytes.NewBufferString("somepayload")), }, wantErr: false, want: []byte("somepayload"), @@ -159,7 +159,7 @@ func TestInterceptor_ExecuteTrigger_Signature(t *testing.T) { }, }, eventType: "OTHER_EVENT", - payload: []byte("somepayload"), + payload: ioutil.NopCloser(bytes.NewBufferString("somepayload")), }, wantErr: true, }, @@ -183,9 +183,18 @@ func TestInterceptor_ExecuteTrigger_Signature(t *testing.T) { }, }, eventType: "MY_EVENT", - payload: []byte("somepayload"), + payload: ioutil.NopCloser(bytes.NewBufferString("somepayload")), }, wantErr: true, + }, { + name: "nil body does not panic", + GitHub: &triggersv1.GitHubInterceptor{}, + args: args{ + payload: nil, + signature: "foo", + }, + want: []byte{}, + wantErr: false, }, } for _, tt := range tests { @@ -194,10 +203,7 @@ func TestInterceptor_ExecuteTrigger_Signature(t *testing.T) { logger, _ := logging.NewLogger("", "") kubeClient := fakekubeclient.Get(ctx) request := &http.Request{ - Body: ioutil.NopCloser(bytes.NewReader(tt.args.payload)), - GetBody: func() (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewReader(tt.args.payload)), nil - }, + Body: tt.args.payload, Header: http.Header{ "Content-Type": []string{"application/json"}, }, diff --git a/pkg/resources/create.go b/pkg/resources/create.go index fc59ef130..2462d6dfa 100644 --- a/pkg/resources/create.go +++ b/pkg/resources/create.go @@ -88,7 +88,7 @@ func Create(logger *zap.SugaredLogger, rt json.RawMessage, triggerName, eventID, if name == "" { name = data.GetGenerateName() } - logger.Infof("Generating resource: kind: %+v, name: %s", apiResource, name) + logger.Infof("Generating resource: kind: %s, name: %s", apiResource, name) gvr := schema.GroupVersionResource{ Group: apiResource.Group, diff --git a/pkg/sink/sink.go b/pkg/sink/sink.go index ec1deaae2..c32adb305 100644 --- a/pkg/sink/sink.go +++ b/pkg/sink/sink.go @@ -17,6 +17,7 @@ limitations under the License. package sink import ( + "bytes" "encoding/json" "errors" "fmt" @@ -72,7 +73,6 @@ func (r Sink) HandleEvent(response http.ResponseWriter, request *http.Request) { response.WriteHeader(http.StatusInternalServerError) return } - event, err := ioutil.ReadAll(request.Body) if err != nil { r.Logger.Errorf("Error reading event body: %s", err) @@ -158,7 +158,11 @@ func (r Sink) executeInterceptors(t *triggersv1.EventListenerTrigger, in *http.R return event, in.Header, nil } - request := in + // The request body to the first interceptor in the chain should be the received event body. + request := &http.Request{ + Header: in.Header, + Body: ioutil.NopCloser(bytes.NewBuffer(event)), + } var resp *http.Response for _, i := range t.Interceptors { var interceptor interceptors.Interceptor @@ -174,7 +178,6 @@ func (r Sink) executeInterceptors(t *triggersv1.EventListenerTrigger, in *http.R default: return nil, nil, fmt.Errorf("unknown interceptor type: %v", i) } - var err error resp, err = interceptor.ExecuteTrigger(request) if err != nil { @@ -185,7 +188,7 @@ func (r Sink) executeInterceptors(t *triggersv1.EventListenerTrigger, in *http.R // request chaining. request = &http.Request{ Header: resp.Header, - Body: resp.Body, + Body: ioutil.NopCloser(resp.Body), } } payload, err := ioutil.ReadAll(resp.Body) diff --git a/pkg/sink/sink_test.go b/pkg/sink/sink_test.go index 432dab49e..57f65a958 100644 --- a/pkg/sink/sink_test.go +++ b/pkg/sink/sink_test.go @@ -231,6 +231,189 @@ func TestHandleEvent(t *testing.T) { } } +func TestHandleEventWithInterceptors(t *testing.T) { + namespace := "foo" + eventBody := json.RawMessage(`{"head_commit": {"id": "testrevision"}, "repository": {"url": "testurl"}, "foo": "bar\t\r\nbaz昨"}`) + + pipelineResource := pipelinev1.PipelineResource{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "tekton.dev/v1alpha1", + Kind: "PipelineResource", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "my-pipelineresource", + Namespace: namespace, + }, + Spec: pipelinev1.PipelineResourceSpec{ + Type: pipelinev1.PipelineResourceTypeGit, + Params: []pipelinev1.ResourceParam{{ + Name: "url", + Value: "$(params.url)", + }}, + }, + } + pipelineResourceBytes, err := json.Marshal(pipelineResource) + if err != nil { + t.Fatalf("Error unmarshalling pipelineResource: %s", err) + } + + tt := bldr.TriggerTemplate("tt", namespace, + bldr.TriggerTemplateSpec( + bldr.TriggerTemplateParam("url", "", ""), + bldr.TriggerResourceTemplate(pipelineResourceBytes), + )) + tb := bldr.TriggerBinding("tb", namespace, + bldr.TriggerBindingSpec( + bldr.TriggerBindingParam("url", "$(body.repository.url)"), + )) + + el := &triggersv1.EventListener{ + ObjectMeta: metav1.ObjectMeta{ + Name: "el", + Namespace: namespace, + }, + Spec: triggersv1.EventListenerSpec{ + Triggers: []triggersv1.EventListenerTrigger{{ + Bindings: []*triggersv1.EventListenerBinding{{Name: "tb"}}, + Template: triggersv1.EventListenerTemplate{Name: "tt"}, + Interceptors: []*triggersv1.EventInterceptor{{ + GitHub: &triggersv1.GitHubInterceptor{ + SecretRef: &triggersv1.SecretRef{ + SecretKey: "secretKey", + SecretName: "secret", + Namespace: namespace, + }, + EventTypes: []string{"pull_request"}, + }, + }}, + }}, + }, + } + + kubeClient := fakekubeclientset.NewSimpleClientset() + test.AddTektonResources(kubeClient) + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret", + }, + Data: map[string][]byte{ + "secretKey": []byte("secret"), + }, + } + if _, err := kubeClient.CoreV1().Secrets(namespace).Create(secret); err != nil { + t.Fatalf("error creating secret: %v", secret) + } + triggersClient := faketriggersclientset.NewSimpleClientset() + if _, err := triggersClient.TektonV1alpha1().TriggerTemplates(namespace).Create(tt); err != nil { + t.Fatalf("Error creating TriggerTemplate: %s", err) + } + if _, err := triggersClient.TektonV1alpha1().TriggerBindings(namespace).Create(tb); err != nil { + t.Fatalf("Error creating TriggerBinding: %s", err) + } + el, err = triggersClient.TektonV1alpha1().EventListeners(namespace).Create(el) + if err != nil { + t.Fatalf("Error creating EventListener: %s", err) + } + + logger, _ := logging.NewLogger("", "") + + dynamicClient := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme()) + dynamicSet := dynamicclientset.New(tekton.WithClient(dynamicClient)) + + r := Sink{ + EventListenerName: el.Name, + EventListenerNamespace: namespace, + DynamicClient: dynamicSet, + DiscoveryClient: kubeClient.Discovery(), + TriggersClient: triggersClient, + KubeClientSet: kubeClient, + Logger: logger, + } + ts := httptest.NewServer(http.HandlerFunc(r.HandleEvent)) + defer ts.Close() + + var wg sync.WaitGroup + wg.Add(1) + + dynamicClient.PrependReactor("*", "*", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) { + defer wg.Done() + return false, nil, nil + }) + + req, err := http.NewRequest("POST", ts.URL, bytes.NewReader(eventBody)) + if err != nil { + t.Fatalf("Error creating Post request: %s", err) + } + req.Header.Add("Content-Type", "application/json") + req.Header.Add("X-Github-Event", "pull_request") + // This was generated by using SHA1 and hmac from go stdlib on secret and payload. + // https://play.golang.org/p/8D2E-Yz3zWf for a sample. + req.Header.Add("X-Hub-Signature", "sha1=c0f3a2bbd1cdb062ba4f54b2a1cad3d171b7a129") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("Error sending Post request: %v", err) + } + + if resp.StatusCode != http.StatusCreated { + t.Fatalf("Response code doesn't match: %v", resp.Status) + } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Error reading response body: %s", err) + } + + wantBody := Response{ + EventListener: el.Name, + Namespace: el.Namespace, + EventID: eventID, + } + + got := Response{} + if err := json.Unmarshal(body, &got); err != nil { + t.Fatalf("Error unmarshalling response body: %s", err) + } + if diff := cmp.Diff(wantBody, got); diff != "" { + t.Errorf("did not get expected response back -want,+got: %s", diff) + } + + // We expect that the EventListener will be able to immediately handle the event so we + // can use a very short timeout + if waitTimeout(&wg, time.Second) { + t.Fatalf("timed out waiting for reactor to fire") + } + wantResource := pipelinev1.PipelineResource{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "tekton.dev/v1alpha1", + Kind: "PipelineResource", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "my-pipelineresource", + Namespace: namespace, + Labels: map[string]string{ + resourceLabel: "el", + triggerLabel: el.Spec.Triggers[0].Name, + eventIDLabel: eventID, + }, + }, + Spec: pipelinev1.PipelineResourceSpec{ + Type: pipelinev1.PipelineResourceTypeGit, + Params: []pipelinev1.ResourceParam{ + {Name: "url", Value: "testurl"}, + }, + }, + } + gvr := schema.GroupVersionResource{ + Group: "tekton.dev", + Version: "v1alpha1", + Resource: "pipelineresources", + } + want := []ktesting.Action{ktesting.NewCreateAction(gvr, "foo", test.ToUnstructured(t, wantResource))} + if diff := cmp.Diff(want, dynamicClient.Actions()); diff != "" { + t.Error(diff) + } +} + // sequentialInterceptor is a HTTP server that will return sequential responses. // It expects a request of the form `{"i": n}`. // The response body will always return with the next value set, whereas the @@ -295,11 +478,11 @@ func TestExecuteInterceptor(t *testing.T) { Interceptors: []*triggersv1.EventInterceptor{a, a}, } - req, err := http.NewRequest(http.MethodPost, "/", bytes.NewBufferString(`{}`)) + req, err := http.NewRequest(http.MethodPost, "/", nil) if err != nil { t.Fatalf("http.NewRequest: %v", err) } - resp, header, err := r.executeInterceptors(trigger, req, nil, "", logger) + resp, header, err := r.executeInterceptors(trigger, req, []byte(`{}`), "", logger) if err != nil { t.Fatalf("executeInterceptors: %v", err) }