Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: supports stream route API #2104

Merged
merged 37 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
22a8b1f
feat: add stream route module
Xu-Mj Aug 27, 2021
d825953
feat: add stream route test cases
Xu-Mj Aug 28, 2021
68199b0
fix: change error handling
Xu-Mj Aug 30, 2021
47078cd
feat: add license for stream route
Xu-Mj Aug 30, 2021
b28a045
fix: format imports
Xu-Mj Aug 31, 2021
0d49fea
fix: add stream route to schema check queue
Xu-Mj Sep 7, 2021
de766a4
feat: add e2e test
Xu-Mj Sep 7, 2021
b2f28ad
feat: add desc and plugins parameters
Xu-Mj Sep 7, 2021
51937a3
Merge branch 'master' into stream_route
Xu-Mj Sep 7, 2021
c4e2d9f
feat: add e2e test
Xu-Mj Sep 8, 2021
1647f57
feat: add license for e2e test
Xu-Mj Sep 8, 2021
4029a5b
fix: fix e2e test case body error
Xu-Mj Sep 9, 2021
95ac83a
fix: delete trailing whitespace at stream_route_test.go 45 line
Xu-Mj Sep 10, 2021
4b88275
Merge branch 'master' into stream_route
bzp2010 Oct 18, 2021
9c21778
chore: remove whitespace
bzp2010 Oct 18, 2021
734ca51
chore: update property order
bzp2010 Oct 18, 2021
806edbe
test: remove unused test
bzp2010 Oct 18, 2021
241d4a7
chore: reorder import
bzp2010 Oct 18, 2021
c3a877a
test: add ginkgo E2E test
bzp2010 Oct 18, 2021
0a929e1
test: update DP E2E test
bzp2010 Oct 18, 2021
4d19588
chore: add license header
bzp2010 Oct 18, 2021
bf6ac29
test: update E2E environment configure
bzp2010 Oct 18, 2021
cdb386a
test: fix typo
bzp2010 Oct 18, 2021
7f1f28f
test: add exception E2E test
bzp2010 Oct 18, 2021
fd97fa2
test: remove old E2E implementation
bzp2010 Oct 18, 2021
a60c504
test: fix typo
bzp2010 Oct 18, 2021
30cd147
feat: add upstream usage check
bzp2010 Oct 18, 2021
be74488
test: add stream route E2E with upstream
bzp2010 Oct 18, 2021
1ecd33b
test: fix unit test
bzp2010 Oct 18, 2021
2e1d987
test: update E2E case
bzp2010 Oct 19, 2021
df2dac8
test: update E2E environment configure
bzp2010 Oct 19, 2021
8f0d27c
test: add condition list unit test
bzp2010 Oct 19, 2021
7813825
Merge branch 'master' into stream_route
bzp2010 Oct 19, 2021
4506f43
test: add TLS with SNI DP E2E test
bzp2010 Oct 19, 2021
01e6d0e
chore: rename Sni to SNI
bzp2010 Oct 21, 2021
55476e6
test: add TCP and UDP E2E test
bzp2010 Oct 21, 2021
6cdc584
test: fix TCP read
bzp2010 Oct 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/internal/core/entity/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,16 @@ type Proto struct {
Desc string `json:"desc,omitempty"`
Content string `json:"content"`
}

// swagger:model StreamRoute
type StreamRoute struct {
BaseInfo
Desc string `json:"desc,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"`
ServerAddr string `json:"server_addr,omitempty"`
ServerPort int `json:"server_port,omitempty"`
Sni string `json:"sni,omitempty"`
bzp2010 marked this conversation as resolved.
Show resolved Hide resolved
Upstream *UpstreamDef `json:"upstream,omitempty"`
UpstreamID interface{} `json:"upstream_id,omitempty"`
Plugins map[string]interface{} `json:"plugins,omitempty"`
}
26 changes: 20 additions & 6 deletions api/internal/core/store/storehub.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
HubKeyServerInfo HubKey = "server_info"
HubKeyPluginConfig HubKey = "plugin_config"
HubKeyProto HubKey = "proto"
HubKeyStreamRoute HubKey = "stream_route"
)

var (
Expand All @@ -47,12 +48,13 @@ var (

func InitStore(key HubKey, opt GenericStoreOption) error {
hubsNeedCheck := map[HubKey]bool{
HubKeyConsumer: true,
HubKeyRoute: true,
HubKeySsl: true,
HubKeyService: true,
HubKeyUpstream: true,
HubKeyGlobalRule: true,
HubKeyConsumer: true,
HubKeyRoute: true,
HubKeySsl: true,
HubKeyService: true,
HubKeyUpstream: true,
HubKeyGlobalRule: true,
HubKeyStreamRoute: true,
}
if _, ok := hubsNeedCheck[key]; ok {
validator, err := NewAPISIXJsonSchemaValidator("main." + string(key))
Expand Down Expand Up @@ -215,5 +217,17 @@ func InitStores() error {
return err
}

err = InitStore(HubKeyStreamRoute, GenericStoreOption{
BasePath: conf.ETCDConfig.Prefix + "/stream_routes",
ObjType: reflect.TypeOf(entity.StreamRoute{}),
KeyFunc: func(obj interface{}) string {
r := obj.(*entity.StreamRoute)
return utils.InterfaceToString(r.ID)
},
})
if err != nil {
return err
}

return nil
}
186 changes: 186 additions & 0 deletions api/internal/handler/stream_route/stream_route.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package stream_route

import (
"fmt"
"net/http"
"reflect"
"strings"

"github.com/gin-gonic/gin"
"github.com/shiningrush/droplet"
"github.com/shiningrush/droplet/data"
"github.com/shiningrush/droplet/wrapper"
wgin "github.com/shiningrush/droplet/wrapper/gin"

"github.com/apisix/manager-api/internal/core/entity"
"github.com/apisix/manager-api/internal/core/store"
"github.com/apisix/manager-api/internal/handler"
"github.com/apisix/manager-api/internal/utils"
)
bzp2010 marked this conversation as resolved.
Show resolved Hide resolved

type Handler struct {
streamRouteStore store.Interface
upstreamStore store.Interface
}

func NewHandler() (handler.RouteRegister, error) {
return &Handler{
streamRouteStore: store.GetStore(store.HubKeyStreamRoute),
upstreamStore: store.GetStore(store.HubKeyUpstream),
}, nil
}

func (h *Handler) ApplyRoute(r *gin.Engine) {
r.GET("/apisix/admin/stream_routes/:id", wgin.Wraps(h.Get,
wrapper.InputType(reflect.TypeOf(GetInput{}))))
r.GET("/apisix/admin/stream_routes", wgin.Wraps(h.List,
wrapper.InputType(reflect.TypeOf(ListInput{}))))
r.POST("/apisix/admin/stream_routes", wgin.Wraps(h.Create,
wrapper.InputType(reflect.TypeOf(entity.StreamRoute{}))))
r.PUT("/apisix/admin/stream_routes", wgin.Wraps(h.Update,
wrapper.InputType(reflect.TypeOf(UpdateInput{}))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this handler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little confused: how do we create a stream-route , we only support get and delete handler?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use POST to create a stream route. Right?

Refer: http://apisix.apache.org/docs/apisix/admin-api#stream-route

r.PUT("/apisix/admin/stream_routes/:id", wgin.Wraps(h.Update,
wrapper.InputType(reflect.TypeOf(UpdateInput{}))))
r.DELETE("/apisix/admin/stream_routes/:ids", wgin.Wraps(h.BatchDelete,
wrapper.InputType(reflect.TypeOf(BatchDelete{}))))
}

type GetInput struct {
ID string `auto_read:"id,path" validate:"required"`
}

func (h *Handler) Get(c droplet.Context) (interface{}, error) {
input := c.Input().(*GetInput)
streamRoute, err := h.streamRouteStore.Get(c.Context(), input.ID)
if err != nil {
return handler.SpecCodeResponse(err), err
}
return streamRoute, nil
}

type ListInput struct {
RemoteAddr string `auto_read:"remote_addr,query"`
ServerAddr string `auto_read:"server_addr,query"`
ServerPort int `auto_read:"server_port,query"`
Sni string `auto_read:"sni,query"`
store.Pagination
}

func (h *Handler) List(c droplet.Context) (interface{}, error) {
input := c.Input().(*ListInput)
ret, err := h.streamRouteStore.List(c.Context(), store.ListInput{
Predicate: func(obj interface{}) bool {
if input.RemoteAddr != "" && !strings.Contains(obj.(*entity.StreamRoute).RemoteAddr, input.RemoteAddr) {
return false
}

if input.ServerAddr != "" && !strings.Contains(obj.(*entity.StreamRoute).ServerAddr, input.ServerAddr) {
return false
}

if input.ServerPort != 0 && obj.(*entity.StreamRoute).ServerPort != input.ServerPort {
return false
}

if input.Sni != "" && strings.Contains(obj.(*entity.StreamRoute).Sni, input.Sni) {
bzp2010 marked this conversation as resolved.
Show resolved Hide resolved
return false
}

return true
},
PageSize: input.PageSize,
PageNumber: input.PageNumber,
})

if err != nil {
return nil, err
}
return ret, nil
}

func (h *Handler) Create(c droplet.Context) (interface{}, error) {
streamRoute := c.Input().(*entity.StreamRoute)
if streamRoute.UpstreamID != nil {
upstreamID := utils.InterfaceToString(streamRoute.UpstreamID)
_, err := h.upstreamStore.Get(c.Context(), upstreamID)
if err != nil {
if err == data.ErrNotFound {
return &data.SpecCodeResponse{StatusCode: http.StatusBadRequest},
fmt.Errorf("upstream id: %s not found", streamRoute.UpstreamID)
}
return &data.SpecCodeResponse{StatusCode: http.StatusBadRequest}, err
bzp2010 marked this conversation as resolved.
Show resolved Hide resolved
}
}
create, err := h.streamRouteStore.Create(c.Context(), streamRoute)
if err != nil {
return handler.SpecCodeResponse(err), err
}
return create, nil
}

type UpdateInput struct {
ID string `auto_read:"id,path"`
entity.StreamRoute
}

func (h *Handler) Update(c droplet.Context) (interface{}, error) {
input := c.Input().(*UpdateInput)

// check if ID in body is equal ID in path
if err := handler.IDCompare(input.ID, input.StreamRoute.ID); err != nil {
return &data.SpecCodeResponse{StatusCode: http.StatusBadRequest}, err
}

// if has id in path, use it
if input.ID != "" {
input.StreamRoute.ID = input.ID
}

if input.UpstreamID != nil {
upstreamID := utils.InterfaceToString(input.UpstreamID)
_, err := h.upstreamStore.Get(c.Context(), upstreamID)
if err != nil {
if err == data.ErrNotFound {
return &data.SpecCodeResponse{StatusCode: http.StatusBadRequest},
fmt.Errorf("upstream id: %s not found", input.UpstreamID)
}
return &data.SpecCodeResponse{StatusCode: http.StatusBadRequest}, err
}
}
res, err := h.streamRouteStore.Update(c.Context(), &input.StreamRoute, true)
if err != nil {
return handler.SpecCodeResponse(err), err
}

return res, nil
}

type BatchDelete struct {
IDs string `auto_read:"ids,path"`
}

func (h *Handler) BatchDelete(c droplet.Context) (interface{}, error) {
input := c.Input().(*BatchDelete)

if err := h.streamRouteStore.BatchDelete(c.Context(), strings.Split(input.IDs, ",")); err != nil {
return handler.SpecCodeResponse(err), err
}

return nil, nil
}
68 changes: 68 additions & 0 deletions api/internal/handler/stream_route/stream_route_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package stream_route

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"

"github.com/apisix/manager-api/internal/core/entity"
)

func TestStructUnmarshal(t *testing.T) {
// define and parse data
jsonStr := `{
"id": 1,
"create_time": 1700000000,
"update_time": 1700000000,
"desc": "desc",
"remote_addr": "1.1.1.1",
"server_addr": "2.2.2.2",
"server_port": 9080,
"sni": "example.com",
"upstream": {
"nodes": [
{
"host": "10.10.10.10",
"port": 8080,
"weight": 1
}
],
"type": "roundrobin",
"scheme": "http",
"pass_host": "pass"
},
"upstream_id": 1
}`
streamRoute := entity.StreamRoute{}
err := json.Unmarshal([]byte(jsonStr), &streamRoute)

// asserts
assert.Nil(t, err)
assert.Equal(t, streamRoute.ID, float64(1))
assert.Equal(t, streamRoute.CreateTime, int64(1700000000))
assert.Equal(t, streamRoute.UpdateTime, int64(1700000000))
assert.Equal(t, streamRoute.Desc, "desc")
assert.Equal(t, streamRoute.RemoteAddr, "1.1.1.1")
assert.Equal(t, streamRoute.ServerAddr, "2.2.2.2")
assert.Equal(t, streamRoute.ServerPort, 9080)
assert.Equal(t, streamRoute.Sni, "example.com")
assert.Equal(t, streamRoute.UpstreamID, float64(1))
assert.NotNil(t, streamRoute.Upstream)
}
bzp2010 marked this conversation as resolved.
Show resolved Hide resolved
35 changes: 29 additions & 6 deletions api/internal/handler/upstream/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ import (
)

type Handler struct {
upstreamStore store.Interface
routeStore store.Interface
serviceStore store.Interface
upstreamStore store.Interface
routeStore store.Interface
serviceStore store.Interface
streamRouteStore store.Interface
}

func NewHandler() (handler.RouteRegister, error) {
return &Handler{
upstreamStore: store.GetStore(store.HubKeyUpstream),
routeStore: store.GetStore(store.HubKeyRoute),
serviceStore: store.GetStore(store.HubKeyService),
upstreamStore: store.GetStore(store.HubKeyUpstream),
routeStore: store.GetStore(store.HubKeyRoute),
serviceStore: store.GetStore(store.HubKeyService),
streamRouteStore: store.GetStore(store.HubKeyStreamRoute),
}, nil
}

Expand Down Expand Up @@ -260,6 +262,27 @@ func (h *Handler) BatchDelete(c droplet.Context) (interface{}, error) {
fmt.Errorf("service: %s is using this upstream", ret.Rows[0].(*entity.Service).Name)
}

ret, err = h.streamRouteStore.List(c.Context(), store.ListInput{
Predicate: func(obj interface{}) bool {
streamRoute := obj.(*entity.StreamRoute)
if _, exist := mp[utils.InterfaceToString(streamRoute.UpstreamID)]; exist {
return true
}

return false
},
PageSize: 0,
PageNumber: 0,
})
if err != nil {
return handler.SpecCodeResponse(err), err
}

if ret.TotalSize > 0 {
return &data.SpecCodeResponse{StatusCode: http.StatusBadRequest},
fmt.Errorf("stream route: %s is using this upstream", ret.Rows[0].(*entity.StreamRoute).ID)
}

if err = h.upstreamStore.BatchDelete(c.Context(), ids); err != nil {
return handler.SpecCodeResponse(err), err
}
Expand Down
Loading