Skip to content

Commit

Permalink
Fix merge conflicts.
Browse files Browse the repository at this point in the history
  • Loading branch information
志宇 committed Mar 4, 2020
2 parents bf93616 + b9ff295 commit 33ec06d
Show file tree
Hide file tree
Showing 45 changed files with 4,602 additions and 52,821 deletions.
13 changes: 11 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@ DOCKER_IMAGE?=skywire-runner # docker image to use for running skywire-visor.`go
DOCKER_NETWORK?=SKYNET
DOCKER_NODE?=SKY01
DOCKER_OPTS?=GO111MODULE=on GOOS=linux # go options for compiling for docker container
TEST_OPTS?=-race -tags no_ci -cover -timeout=5m
TEST_OPTS_NOCI?=-race -cover -timeout=5m -v

GO_VERSION=$(shell go version)
# TODO: Remove after https://github.com/etcd-io/bbolt/pull/201 is closed.
DISABLE_CHECKPTR=-gcflags=all=-d=checkptr=0
OPTIONAL_FLAGS=
ifneq (,$(findstring go1.14,$(GO_VERSION)))
OPTIONAL_FLAGS=$(DISABLE_CHECKPTR)
endif

TEST_OPTS?=-race $(OPTIONAL_FLAGS) -tags no_ci -cover -timeout=5m
TEST_OPTS_NOCI?=-race $(OPTIONAL_FLAGS) -cover -timeout=5m -v

BUILDINFO_PATH := $(PROJECT_BASE)/pkg/util/buildinfo

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ require (
github.com/spf13/cobra v0.0.5
github.com/stretchr/testify v1.4.0
go.etcd.io/bbolt v1.3.3
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d // indirect
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073
golang.org/x/net v0.0.0-20191204025024-5ee1b9f4859a
golang.org/x/sys v0.0.0-20200301040627-c5d0d7b4ec88 // indirect
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 // indirect
)

//replace github.com/SkycoinProject/dmsg => ../dmsg
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d h1:1ZiEyfaQIg3Qh0EoqpwAakHVhecoE5wlSg5GjnafJGw=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 h1:xMPOj6Pz6UipU1wXLkrtqpHbR0AVFnyPEQq/wRWz9lM=
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
Expand Down Expand Up @@ -263,6 +265,8 @@ golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200301040627-c5d0d7b4ec88 h1:LNVdAhESTW4gWDhYvciNcGoS9CEcxRiUKE9kSgw+X3s=
golang.org/x/sys v0.0.0-20200301040627-c5d0d7b4ec88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 h1:uYVVQ9WP/Ds2ROhcaGPeIdVq0RIXVLwsHlnvJ+cT1So=
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
12 changes: 5 additions & 7 deletions pkg/restart/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
)

var (
// ErrAlreadyStarting is returned on starting attempt when starting is in progress.
ErrAlreadyStarting = errors.New("already starting")
// ErrAlreadyStarted is returned when Start is already called.
ErrAlreadyStarted = errors.New("already started")
)

const (
Expand All @@ -28,7 +28,7 @@ type Context struct {
log logrus.FieldLogger
cmd *exec.Cmd
checkDelay time.Duration
isStarting int32
isStarted int32
appendDelay bool // disabled in tests
}

Expand Down Expand Up @@ -71,12 +71,10 @@ func (c *Context) CmdPath() string {

// Start starts a new executable using Context.
func (c *Context) Start() error {
if !atomic.CompareAndSwapInt32(&c.isStarting, 0, 1) {
return ErrAlreadyStarting
if !atomic.CompareAndSwapInt32(&c.isStarted, 0, 1) {
return ErrAlreadyStarted
}

defer atomic.StoreInt32(&c.isStarting, 0)

errCh := c.startExec()

ticker := time.NewTicker(c.checkDelay)
Expand Down
14 changes: 10 additions & 4 deletions pkg/restart/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func TestContext_RegisterLogger(t *testing.T) {
}

func TestContext_Start(t *testing.T) {
cc := CaptureContext()
assert.NotZero(t, len(cc.cmd.Args))

t.Run("executable started", func(t *testing.T) {
cc := CaptureContext()
assert.NotZero(t, len(cc.cmd.Args))

cmd := "touch"
path := "/tmp/test_start"
cc.cmd = exec.Command(cmd, path) // nolint:gosec
Expand All @@ -47,6 +47,9 @@ func TestContext_Start(t *testing.T) {
})

t.Run("bad args", func(t *testing.T) {
cc := CaptureContext()
assert.NotZero(t, len(cc.cmd.Args))

cmd := "bad_command"
cc.cmd = exec.Command(cmd) // nolint:gosec

Expand All @@ -60,6 +63,9 @@ func TestContext_Start(t *testing.T) {
})

t.Run("already starting", func(t *testing.T) {
cc := CaptureContext()
assert.NotZero(t, len(cc.cmd.Args))

cmd := "touch"
path := "/tmp/test_start"
cc.cmd = exec.Command(cmd, path) // nolint:gosec
Expand All @@ -74,7 +80,7 @@ func TestContext_Start(t *testing.T) {
err2 := <-errCh
errors := []error{err1, err2}

assert.Contains(t, errors, ErrAlreadyStarting)
assert.Contains(t, errors, ErrAlreadyStarted)
assert.Contains(t, errors, nil)

assert.NoError(t, os.Remove(path))
Expand Down
24 changes: 19 additions & 5 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (rg *RouteGroup) write(data []byte, tp *transport.ManagedTransport, rule ro

ctx, cancel := context.WithCancel(context.Background())

errCh := rg.writePacketAsync(ctx, tp, packet)
errCh := rg.writePacketAsync(ctx, tp, packet, rule.KeyRouteID())
defer cancel()

select {
Expand All @@ -290,11 +290,12 @@ func (rg *RouteGroup) write(data []byte, tp *transport.ManagedTransport, rule ro
}
}

func (rg *RouteGroup) writePacketAsync(ctx context.Context, tp *transport.ManagedTransport, packet routing.Packet) chan error {
func (rg *RouteGroup) writePacketAsync(ctx context.Context, tp *transport.ManagedTransport, packet routing.Packet,
ruleID routing.RouteID) chan error {
errCh := make(chan error)
go func() {
defer close(errCh)
err := tp.WritePacket(ctx, packet)
err := rg.writePacket(ctx, tp, packet, ruleID)
select {
case <-ctx.Done():
return
Expand All @@ -306,6 +307,19 @@ func (rg *RouteGroup) writePacketAsync(ctx context.Context, tp *transport.Manage
return errCh
}

func (rg *RouteGroup) writePacket(ctx context.Context, tp *transport.ManagedTransport, packet routing.Packet,
ruleID routing.RouteID) error {
err := tp.WritePacket(ctx, packet)
// note equality here. update activity only if there was NO error
if err == nil {
if err := rg.rt.UpdateActivity(ruleID); err != nil {
rg.logger.WithError(err).Errorf("error updating activity of rule %d", ruleID)
}
}

return err
}

// rule fetches first available forward rule.
// NOTE: not thread-safe.
func (rg *RouteGroup) rule() (routing.Rule, error) {
Expand Down Expand Up @@ -374,7 +388,7 @@ func (rg *RouteGroup) sendKeepAlive() error {
}

packet := routing.MakeKeepAlivePacket(rule.NextRouteID())
if err := tp.WritePacket(context.Background(), packet); err != nil {
if err := rg.writePacket(context.Background(), tp, packet, rule.KeyRouteID()); err != nil {
return err
}

Expand Down Expand Up @@ -473,7 +487,7 @@ func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error {
func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) {
for i := 0; i < len(rg.tps); i++ {
packet := routing.MakeClosePacket(rg.fwd[i].NextRouteID(), code)
if err := rg.tps[i].WritePacket(context.Background(), packet); err != nil {
if err := rg.writePacket(context.Background(), rg.tps[i], packet, rg.fwd[i].KeyRouteID()); err != nil {
rg.logger.WithError(err).Errorf("Failed to send close packet to %s", rg.tps[i].Remote())
}
}
Expand Down
71 changes: 50 additions & 21 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,22 @@ func (r *router) SaveRule(rule routing.Rule) error {

// DelRules removes rules associated with `ids` from the routing table.
func (r *router) DelRules(ids []routing.RouteID) {
rules := make([]routing.Rule, 0, len(ids))
for _, id := range ids {
rule, err := r.rt.Rule(id)
if err != nil {
r.logger.WithError(err).Errorf("Failed to get rule with ID %d on rule removal", id)
continue
}

rules = append(rules, rule)
}

r.rt.DelRules(ids)

for _, rule := range rules {
r.removeRouteGroupOfRule(rule)
}
}

func (r *router) rulesGCLoop() {
Expand All @@ -744,32 +759,46 @@ func (r *router) rulesGCLoop() {
}

func (r *router) rulesGC() {
log := r.logger.WithField("_src", "rulesGC")
log := r.logger.WithField("func", "router.rulesGC")

removedRules := r.rt.CollectGarbage()
log.WithField("rules_count", len(removedRules)).
Debug("Removed rules.")

for _, rule := range removedRules {
// we need to process only consume rules, cause we don't
// really care about the other ones, other rules removal
// doesn't affect our work here
if rule.Type() == routing.RuleConsume {
rDesc := rule.RouteDescriptor()
log := log.
WithField("rule_type", rule.Type().String()).
WithField("rule_desc", rDesc.String())
rg, ok := r.popRouteGroup(rDesc)
if !ok {
log.Debug("No route group associated with expired rule. Continuing...")
continue
}
if rg.isClosed() {
log.Debug("Route group already closed. Continuing...")
continue
}
log.WithError(rg.Close()).
Debug("Route group closed.")
}
r.removeRouteGroupOfRule(rule)
}
}

func (r *router) removeRouteGroupOfRule(rule routing.Rule) {
log := r.logger.
WithField("func", "router.removeRouteGroupOfRule").
WithField("rule_type", rule.Type().String()).
WithField("rule_keyRtID", rule.KeyRouteID())

// we need to process only consume rules, cause we don't
// really care about the other ones, other rules removal
// doesn't affect our work here
if rule.Type() != routing.RuleConsume {
log.Debug("Nothing to be done.")
}

rDesc := rule.RouteDescriptor()
log.WithField("rt_desc", rDesc.String()).
Debug("Closing route group associated with rule...")

rg, ok := r.popRouteGroup(rDesc)
if !ok {
log.Debug("No route group associated with expired rule. Nothing to be done.")
return
}
if rg.isClosed() {
log.Debug("Route group already closed. Nothing to be done.")
return
}
if err := rg.Close(); err != nil {
log.WithError(err).Error("Failed to close route group.")
return
}
log.Debug("Route group closed.")
}
26 changes: 22 additions & 4 deletions pkg/util/pathutil/configpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,18 @@ func (dp ConfigPaths) String() string {
if err != nil {
log.Fatalf("cannot marshal default paths: %s", err.Error())
}

return string(raw)
}

// Get obtains a path stored under given configuration location type.
func (dp ConfigPaths) Get(cpType ConfigLocationType) string {
if path, ok := dp[cpType]; ok {
return path
path, ok := dp[cpType]
if !ok {
log.Fatalf("invalid config type '%s' provided. Valid types: %v", cpType, AllConfigLocationTypes())
}
log.Fatalf("invalid config type '%s' provided. Valid types: %v", cpType, AllConfigLocationTypes())
return ""

return path
}

// VisorDefaults returns the default config paths for skywire-visor.
Expand All @@ -77,8 +79,10 @@ func VisorDefaults() ConfigPaths {
if wd, err := os.Getwd(); err == nil {
paths[WorkingDirLoc] = filepath.Join(wd, "skywire-config.json")
}

paths[HomeLoc] = filepath.Join(HomeDir(), ".skycoin/skywire/skywire-config.json")
paths[LocalLoc] = "/usr/local/SkycoinProject/skywire-mainnet/skywire-config.json"

return paths
}

Expand All @@ -88,8 +92,10 @@ func HypervisorDefaults() ConfigPaths {
if wd, err := os.Getwd(); err == nil {
paths[WorkingDirLoc] = filepath.Join(wd, "hypervisor-config.json")
}

paths[HomeLoc] = filepath.Join(HomeDir(), ".skycoin/hypervisor/hypervisor-config.json")
paths[LocalLoc] = "/usr/local/SkycoinProject/hypervisor/hypervisor-config.json"

return paths
}

Expand All @@ -102,29 +108,37 @@ func FindConfigPath(args []string, argsIndex int, env string, defaults ConfigPat
if argsIndex >= 0 && len(args) > argsIndex {
path := args[argsIndex]
log.Infof("using args[%d] as config path: %s", argsIndex, path)

return path
}

if env != "" {
if path, ok := os.LookupEnv(env); ok {
log.Infof("using $%s as config path: %s", env, path)
return path
}
}

log.Debugf("config path is not explicitly specified, trying default paths...")

for i, cpType := range []ConfigLocationType{WorkingDirLoc, HomeLoc, LocalLoc} {
path, ok := defaults[cpType]
if !ok {
continue
}

if _, err := os.Stat(path); err != nil {
log.Debugf("- [%d/%d] '%s' cannot be accessed: %s", i+1, len(defaults), path, err.Error())
} else {
log.Debugf("- [%d/%d] '%s' is found", i+1, len(defaults), path)
log.Printf("using fallback config path: %s", path)

return path
}
}

log.Fatalf("config not found in any of the following paths: %s", defaults.String())

return ""
}

Expand All @@ -136,14 +150,18 @@ func WriteJSONConfig(conf interface{}, output string, replace bool) {
if err != nil {
log.WithError(err).Fatal("unexpected error, report to dev")
}

if _, err := os.Stat(output); !replace && err == nil {
log.Fatalf("file %s already exists, stopping as 'replace,r' flag is not set", output)
}

if err := os.MkdirAll(filepath.Dir(output), 0750); err != nil {
log.WithError(err).Fatalln("failed to create output directory")
}

if err := ioutil.WriteFile(output, raw, 0744); err != nil {
log.WithError(err).Fatalln("failed to write file")
}

log.Infof("Wrote %d bytes to %s\n%s", len(raw), output, string(raw))
}
Loading

0 comments on commit 33ec06d

Please sign in to comment.