Skip to content

Commit

Permalink
Merge pull request #501 from micro/wrap
Browse files Browse the repository at this point in the history
add the wrappers back into the core router
  • Loading branch information
asim authored Jun 7, 2019
2 parents a2fbf19 + f5ac238 commit 95b8147
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 15 deletions.
4 changes: 2 additions & 2 deletions server/rpc_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type rpcRequest struct {
codec codec.Codec
header map[string]string
body []byte
rawBody interface{}
stream bool
}

Expand Down Expand Up @@ -48,8 +49,7 @@ func (r *rpcRequest) Header() map[string]string {
}

func (r *rpcRequest) Body() interface{} {
// TODO: convert to interface value
return r.body
return r.rawBody
}

func (r *rpcRequest) Read() ([]byte, error) {
Expand Down
11 changes: 11 additions & 0 deletions server/rpc_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
method: req.msg.Method,
endpoint: req.msg.Endpoint,
body: req.msg.Body,
rawBody: argv.Interface(),
}

if !mtype.stream {
Expand All @@ -202,6 +203,11 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
return nil
}

// wrap the handler
for i := len(router.hdlrWrappers); i > 0; i-- {
fn = router.hdlrWrappers[i-1](fn)
}

// execute handler
if err := fn(ctx, r, replyv.Interface()); err != nil {
return err
Expand Down Expand Up @@ -235,6 +241,11 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
}
}

// wrap the handler
for i := len(router.hdlrWrappers); i > 0; i-- {
fn = router.hdlrWrappers[i-1](fn)
}

// client.Stream request
r.stream = true

Expand Down
48 changes: 35 additions & 13 deletions server/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type rpcServer struct {

func newRpcServer(opts ...Option) Server {
options := newOptions(opts...)
router := newRpcRouter()
router.hdlrWrappers = options.HdlrWrappers

return &rpcServer{
opts: options,
router: DefaultRouter,
Expand All @@ -45,6 +48,14 @@ func newRpcServer(opts ...Option) Server {
}
}

type rpcRouter struct {
h func(context.Context, Request, interface{}) error
}

func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response) error {
return r.h(ctx, req, rsp)
}

// ServeConn serves a single connection
func (s *rpcServer) ServeConn(sock transport.Socket) {
defer func() {
Expand Down Expand Up @@ -143,24 +154,26 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
}

// set router
r := s.opts.Router
r := Router(s.router)

// if nil use default router
if s.opts.Router == nil {
r = s.router
}
// if not nil use the router specified
if s.opts.Router != nil {
// create a wrapped function
handler := func(ctx context.Context, req Request, rsp interface{}) error {
return s.opts.Router.ServeRequest(ctx, req, rsp.(Response))
}

// create a wrapped function
handler := func(ctx context.Context, req Request, rsp interface{}) error {
return r.ServeRequest(ctx, req, rsp.(Response))
}
// execute the wrapper for it
for i := len(s.opts.HdlrWrappers); i > 0; i-- {
handler = s.opts.HdlrWrappers[i-1](handler)
}

for i := len(s.opts.HdlrWrappers); i > 0; i-- {
handler = s.opts.HdlrWrappers[i-1](handler)
// set the router
r = rpcRouter{handler}
}

// TODO: handle error better
if err := handler(ctx, request, response); err != nil {
// serve the actual request using the request router
if err := r.ServeRequest(ctx, request, response); err != nil {
// write an error response
err = rcodec.Write(&codec.Message{
Header: msg.Header,
Expand Down Expand Up @@ -206,6 +219,15 @@ func (s *rpcServer) Init(opts ...Option) error {
for _, opt := range opts {
opt(&s.opts)
}

// update router if its the default
if s.opts.Router == nil {
r := newRpcRouter()
r.hdlrWrappers = s.opts.HdlrWrappers
r.serviceMap = s.router.serviceMap
s.router = r
}

s.Unlock()
return nil
}
Expand Down

0 comments on commit 95b8147

Please sign in to comment.