Skip to content

Commit

Permalink
Fix issue skycoin#360
Browse files Browse the repository at this point in the history
  • Loading branch information
iketheadore committed Jul 8, 2017
1 parent 1375dd8 commit 73f2e90
Show file tree
Hide file tree
Showing 17 changed files with 501 additions and 340 deletions.
26 changes: 21 additions & 5 deletions cmd/skycoin/skycoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,11 +534,16 @@ func Run(c *Config) {
return
}

go d.Run(quit)
errC := make(chan error, 1)

go func() {
errC <- d.Run()
}()

var rpc *webrpc.WebRPC
// start the webrpc
if c.RPCInterface {
rpc, err := webrpc.New(
rpc, err = webrpc.New(
fmt.Sprintf("%v:%v", c.RPCInterfaceAddr, c.RPCInterfacePort),
webrpc.ChanBuffSize(1000),
webrpc.ThreadNum(c.RPCThreadNum),
Expand All @@ -548,7 +553,9 @@ func Run(c *Config) {
return
}

go rpc.Run(quit)
go func() {
errC <- rpc.Run()
}()
}

// Debug only - forces connection on start. Violates thread safety.
Expand Down Expand Up @@ -623,9 +630,18 @@ func Run(c *Config) {
}
*/

<-quit
select {
case <-quit:
case err := <-errC:
logger.Error("%v", err)
}

logger.Info("Shutting down...")

if rpc != nil {
rpc.Shutdown()
}

logger.Info("Shutting down")
gui.Shutdown()
d.Shutdown()
logger.Info("Goodbye")
Expand Down
13 changes: 11 additions & 2 deletions src/api/webrpc/gatewayer_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (m *GatewayerMock) GetTransaction(p0 cipher.SHA256) (*visor.Transaction, er
}

// GetUnspentOutputs mocked method
func (m *GatewayerMock) GetUnspentOutputs(p0 ...daemon.OutputsFilter) visor.ReadableOutputSet {
func (m *GatewayerMock) GetUnspentOutputs(p0 ...daemon.OutputsFilter) (visor.ReadableOutputSet, error) {

ret := m.Called(p0)

Expand All @@ -165,7 +165,16 @@ func (m *GatewayerMock) GetUnspentOutputs(p0 ...daemon.OutputsFilter) visor.Read
panic(fmt.Sprintf("unexpected type: %v", res))
}

return r0
var r1 error
switch res := ret.Get(1).(type) {
case nil:
case error:
r1 = res
default:
panic(fmt.Sprintf("unexpected type: %v", res))
}

return r0, r1

}

Expand Down
36 changes: 18 additions & 18 deletions src/api/webrpc/webrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ type WebRPC struct {
mux *http.ServeMux
handlers map[string]HandlerFunc
gateway Gatewayer
listener net.Listener
quit chan struct{}
}

// Option is the argument type for creating webrpc instance.
Expand All @@ -126,6 +128,7 @@ type Option func(*WebRPC)
func New(addr string, ops ...Option) (*WebRPC, error) {
rpc := &WebRPC{
addr: addr,
quit: make(chan struct{}),
}

for _, opt := range ops {
Expand Down Expand Up @@ -176,40 +179,37 @@ func (rpc *WebRPC) initHandlers() error {
}

// Run starts the webrpc service.
func (rpc *WebRPC) Run(quit chan struct{}) {
func (rpc *WebRPC) Run() error {
logger.Infof("start webrpc on http://%s", rpc.addr)
defer logger.Info("webrpc service closed")

l, err := net.Listen("tcp", rpc.addr)
if err != nil {
logger.Error("%v", err)
close(quit)
return
return err
}

c := make(chan struct{})
q := make(chan struct{}, 1)
rpc.listener = l

errC := make(chan error, 1)
go func() {
if err := http.Serve(l, rpc); err != nil {
select {
case <-c:
case <-rpc.quit:
return
default:
// the webrpc service failed unexpectly, notify the
logger.Error("%v", err)
q <- struct{}{}
errC <- err
}
}
}()

select {
case <-quit:
close(c)
l.Close()
case <-q:
close(quit)
}
logger.Info("webrpc quit")
return
return <-errC
}

// Shutdown close the webrpc service
func (rpc *WebRPC) Shutdown() {
close(rpc.quit)
rpc.listener.Close()
}

// HandleFunc registers handler function
Expand Down
22 changes: 14 additions & 8 deletions src/api/webrpc/webrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"net/http/httptest"
"testing"

"time"

"github.com/skycoin/skycoin/src/cipher"
"github.com/skycoin/skycoin/src/coin"
"github.com/skycoin/skycoin/src/daemon"
Expand All @@ -17,10 +19,9 @@ import (

func setup() (*WebRPC, func()) {
c := make(chan struct{})
f := func() {
close(c)
}

rpc, err := New(
"0.0.0.0:8081",
ChanBuffSize(1),
ThreadNum(1),
Gateway(&fakeGateway{}),
Expand All @@ -29,7 +30,9 @@ func setup() (*WebRPC, func()) {
panic(err)
}

return rpc, f
return rpc, func() {
rpc.Shutdown()
}
}

type fakeGateway struct {
Expand Down Expand Up @@ -65,14 +68,14 @@ func (fg fakeGateway) GetBlocksInDepth(vs []uint64) *visor.ReadableBlocks {
return nil
}

func (fg fakeGateway) GetUnspentOutputs(filters ...daemon.OutputsFilter) visor.ReadableOutputSet {
func (fg fakeGateway) GetUnspentOutputs(filters ...daemon.OutputsFilter) (visor.ReadableOutputSet, error) {
v := decodeOutputStr(outputStr)
for _, f := range filters {
v.HeadOutputs = f(v.HeadOutputs)
v.OutgoingOutputs = f(v.OutgoingOutputs)
v.IncommingOutputs = f(v.IncommingOutputs)
}
return v
return v, nil
}

func (fg fakeGateway) GetTransaction(txid cipher.SHA256) (*visor.Transaction, error) {
Expand Down Expand Up @@ -100,15 +103,15 @@ func (fg fakeGateway) GetTimeNow() uint64 {
}

func TestNewWebRPC(t *testing.T) {
rpc1, err := New(ChanBuffSize(1), ThreadNum(1), Gateway(&fakeGateway{}), Quit(make(chan struct{})))
rpc1, err := New("0.0.0.0:8080", ChanBuffSize(1), ThreadNum(1), Gateway(&fakeGateway{}), Quit(make(chan struct{})))
assert.Nil(t, err)
assert.NotNil(t, rpc1.mux)
assert.NotNil(t, rpc1.handlers)
assert.NotNil(t, rpc1.gateway)
}

func Test_rpcHandler_HandlerFunc(t *testing.T) {
rpc, err := New(ChanBuffSize(1), ThreadNum(1), Gateway(&fakeGateway{}), Quit(make(chan struct{})))
rpc, err := New("0.0.0.0:8080", ChanBuffSize(1), ThreadNum(1), Gateway(&fakeGateway{}), Quit(make(chan struct{})))
assert.Nil(t, err)
rpc.HandleFunc("get_status", getStatusHandler)
err = rpc.HandleFunc("get_status", getStatusHandler)
Expand All @@ -118,6 +121,9 @@ func Test_rpcHandler_HandlerFunc(t *testing.T) {
func Test_rpcHandler_Handler(t *testing.T) {
rpc, teardown := setup()
defer teardown()
go rpc.Run()

time.Sleep(50 * time.Millisecond)

type args struct {
httpMethod string
Expand Down
84 changes: 62 additions & 22 deletions src/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ type Daemon struct {
ipCounts *IPCount
// Message handling queue
messageEvents chan MessageEvent
// quit channel
quitC chan chan struct{}
}

// NewDaemon returns a Daemon with primitives allocated
Expand Down Expand Up @@ -241,8 +243,8 @@ func NewDaemon(config Config) (*Daemon, error) {
connectionErrors: make(chan ConnectionError, config.Daemon.OutgoingMax),
outgoingConnections: NewOutgoingConnections(config.Daemon.OutgoingMax),
pendingConnections: NewPendingConnections(config.Daemon.PendingMax),
messageEvents: make(chan MessageEvent,
config.Pool.EventChannelSize),
messageEvents: make(chan MessageEvent, config.Pool.EventChannelSize),
quitC: make(chan chan struct{}),
}

d.Gateway = NewGateway(config.Gateway, d)
Expand Down Expand Up @@ -280,32 +282,36 @@ type MessageEvent struct {
// over the quit channel provided to Init. The Daemon run loop must be stopped
// before calling this function.
func (dm *Daemon) Shutdown() {
// close the daemon loop first
q := make(chan struct{}, 1)
dm.quitC <- q
<-q

dm.Pool.Shutdown()
dm.Peers.Shutdown()
dm.Visor.Shutdown()
}

// Run main loop for peer/connection management. Send anything to quit to shut it
// down
func (dm *Daemon) Run(quit chan struct{}) {
func (dm *Daemon) Run() (err error) {
defer func() {
if r := recover(); r != nil {
logger.Error("recover:%v\n stack:%v", r, string(debug.Stack()))
}

// close quit to notify the caller this daemon running loop is stopped
if quit != nil {
close(quit)
logger.Errorf("recover:%v\n stack:%v", r, string(debug.Stack()))
}
}()

c := make(chan struct{})
errC := make(chan error)

// start visor
go dm.Visor.Run(c)
go func() {
errC <- dm.Visor.Run()
}()

if !dm.Config.DisableIncomingConnections {
go dm.Pool.Run(c)
go func() {
errC <- dm.Pool.Run()
}()
}

// TODO -- run blockchain stuff in its own goroutine
Expand Down Expand Up @@ -335,7 +341,11 @@ func (dm *Daemon) Run(quit chan struct{}) {

for {
select {
case <-c:
case err = <-errC:
return
case qc := <-dm.quitC:
qc <- struct{}{}
logger.Info("Daemon closed")
return
// Remove connections that failed to complete the handshake
case <-cullInvalidTicker:
Expand Down Expand Up @@ -476,7 +486,12 @@ func (dm *Daemon) connectToPeer(p *pex.Peer) error {
return errors.New("Not localhost")
}

if dm.Pool.Pool.IsConnExist(p.Addr) {
conned, err := dm.Pool.Pool.IsConnExist(p.Addr)
if err != nil {
return err
}

if conned {
return errors.New("Already connected")
}

Expand Down Expand Up @@ -570,21 +585,40 @@ func (dm *Daemon) cullInvalidConnections() {
// This method only handles the erroneous people from the DHT, but not
// malicious nodes
now := util.Now()
addrs := dm.expectingIntroductions.CullInvalidConns(func(addr string, t time.Time) bool {
if !dm.Pool.Pool.IsConnExist(addr) {
return true
addrs, err := dm.expectingIntroductions.CullInvalidConns(func(addr string, t time.Time) (bool, error) {
conned, err := dm.Pool.Pool.IsConnExist(addr)
if err != nil {
return false, err
}

if !conned {
return true, nil
}

if t.Add(dm.Config.IntroductionWait).Before(now) {
return true
return true, nil
}
return false
return false, nil
})

if err != nil {
logger.Error("expectingIntroduction cull invalid connections failed: %v", err)
return
}

for _, a := range addrs {
if dm.Pool.Pool.IsConnExist(a) {
exist, err := dm.Pool.Pool.IsConnExist(a)
if err != nil {
logger.Error("%v", err)
return
}

if exist {
logger.Info("Removing %s for not sending a version", a)
dm.Pool.Pool.Disconnect(a, ErrDisconnectIntroductionTimeout)
if err := dm.Pool.Pool.Disconnect(a, ErrDisconnectIntroductionTimeout); err != nil {
logger.Error("%v", err)
return
}
dm.Peers.RemovePeer(a)
}
}
Expand Down Expand Up @@ -633,7 +667,13 @@ func (dm *Daemon) onConnect(e ConnectEvent) {

dm.pendingConnections.Remove(a)

if !dm.Pool.Pool.IsConnExist(a) {
exist, err := dm.Pool.Pool.IsConnExist(a)
if err != nil {
logger.Error("%v", err)
return
}

if !exist {
logger.Warning("While processing an onConnect event, no pool " +
"connection was found")
return
Expand Down
Loading

0 comments on commit 73f2e90

Please sign in to comment.