Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Features for reverse calls, notifs, custom-params #88

Merged
merged 6 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 134 additions & 64 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,18 @@ func httpClient(ctx context.Context, addr string, namespace string, outs []inter
defer httpResp.Body.Close()

var resp clientResponse
if err := json.NewDecoder(httpResp.Body).Decode(&resp); err != nil {
return clientResponse{}, xerrors.Errorf("http status %s unmarshaling response: %w", httpResp.Status, err)
}
if cr.req.ID != nil { // non-notification
if err := json.NewDecoder(httpResp.Body).Decode(&resp); err != nil {
return clientResponse{}, xerrors.Errorf("http status %s unmarshaling response: %w", httpResp.Status, err)
}

if resp.ID, err = normalizeID(resp.ID); err != nil {
return clientResponse{}, xerrors.Errorf("failed to response ID: %w", err)
}
if resp.ID, err = normalizeID(resp.ID); err != nil {
return clientResponse{}, xerrors.Errorf("failed to response ID: %w", err)
}

if resp.ID != cr.req.ID {
return clientResponse{}, xerrors.New("request and response id didn't match")
if resp.ID != cr.req.ID {
return clientResponse{}, xerrors.New("request and response id didn't match")
}
}

return resp, nil
Expand Down Expand Up @@ -220,6 +222,45 @@ func websocketClient(ctx context.Context, addr string, namespace string, outs []
errors: config.errors,
}

requests := c.setupRequestChan()

stop := make(chan struct{})
exiting := make(chan struct{})
c.exiting = exiting

var hnd reqestHandler
if len(config.reverseHandlers) > 0 {
h := makeHandler(defaultServerConfig())
h.aliasedMethods = config.aliasedHandlerMethods
for _, reverseHandler := range config.reverseHandlers {
h.register(reverseHandler.ns, reverseHandler.hnd)
}
hnd = h
}

go (&wsConn{
conn: conn,
connFactory: connFactory,
reconnectBackoff: config.reconnectBackoff,
pingInterval: config.pingInterval,
timeout: config.timeout,
handler: hnd,
requests: requests,
stop: stop,
exiting: exiting,
}).handleWsConn(ctx)

if err := c.provide(outs); err != nil {
return nil, err
}

return func() {
close(stop)
<-exiting
}, nil
}

func (c *client) setupRequestChan() chan clientRequest {
requests := make(chan clientRequest)

c.doRequest = func(ctx context.Context, cr clientRequest) (clientResponse, error) {
Expand All @@ -245,12 +286,18 @@ func websocketClient(ctx context.Context, addr string, namespace string, outs []
case <-ctxDone: // send cancel request
ctxDone = nil

rp, err := json.Marshal([]param{{v: reflect.ValueOf(cr.req.ID)}})
if err != nil {
return clientResponse{}, xerrors.Errorf("marshalling cancel request: %w", err)
}

cancelReq := clientRequest{
req: request{
Jsonrpc: "2.0",
Method: wsCancel,
Params: []param{{v: reflect.ValueOf(cr.req.ID)}},
Params: rp,
},
ready: make(chan clientResponse, 1),
}
select {
case requests <- cancelReq:
Expand All @@ -264,30 +311,7 @@ func websocketClient(ctx context.Context, addr string, namespace string, outs []
return resp, nil
}

stop := make(chan struct{})
exiting := make(chan struct{})
c.exiting = exiting

go (&wsConn{
conn: conn,
connFactory: connFactory,
reconnectBackoff: config.reconnectBackoff,
pingInterval: config.pingInterval,
timeout: config.timeout,
handler: nil,
requests: requests,
stop: stop,
exiting: exiting,
}).handleWsConn(ctx)

if err := c.provide(outs); err != nil {
return nil, err
}

return func() {
close(stop)
<-exiting
}, nil
return requests
}

func (c *client) provide(outs []interface{}) error {
Expand Down Expand Up @@ -433,10 +457,15 @@ type rpcFunc struct {
valOut int
errOut int

hasCtx int
// hasCtx is 1 if the function has a context.Context as its first argument.
// Used as the number of the first non-context argument.
hasCtx int
Comment on lines +460 to +462

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for commenting this. Very confusing.


hasRawParams bool
returnValueIsChannel bool

retry bool
retry bool
notify bool
}

func (fn *rpcFunc) processResponse(resp clientResponse, rval reflect.Value) []reflect.Value {
Expand Down Expand Up @@ -471,21 +500,47 @@ func (fn *rpcFunc) processError(err error) []reflect.Value {
}

func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value) {
var id interface{} = atomic.AddInt64(&fn.client.idCtr, 1)
params := make([]param, len(args)-fn.hasCtx)
for i, arg := range args[fn.hasCtx:] {
enc, found := fn.client.paramEncoders[arg.Type()]
if found {
// custom param encoder
var err error
arg, err = enc(arg)
if err != nil {
return fn.processError(fmt.Errorf("sendRequest failed: %w", err))
}
var id interface{}
if !fn.notify {
id = atomic.AddInt64(&fn.client.idCtr, 1)

// Prepare the ID to send on the wire.
// We track int64 ids as float64 in the inflight map (because that's what
// they'll be decoded to). encoding/json outputs numbers with their minimal
// encoding, avoding the decimal point when possible, i.e. 3 will never get
// converted to 3.0.
var err error
id, err = normalizeID(id)
if err != nil {
return fn.processError(fmt.Errorf("failed to normalize id")) // should probably panic
}
}

params[i] = param{
v: arg,
var serializedParams json.RawMessage

if fn.hasRawParams {
serializedParams = json.RawMessage(args[fn.hasCtx].Interface().(RawParams))
} else {
params := make([]param, len(args)-fn.hasCtx)
for i, arg := range args[fn.hasCtx:] {
enc, found := fn.client.paramEncoders[arg.Type()]
if found {
// custom param encoder
var err error
arg, err = enc(arg)
if err != nil {
return fn.processError(fmt.Errorf("sendRequest failed: %w", err))
}
}

params[i] = param{
v: arg,
}
}
var err error
serializedParams, err = json.Marshal(params)
if err != nil {
return fn.processError(fmt.Errorf("marshaling params failed: %w", err))
}
}

Expand All @@ -506,21 +561,11 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value)
retVal, chCtor = fn.client.makeOutChan(ctx, fn.ftyp, fn.valOut)
}

// Prepare the ID to send on the wire.
// We track int64 ids as float64 in the inflight map (because that's what
// they'll be decoded to). encoding/json outputs numbers with their minimal
// encoding, avoding the decimal point when possible, i.e. 3 will never get
// converted to 3.0.
id, err := normalizeID(id)
if err != nil {
return fn.processError(fmt.Errorf("failed to normalize id")) // should probably panic
}

req := request{
Jsonrpc: "2.0",
ID: id,
Method: fn.client.namespace + "." + fn.name,
Params: params,
Method: fn.name,
Params: serializedParams,
}

if span != nil {
Expand All @@ -538,6 +583,7 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value)
minDelay: methodMinRetryDelay,
}

var err error
var resp clientResponse
// keep retrying if got a forced closed websocket conn and calling method
// has retry annotation
Expand All @@ -547,7 +593,7 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value)
return fn.processError(fmt.Errorf("sendRequest failed: %w", err))
}

if resp.ID != req.ID {
if !fn.notify && resp.ID != req.ID {
return fn.processError(xerrors.New("request and response id didn't match"))
}

Expand Down Expand Up @@ -575,24 +621,48 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value)
return fn.processResponse(resp, retVal())
}

const (
ProxyTagRetry = "retry"
ProxyTagNotify = "notify"
ProxyTagRPCMethod = "rpc_method"
)

func (c *client) makeRpcFunc(f reflect.StructField) (reflect.Value, error) {
ftyp := f.Type
if ftyp.Kind() != reflect.Func {
return reflect.Value{}, xerrors.New("handler field not a func")
}

name := c.namespace + "." + f.Name
if tag, ok := f.Tag.Lookup(ProxyTagRPCMethod); ok {
name = tag
}

fun := &rpcFunc{
client: c,
ftyp: ftyp,
name: f.Name,
retry: f.Tag.Get("retry") == "true",
name: name,
retry: f.Tag.Get(ProxyTagRetry) == "true",
notify: f.Tag.Get(ProxyTagNotify) == "true",
}
fun.valOut, fun.errOut, fun.nout = processFuncOut(ftyp)

if fun.valOut != -1 && fun.notify {
return reflect.Value{}, xerrors.New("notify methods cannot return values")
}

fun.returnValueIsChannel = fun.valOut != -1 && ftyp.Out(fun.valOut).Kind() == reflect.Chan

if ftyp.NumIn() > 0 && ftyp.In(0) == contextType {
fun.hasCtx = 1
}
fun.returnValueIsChannel = fun.valOut != -1 && ftyp.Out(fun.valOut).Kind() == reflect.Chan
// note: hasCtx is also the number of the first non-context argument
if ftyp.NumIn() > fun.hasCtx && ftyp.In(fun.hasCtx) == rtRawParams {
if ftyp.NumIn() > fun.hasCtx+1 {
return reflect.Value{}, xerrors.New("raw params can't be mixed with other arguments")
}
fun.hasRawParams = true
}

return reflect.MakeFunc(ftyp, fun.handleRpcCall), nil
}
11 changes: 10 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/filecoin-project/go-jsonrpc

go 1.14
go 1.18

require (
github.com/google/uuid v1.1.1
Expand All @@ -12,3 +12,12 @@ require (
go.uber.org/zap v1.14.1
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.6.0 // indirect
go.uber.org/multierr v1.5.0 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)
Loading