Skip to content

Commit

Permalink
feat: supports stream route API (#2104)
Browse files Browse the repository at this point in the history
* feat: add stream route module

* feat: add stream route test cases

* fix: change error handling

* feat: add license for stream route

* fix: format imports

* fix: add stream route to schema check queue

* feat: add e2e test

* feat: add desc and plugins parameters

* feat: add e2e test

* feat: add license for e2e test

* fix: fix e2e test case body error

* fix: delete trailing whitespace at stream_route_test.go 45 line

* chore: remove whitespace

* chore: update property order

* test: remove unused test

* chore: reorder import

* test: add ginkgo E2E test

* test: update DP E2E test

* chore: add license header

* test: update E2E environment configure

* test: fix typo

* test: add exception E2E test

* test: remove old E2E implementation

* test: fix typo

* feat: add upstream usage check

* test: add stream route E2E with upstream

* test: fix unit test

* test: update E2E case

* test: update E2E environment configure

* test: add condition list unit test

* test: add TLS with SNI DP E2E test

* chore: rename Sni to SNI

* test: add TCP and UDP E2E test

* test: fix TCP read

Co-authored-by: bzp2010 <[email protected]>
  • Loading branch information
Xu-Mj and bzp2010 authored Nov 4, 2021
1 parent 5510ecb commit e53a607
Show file tree
Hide file tree
Showing 12 changed files with 864 additions and 13 deletions.
13 changes: 13 additions & 0 deletions api/internal/core/entity/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,16 @@ type Proto struct {
Desc string `json:"desc,omitempty"`
Content string `json:"content"`
}

// swagger:model StreamRoute
type StreamRoute struct {
BaseInfo
Desc string `json:"desc,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"`
ServerAddr string `json:"server_addr,omitempty"`
ServerPort int `json:"server_port,omitempty"`
SNI string `json:"sni,omitempty"`
Upstream *UpstreamDef `json:"upstream,omitempty"`
UpstreamID interface{} `json:"upstream_id,omitempty"`
Plugins map[string]interface{} `json:"plugins,omitempty"`
}
26 changes: 20 additions & 6 deletions api/internal/core/store/storehub.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
HubKeyServerInfo HubKey = "server_info"
HubKeyPluginConfig HubKey = "plugin_config"
HubKeyProto HubKey = "proto"
HubKeyStreamRoute HubKey = "stream_route"
)

var (
Expand All @@ -47,12 +48,13 @@ var (

func InitStore(key HubKey, opt GenericStoreOption) error {
hubsNeedCheck := map[HubKey]bool{
HubKeyConsumer: true,
HubKeyRoute: true,
HubKeySsl: true,
HubKeyService: true,
HubKeyUpstream: true,
HubKeyGlobalRule: true,
HubKeyConsumer: true,
HubKeyRoute: true,
HubKeySsl: true,
HubKeyService: true,
HubKeyUpstream: true,
HubKeyGlobalRule: true,
HubKeyStreamRoute: true,
}
if _, ok := hubsNeedCheck[key]; ok {
validator, err := NewAPISIXJsonSchemaValidator("main." + string(key))
Expand Down Expand Up @@ -215,5 +217,17 @@ func InitStores() error {
return err
}

err = InitStore(HubKeyStreamRoute, GenericStoreOption{
BasePath: conf.ETCDConfig.Prefix + "/stream_routes",
ObjType: reflect.TypeOf(entity.StreamRoute{}),
KeyFunc: func(obj interface{}) string {
r := obj.(*entity.StreamRoute)
return utils.InterfaceToString(r.ID)
},
})
if err != nil {
return err
}

return nil
}
186 changes: 186 additions & 0 deletions api/internal/handler/stream_route/stream_route.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package stream_route

import (
"fmt"
"net/http"
"reflect"
"strings"

"github.com/gin-gonic/gin"
"github.com/shiningrush/droplet"
"github.com/shiningrush/droplet/data"
"github.com/shiningrush/droplet/wrapper"
wgin "github.com/shiningrush/droplet/wrapper/gin"

"github.com/apisix/manager-api/internal/core/entity"
"github.com/apisix/manager-api/internal/core/store"
"github.com/apisix/manager-api/internal/handler"
"github.com/apisix/manager-api/internal/utils"
)

type Handler struct {
streamRouteStore store.Interface
upstreamStore store.Interface
}

func NewHandler() (handler.RouteRegister, error) {
return &Handler{
streamRouteStore: store.GetStore(store.HubKeyStreamRoute),
upstreamStore: store.GetStore(store.HubKeyUpstream),
}, nil
}

func (h *Handler) ApplyRoute(r *gin.Engine) {
r.GET("/apisix/admin/stream_routes/:id", wgin.Wraps(h.Get,
wrapper.InputType(reflect.TypeOf(GetInput{}))))
r.GET("/apisix/admin/stream_routes", wgin.Wraps(h.List,
wrapper.InputType(reflect.TypeOf(ListInput{}))))
r.POST("/apisix/admin/stream_routes", wgin.Wraps(h.Create,
wrapper.InputType(reflect.TypeOf(entity.StreamRoute{}))))
r.PUT("/apisix/admin/stream_routes", wgin.Wraps(h.Update,
wrapper.InputType(reflect.TypeOf(UpdateInput{}))))
r.PUT("/apisix/admin/stream_routes/:id", wgin.Wraps(h.Update,
wrapper.InputType(reflect.TypeOf(UpdateInput{}))))
r.DELETE("/apisix/admin/stream_routes/:ids", wgin.Wraps(h.BatchDelete,
wrapper.InputType(reflect.TypeOf(BatchDelete{}))))
}

type GetInput struct {
ID string `auto_read:"id,path" validate:"required"`
}

func (h *Handler) Get(c droplet.Context) (interface{}, error) {
input := c.Input().(*GetInput)
streamRoute, err := h.streamRouteStore.Get(c.Context(), input.ID)
if err != nil {
return handler.SpecCodeResponse(err), err
}
return streamRoute, nil
}

type ListInput struct {
RemoteAddr string `auto_read:"remote_addr,query"`
ServerAddr string `auto_read:"server_addr,query"`
ServerPort int `auto_read:"server_port,query"`
SNI string `auto_read:"sni,query"`
store.Pagination
}

func (h *Handler) List(c droplet.Context) (interface{}, error) {
input := c.Input().(*ListInput)
ret, err := h.streamRouteStore.List(c.Context(), store.ListInput{
Predicate: func(obj interface{}) bool {
if input.RemoteAddr != "" && !strings.Contains(obj.(*entity.StreamRoute).RemoteAddr, input.RemoteAddr) {
return false
}

if input.ServerAddr != "" && !strings.Contains(obj.(*entity.StreamRoute).ServerAddr, input.ServerAddr) {
return false
}

if input.ServerPort != 0 && obj.(*entity.StreamRoute).ServerPort != input.ServerPort {
return false
}

if input.SNI != "" && !strings.Contains(obj.(*entity.StreamRoute).SNI, input.SNI) {
return false
}

return true
},
PageSize: input.PageSize,
PageNumber: input.PageNumber,
})

if err != nil {
return nil, err
}
return ret, nil
}

func (h *Handler) Create(c droplet.Context) (interface{}, error) {
streamRoute := c.Input().(*entity.StreamRoute)
if streamRoute.UpstreamID != nil {
upstreamID := utils.InterfaceToString(streamRoute.UpstreamID)
_, err := h.upstreamStore.Get(c.Context(), upstreamID)
if err != nil {
if err == data.ErrNotFound {
return &data.SpecCodeResponse{StatusCode: http.StatusBadRequest},
fmt.Errorf("upstream id: %s not found", streamRoute.UpstreamID)
}
return handler.SpecCodeResponse(err), err
}
}
create, err := h.streamRouteStore.Create(c.Context(), streamRoute)
if err != nil {
return handler.SpecCodeResponse(err), err
}
return create, nil
}

type UpdateInput struct {
ID string `auto_read:"id,path"`
entity.StreamRoute
}

func (h *Handler) Update(c droplet.Context) (interface{}, error) {
input := c.Input().(*UpdateInput)

// check if ID in body is equal ID in path
if err := handler.IDCompare(input.ID, input.StreamRoute.ID); err != nil {
return &data.SpecCodeResponse{StatusCode: http.StatusBadRequest}, err
}

// if has id in path, use it
if input.ID != "" {
input.StreamRoute.ID = input.ID
}

if input.UpstreamID != nil {
upstreamID := utils.InterfaceToString(input.UpstreamID)
_, err := h.upstreamStore.Get(c.Context(), upstreamID)
if err != nil {
if err == data.ErrNotFound {
return &data.SpecCodeResponse{StatusCode: http.StatusBadRequest},
fmt.Errorf("upstream id: %s not found", input.UpstreamID)
}
return &data.SpecCodeResponse{StatusCode: http.StatusBadRequest}, err
}
}
res, err := h.streamRouteStore.Update(c.Context(), &input.StreamRoute, true)
if err != nil {
return handler.SpecCodeResponse(err), err
}

return res, nil
}

type BatchDelete struct {
IDs string `auto_read:"ids,path"`
}

func (h *Handler) BatchDelete(c droplet.Context) (interface{}, error) {
input := c.Input().(*BatchDelete)

if err := h.streamRouteStore.BatchDelete(c.Context(), strings.Split(input.IDs, ",")); err != nil {
return handler.SpecCodeResponse(err), err
}

return nil, nil
}
Loading

0 comments on commit e53a607

Please sign in to comment.