Skip to content

Commit

Permalink
fix panic in github interceptor
Browse files Browse the repository at this point in the history
We were passing the original request where the body had been drained
to the first interceptor meaning that the body to the first interceptor
in the chain was always empty. Further, we were using the `GetBody` which
can sometimes be empty leading to a panic.

fixes #355

Signed-off-by: Dibyo Mukherjee <[email protected]>
  • Loading branch information
dibyom committed Jan 17, 2020
1 parent bc6916b commit 761ec63
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 25 deletions.
11 changes: 4 additions & 7 deletions pkg/interceptors/cel/cel.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cel

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -58,12 +59,8 @@ func (w *Interceptor) ExecuteTrigger(request *http.Request) (*http.Response, err
return nil, fmt.Errorf("error creating cel environment: %w", err)
}

body, err := request.GetBody()
if err != nil {
return nil, fmt.Errorf("error getting request body: %w", err)
}
defer body.Close()
payload, err := ioutil.ReadAll(body)
defer request.Body.Close()
payload, err := ioutil.ReadAll(request.Body)
if err != nil {
return nil, fmt.Errorf("error reading request body: %w", err)
}
Expand All @@ -84,7 +81,7 @@ func (w *Interceptor) ExecuteTrigger(request *http.Request) (*http.Response, err

return &http.Response{
Header: request.Header,
Body: request.Body,
Body: ioutil.NopCloser(bytes.NewBuffer(payload)),
}, nil
}

Expand Down
19 changes: 8 additions & 11 deletions pkg/interceptors/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package github

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -46,22 +47,18 @@ func NewInterceptor(gh *triggersv1.GitHubInterceptor, k kubernetes.Interface, ns
}

func (w *Interceptor) ExecuteTrigger(request *http.Request) (*http.Response, error) {
defer request.Body.Close()
payload, err := ioutil.ReadAll(request.Body)
if err != nil {
return nil, fmt.Errorf("failed to read request body: %w", err)
}

// Validate secrets first before anything else, if set
if w.GitHub.SecretRef != nil {
header := request.Header.Get("X-Hub-Signature")
if header == "" {
return nil, errors.New("no X-Hub-Signature header set")
}

body, err := request.GetBody()
if err != nil {
return nil, err
}
defer body.Close()
payload, err := ioutil.ReadAll(body)
if err != nil {
return nil, err
}
secretToken, err := interceptors.GetSecretToken(w.KubeClientSet, w.GitHub.SecretRef, w.EventListenerNamespace)
if err != nil {
return nil, err
Expand All @@ -88,6 +85,6 @@ func (w *Interceptor) ExecuteTrigger(request *http.Request) (*http.Response, err

return &http.Response{
Header: request.Header,
Body: request.Body,
Body: ioutil.NopCloser(bytes.NewBuffer(payload)),
}, nil
}
2 changes: 1 addition & 1 deletion pkg/resources/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func Create(logger *zap.SugaredLogger, rt json.RawMessage, triggerName, eventID,
if name == "" {
name = data.GetGenerateName()
}
logger.Infof("Generating resource: kind: %+v, name: %s", apiResource, name)
logger.Infof("Generating resource: kind: %s, name: %s", apiResource, name)

gvr := schema.GroupVersionResource{
Group: apiResource.Group,
Expand Down
11 changes: 7 additions & 4 deletions pkg/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sink

import (
"bytes"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -72,7 +73,6 @@ func (r Sink) HandleEvent(response http.ResponseWriter, request *http.Request) {
response.WriteHeader(http.StatusInternalServerError)
return
}

event, err := ioutil.ReadAll(request.Body)
if err != nil {
r.Logger.Errorf("Error reading event body: %s", err)
Expand Down Expand Up @@ -158,7 +158,11 @@ func (r Sink) executeInterceptors(t *triggersv1.EventListenerTrigger, in *http.R
return event, in.Header, nil
}

request := in
// The request body to the first interceptor in the chain should be the received event body.
request := &http.Request{
Header: in.Header,
Body: ioutil.NopCloser(bytes.NewBuffer(event)),
}
var resp *http.Response
for _, i := range t.Interceptors {
var interceptor interceptors.Interceptor
Expand All @@ -174,7 +178,6 @@ func (r Sink) executeInterceptors(t *triggersv1.EventListenerTrigger, in *http.R
default:
return nil, nil, fmt.Errorf("unknown interceptor type: %v", i)
}

var err error
resp, err = interceptor.ExecuteTrigger(request)
if err != nil {
Expand All @@ -185,7 +188,7 @@ func (r Sink) executeInterceptors(t *triggersv1.EventListenerTrigger, in *http.R
// request chaining.
request = &http.Request{
Header: resp.Header,
Body: resp.Body,
Body: ioutil.NopCloser(resp.Body),
}
}
payload, err := ioutil.ReadAll(resp.Body)
Expand Down
187 changes: 185 additions & 2 deletions pkg/sink/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,189 @@ func TestHandleEvent(t *testing.T) {
}
}

func TestHandleEventWithInterceptors(t *testing.T) {
namespace := "foo"
eventBody := json.RawMessage(`{"head_commit": {"id": "testrevision"}, "repository": {"url": "testurl"}, "foo": "bar\t\r\nbaz昨"}`)

pipelineResource := pipelinev1.PipelineResource{
TypeMeta: metav1.TypeMeta{
APIVersion: "tekton.dev/v1alpha1",
Kind: "PipelineResource",
},
ObjectMeta: metav1.ObjectMeta{
Name: "my-pipelineresource",
Namespace: namespace,
},
Spec: pipelinev1.PipelineResourceSpec{
Type: pipelinev1.PipelineResourceTypeGit,
Params: []pipelinev1.ResourceParam{{
Name: "url",
Value: "$(params.url)",
}},
},
}
pipelineResourceBytes, err := json.Marshal(pipelineResource)
if err != nil {
t.Fatalf("Error unmarshalling pipelineResource: %s", err)
}

tt := bldr.TriggerTemplate("tt", namespace,
bldr.TriggerTemplateSpec(
bldr.TriggerTemplateParam("url", "", ""),
bldr.TriggerResourceTemplate(pipelineResourceBytes),
))
tb := bldr.TriggerBinding("tb", namespace,
bldr.TriggerBindingSpec(
bldr.TriggerBindingParam("url", "$(body.repository.url)"),
))

el := &triggersv1.EventListener{
ObjectMeta: metav1.ObjectMeta{
Name: "el",
Namespace: namespace,
},
Spec: triggersv1.EventListenerSpec{
Triggers: []triggersv1.EventListenerTrigger{{
Bindings: []*triggersv1.EventListenerBinding{{Name: "tb"}},
Template: triggersv1.EventListenerTemplate{Name: "tt"},
Interceptors: []*triggersv1.EventInterceptor{{
GitHub: &triggersv1.GitHubInterceptor{
SecretRef: &triggersv1.SecretRef{
SecretKey: "secretKey",
SecretName: "secret",
Namespace: namespace,
},
EventTypes: []string{"pull_request"},
},
}},
}},
},
}

kubeClient := fakekubeclientset.NewSimpleClientset()
test.AddTektonResources(kubeClient)
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "secret",
},
Data: map[string][]byte{
"secretKey": []byte("secret"),
},
}
if _, err := kubeClient.CoreV1().Secrets(namespace).Create(secret); err != nil {
t.Fatalf("error creating secret: %v", secret)
}
triggersClient := faketriggersclientset.NewSimpleClientset()
if _, err := triggersClient.TektonV1alpha1().TriggerTemplates(namespace).Create(tt); err != nil {
t.Fatalf("Error creating TriggerTemplate: %s", err)
}
if _, err := triggersClient.TektonV1alpha1().TriggerBindings(namespace).Create(tb); err != nil {
t.Fatalf("Error creating TriggerBinding: %s", err)
}
el, err = triggersClient.TektonV1alpha1().EventListeners(namespace).Create(el)
if err != nil {
t.Fatalf("Error creating EventListener: %s", err)
}

logger, _ := logging.NewLogger("", "")

dynamicClient := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme())
dynamicSet := dynamicclientset.New(tekton.WithClient(dynamicClient))

r := Sink{
EventListenerName: el.Name,
EventListenerNamespace: namespace,
DynamicClient: dynamicSet,
DiscoveryClient: kubeClient.Discovery(),
TriggersClient: triggersClient,
KubeClientSet: kubeClient,
Logger: logger,
}
ts := httptest.NewServer(http.HandlerFunc(r.HandleEvent))
defer ts.Close()

var wg sync.WaitGroup
wg.Add(1)

dynamicClient.PrependReactor("*", "*", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
defer wg.Done()
return false, nil, nil
})

req, err := http.NewRequest("POST", ts.URL, bytes.NewReader(eventBody))
if err != nil {
t.Fatalf("Error creating Post request: %s", err)
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("X-Github-Event", "pull_request")
// This was generated by using SHA1 and hmac from go stdlib on secret and payload.
// https://play.golang.org/p/8D2E-Yz3zWf for a sample.
req.Header.Add("X-Hub-Signature", "sha1=c0f3a2bbd1cdb062ba4f54b2a1cad3d171b7a129")

resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Error sending Post request: %v", err)
}

if resp.StatusCode != http.StatusCreated {
t.Fatalf("Response code doesn't match: %v", resp.Status)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error reading response body: %s", err)
}

wantBody := Response{
EventListener: el.Name,
Namespace: el.Namespace,
EventID: eventID,
}

got := Response{}
if err := json.Unmarshal(body, &got); err != nil {
t.Fatalf("Error unmarshalling response body: %s", err)
}
if diff := cmp.Diff(wantBody, got); diff != "" {
t.Errorf("did not get expected response back -want,+got: %s", diff)
}

// We expect that the EventListener will be able to immediately handle the event so we
// can use a very short timeout
if waitTimeout(&wg, time.Second) {
t.Fatalf("timed out waiting for reactor to fire")
}
wantResource := pipelinev1.PipelineResource{
TypeMeta: metav1.TypeMeta{
APIVersion: "tekton.dev/v1alpha1",
Kind: "PipelineResource",
},
ObjectMeta: metav1.ObjectMeta{
Name: "my-pipelineresource",
Namespace: namespace,
Labels: map[string]string{
resourceLabel: "el",
triggerLabel: el.Spec.Triggers[0].Name,
eventIDLabel: eventID,
},
},
Spec: pipelinev1.PipelineResourceSpec{
Type: pipelinev1.PipelineResourceTypeGit,
Params: []pipelinev1.ResourceParam{
{Name: "url", Value: "testurl"},
},
},
}
gvr := schema.GroupVersionResource{
Group: "tekton.dev",
Version: "v1alpha1",
Resource: "pipelineresources",
}
want := []ktesting.Action{ktesting.NewCreateAction(gvr, "foo", test.ToUnstructured(t, wantResource))}
if diff := cmp.Diff(want, dynamicClient.Actions()); diff != "" {
t.Error(diff)
}
}

// sequentialInterceptor is a HTTP server that will return sequential responses.
// It expects a request of the form `{"i": n}`.
// The response body will always return with the next value set, whereas the
Expand Down Expand Up @@ -295,11 +478,11 @@ func TestExecuteInterceptor(t *testing.T) {
Interceptors: []*triggersv1.EventInterceptor{a, a},
}

req, err := http.NewRequest(http.MethodPost, "/", bytes.NewBufferString(`{}`))
req, err := http.NewRequest(http.MethodPost, "/", nil)
if err != nil {
t.Fatalf("http.NewRequest: %v", err)
}
resp, header, err := r.executeInterceptors(trigger, req, nil, "", logger)
resp, header, err := r.executeInterceptors(trigger, req, []byte(`{}`), "", logger)
if err != nil {
t.Fatalf("executeInterceptors: %v", err)
}
Expand Down

0 comments on commit 761ec63

Please sign in to comment.