Skip to content

Commit

Permalink
Changes:
Browse files Browse the repository at this point in the history
- Fix app stop bug
- Fix app restart bug
- Fix start of 2+ apps bug
- Improve linter
  • Loading branch information
nkryuchkov committed Jan 18, 2020
1 parent d659046 commit 7f84e84
Show file tree
Hide file tree
Showing 21 changed files with 286 additions and 200 deletions.
25 changes: 21 additions & 4 deletions cmd/apps/skysocks/skysocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package main
import (
"flag"
"fmt"
"os"
"os/signal"

"github.com/SkycoinProject/skycoin/src/util/logging"

Expand All @@ -16,16 +18,17 @@ import (
)

const (
appName = "skysocks"
netType = appnet.TypeSkynet
port = routing.Port(3)
appName = "skysocks"
netType = appnet.TypeSkynet
port routing.Port = 3
)

func main() {
log := app.NewLogger(appName)
skysocks.Log = log.PackageLogger("skysocks")

var passcode = flag.String("passcode", "", "Authorize user against this passcode")

flag.Parse()

config, err := app.ClientConfigFromEnv()
Expand All @@ -37,6 +40,7 @@ func main() {
if err != nil {
log.Fatal("Setup failure: ", err)
}

defer func() {
socksApp.Close()
}()
Expand All @@ -53,5 +57,18 @@ func main() {

log.Infoln("Starting serving proxy server")

log.Fatal(srv.Serve(l))
termCh := make(chan os.Signal, 1)
signal.Notify(termCh, os.Interrupt)

go func() {
<-termCh

if err := srv.Close(); err != nil {
log.Fatalf("Failed to close server: %v", err)
}
}()

if err := srv.Serve(l); err != nil {
log.Fatal(err)
}
}
93 changes: 53 additions & 40 deletions internal/skysocks/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// Log is skysocks package level logger, it can be replaced with a different one from outside the package
var Log = logging.MustGetLogger("skysocks")
var Log = logging.MustGetLogger("skysocks") // nolint: gochecknoglobals

// Client implement multiplexing proxy client using yamux.
type Client struct {
Expand All @@ -19,20 +19,22 @@ type Client struct {
}

// NewClient constructs a new Client.
func NewClient(conn net.Conn) (*Client, error) {
func NewClient(conn io.ReadWriteCloser) (*Client, error) {
session, err := yamux.Client(conn, nil)
if err != nil {
return nil, fmt.Errorf("error creating client: yamux: %s", err)
}

return &Client{session: session}, nil
c := &Client{
session: session,
}

return c, nil
}

// ListenAndServe start tcp listener on addr and proxies incoming
// connection to a remote proxy server.
func (c *Client) ListenAndServe(addr string) error {
var err error

l, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("listen: %s", err)
Expand All @@ -41,6 +43,7 @@ func (c *Client) ListenAndServe(addr string) error {
Log.Printf("Listening skysocks client on %s", addr)

c.listener = l

for {
conn, err := l.Accept()
if err != nil {
Expand All @@ -49,6 +52,7 @@ func (c *Client) ListenAndServe(addr string) error {
}

Log.Println("Accepted skysocks client")

stream, err := c.session.Open()
if err != nil {
return fmt.Errorf("error on `ListenAndServe`: yamux: %s", err)
Expand All @@ -57,51 +61,60 @@ func (c *Client) ListenAndServe(addr string) error {
Log.Println("Opened session skysocks client")

go func() {
errCh := make(chan error, 2)

go func() {
_, err := io.Copy(stream, conn)
errCh <- err
}()

go func() {
_, err := io.Copy(conn, stream)
errCh <- err
}()

var connClosed, streamClosed bool
for err := range errCh {
if !connClosed {
if err := conn.Close(); err != nil {
Log.WithError(err).Warn("Failed to close connection")
}

connClosed = true
}

if !streamClosed {
if err := stream.Close(); err != nil {
Log.WithError(err).Warn("Failed to close stream")
}

streamClosed = true
}

if err != nil {
Log.Error("Copy error:", err)
}
c.handleStream(conn, stream)
}()
}
}

func (c *Client) handleStream(conn, stream net.Conn) {
const errorCount = 2

errCh := make(chan error, errorCount)

go func() {
_, err := io.Copy(stream, conn)
errCh <- err
}()

go func() {
_, err := io.Copy(conn, stream)
errCh <- err
}()

var connClosed, streamClosed bool

for err := range errCh {
if !connClosed {
if err := conn.Close(); err != nil {
Log.WithError(err).Warn("Failed to close connection")
}

close(errCh)
}()
connClosed = true
}

if !streamClosed {
if err := stream.Close(); err != nil {
Log.WithError(err).Warn("Failed to close stream")
}

streamClosed = true
}

if err != nil {
Log.Error("Copy error:", err)
}
}

close(errCh)
}

// Close implement io.Closer.
func (c *Client) Close() error {
Log.Infoln("Closing proxy client")

if c == nil {
return nil
}

return c.listener.Close()
}
23 changes: 23 additions & 0 deletions internal/skysocks/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package skysocks
import (
"fmt"
"net"
"sync/atomic"

"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/armon/go-socks5"
Expand All @@ -14,6 +15,7 @@ type Server struct {
socks *socks5.Server
listener net.Listener
log *logging.MasterLogger
closed uint32
}

// NewServer constructs a new Server.
Expand All @@ -35,9 +37,18 @@ func NewServer(passcode string, l *logging.MasterLogger) (*Server, error) {
// the incoming connections.
func (s *Server) Serve(l net.Listener) error {
s.listener = l

for {
if s.isClosed() {
return nil
}

conn, err := l.Accept()
if err != nil {
if s.isClosed() {
return nil
}

return fmt.Errorf("accept: %s", err)
}

Expand All @@ -59,14 +70,26 @@ func (s *Server) Close() error {
if s == nil {
return nil
}

s.close()

return s.listener.Close()
}

func (s *Server) close() {
atomic.StoreUint32(&s.closed, 1)
}

func (s *Server) isClosed() bool {
return atomic.LoadUint32(&s.closed) != 0
}

type passcodeCredentials string

func (s passcodeCredentials) Valid(user, password string) bool {
if len(s) == 0 {
return true
}

return user == string(s) || password == string(s)
}
9 changes: 7 additions & 2 deletions internal/skysocks/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestMain(m *testing.M) {
if err != nil {
Log.Fatal(err)
}

logging.SetLevel(lvl)
} else {
logging.Disable()
Expand All @@ -40,11 +41,14 @@ func TestProxy(t *testing.T) {
require.NoError(t, err)

errChan := make(chan error)

go func() {
errChan <- srv.Serve(l)
}()

time.Sleep(100 * time.Millisecond)
const delay = 100 * time.Millisecond

time.Sleep(delay)

conn, err := net.Dial("tcp", l.Addr().String())
require.NoError(t, err)
Expand All @@ -53,11 +57,12 @@ func TestProxy(t *testing.T) {
require.NoError(t, err)

errChan2 := make(chan error)

go func() {
errChan2 <- client.ListenAndServe(":10080")
}()

time.Sleep(100 * time.Millisecond)
time.Sleep(delay)

proxyDial, err := proxy.SOCKS5("tcp", ":10080", nil, proxy.Direct)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/app/appnet/addr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func TestConvertAddr(t *testing.T) {
}

pk, _ := cipher.GenerateKeyPair()
port := uint16(100)

const port uint16 = 100

tt := []struct {
name string
Expand Down
3 changes: 2 additions & 1 deletion pkg/app/appnet/dmsg_networker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (n *DmsgNetworker) DialContext(ctx context.Context, addr Addr) (net.Conn, e
PK: addr.PubKey,
Port: uint16(addr.Port),
}

return n.dmsgC.Dial(ctx, remote)
}

Expand All @@ -39,6 +40,6 @@ func (n *DmsgNetworker) Listen(addr Addr) (net.Listener, error) {
}

// ListenContext starts listening on local `addr` in the dmsg network with context.
func (n *DmsgNetworker) ListenContext(ctx context.Context, addr Addr) (net.Listener, error) {
func (n *DmsgNetworker) ListenContext(_ context.Context, addr Addr) (net.Listener, error) {
return n.dmsgC.Listen(uint16(addr.Port))
}
8 changes: 0 additions & 8 deletions pkg/app/appnet/errors.go

This file was deleted.

1 change: 1 addition & 0 deletions pkg/app/appnet/networker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
ErrNetworkerAlreadyExists = errors.New("networker already exists")
)

// nolint: gochecknoglobals
var (
networkers = make(map[Type]Networker)
networkersMx sync.RWMutex
Expand Down
3 changes: 2 additions & 1 deletion pkg/app/appnet/networker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func TestListen(t *testing.T) {

func prepAddr() Addr {
addrPK, _ := cipher.GenerateKeyPair()
addrPort := routing.Port(100)

const addrPort routing.Port = 100

return Addr{
Net: TypeDmsg,
Expand Down
9 changes: 8 additions & 1 deletion pkg/app/appnet/skywire_networker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
)

var (
// ErrPortAlreadyBound is being returned when the desired port is already bound to.
ErrPortAlreadyBound = errors.New("port already bound")
)

// SkywireNetworker implements `Networker` for skynet.
type SkywireNetworker struct {
log *logging.Logger
Expand Down Expand Up @@ -62,10 +67,12 @@ func (r *SkywireNetworker) Listen(addr Addr) (net.Listener, error) {

// ListenContext starts listening on local `addr` in the skynet with context.
func (r *SkywireNetworker) ListenContext(ctx context.Context, addr Addr) (net.Listener, error) {
const bufSize = 1000000

lis := &skywireListener{
addr: addr,
// TODO: pass buf size
connsCh: make(chan net.Conn, 1000000),
connsCh: make(chan net.Conn, bufSize),
freePort: nil,
}

Expand Down
1 change: 1 addition & 0 deletions pkg/app/appnet/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func (n Type) IsValid() bool {
return ok
}

// nolint: gochecknoglobals
var (
validNetworks = map[Type]struct{}{
TypeDmsg: {},
Expand Down
Loading

0 comments on commit 7f84e84

Please sign in to comment.