Skip to content

Commit

Permalink
Merge pull request ethereum#206 from ethersphere/swarm-network-rewrit…
Browse files Browse the repository at this point in the history
…e-syncer-refactor

Swarm network rewrite syncer refactor
  • Loading branch information
gbalint authored Jan 22, 2018
2 parents 4e032ae + 98ba78c commit 111b53d
Show file tree
Hide file tree
Showing 28 changed files with 2,371 additions and 1,844 deletions.
2 changes: 1 addition & 1 deletion p2p/simulations/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
MaxPeers: math.MaxInt32,
NoDiscovery: true,
Dialer: s,
EnableMsgEvents: true,
EnableMsgEvents: false,
},
NoUSB: true,
Logger: log.New("node.id", id.String()),
Expand Down
8 changes: 4 additions & 4 deletions swarm/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (

// discovery bzz extension for requesting and relaying node address records

// discPeer wraps bzzPeer and embeds an Overlay connectivity driver
// discPeer wraps BzzPeer and embeds an Overlay connectivity driver
type discPeer struct {
*bzzPeer
*BzzPeer
overlay Overlay
sentPeers bool // whether we already sent peer closer to this address
mtx sync.Mutex
Expand All @@ -36,10 +36,10 @@ type discPeer struct {
}

// NewDiscovery constructs a discovery peer
func newDiscovery(p *bzzPeer, o Overlay) *discPeer {
func newDiscovery(p *BzzPeer, o Overlay) *discPeer {
d := &discPeer{
overlay: o,
bzzPeer: p,
BzzPeer: p,
peers: make(map[string]bool),
}
// record remote as seen so we never send a peer its own record
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestDiscovery(t *testing.T) {
addr := RandomAddr()
to := NewKademlia(addr.OAddr, NewKadParams())

run := func(p *bzzPeer) error {
run := func(p *BzzPeer) error {
dp := newDiscovery(p, to)
to.On(p)
defer to.Off(p)
Expand Down
4 changes: 2 additions & 2 deletions swarm/network/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (h *Hive) connect() {
}

// Run protocol run function
func (h *Hive) Run(p *bzzPeer) error {
func (h *Hive) Run(p *BzzPeer) error {
dp := newDiscovery(p, h)
depth, changed := h.On(dp)
// if we want discovery, advertise changed depth of depth
Expand Down Expand Up @@ -191,7 +191,7 @@ func ToAddr(pa OverlayPeer) *BzzAddr {
if p, ok := pa.(*discPeer); ok {
return p.BzzAddr
}
return pa.(*bzzPeer).BzzAddr
return pa.(*BzzPeer).BzzAddr
}

// loadPeers, savePeer implement persistence callback/
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func newTestKademlia(b string) *testKademlia {
}

func (k *testKademlia) newTestKadPeer(s string) Peer {
return &testDropPeer{&bzzPeer{BzzAddr: testKadPeerAddr(s)}, k.dropc}
return &testDropPeer{&BzzPeer{BzzAddr: testKadPeerAddr(s)}, k.dropc}
}

func (k *testKademlia) On(ons ...string) *testKademlia {
Expand Down
55 changes: 28 additions & 27 deletions swarm/network/lightnode.go → swarm/network/light/lightnode.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017 The go-ethereum Authors
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.d
//
// The go-ethereum library is free software: you can redistribute it and/or modify
Expand All @@ -14,17 +14,18 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package network
package light

import (
"errors"

"github.com/ethereum/go-ethereum/swarm/network/stream"
"github.com/ethereum/go-ethereum/swarm/storage"
)

// RemoteReader implements IncomingStreamer
type RemoteSectionReader struct {
db *DbAccess
db *storage.DBAPI
start uint64
end uint64
hashes chan []byte
Expand All @@ -35,7 +36,7 @@ type RemoteSectionReader struct {
}

// NewRemoteReader is the constructor for RemoteReader
func NewRemoteSectionReader(root []byte, db *DbAccess) *RemoteSectionReader {
func NewRemoteSectionReader(root []byte, db *storage.DBAPI) *RemoteSectionReader {
return &RemoteSectionReader{
db: db,
root: root,
Expand All @@ -45,7 +46,7 @@ func NewRemoteSectionReader(root []byte, db *DbAccess) *RemoteSectionReader {
}

func (r *RemoteSectionReader) NeedData(key []byte) func() {
chunk, created := r.db.getOrCreateRequest(storage.Key(key))
chunk, created := r.db.GetOrCreateRequest(storage.Key(key))
// TODO: we may want to request from this peer anyway even if the request exists
if chunk.ReqC == nil || !created {
return nil
Expand All @@ -58,7 +59,7 @@ func (r *RemoteSectionReader) NeedData(key []byte) func() {
}
}

func (r *RemoteSectionReader) BatchDone(s string, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error) {
func (r *RemoteSectionReader) BatchDone(s string, from uint64, hashes []byte, root []byte) func() (*stream.TakeoverProof, error) {
r.hashes <- hashes
return nil
}
Expand All @@ -75,9 +76,9 @@ func (r *RemoteSectionReader) Read(b []byte) (n int64, err error) {
return l, nil
}
var end bool
for i := 0; !end && i < len(r.currentHashes); i += HashSize {
hash := r.currentHashes[i : i+HashSize]
chunk, err := r.db.get(hash)
for i := 0; !end && i < len(r.currentHashes); i += stream.HashSize {
hash := r.currentHashes[i : i+stream.HashSize]
chunk, err := r.db.Get(hash)
if err != nil {
return n, err
}
Expand All @@ -96,9 +97,9 @@ func (r *RemoteSectionReader) Read(b []byte) (n int64, err error) {
return n, errors.New("aborted")
case hashes := <-r.hashes:
var i int
for ; !end && i < len(hashes); i += HashSize {
hash := hashes[i : i+HashSize]
chunk, err := r.db.get(hash)
for ; !end && i < len(hashes); i += stream.HashSize {
hash := hashes[i : i+stream.HashSize]
chunk, err := r.db.Get(hash)
if err != nil {
return n, err
}
Expand All @@ -120,12 +121,12 @@ func (r *RemoteSectionReader) Read(b []byte) (n int64, err error) {
type RemoteSectionServer struct {
// quit chan struct{}
root []byte
db *DbAccess
db *storage.DBAPI
r *storage.LazyChunkReader
}

// NewRemoteReader is the constructor for RemoteReader
func NewRemoteSectionServer(db *DbAccess, r *storage.LazyChunkReader) *RemoteSectionServer {
func NewRemoteSectionServer(db *storage.DBAPI, r *storage.LazyChunkReader) *RemoteSectionServer {
return &RemoteSectionServer{
db: db,
r: r,
Expand All @@ -134,51 +135,51 @@ func NewRemoteSectionServer(db *DbAccess, r *storage.LazyChunkReader) *RemoteSec

// GetData retrieves the actual chunk from localstore
func (s *RemoteSectionServer) GetData(key []byte) []byte {
chunk, err := s.db.get(storage.Key(key))
chunk, err := s.db.Get(storage.Key(key))
if err != nil {
return nil
}
return chunk.SData
}

// GetBatch retrieves the next batch of hashes from the dbstore
func (s *RemoteSectionServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
if to > from+batchSize {
to = from + batchSize
func (s *RemoteSectionServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *stream.HandoverProof, error) {
if to > from+stream.BatchSize {
to = from + stream.BatchSize
}
batch := make([]byte, (to-from)*HashSize)
batch := make([]byte, (to-from)*stream.HashSize)
s.r.ReadAt(batch, int64(from))
return batch, from, to, nil, nil
}

// RegisterRemoteSectionReader registers RemoteSectionReader on light downstream node
func RegisterRemoteSectionReader(s *Streamer, db *DbAccess) {
s.RegisterIncomingStreamer("REMOTE_SECTION", func(p *StreamerPeer, t []byte) (IncomingStreamer, error) {
func RegisterRemoteSectionReader(s *stream.Registry, db *storage.DBAPI) {
s.RegisterClientFunc("REMOTE_SECTION", func(p *stream.Peer, t []byte) (stream.Client, error) {
return NewRemoteSectionReader(t, db), nil
})
}

// RegisterRemoteSectionServer registers RemoteSectionServer outgoing streamer on
// upstream light server node
func RegisterRemoteSectionServer(s *Streamer, db *DbAccess, rf func([]byte) *storage.LazyChunkReader) {
s.RegisterOutgoingStreamer("REMOTE_SECTION", func(p *StreamerPeer, t []byte) (OutgoingStreamer, error) {
func RegisterRemoteSectionServer(s *stream.Registry, db *storage.DBAPI, rf func([]byte) *storage.LazyChunkReader) {
s.RegisterServerFunc("REMOTE_SECTION", func(p *stream.Peer, t []byte) (stream.Server, error) {
r := rf(t)
return NewRemoteSectionServer(db, r), nil
})
}

// RegisterRemoteDownloader registers RemoteDownloader incoming streamer
// on downstream light node
// func RegisterRemoteDownloader(s *Streamer, db *DbAccess) {
// s.RegisterIncomingStreamer("REMOTE_DOWNLOADER", func(p *StreamerPeer, t []byte) (IncomingStreamer, error) {
// func RegisterRemoteDownloader(s *Streamer, db *storage.DBAPI) {
// s.RegisterIncomingStreamer("REMOTE_DOWNLOADER", func(p *stream.Peer, t []byte) (IncomingStreamer, error) {
// return NewRemoteDownloader(t, db), nil
// })
// }
//
// // RegisterRemoteDownloadServer registers RemoteDownloadServer outgoing streamer on
// // upstream light server node
// func RegisterRemoteDownloadServer(s *Streamer, db *DbAccess, rf func([]byte) *storage.LazyChunkReader) {
// s.RegisterOutgoingStreamer("REMOTE_DOWNLOADER", func(p *StreamerPeer, t []byte) (OutgoingStreamer, error) {
// func RegisterRemoteDownloadServer(s *Streamer, db *storage.DBAPI, rf func([]byte) *storage.LazyChunkReader) {
// s.RegisterOutgoingStreamer("REMOTE_DOWNLOADER", func(p *stream.Peer, t []byte) (OutgoingStreamer, error) {
// r := rf(t)
// return NewRemoteDownloadServer(db, r), nil
// })
Expand Down
43 changes: 19 additions & 24 deletions swarm/network/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ type BzzConfig struct {

// Bzz is the swarm protocol bundle
type Bzz struct {
Streamer *Streamer
*Hive
localAddr *BzzAddr
mtx sync.Mutex
Expand All @@ -115,9 +114,8 @@ type Bzz struct {
// * bzz config
// * overlay driver
// * peer store
func NewBzz(config *BzzConfig, kad Overlay, store StateStore, streamer *Streamer) *Bzz {
func NewBzz(config *BzzConfig, kad Overlay, store StateStore) *Bzz {
return &Bzz{
Streamer: streamer,
Hive: NewHive(config.HiveParams, kad, store),
localAddr: &BzzAddr{config.OverlayAddr, config.UnderlayAddr},
handshakes: make(map[discover.NodeID]*HandshakeMsg),
Expand All @@ -143,7 +141,7 @@ func (b *Bzz) NodeInfo() interface{} {
// * handshake/hive
// * discovery
func (b *Bzz) Protocols() []p2p.Protocol {
protocols := []p2p.Protocol{
return []p2p.Protocol{
{
Name: BzzSpec.Name,
Version: BzzSpec.Version,
Expand All @@ -160,17 +158,6 @@ func (b *Bzz) Protocols() []p2p.Protocol {
PeerInfo: b.Hive.PeerInfo,
},
}
if b.Streamer != nil {
protocols = append(protocols, p2p.Protocol{
Name: StreamerSpec.Name,
Version: StreamerSpec.Version,
Length: StreamerSpec.Length(),
Run: b.RunProtocol(StreamerSpec, b.Streamer.Run),
NodeInfo: b.Streamer.NodeInfo,
PeerInfo: b.Streamer.PeerInfo,
})
}
return protocols
}

// APIs returns the APIs offered by bzz
Expand All @@ -188,12 +175,12 @@ func (b *Bzz) APIs() []rpc.API {
// returns a p2p protocol run function that can be assigned to p2p.Protocol#Run field
// arguments:
// * p2p protocol spec
// * run function taking bzzPeer as argument
// * run function taking BzzPeer as argument
// this run function is meant to block for the duration of the protocol session
// on return the session is terminated and the peer is disconnected
// the protocol waits for the bzz handshake is negotiated
// the overlay address on the bzzPeer is set from the remote handshake
func (b *Bzz) RunProtocol(spec *protocols.Spec, run func(*bzzPeer) error) func(*p2p.Peer, p2p.MsgReadWriter) error {
// the overlay address on the BzzPeer is set from the remote handshake
func (b *Bzz) RunProtocol(spec *protocols.Spec, run func(*BzzPeer) error) func(*p2p.Peer, p2p.MsgReadWriter) error {
return func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
// wait for the bzz protocol to perform the handshake
handshake, _ := b.GetHandshake(p.ID())
Expand All @@ -206,8 +193,8 @@ func (b *Bzz) RunProtocol(spec *protocols.Spec, run func(*bzzPeer) error) func(*
if handshake.err != nil {
return fmt.Errorf("%08x: %s protocol closed: %v", b.BaseAddr()[:4], spec.Name, handshake.err)
}
// the handshake has succeeded so construct the bzzPeer and run the protocol
peer := &bzzPeer{
// the handshake has succeeded so construct the BzzPeer and run the protocol
peer := &BzzPeer{
Peer: protocols.NewPeer(p, rw, spec),
localAddr: b.localAddr,
BzzAddr: handshake.peerAddr,
Expand Down Expand Up @@ -257,22 +244,30 @@ func (b *Bzz) runBzz(p *p2p.Peer, rw p2p.MsgReadWriter) error {
return errors.New("received multiple handshakes")
}

// bzzPeer is the bzz protocol view of a protocols.Peer (itself an extension of p2p.Peer)
// BzzPeer is the bzz protocol view of a protocols.Peer (itself an extension of p2p.Peer)
// implements the Peer interface and all interfaces Peer implements: Addr, OverlayPeer
type bzzPeer struct {
type BzzPeer struct {
*protocols.Peer // represents the connection for online peers
localAddr *BzzAddr // local Peers address
*BzzAddr // remote address -> implements Addr interface = protocols.Peer
lastActive time.Time // time is updated whenever mutexes are releasing
}

func NewBzzTestPeer(p *protocols.Peer, addr *BzzAddr) *BzzPeer {
return &BzzPeer{
Peer: p,
localAddr: addr,
BzzAddr: NewAddrFromNodeID(p.ID()),
}
}

// Off returns the overlay peer record for offline persistance
func (p *bzzPeer) Off() OverlayAddr {
func (p *BzzPeer) Off() OverlayAddr {
return p.BzzAddr
}

// LastActive returns the time the peer was last active
func (p *bzzPeer) LastActive() time.Time {
func (p *BzzPeer) LastActive() time.Time {
return p.lastActive
}

Expand Down
21 changes: 17 additions & 4 deletions swarm/network/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,29 @@
package network

import (
"flag"
"fmt"
"os"
"sync"
"testing"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/protocols"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
)

var (
adapter = flag.String("adapter", "sim", "type of simulation: sim|socket|exec|docker")
loglevel = flag.Int("loglevel", 2, "verbosity of logs")
)

func init() {
flag.Parse()
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
}

type testStore struct {
sync.Mutex

Expand Down Expand Up @@ -78,16 +91,16 @@ func HandshakeMsgExchange(lhs, rhs *HandshakeMsg, id discover.NodeID) []p2ptest.
}
}

func newBzzBaseTester(t *testing.T, n int, addr *BzzAddr, spec *protocols.Spec, run func(*bzzPeer) error) *bzzTester {
func newBzzBaseTester(t *testing.T, n int, addr *BzzAddr, spec *protocols.Spec, run func(*BzzPeer) error) *bzzTester {
cs := make(map[string]chan bool)

srv := func(p *bzzPeer) error {
srv := func(p *BzzPeer) error {
defer close(cs[p.ID().String()])
return run(p)
}

protocall := func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
return srv(&bzzPeer{
return srv(&BzzPeer{
Peer: protocols.NewPeer(p, rw, spec),
localAddr: addr,
BzzAddr: NewAddrFromNodeID(p.ID()),
Expand Down Expand Up @@ -115,7 +128,7 @@ type bzzTester struct {

func newBzzTester(t *testing.T, n int, addr *BzzAddr, pp *p2ptest.TestPeerPool, spec *protocols.Spec, services func(Peer) error) *bzzTester {

extraservices := func(p *bzzPeer) error {
extraservices := func(p *BzzPeer) error {
pp.Add(p)
defer pp.Remove(p)
if services == nil {
Expand Down
Loading

0 comments on commit 111b53d

Please sign in to comment.