From 7b60f8c59dcc497b3b367deaa14553f831268477 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 17 Dec 2020 15:58:43 +0700 Subject: [PATCH] change OpenStream to accept a context --- .github/workflows/interop.yml | 5 +++- cmd/client/main.go | 2 +- conn.go | 4 +-- conn_test.go | 6 ++--- go.mod | 2 +- go.sum | 4 +-- integrationtests/conn/conn.go | 14 +++++++++++ .../conn/conn_without_stream_open_context.go | 14 +++++++++++ integrationtests/main.go | 25 ++++++++++--------- .../stream/stream_old_interface.go | 2 +- 10 files changed, 55 insertions(+), 23 deletions(-) create mode 100644 integrationtests/conn/conn.go create mode 100644 integrationtests/conn/conn_without_stream_open_context.go diff --git a/.github/workflows/interop.yml b/.github/workflows/interop.yml index d5f311f..b98ae14 100644 --- a/.github/workflows/interop.yml +++ b/.github/workflows/interop.yml @@ -60,7 +60,10 @@ jobs: git reflog --decorate -1 TAGS=() if [[ `git merge-base --is-ancestor HEAD 126c64772ba0aef0b2b6d58ff36e55a93f9253a7; echo $?` != "1" ]]; then - TAGS+=("oldstream") + TAGS+=("old_stream_close") + fi + if [[ `git merge-base --is-ancestor HEAD 3123af36d6cec13e31dac75058c8046e6e4a6690; echo $?` != "1" ]]; then + TAGS+=("stream_open_no_context") fi if [[ "${{ matrix.cfg.retireBugBackwardsCompatiblityMode }}" == "true" ]]; then TAGS+=("retirebugcompatmode") diff --git a/cmd/client/main.go b/cmd/client/main.go index b17f47b..bc033e8 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -49,7 +49,7 @@ func run(raddr string, p string) error { return err } defer conn.Close() - str, err := conn.OpenStream() + str, err := conn.OpenStream(context.Background()) if err != nil { return err } diff --git a/conn.go b/conn.go index 9ba8593..4f74d25 100644 --- a/conn.go +++ b/conn.go @@ -37,8 +37,8 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream() (mux.MuxedStream, error) { - qstr, err := c.sess.OpenStreamSync(context.Background()) +func (c *conn) OpenStream(ctx context.Context) (mux.MuxedStream, error) { + qstr, err := c.sess.OpenStreamSync(ctx) return &stream{Stream: qstr}, err } diff --git a/conn_test.go b/conn_test.go index 172acff..0eee557 100644 --- a/conn_test.go +++ b/conn_test.go @@ -130,7 +130,7 @@ var _ = Describe("Connection", func() { Expect(err).ToNot(HaveOccurred()) defer serverConn.Close() - str, err := conn.OpenStream() + str, err := conn.OpenStream(context.Background()) Expect(err).ToNot(HaveOccurred()) _, err = str.Write([]byte("foobar")) Expect(err).ToNot(HaveOccurred()) @@ -251,7 +251,7 @@ var _ = Describe("Connection", func() { for _, c := range []tpt.CapableConn{serverConn1, serverConn2} { go func(conn tpt.CapableConn) { defer GinkgoRecover() - str, err := conn.OpenStream() + str, err := conn.OpenStream(context.Background()) Expect(err).ToNot(HaveOccurred()) defer str.Close() _, err = str.Write(data) @@ -315,7 +315,7 @@ var _ = Describe("Connection", func() { defer GinkgoRecover() conn, err := ln.Accept() Expect(err).ToNot(HaveOccurred()) - str, err := conn.OpenStream() + str, err := conn.OpenStream(context.Background()) Expect(err).ToNot(HaveOccurred()) str.Write([]byte("foobar")) }() diff --git a/go.mod b/go.mod index e160895..5dc7713 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.14 require ( github.com/golang/mock v1.4.4 github.com/ipfs/go-log v1.0.4 - github.com/libp2p/go-libp2p-core v0.7.0 + github.com/libp2p/go-libp2p-core v0.8.0 github.com/libp2p/go-libp2p-tls v0.1.3 github.com/libp2p/go-netroute v0.1.3 github.com/lucas-clemente/quic-go v0.19.3 diff --git a/go.sum b/go.sum index e3116a7..a472091 100644 --- a/go.sum +++ b/go.sum @@ -126,8 +126,8 @@ github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoR github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8= github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= -github.com/libp2p/go-libp2p-core v0.7.0 h1:4a0TMjrWNTZlNvcqxZmrMRDi/NQWrhwO2pkTuLSQ/IQ= -github.com/libp2p/go-libp2p-core v0.7.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= +github.com/libp2p/go-libp2p-core v0.8.0 h1:5K3mT+64qDTKbV3yTdbMCzJ7O6wbNsavAEb8iqBvBcI= +github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-tls v0.1.3 h1:twKMhMu44jQO+HgQK9X8NHO5HkeJu2QbhLzLJpa8oNM= github.com/libp2p/go-libp2p-tls v0.1.3/go.mod h1:wZfuewxOndz5RTnCAxFliGjvYSDA40sKitV4c50uI1M= github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU= diff --git a/integrationtests/conn/conn.go b/integrationtests/conn/conn.go new file mode 100644 index 0000000..92a7d92 --- /dev/null +++ b/integrationtests/conn/conn.go @@ -0,0 +1,14 @@ +// +build !stream_open_no_context + +package conn + +import ( + "context" + + "github.com/libp2p/go-libp2p-core/mux" + tpt "github.com/libp2p/go-libp2p-core/transport" +) + +func OpenStream(ctx context.Context, c tpt.CapableConn) (mux.MuxedStream, error) { + return c.OpenStream(ctx) +} diff --git a/integrationtests/conn/conn_without_stream_open_context.go b/integrationtests/conn/conn_without_stream_open_context.go new file mode 100644 index 0000000..1e2cfc2 --- /dev/null +++ b/integrationtests/conn/conn_without_stream_open_context.go @@ -0,0 +1,14 @@ +// +build stream_open_no_context + +package conn + +import ( + "context" + + "github.com/libp2p/go-libp2p-core/mux" + tpt "github.com/libp2p/go-libp2p-core/transport" +) + +func OpenStream(_ context.Context, c tpt.CapableConn) (mux.MuxedStream, error) { + return c.OpenStream() +} diff --git a/integrationtests/main.go b/integrationtests/main.go index c6a4634..6d454f1 100644 --- a/integrationtests/main.go +++ b/integrationtests/main.go @@ -19,6 +19,7 @@ import ( ma "github.com/multiformats/go-multiaddr" "golang.org/x/sync/errgroup" + "github.com/libp2p/go-libp2p-quic-transport/integrationtests/conn" "github.com/libp2p/go-libp2p-quic-transport/integrationtests/stream" ) @@ -151,18 +152,18 @@ func testSingleFileTransfer(tr transport.Transport, serverKey crypto.PubKey, add } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - conn, err := tr.Dial(ctx, addr, serverPeerID) + c, err := tr.Dial(ctx, addr, serverPeerID) if err != nil { return fmt.Errorf("Dial failed: %w", err) } - defer conn.Close() - if !conn.RemotePublicKey().Equals(serverKey) { + defer c.Close() + if !c.RemotePublicKey().Equals(serverKey) { return errors.New("mismatching public keys") } - if conn.RemotePeer() != serverPeerID { - return fmt.Errorf("remote Peer ID mismatch. Got %s, expected %s", conn.RemotePeer().Pretty(), serverPeerID.Pretty()) + if c.RemotePeer() != serverPeerID { + return fmt.Errorf("remote Peer ID mismatch. Got %s, expected %s", c.RemotePeer().Pretty(), serverPeerID.Pretty()) } - st, err := conn.OpenStream() + st, err := conn.OpenStream(context.Background(), c) if err != nil { return err } @@ -192,21 +193,21 @@ func testMultipleFileTransfer(tr transport.Transport, serverKey crypto.PubKey, a } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - conn, err := tr.Dial(ctx, addr, serverPeerID) + c, err := tr.Dial(ctx, addr, serverPeerID) if err != nil { return fmt.Errorf("Dial failed: %w", err) } - defer conn.Close() - if !conn.RemotePublicKey().Equals(serverKey) { + defer c.Close() + if !c.RemotePublicKey().Equals(serverKey) { return errors.New("mismatching public keys") } - if conn.RemotePeer() != serverPeerID { - return fmt.Errorf("remote Peer ID mismatch. Got %s, expected %s", conn.RemotePeer().Pretty(), serverPeerID.Pretty()) + if c.RemotePeer() != serverPeerID { + return fmt.Errorf("remote Peer ID mismatch. Got %s, expected %s", c.RemotePeer().Pretty(), serverPeerID.Pretty()) } var g errgroup.Group for i := 0; i < 2000; i++ { g.Go(func() error { - st, err := conn.OpenStream() + st, err := conn.OpenStream(context.Background(), c) if err != nil { return err } diff --git a/integrationtests/stream/stream_old_interface.go b/integrationtests/stream/stream_old_interface.go index fb8dc14..65ed999 100644 --- a/integrationtests/stream/stream_old_interface.go +++ b/integrationtests/stream/stream_old_interface.go @@ -1,4 +1,4 @@ -// +build oldstream +// +build old_stream_close package stream