Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add service pkg and deprecate micro pkg #1466

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 11 additions & 171 deletions micro/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,189 +15,29 @@ package micro

import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/service"
)

type (
// Handler is used to respond to service requests.
Handler interface {
Handle(Request)
}

// HandlerFunc is a function implementing [Handler].
// It allows using a function as a request handler, without having to implement Handle
// on a separate type.
HandlerFunc func(Request)

// Request represents service request available in the service handler.
// It exposes methods to respond to the request, as well as
// getting the request data and headers.
Request interface {
// Respond sends the response for the request.
// Additional headers can be passed using [WithHeaders] option.
Respond([]byte, ...RespondOpt) error

// RespondJSON marshals the given response value and responds to the request.
// Additional headers can be passed using [WithHeaders] option.
RespondJSON(any, ...RespondOpt) error

// Error prepares and publishes error response from a handler.
// A response error should be set containing an error code and description.
// Optionally, data can be set as response payload.
Error(code, description string, data []byte, opts ...RespondOpt) error

// Data returns request data.
Data() []byte

// Headers returns request headers.
Headers() Headers

// Subject returns underlying NATS message subject.
Subject() string
}

// Headers is a wrapper around [*nats.Header]
Headers nats.Header

// RespondOpt is a function used to configure [Request.Respond] and [Request.RespondJSON] methods.
RespondOpt func(*nats.Msg)

// request is a default implementation of Request interface
request struct {
msg *nats.Msg
respondError error
}

serviceError struct {
Code string `json:"code"`
Description string `json:"description"`
}
Handler = service.Handler
HandlerFunc = service.HandlerFunc
Request = service.Request
Headers = service.Headers
RespondOpt = service.RespondOpt
)

var (
ErrRespond = errors.New("NATS error when sending response")
ErrMarshalResponse = errors.New("marshaling response")
ErrArgRequired = errors.New("argument required")
ErrRespond = service.ErrRespond
ErrMarshalResponse = service.ErrMarshalResponse
ErrArgRequired = service.ErrArgRequired
)

func (fn HandlerFunc) Handle(req Request) {
fn(req)
}

// ContextHandler is a helper function used to utilize [context.Context]
// in request handlers.
func ContextHandler(ctx context.Context, handler func(context.Context, Request)) Handler {
return HandlerFunc(func(req Request) {
handler(ctx, req)
})
}

// Respond sends the response for the request.
// Additional headers can be passed using [WithHeaders] option.
func (r *request) Respond(response []byte, opts ...RespondOpt) error {
respMsg := &nats.Msg{
Data: response,
}
for _, opt := range opts {
opt(respMsg)
}

if err := r.msg.RespondMsg(respMsg); err != nil {
r.respondError = fmt.Errorf("%w: %s", ErrRespond, err)
return r.respondError
}

return nil
}

// RespondJSON marshals the given response value and responds to the request.
// Additional headers can be passed using [WithHeaders] option.
func (r *request) RespondJSON(response any, opts ...RespondOpt) error {
resp, err := json.Marshal(response)
if err != nil {
return ErrMarshalResponse
}
return r.Respond(resp, opts...)
}

// Error prepares and publishes error response from a handler.
// A response error should be set containing an error code and description.
// Optionally, data can be set as response payload.
func (r *request) Error(code, description string, data []byte, opts ...RespondOpt) error {
if code == "" {
return fmt.Errorf("%w: error code", ErrArgRequired)
}
if description == "" {
return fmt.Errorf("%w: description", ErrArgRequired)
}
response := &nats.Msg{
Header: nats.Header{
ErrorHeader: []string{description},
ErrorCodeHeader: []string{code},
},
}
for _, opt := range opts {
opt(response)
}

response.Data = data
if err := r.msg.RespondMsg(response); err != nil {
r.respondError = err
return err
}
r.respondError = &serviceError{
Code: code,
Description: description,
}

return nil
return service.ContextHandler(ctx, handler)
}

// WithHeaders can be used to configure response with custom headers.
func WithHeaders(headers Headers) RespondOpt {
return func(m *nats.Msg) {
if m.Header == nil {
m.Header = nats.Header(headers)
return
}

for k, v := range headers {
m.Header[k] = v
}
}
}

// Data returns request data.
func (r *request) Data() []byte {
return r.msg.Data
}

// Headers returns request headers.
func (r *request) Headers() Headers {
return Headers(r.msg.Header)
}

// Subject returns underlying NATS message subject.
func (r *request) Subject() string {
return r.msg.Subject
}

// Get gets the first value associated with the given key.
// It is case-sensitive.
func (h Headers) Get(key string) string {
return nats.Header(h).Get(key)
}

// Values returns all values associated with the given key.
// It is case-sensitive.
func (h Headers) Values(key string) []string {
return nats.Header(h).Values(key)
}

func (e *serviceError) Error() string {
return fmt.Sprintf("%s:%s", e.Code, e.Description)
return service.WithHeaders(headers)
}
Loading