Skip to content

Commit

Permalink
feat: add support to parse QueryParameter and PostForm on webhook eve… (
Browse files Browse the repository at this point in the history
#1978)

* feat: add support to parse QueryParameter and PostForm on webhook eventsource

Signed-off-by: shirou <[email protected]>
  • Loading branch information
shirou authored and whynowy committed Jun 9, 2022
1 parent 0a9206d commit 656b99d
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 6 deletions.
71 changes: 65 additions & 6 deletions eventsources/sources/webhook/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ limitations under the License.
package webhook

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"

"go.uber.org/zap"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
Expand All @@ -33,6 +33,8 @@ import (
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
"github.com/argoproj/argo-events/pkg/apis/events"
"github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
"github.com/pkg/errors"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -101,22 +103,27 @@ func (router *Router) HandleRoute(writer http.ResponseWriter, request *http.Requ
return
}

if route.Context.Method != request.Method {
logger.Info("http method does not match")
common.SendErrorResponse(writer, "http method does not match")
return
}

defer func(start time.Time) {
route.Metrics.EventProcessingDuration(route.EventSourceName, route.EventName, float64(time.Since(start)/time.Millisecond))
}(time.Now())

request.Body = http.MaxBytesReader(writer, request.Body, 65536)
body, err := io.ReadAll(request.Body)
body, err := GetBody(&writer, request, route, logger)
if err != nil {
logger.Errorw("failed to parse request body", zap.Error(err))
logger.Errorw("failed to get body", zap.Error(err))
common.SendErrorResponse(writer, err.Error())
route.Metrics.EventProcessingFailed(route.EventSourceName, route.EventName)
return
}

payload := &events.WebhookEventData{
Header: request.Header,
Body: (*json.RawMessage)(&body),
Body: body,
Metadata: route.Context.Metadata,
}

Expand Down Expand Up @@ -155,3 +162,55 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
route: route,
}, controller, dispatch)
}

func GetBody(writer *http.ResponseWriter, request *http.Request, route *webhook.Route, logger *zap.SugaredLogger) (*json.RawMessage, error) {
switch request.Method {
case http.MethodGet:
body, _ := json.Marshal(request.URL.Query())
ret := json.RawMessage(body)
return &ret, nil
case http.MethodPost:
contentType := ""
if len(request.Header["Content-Type"]) > 0 {
contentType = request.Header["Content-Type"][0]
}

switch contentType {
case "application/x-www-form-urlencoded":
if err := request.ParseForm(); err != nil {
logger.Errorw("failed to parse form data", zap.Error(err))
common.SendInternalErrorResponse(*writer, err.Error())
route.Metrics.EventProcessingFailed(route.EventSourceName, route.EventName)
return nil, err
}
body, _ := json.Marshal(request.PostForm)
ret := json.RawMessage(body)
return &ret, nil
// default including "application/json" is parsing body as JSON
default:
request.Body = http.MaxBytesReader(*writer, request.Body, 65536)
body, err := getRequestBody(request)
if err != nil {
logger.Errorw("failed to read request body", zap.Error(err))
common.SendErrorResponse(*writer, err.Error())
route.Metrics.EventProcessingFailed(route.EventSourceName, route.EventName)
return nil, err
}
ret := json.RawMessage(body)
return &ret, nil
}
default:
return nil, fmt.Errorf("unsupoorted method: %s", request.Method)
}
}

func getRequestBody(request *http.Request) ([]byte, error) {
// Read request payload
body, err := io.ReadAll(request.Body)
// Reset request.Body ReadCloser to prevent side-effect if re-read
request.Body = io.NopCloser(bytes.NewBuffer(body))
if err != nil {
return nil, errors.Wrap(err, "failed to parse request body")
}
return body, nil
}
112 changes: 112 additions & 0 deletions eventsources/sources/webhook/start_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package webhook

import (
"bytes"
"io"
"net/http"
"net/url"
"strings"
"testing"

"github.com/smartystreets/goconvey/convey"

"github.com/argoproj/argo-events/eventsources/common/webhook"
)

func TestHandleRoute(t *testing.T) {
convey.Convey("Given a route that receives HTTP access", t, func() {
router := &Router{
route: webhook.GetFakeRoute(),
}
writer := &webhook.FakeHttpWriter{}

convey.Convey("Test Get method with query parameters", func() {
url, _ := url.Parse("http://example.com/fake?aaa=b%20b&ccc=d%20d")
out := make(chan []byte)
router.route.Active = true
router.route.Context.Method = http.MethodGet

go func() {
out <- <-router.route.DataCh
}()

router.HandleRoute(writer, &http.Request{
Method: http.MethodGet,
URL: url,
})
result := <-out
convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusOK)
convey.So(string(result), convey.ShouldContainSubstring, `"body":{"aaa":["b b"],"ccc":["d d"]}`)
})
convey.Convey("Test Get method without query parameter", func() {
url, _ := url.Parse("http://example.com/fake")
out := make(chan []byte)
router.route.Active = true
router.route.Context.Method = http.MethodGet

go func() {
out <- <-router.route.DataCh
}()

router.HandleRoute(writer, &http.Request{
Method: http.MethodGet,
URL: url,
})
result := <-out
convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusOK)
convey.So(string(result), convey.ShouldContainSubstring, `"body":{}`)
})
convey.Convey("Test POST method with form-urlencoded", func() {
payload := []byte(`aaa=b%20b&ccc=d%20d`)

out := make(chan []byte)
router.route.Active = true
router.route.Context.Method = http.MethodPost

go func() {
out <- <-router.route.DataCh
}()

var buf bytes.Buffer
buf.Write(payload)

headers := make(map[string][]string)
headers["Content-Type"] = []string{"application/x-www-form-urlencoded"}

router.HandleRoute(writer, &http.Request{
Method: http.MethodPost,
Header: headers,
Body: io.NopCloser(strings.NewReader(buf.String())),
})
result := <-out
convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusOK)
convey.So(string(result), convey.ShouldContainSubstring, `"body":{"aaa":["b b"],"ccc":["d d"]}`)
})
convey.Convey("Test POST method with json", func() {
payload := []byte(`{"aaa":["b b"],"ccc":["d d"]}`)

out := make(chan []byte)
router.route.Active = true
router.route.Context.Method = http.MethodPost

go func() {
out <- <-router.route.DataCh
}()

var buf bytes.Buffer
buf.Write(payload)

headers := make(map[string][]string)
headers["Content-Type"] = []string{"application/json"}

router.HandleRoute(writer, &http.Request{
Method: http.MethodPost,
Header: headers,
Body: io.NopCloser(strings.NewReader(buf.String())),
})
result := <-out
convey.So(writer.HeaderStatus, convey.ShouldEqual, http.StatusOK)
convey.So(string(result), convey.ShouldContainSubstring, `"body":{"aaa":["b b"],"ccc":["d d"]}`)
})
})
}

0 comments on commit 656b99d

Please sign in to comment.