Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] stream write data slow #432

Closed
ziranliu opened this issue Sep 23, 2018 · 6 comments
Closed

[Question] stream write data slow #432

ziranliu opened this issue Sep 23, 2018 · 6 comments
Labels
kind/support A question or request for support

Comments

@ziranliu
Copy link

Hi,
I'm working on a project using go-libp2p and I find the speed of data transmission between nodes is very slow. My service is deployed on AWS cloud in different regions. I have measured the network bandwidth and latency with tools such as iperf and found nothing wrong. So I wrote some codes to make a test directly.
It's the test code using the standard library:

// Server
package main

import (
	"fmt"
	"io"
	"net"
)

func read(conn net.Conn) {
	data := make([]byte, 300000)
	for {
		n, err := io.ReadFull(conn, data)
		if err != nil {
			fmt.Println("read error:", err)
			break
		}
		fmt.Println("read", n, "bytes")
	}

}

func main() {
	l, err := net.Listen("tcp", "0.0.0.0:30001")
	if err != nil {
		panic(err)
	}
	conn, err := l.Accept()
	if err != nil {
		panic(err)
	}
	read(conn)
}


// Client
package main

import (
	"fmt"
	"net"
	"time"
)

func main() {
	conn, err := net.Dial("tcp", "x.x.x.x:30001")
	if err != nil {
		panic(err)
	}
	data := make([]byte, 300000)
	for {
		s := time.Now().UnixNano()
		n, err := conn.Write(data)
		if err != nil {
			panic(err)
		}
		fmt.Printf("write %d bytes, time:%d ns\n", n, time.Now().UnixNano()-s)
		time.Sleep(time.Second)
	}
}

The output is as following and the speed seems very fast.

write 300000 bytes, time:506510804 ns
write 300000 bytes, time:85992 ns
write 300000 bytes, time:85395 ns
write 300000 bytes, time:94354 ns
write 300000 bytes, time:91790 ns
write 300000 bytes, time:89546 ns
write 300000 bytes, time:94254 ns
write 300000 bytes, time:75717 ns
write 300000 bytes, time:91903 ns
...

It's the test code using the go-libp2p:

// Server
package main

import (
	"context"
	"fmt"
	"io"
	mrand "math/rand"

	libp2p "github.com/libp2p/go-libp2p"
	crypto "github.com/libp2p/go-libp2p-crypto"
	host "github.com/libp2p/go-libp2p-host"
	net "github.com/libp2p/go-libp2p-net"
)

func makeBasicHost(listenPort int) (host.Host, error) {

	priv, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, mrand.New(mrand.NewSource(0)))
	if err != nil {
		return nil, err
	}

	opts := []libp2p.Option{
		libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", listenPort)),
		libp2p.Identity(priv),
	}

	basicHost, err := libp2p.New(context.Background(), opts...)
	if err != nil {
		return nil, err
	}

	fmt.Println("id:", basicHost.ID().Pretty())

	return basicHost, nil
}

func main() {
	ha, err := makeBasicHost(30000)
	if err != nil {
		panic(err)
	}

	ha.SetStreamHandler("test", func(s net.Stream) {
		fmt.Println("Got a new stream!")
		buf := make([]byte, 300000)
		for {
			n, err := io.ReadFull(s, buf)
			if err != nil {
				fmt.Println("read error:", err)
				break
			}
			fmt.Println("read", n, "bytes")
		}
	})
	select {}
}


// Client
package main

import (
	"context"
	"crypto/rand"
	"fmt"
	"time"

	libp2p "github.com/libp2p/go-libp2p"
	crypto "github.com/libp2p/go-libp2p-crypto"
	host "github.com/libp2p/go-libp2p-host"
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
	ma "github.com/multiformats/go-multiaddr"
)

func makeBasicHost(listenPort int) (host.Host, error) {
	priv, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, rand.Reader)
	if err != nil {
		return nil, err
	}

	opts := []libp2p.Option{
		libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),
		libp2p.Identity(priv),
	}

	basicHost, err := libp2p.New(context.Background(), opts...)
	if err != nil {
		return nil, err
	}

	fmt.Println("id: ", basicHost.ID().Pretty())

	return basicHost, nil
}

func main() {

	ha, err := makeBasicHost(30012)
	if err != nil {
		panic(err)
	}

	ipfsaddr, err := ma.NewMultiaddr("/ip4/x.x.x.x/tcp/30000/ipfs/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw")
	if err != nil {
		panic(err)
	}

	pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
	if err != nil {
		panic(err)
	}

	peerid, err := peer.IDB58Decode(pid)
	if err != nil {
		panic(err)
	}

	targetPeerAddr, _ := ma.NewMultiaddr(
		fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
	targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)

	ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)

	s, err := ha.NewStream(context.Background(), peerid, "test")
	if err != nil {
		panic(err)
	}
	fmt.Println("open a stream")

	a := make([]byte, 300000)
	for {
		start := time.Now().UnixNano()
		n, err := s.Write(a)
		if err != nil {
			panic(err)
		}
		fmt.Printf("write %d bytes, time:%d ns\n", n, time.Now().UnixNano()-start)
		time.Sleep(time.Second)
	}

}

The output is as following:

open a stream
write 300000 bytes, time:678537074 ns
write 300000 bytes, time:172239022 ns
write 300000 bytes, time:172187839 ns
write 300000 bytes, time:171730614 ns
write 300000 bytes, time:173011585 ns
write 300000 bytes, time:172056535 ns
write 300000 bytes, time:171954690 ns
write 300000 bytes, time:171592410 ns
write 300000 bytes, time:172407346 ns
...

As you can see, the second test code's performance is much worse.
Maybe I did something wrong or missed something. I hope for your help!

@Stebalien
Copy link
Member

So, you're writing a tiny amount each time and then pausing for a second. The data is probably still sitting in a buffer somewhere when you stop the clock. The difference between libp2p and a raw TCP socket is that libp2p will encrypt and multiplex your data; this will add significant overhead over just dumping to a buffer somewhere.

Try measuring the bandwidth of a sustained read. That is, have one node continuously write to a remote node and then measure the bandwidth on the remote node. Libp2p will still be slower (we have quite a bit more overhead over a simple TCP socket and haven't spent enough time optimizing our stack) but it shouldn't be that slow.

@Stebalien Stebalien added the kind/support A question or request for support label Sep 24, 2018
@ziranliu
Copy link
Author

ziranliu commented Sep 24, 2018

@Stebalien Thanks for your response! I have followed your advice and made more tests.
Firstly I measured the time interval between the read ending and the send starting. As you can see below, I just add one line code respectively and manually calculate the difference between them.

// Server
func read(conn net.Conn) {
	data := make([]byte, 300000)
	for {
		n, err := io.ReadFull(conn, data)
		if err != nil {
			fmt.Println("read error:", err)
			break
		}
		fmt.Println(time.Now().UnixNano())  // just add the line
		fmt.Println("read", n, "bytes")
	}
}

// Client
	for {
		s := time.Now().UnixNano()
		fmt.Println(s)  // just add the line
		n, err := conn.Write(data)
		if err != nil {
			panic(err)
		}
		fmt.Printf("write %d bytes, time:%d ns\n", n, time.Now().UnixNano()-s)
		time.Sleep(time.Second)
	}

Here's the result:

packet size 300KB 3MB
raw tcp 91 ms 280 ms
libp2p 264 ms 2 s
libp2p without encryption 261 ms 2 s

Secondly I measured the bandwidth of a sustained read as you mentioned. I kept the client node continuously writing to the server node and measured the time interval after reading every 3MB data.
Here's the result:

time
raw tcp 354 ms
libp2p 2 s
libp2p without encryption 2 s

The libp2p is much slower. And the test result is the same as in my production environment(one packet is about 3MB and the latency is 1-2 seconds). It seems that the multiplex has a consumption performance. If that's true, what's the benefit of multiplex and how can I make full use of it? Can you give me some advice of accelerating the data transmission speed?

@Stebalien
Copy link
Member

If that's true, what's the benefit of multiplex and how can I make full use of it?

It allows one to have multiple streams between two endpoints without having their congestion control algorithms fighting eachother. In this case, it won't help you.

This may be ipfs/kubo#4280 but it may also be an issue with the congestion control built into the default stream muxer.

Try using mplex instead of yamux.

  1. Import mplex "github.com/whyrusleeping/go-smux-multiplex".
  2. Muxer("/mplex/6.7.0", mplex.DefaultTransport) as an option to libp2p.New.

This will override the default multiplexers and disable yamux. Mplex is significantly simpler so it may perform better in this instance. If it does, we can see what we can do to improve yamux.


Thanks for taking the time to look into this.

@ziranliu
Copy link
Author

The mplex's performance is very close to the raw TCP socket. I've replaced the default muxer with it in my project and the service now is running smoothly. Thank you very much for the big help!

libp2p is really a great work with which people like me can build a distributed system efficiently without thinking about the annoying network problems. Wish libp2p better and better!

@vyzo
Copy link
Contributor

vyzo commented Sep 24, 2018

This definitely suggests that we should put some effort into measuring and optimizing yamux.

@Stebalien Stebalien added kind/support A question or request for support and removed kind/support A question or request for support labels Sep 25, 2018
@Stebalien
Copy link
Member

Closing as resolved in favor of #435.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/support A question or request for support
Projects
None yet
Development

No branches or pull requests

3 participants