Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add dmsgpty-cli to skywire-cli #1250

Merged
merged 6 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions cmd/skywire-cli/commands/dmsgpty/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package dmsgpty

import (
"context"
"fmt"
"net"
"os"
"strconv"
"time"

"github.com/skycoin/dmsg/pkg/dmsgpty"
"github.com/skycoin/skycoin/src/util/logging"
"github.com/spf13/cobra"

"github.com/skycoin/skywire-utilities/pkg/cmdutil"
"github.com/skycoin/skywire/cmd/skywire-cli/internal"
"github.com/skycoin/skywire/pkg/visor"
)

var rpcAddr string
var ptyPort string
var masterLogger = logging.NewMasterLogger()
var packageLogger = masterLogger.PackageLogger("dmsgpty")

func init() {
RootCmd.PersistentFlags().StringVarP(&rpcAddr, "rpc", "", "localhost:3435", "RPC server address")
RootCmd.PersistentFlags().StringVarP(&ptyPort, "port", "p", "22", "port of remote visor dmsgpty")
}

// RootCmd is the command that contains sub-commands which interacts with dmsgpty.
var RootCmd = &cobra.Command{
Use: "dmsgpty",
Short: "Some simple commands of dmsgpty to remote visor",
}

func init() {
RootCmd.AddCommand(
listOfVisors,
executeCommand,
)
}

var listOfVisors = &cobra.Command{
Use: "list",
Short: "fetch list of connected visor's PKs",
Run: func(_ *cobra.Command, _ []string) {
remoteVisors := rpcClient().RemoteVisors()
var msg string
for idx, pk := range remoteVisors {
msg += fmt.Sprintf("%d. %s\n", idx+1, pk)
}
if _, err := os.Stdout.Write([]byte(msg)); err != nil {
packageLogger.Fatal("Failed to output build info:", err)
}
},
}

var executeCommand = &cobra.Command{
Use: "start <pk>",
Short: "start dmsgpty for specific visor by its dmsg address pk:port",
Args: cobra.MinimumNArgs(1),
RunE: func(_ *cobra.Command, args []string) error {
cli := dmsgpty.DefaultCLI()
addr := internal.ParsePK("pk", args[0])
port, _ := strconv.ParseUint(ptyPort, 10, 16) //nolint
ctx, cancel := cmdutil.SignalContext(context.Background(), nil)
defer cancel()
return cli.StartRemotePty(ctx, addr, uint16(port), dmsgpty.DefaultCmd)
},
}

func rpcClient() visor.API {
const rpcDialTimeout = time.Second * 5
conn, err := net.DialTimeout("tcp", rpcAddr, rpcDialTimeout)
if err != nil {
packageLogger.Fatal("RPC connection failed:", err)
}
return visor.NewRPCClient(packageLogger, conn, visor.RPCPrefix, 0)
}
2 changes: 2 additions & 0 deletions cmd/skywire-cli/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/skycoin/skywire/cmd/skywire-cli/commands/completion"
"github.com/skycoin/skywire/cmd/skywire-cli/commands/config"
"github.com/skycoin/skywire/cmd/skywire-cli/commands/dmsgpty"
"github.com/skycoin/skywire/cmd/skywire-cli/commands/mdisc"
"github.com/skycoin/skywire/cmd/skywire-cli/commands/rtfind"
"github.com/skycoin/skywire/cmd/skywire-cli/commands/visor"
Expand All @@ -32,6 +33,7 @@ func init() {
rtfind.RootCmd,
mdisc.RootCmd,
completion.RootCmd,
dmsgpty.RootCmd,
)
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/visor/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type API interface {
GetAppStats(appName string) (appserver.AppStats, error)
GetAppError(appName string) (string, error)
GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error)
RemoteVisors() []string

TransportTypes() ([]string, error)
Transports(types []string, pks []cipher.PubKey, logs bool) ([]*TransportSummary, error)
Expand Down Expand Up @@ -572,6 +573,15 @@ func (v *Visor) GetAppConnectionsSummary(appName string) ([]appserver.Connection
return nil, ErrProcNotAvailable
}

// RemoteVisors return list of connected remote visors
func (v *Visor) RemoteVisors() []string {
var visors []string
for _, conn := range v.remoteVisors {
visors = append(visors, conn.Addr.PK.String())
}
return visors
}

// TransportTypes implements API.
func (v *Visor) TransportTypes() ([]string, error) {
var types []string
Expand Down
28 changes: 15 additions & 13 deletions pkg/visor/hypervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type Conn struct {
type Hypervisor struct {
c hypervisorconfig.Config
visor *Visor
remoteVisors map[cipher.PubKey]Conn // connected remote visors to hypervisor
dmsgC *dmsg.Client
visors map[cipher.PubKey]Conn // connected remote visors
users *usermanager.UserManager
mu *sync.RWMutex
visorMu sync.Mutex
Expand Down Expand Up @@ -87,13 +87,14 @@ func New(config hypervisorconfig.Config, visor *Visor, dmsgC *dmsg.Client) (*Hyp
mLogger := logging.NewMasterLogger()
if visor != nil {
mLogger = visor.MasterLogger()
visor.remoteVisors = make(map[cipher.PubKey]Conn)
}

hv := &Hypervisor{
c: config,
visor: visor,
remoteVisors: make(map[cipher.PubKey]Conn),
dmsgC: dmsgC,
visors: make(map[cipher.PubKey]Conn),
users: usermanager.NewUserManager(mLogger, singleUserDB, config.Cookies),
mu: new(sync.RWMutex),
visorChanMux: make(map[cipher.PubKey]*chanMux),
Expand Down Expand Up @@ -146,7 +147,8 @@ func (hv *Hypervisor) ServeRPC(ctx context.Context, dmsgPort uint16) error {
log.Info("Accepted.")

hv.mu.Lock()
hv.visors[addr.PK] = *visorConn
hv.visor.remoteVisors[addr.PK] = *visorConn
hv.remoteVisors[addr.PK] = *visorConn
hv.mu.Unlock()
}
}
Expand Down Expand Up @@ -175,7 +177,7 @@ func (hv *Hypervisor) AddMockData(config MockConfig) error {
}

hv.mu.Lock()
hv.visors[pk] = Conn{
hv.remoteVisors[pk] = Conn{
Addr: dmsg.Addr{
PK: pk,
Port: uint16(i),
Expand Down Expand Up @@ -321,13 +323,13 @@ func (hv *Hypervisor) getDmsgSummary() []dmsgtracker.DmsgClientSummary {
hv.mu.RLock()
defer hv.mu.RUnlock()

pks := make([]cipher.PubKey, 0, len(hv.visors)+1)
pks := make([]cipher.PubKey, 0, len(hv.remoteVisors)+1)
if hv.visor != nil {
// Add hypervisor node.
pks = append(pks, hv.visor.conf.PK)
}

for pk := range hv.visors {
for pk := range hv.remoteVisors {
pks = append(pks, pk)
}
if hv.visor.isDTMReady() {
Expand Down Expand Up @@ -395,14 +397,14 @@ func (hv *Hypervisor) getVisors() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
hv.mu.RLock()
wg := new(sync.WaitGroup)
wg.Add(len(hv.visors))
wg.Add(len(hv.remoteVisors))

i := 0
if hv.visor != nil {
i++
}

overviews := make([]Overview, len(hv.visors)+i)
overviews := make([]Overview, len(hv.remoteVisors)+i)

if hv.visor != nil {
overview, err := hv.visor.Overview()
Expand All @@ -414,7 +416,7 @@ func (hv *Hypervisor) getVisors() http.HandlerFunc {
overviews[0] = *overview
}

for pk, c := range hv.visors {
for pk, c := range hv.remoteVisors {
go func(pk cipher.PubKey, c Conn, i int) {
log := hv.log(r).
WithField("visor_addr", c.Addr).
Expand Down Expand Up @@ -490,7 +492,7 @@ func (hv *Hypervisor) getAllVisorsSummary() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
hv.mu.RLock()
wg := new(sync.WaitGroup)
wg.Add(len(hv.visors))
wg.Add(len(hv.remoteVisors))

i := 1

Expand All @@ -504,7 +506,7 @@ func (hv *Hypervisor) getAllVisorsSummary() http.HandlerFunc {
wg.Done()
}()

summaries := make([]Summary, len(hv.visors)+i)
summaries := make([]Summary, len(hv.remoteVisors)+i)

summary, err := hv.visor.Summary()
if err != nil {
Expand All @@ -519,7 +521,7 @@ func (hv *Hypervisor) getAllVisorsSummary() http.HandlerFunc {

summaries[0] = makeSummaryResp(err == nil, true, summary)

for pk, c := range hv.visors {
for pk, c := range hv.remoteVisors {
go func(pk cipher.PubKey, c Conn, i int) {
log := hv.log(r).
WithField("visor_addr", c.Addr).
Expand Down Expand Up @@ -1409,7 +1411,7 @@ func (hv *Hypervisor) getPersistentTransports() http.HandlerFunc {

func (hv *Hypervisor) visorConn(pk cipher.PubKey) (Conn, bool) {
hv.mu.RLock()
conn, ok := hv.visors[pk]
conn, ok := hv.remoteVisors[pk]
hv.mu.RUnlock()

return conn, ok
Expand Down
12 changes: 12 additions & 0 deletions pkg/visor/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,15 @@ func (r *RPC) SetPublicAutoconnect(pAc *bool, _ *struct{}) (err error) {
err = r.visor.SetPublicAutoconnect(*pAc)
return err
}

// RemoteVisors return connected remote visors
func (r *RPC) RemoteVisors(_ *struct{}, out *[]string) (err error) {
defer rpcutil.LogCall(r.log, "RemoteVisor", nil)(out, &err)

remoteVisors := r.visor.RemoteVisors()
if remoteVisors != nil {
*out = remoteVisors
}

return err
}
13 changes: 13 additions & 0 deletions pkg/visor/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type rpcClient struct {
conn io.ReadWriteCloser
client *rpc.Client
prefix string
FixGob bool
}

// NewRPCClient creates a new API.
Expand Down Expand Up @@ -491,6 +492,13 @@ func (rc *rpcClient) UpdateStatus() (string, error) {
return result, err
}

// RemoteVisors calls RemoteVisors.
func (rc *rpcClient) RemoteVisors() []string {
output := []string{}
rc.Call("RemoteVisors", &struct{}{}, &output) // nolint
return output
}

// MockRPCClient mocks API.
type mockRPCClient struct {
startedAt time.Time
Expand Down Expand Up @@ -1047,3 +1055,8 @@ func (mc *mockRPCClient) SetPersistentTransports(_ []transport.PersistentTranspo
func (mc *mockRPCClient) GetPersistentTransports() ([]transport.PersistentTransports, error) {
return []transport.PersistentTransports{}, nil
}

// RemoteVisors implements API
func (mc *mockRPCClient) RemoteVisors() []string {
return []string{}
}
3 changes: 3 additions & 0 deletions pkg/visor/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/skycoin/dmsg/pkg/dmsg"
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire-utilities/pkg/cipher"
"github.com/skycoin/skywire/pkg/app/appdisc"
"github.com/skycoin/skywire/pkg/app/appevent"
"github.com/skycoin/skywire/pkg/app/appserver"
Expand Down Expand Up @@ -90,6 +91,8 @@ type Visor struct {
runtimeErrors chan error

isServicesHealthy *internalHealthInfo

remoteVisors map[cipher.PubKey]Conn // copy of connected remote visors to hypervisor
}

// todo: consider moving module closing to the module system
Expand Down