Skip to content

Commit

Permalink
Merge pull request #811 from i-hate-nicknames/feature/snet-rewrite
Browse files Browse the repository at this point in the history
Feature/snet rewrite
  • Loading branch information
jdknives authored Jul 8, 2021
2 parents a663372 + 6bb7875 commit 4a13052
Show file tree
Hide file tree
Showing 67 changed files with 1,578 additions and 3,636 deletions.
15 changes: 7 additions & 8 deletions cmd/skywire-cli/commands/visor/transports.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import (
"text/tabwriter"
"time"

"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
"github.com/spf13/cobra"

"github.com/skycoin/skywire/cmd/skywire-cli/internal"
"github.com/skycoin/skywire/pkg/snet/directtp/tptypes"
"github.com/skycoin/skywire/pkg/transport/network"
"github.com/skycoin/skywire/pkg/visor"
)

Expand Down Expand Up @@ -113,15 +112,15 @@ var addTpCmd = &cobra.Command{

logger.Infof("Established %v transport to %v", transportType, pk)
} else {
transportTypes := []string{
tptypes.STCP,
tptypes.STCPR,
tptypes.SUDPH,
dmsg.Type,
transportTypes := []network.Type{
network.STCP,
network.STCPR,
network.SUDPH,
network.DMSG,
}

for _, transportType := range transportTypes {
tp, err = rpcClient().AddTransport(pk, transportType, public, timeout)
tp, err = rpcClient().AddTransport(pk, string(transportType), public, timeout)
if err == nil {
logger.Infof("Established %v transport to %v", transportType, pk)
break
Expand Down
26 changes: 26 additions & 0 deletions pkg/app/appevent/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package appevent

import "context"

// SendTCPDial sends tcp dial event
func (eb *Broadcaster) SendTCPDial(ctx context.Context, remoteNet, remoteAddr string) {
data := TCPDialData{RemoteNet: remoteNet, RemoteAddr: remoteAddr}
event := NewEvent(TCPDial, data)
eb.sendEvent(ctx, event)
}

// SendTPClose sends transport close event
func (eb *Broadcaster) SendTPClose(ctx context.Context, netType, addr string) {
data := TCPCloseData{RemoteNet: string(netType), RemoteAddr: addr}
event := NewEvent(TCPClose, data)
if err := eb.Broadcast(context.Background(), event); err != nil {
eb.log.WithError(err).Errorln("Failed to broadcast TCPClose event")
}
}

func (eb *Broadcaster) sendEvent(_ context.Context, event *Event) {
err := eb.Broadcast(context.Background(), event) //nolint:errcheck
if err != nil {
eb.log.Warn("Failed to broadcast event: %v", event)
}
}
6 changes: 3 additions & 3 deletions pkg/app/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/skycoin/skywire/pkg/app/appserver"
"github.com/skycoin/skywire/pkg/app/idmanager"
"github.com/skycoin/skywire/pkg/routing"
"github.com/skycoin/skywire/pkg/snet/snettest"
"github.com/skycoin/skywire/pkg/util/cipherutil"
)

func TestConn_Read(t *testing.T) {
Expand Down Expand Up @@ -175,7 +175,7 @@ func (p *wrappedConn) RemoteAddr() net.Addr {
func TestConn_TestConn(t *testing.T) {
mp := func() (net.Conn, net.Conn, func(), error) {
netType := appnet.TypeSkynet
keys := snettest.GenKeyPairs(2)
keys := cipherutil.GenKeyPairs(2)
fmt.Printf("C1 Local: %s\n", keys[0].PK)
fmt.Printf("C2 Local: %s\n", keys[1].PK)
p1, p2 := net.Pipe()
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestConn_TestConn(t *testing.T) {

rpcS := rpc.NewServer()

appKeys := snettest.GenKeyPairs(2)
appKeys := cipherutil.GenKeyPairs(2)

var (
procKey1 appcommon.ProcKey
Expand Down
42 changes: 42 additions & 0 deletions pkg/dmsgc/dmsgc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package dmsgc

import (
"context"

"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/dmsg/disc"
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/pkg/app/appevent"
)

// DmsgConfig defines config for Dmsg network.
type DmsgConfig struct {
Discovery string `json:"discovery"`
SessionsCount int `json:"sessions_count"`
}

// New makes new dmsg client from configuration
func New(pk cipher.PubKey, sk cipher.SecKey, eb *appevent.Broadcaster, conf *DmsgConfig) *dmsg.Client {
dmsgConf := &dmsg.Config{
MinSessions: conf.SessionsCount,
Callbacks: &dmsg.ClientCallbacks{
OnSessionDial: func(network, addr string) error {
data := appevent.TCPDialData{RemoteNet: network, RemoteAddr: addr}
event := appevent.NewEvent(appevent.TCPDial, data)
_ = eb.Broadcast(context.Background(), event) //nolint:errcheck
// @evanlinjin: An error is not returned here as this will cancel the session dial.
return nil
},
OnSessionDisconnect: func(network, addr string, _ error) {
data := appevent.TCPCloseData{RemoteNet: network, RemoteAddr: addr}
event := appevent.NewEvent(appevent.TCPClose, data)
_ = eb.Broadcast(context.Background(), event) //nolint:errcheck
},
},
}
dmsgC := dmsg.NewClient(pk, sk, disc.NewHTTP(conf.Discovery), dmsgConf)
dmsgC.SetLogger(logging.MustGetLogger("dmsgC"))
return dmsgC
}
Loading

0 comments on commit 4a13052

Please sign in to comment.