From fc6e0f98126cf3aea328d6de0859feced7e06238 Mon Sep 17 00:00:00 2001 From: lightclient <14004106+lightclient@users.noreply.github.com> Date: Tue, 20 Aug 2024 07:54:28 -0600 Subject: [PATCH] rpc: add timeout to rpc client Unsubscribe (#30318) Fixes #30156 This adds a repro of the linked issue. I fixed it by adding a timeout when issuing the call to unsubscribe. --- rpc/client.go | 1 + rpc/client_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++ rpc/subscription.go | 5 +++- 3 files changed, 69 insertions(+), 1 deletion(-) diff --git a/rpc/client.go b/rpc/client.go index 05b87ae96cb7a..f9a8f1116b2b1 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -45,6 +45,7 @@ var ( const ( defaultDialTimeout = 10 * time.Second // used if context has no deadline subscribeTimeout = 10 * time.Second // overall timeout eth_subscribe, rpc_modules calls + unsubscribeTimeout = 10 * time.Second // timeout for *_unsubscribe calls ) const ( diff --git a/rpc/client_test.go b/rpc/client_test.go index 01c326afb017e..b7607adfce9d2 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -518,6 +518,70 @@ func TestClientCloseUnsubscribeRace(t *testing.T) { } } +// unsubscribeBlocker will wait for the quit channel to process an unsubscribe +// request. +type unsubscribeBlocker struct { + ServerCodec + quit chan struct{} +} + +func (b *unsubscribeBlocker) readBatch() ([]*jsonrpcMessage, bool, error) { + msgs, batch, err := b.ServerCodec.readBatch() + for _, msg := range msgs { + if msg.isUnsubscribe() { + <-b.quit + } + } + return msgs, batch, err +} + +// TestUnsubscribeTimeout verifies that calling the client's Unsubscribe +// function will eventually timeout and not block forever in case the serve does +// not respond. +// It reproducers the issue https://github.com/ethereum/go-ethereum/issues/30156 +func TestUnsubscribeTimeout(t *testing.T) { + srv := NewServer() + srv.RegisterName("nftest", new(notificationTestService)) + + // Setup middleware to block on unsubscribe. + p1, p2 := net.Pipe() + blocker := &unsubscribeBlocker{ServerCodec: NewCodec(p1), quit: make(chan struct{})} + defer close(blocker.quit) + + // Serve the middleware. + go srv.ServeCodec(blocker, OptionMethodInvocation|OptionSubscriptions) + defer srv.Stop() + + // Create the client on the other end of the pipe. + cfg := new(clientConfig) + client, _ := newClient(context.Background(), cfg, func(context.Context) (ServerCodec, error) { + return NewCodec(p2), nil + }) + defer client.Close() + + // Start subscription. + sub, err := client.Subscribe(context.Background(), "nftest", make(chan int), "someSubscription", 1, 1) + if err != nil { + t.Fatalf("failed to subscribe: %v", err) + } + + // Now on a separate thread, attempt to unsubscribe. Since the middleware + // won't return, the function will only return if it times out on the request. + done := make(chan struct{}) + go func() { + sub.Unsubscribe() + done <- struct{}{} + }() + + // Wait for the timeout. If the expected time for the timeout elapses, the + // test is considered failed. + select { + case <-done: + case <-time.After(unsubscribeTimeout + 3*time.Second): + t.Fatalf("Unsubscribe did not return within %s", unsubscribeTimeout) + } +} + // unsubscribeRecorder collects the subscription IDs of *_unsubscribe calls. type unsubscribeRecorder struct { ServerCodec diff --git a/rpc/subscription.go b/rpc/subscription.go index d77c655bf9007..9e400c8b6080e 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -371,5 +371,8 @@ func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, e func (sub *ClientSubscription) requestUnsubscribe() error { var result interface{} - return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid) + ctx, cancel := context.WithTimeout(context.Background(), unsubscribeTimeout) + defer cancel() + err := sub.client.CallContext(ctx, &result, sub.namespace+unsubscribeMethodSuffix, sub.subid) + return err }