From 656b99ded768b48f823bb788f5bc03a28f1b4ca2 Mon Sep 17 00:00:00 2001 From: shirou Date: Tue, 24 May 2022 06:13:35 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20add=20support=20to=20parse=20QueryParam?= =?UTF-8?q?eter=20and=20PostForm=20on=20webhook=20eve=E2=80=A6=20(#1978)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add support to parse QueryParameter and PostForm on webhook eventsource Signed-off-by: shirou --- eventsources/sources/webhook/start.go | 71 +++++++++++-- eventsources/sources/webhook/start_test.go | 112 +++++++++++++++++++++ 2 files changed, 177 insertions(+), 6 deletions(-) create mode 100644 eventsources/sources/webhook/start_test.go diff --git a/eventsources/sources/webhook/start.go b/eventsources/sources/webhook/start.go index a634708146..fd8062728e 100644 --- a/eventsources/sources/webhook/start.go +++ b/eventsources/sources/webhook/start.go @@ -17,14 +17,14 @@ limitations under the License. package webhook import ( + "bytes" "context" "encoding/json" + "fmt" "io" "net/http" "time" - "go.uber.org/zap" - "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" @@ -33,6 +33,8 @@ import ( apicommon "github.com/argoproj/argo-events/pkg/apis/common" "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1" + "github.com/pkg/errors" + "go.uber.org/zap" ) var ( @@ -101,14 +103,19 @@ func (router *Router) HandleRoute(writer http.ResponseWriter, request *http.Requ return } + if route.Context.Method != request.Method { + logger.Info("http method does not match") + common.SendErrorResponse(writer, "http method does not match") + return + } + defer func(start time.Time) { route.Metrics.EventProcessingDuration(route.EventSourceName, route.EventName, float64(time.Since(start)/time.Millisecond)) }(time.Now()) - request.Body = http.MaxBytesReader(writer, request.Body, 65536) - body, err := io.ReadAll(request.Body) + body, err := GetBody(&writer, request, route, logger) if err != nil { - logger.Errorw("failed to parse request body", zap.Error(err)) + logger.Errorw("failed to get body", zap.Error(err)) common.SendErrorResponse(writer, err.Error()) route.Metrics.EventProcessingFailed(route.EventSourceName, route.EventName) return @@ -116,7 +123,7 @@ func (router *Router) HandleRoute(writer http.ResponseWriter, request *http.Requ payload := &events.WebhookEventData{ Header: request.Header, - Body: (*json.RawMessage)(&body), + Body: body, Metadata: route.Context.Metadata, } @@ -155,3 +162,55 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt route: route, }, controller, dispatch) } + +func GetBody(writer *http.ResponseWriter, request *http.Request, route *webhook.Route, logger *zap.SugaredLogger) (*json.RawMessage, error) { + switch request.Method { + case http.MethodGet: + body, _ := json.Marshal(request.URL.Query()) + ret := json.RawMessage(body) + return &ret, nil + case http.MethodPost: + contentType := "" + if len(request.Header["Content-Type"]) > 0 { + contentType = request.Header["Content-Type"][0] + } + + switch contentType { + case "application/x-www-form-urlencoded": + if err := request.ParseForm(); err != nil { + logger.Errorw("failed to parse form data", zap.Error(err)) + common.SendInternalErrorResponse(*writer, err.Error()) + route.Metrics.EventProcessingFailed(route.EventSourceName, route.EventName) + return nil, err + } + body, _ := json.Marshal(request.PostForm) + ret := json.RawMessage(body) + return &ret, nil + // default including "application/json" is parsing body as JSON + default: + request.Body = http.MaxBytesReader(*writer, request.Body, 65536) + body, err := getRequestBody(request) + if err != nil { + logger.Errorw("failed to read request body", zap.Error(err)) + common.SendErrorResponse(*writer, err.Error()) + route.Metrics.EventProcessingFailed(route.EventSourceName, route.EventName) + return nil, err + } + ret := json.RawMessage(body) + return &ret, nil + } + default: + return nil, fmt.Errorf("unsupoorted method: %s", request.Method) + } +} + +func getRequestBody(request *http.Request) ([]byte, error) { + // Read request payload + body, err := io.ReadAll(request.Body) + // Reset request.Body ReadCloser to prevent side-effect if re-read + request.Body = io.NopCloser(bytes.NewBuffer(body)) + if err != nil { + return nil, errors.Wrap(err, "failed to parse request body") + } + return body, nil +} diff --git a/eventsources/sources/webhook/start_test.go b/eventsources/sources/webhook/start_test.go new file mode 100644 index 0000000000..2c39923cb3 --- /dev/null +++ b/eventsources/sources/webhook/start_test.go @@ -0,0 +1,112 @@ +package webhook + +import ( + "bytes" + "io" + "net/http" + "net/url" + "strings" + "testing" + + "github.com/smartystreets/goconvey/convey" + + "github.com/argoproj/argo-events/eventsources/common/webhook" +) + +func TestHandleRoute(t *testing.T) { + convey.Convey("Given a route that receives HTTP access", t, func() { + router := &Router{ + route: webhook.GetFakeRoute(), + } + writer := &webhook.FakeHttpWriter{} + + convey.Convey("Test Get method with query parameters", func() { + url, _ := url.Parse("http://example.com/fake?aaa=b%20b&ccc=d%20d") + out := make(chan []byte) + router.route.Active = true + router.route.Context.Method = http.MethodGet + + go func() { + out <- <-router.route.DataCh + }() + + router.HandleRoute(writer, &http.Request{ + Method: http.MethodGet, + URL: url, + }) + result := <-out + convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusOK) + convey.So(string(result), convey.ShouldContainSubstring, `"body":{"aaa":["b b"],"ccc":["d d"]}`) + }) + convey.Convey("Test Get method without query parameter", func() { + url, _ := url.Parse("http://example.com/fake") + out := make(chan []byte) + router.route.Active = true + router.route.Context.Method = http.MethodGet + + go func() { + out <- <-router.route.DataCh + }() + + router.HandleRoute(writer, &http.Request{ + Method: http.MethodGet, + URL: url, + }) + result := <-out + convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusOK) + convey.So(string(result), convey.ShouldContainSubstring, `"body":{}`) + }) + convey.Convey("Test POST method with form-urlencoded", func() { + payload := []byte(`aaa=b%20b&ccc=d%20d`) + + out := make(chan []byte) + router.route.Active = true + router.route.Context.Method = http.MethodPost + + go func() { + out <- <-router.route.DataCh + }() + + var buf bytes.Buffer + buf.Write(payload) + + headers := make(map[string][]string) + headers["Content-Type"] = []string{"application/x-www-form-urlencoded"} + + router.HandleRoute(writer, &http.Request{ + Method: http.MethodPost, + Header: headers, + Body: io.NopCloser(strings.NewReader(buf.String())), + }) + result := <-out + convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusOK) + convey.So(string(result), convey.ShouldContainSubstring, `"body":{"aaa":["b b"],"ccc":["d d"]}`) + }) + convey.Convey("Test POST method with json", func() { + payload := []byte(`{"aaa":["b b"],"ccc":["d d"]}`) + + out := make(chan []byte) + router.route.Active = true + router.route.Context.Method = http.MethodPost + + go func() { + out <- <-router.route.DataCh + }() + + var buf bytes.Buffer + buf.Write(payload) + + headers := make(map[string][]string) + headers["Content-Type"] = []string{"application/json"} + + router.HandleRoute(writer, &http.Request{ + Method: http.MethodPost, + Header: headers, + Body: io.NopCloser(strings.NewReader(buf.String())), + }) + result := <-out + convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusOK) + convey.So(string(result), convey.ShouldContainSubstring, `"body":{"aaa":["b b"],"ccc":["d d"]}`) + }) + }) +}