Skip to content

Commit

Permalink
Log collect by sk (#1566)
Browse files Browse the repository at this point in the history
* optimizations for skywire-cli log

* make format

* fix variable name

* remove unused func & minor change on help menu
  • Loading branch information
0pcom authored May 18, 2023
1 parent 1b5e708 commit 10eb8aa
Showing 1 changed file with 64 additions and 47 deletions.
111 changes: 64 additions & 47 deletions cmd/skywire-cli/commands/log/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,55 @@ import (
"github.com/skycoin/skywire-utilities/pkg/cipher"
"github.com/skycoin/skywire-utilities/pkg/cmdutil"
"github.com/skycoin/skywire-utilities/pkg/logging"
"github.com/skycoin/skywire-utilities/pkg/skyenv"
)

var (
env string
duration int
minv string
allVisors bool
batchSize int
maxFileSize int64
utAddr string
env string
duration int
minv string
allVisors bool
batchSize int
maxFileSize int64
utAddr string
sk cipher.SecKey
dmsgDisc string
logOnly bool
surveyOnly bool
deleteOnErrors bool
)

func init() {
logCmd.Flags().SortFlags = false
logCmd.Flags().StringVarP(&env, "env", "e", "prod", "selecting env to fetch uptimes, default is prod")
logCmd.Flags().BoolVarP(&logOnly, "log", "l", false, "fetch only transport logs")
logCmd.Flags().BoolVarP(&surveyOnly, "survey", "v", false, "fetch only surveys")
logCmd.Flags().BoolVarP(&deleteOnErrors, "clean", "c", false, "delete files and folders on errors")
logCmd.Flags().StringVar(&minv, "minv", "v1.3.4", "minimum version for get logs, default is 1.3.4")
logCmd.Flags().IntVarP(&duration, "duration", "d", 1, "count of days before today to fetch logs")
logCmd.Flags().BoolVar(&allVisors, "all", false, "consider all visors, actually skip filtering on version")
logCmd.Flags().IntVarP(&duration, "duration", "n", 1, "numberof days before today to fetch transport logs for")
logCmd.Flags().BoolVar(&allVisors, "all", false, "consider all visors ; no version filtering")
logCmd.Flags().IntVar(&batchSize, "batchSize", 50, "number of visor in each batch, default is 50")
logCmd.Flags().Int64Var(&maxFileSize, "maxfilesize", 30, "maximum file size allowed to download during collecting logs, on KB")
logCmd.Flags().StringVar(&utAddr, "ut", "", "custom uptime tracker url, usable for get specific(s) visors log data")
logCmd.Flags().Int64Var(&maxFileSize, "maxfilesize", 30, "maximum file size allowed to download during collecting logs, in KB")
logCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "D", skyenv.DmsgDiscAddr, "dmsg discovery url\n")
logCmd.Flags().StringVarP(&utAddr, "ut", "u", "", "custom uptime tracker url")
if os.Getenv("DMSGGET_SK") != "" {
sk.Set(os.Getenv("DMSGGET_SK")) //nolint
}
logCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r")
}

// RootCmd is surveyCmd
var RootCmd = logCmd

var logCmd = &cobra.Command{
Use: "log collecting",
Short: "collecting logs",
Long: "collecting logs from all visors to calculate rewards",
Use: "log",
Short: "survey & transport log collection",
Long: "collect surveys and transport logging from visors which are online in the uptime tracker",
Run: func(cmd *cobra.Command, args []string) {
log := logging.MustGetLogger("log-collecting")
if logOnly && surveyOnly {
log.Fatal("use of mutually exclusive flags --log and --survey")
}

// Preparing directories
if _, err := os.ReadDir("log_collecting"); err != nil {
Expand All @@ -76,24 +93,29 @@ var logCmd = &cobra.Command{
cancel()
os.Exit(1)
}()
// Fetch visors data from uptime tracker
endpoint := "https://ut.skywire.skycoin.com/uptimes?v=v2"
// Set the uptime tracker to fetch data from
endpoint := skyenv.UptimeTrackerAddr + "/uptimes?v=v2"
if env == "test" {
endpoint = "https://ut.skywire.dev/uptimes?v=v2"
endpoint = skyenv.TestUptimeTrackerAddr + "/uptimes?v=v2"
}
if utAddr != "" {
endpoint = utAddr
}
//Fetch the uptime data over http
uptimes, err := getUptimes(endpoint, log)
if err != nil {
log.WithError(err).Panic("Unable to get data from uptime tracker.")
}
//randomize the order of the fetching - prevents observed hanging
//randomize the order of the survey collection - workaround for hanging
rand.Shuffle(len(uptimes), func(i, j int) {
uptimes[i], uptimes[j] = uptimes[j], uptimes[i]
})
// Create dmsg http client
pk, sk, _ := genKeys("") //nolint
pk, err := sk.PubKey()
if err != nil {
pk, sk = cipher.GenerateKeyPair()
}

dmsgC, closeDmsg, err := dg.StartDmsg(ctx, log, pk, sk)
if err != nil {
log.WithError(err).Panic(err)
Expand Down Expand Up @@ -127,25 +149,33 @@ var logCmd = &cobra.Command{
}
deleteOnError = true
}

err = download(ctx, log, httpC, "node-info.json", "node-info.json", key, maxFileSize)
if err != nil {
if deleteOnError {
bulkFolders = append(bulkFolders, key)
if !logOnly {
err = download(ctx, log, httpC, "node-info.json", "node-info.json", key, maxFileSize)
if err != nil {
// This logic saves time, however it potentially
// omits or deletes transport logs for visors
// that are not seeking rewards which we still should collect
// so it is made optional via flag
if deleteOnErrors {
if deleteOnError {
bulkFolders = append(bulkFolders, key)
}
return
}
}
return
}

if duration == 1 {
yesterday := time.Now().AddDate(0, 0, -1).UTC().Format("2006-01-02")
download(ctx, log, httpC, "transport_logs/"+yesterday+".csv", yesterday+".csv", key, maxFileSize) //nolint
} else {
for i := 1; i <= duration; i++ {
date := time.Now().AddDate(0, 0, -i).UTC().Format("2006-01-02")
download(ctx, log, httpC, "transport_logs/"+date+".csv", date+".csv", key, maxFileSize) //nolint
if !surveyOnly {
if duration == 1 {
yesterday := time.Now().AddDate(0, 0, -1).UTC().Format("2006-01-02")
download(ctx, log, httpC, "transport_logs/"+yesterday+".csv", yesterday+".csv", key, maxFileSize) //nolint
} else {
for i := 1; i <= duration; i++ {
date := time.Now().AddDate(0, 0, -i).UTC().Format("2006-01-02")
download(ctx, log, httpC, "transport_logs/"+date+".csv", date+".csv", key, maxFileSize) //nolint
}
}
}

}(v.PubKey, &wg)
batchSize--
if batchSize == 0 {
Expand Down Expand Up @@ -184,7 +214,6 @@ func getUptimes(endpoint string, log *logging.Logger) ([]VisorUptimeResponse, er
log.Error("Error while fetching data from uptime service. Error: ", err)
return results, errors.New("Cannot get Uptime data")
}

defer response.Body.Close() //nolint
body, err := io.ReadAll(response.Body)
if err != nil {
Expand All @@ -209,22 +238,10 @@ type VisorUptimeResponse struct { //nolint
Version string `json:"version"`
}

func genKeys(skStr string) (pk cipher.PubKey, sk cipher.SecKey, err error) {
if skStr == "" {
pk, sk = cipher.GenerateKeyPair()
return
}
if err = sk.Set(skStr); err != nil {
return
}
pk, err = sk.PubKey()
return
}

func getAllDMSGServers() []dmsgServer {
var results []dmsgServer

response, err := http.Get("https://dmsgd.skywire.skycoin.com/dmsg-discovery/all_servers") //nolint
response, err := http.Get(dmsgDisc + "/dmsg-discovery/all_servers") //nolint
if err != nil {
return results
}
Expand Down

0 comments on commit 10eb8aa

Please sign in to comment.