Skip to content

Commit

Permalink
add options to find logs within completed tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
gondor committed Dec 20, 2016
1 parent 60a67ac commit 63150c8
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 33 deletions.
28 changes: 20 additions & 8 deletions mesoslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package main

import (
"fmt"
"github.com/spf13/cobra"
ml "github.com/ContainX/go-mesoslog/mesoslog"
"github.com/spf13/cobra"
"io"
"os"
"strconv"
Expand All @@ -14,6 +14,11 @@ import (
const (
// StdErrFlag is a flag to output stderr logs vs stdout if true
StdErrFlag string = "stderr"
// CompletedFlag is a flag to output completed tasks if true
CompletedFlag string = "completed"
// LatestFlag is a flag to only capture the latest instance.
// This applies to completed and non-completed tasks
LatestFlag string = "latest"
// MasterFlag is the mesos master host:port flag
MasterFlag string = "master"
// DurationFlag is how often to poll in seconds
Expand Down Expand Up @@ -56,6 +61,8 @@ var appsCmd = &cobra.Command{

func main() {
rootCmd.PersistentFlags().Bool(StdErrFlag, false, "Output stderr log instead of default stdout")
printCmd.PersistentFlags().Bool(CompletedFlag, false, "Use completed tasks (default: running tasks)")
printCmd.PersistentFlags().Bool(LatestFlag, false, "Use the latest instance only")
rootCmd.PersistentFlags().StringP(MasterFlag, "m", "", "Mesos Master host:port (eg. 192.168.2.1:5050) or ENV [MESOS_MASTER]")
tailCmd.Flags().IntP(DurationFlag, "d", 5, "Log poll time (duration) in seconds")
rootCmd.AddCommand(appsCmd, printCmd, tailCmd, fileCmd)
Expand All @@ -69,7 +76,10 @@ func printLog(cmd *cobra.Command, args []string) {
return
}

logs, err := client().GetLog(args[0], getLogType(), "")
completed, _ := cmd.Flags().GetBool(CompletedFlag)
latest, _ := cmd.Flags().GetBool(LatestFlag)

logs, err := client(&ml.MesosClientOptions{SearchCompletedTasks: completed, ShowLatestOnly: latest}).GetLog(args[0], getLogType(), "")
if err != nil {
fmt.Printf("%s", err.Error())
return
Expand All @@ -87,8 +97,9 @@ func tailLog(cmd *cobra.Command, args []string) {
fmt.Println("ERROR: An [appId] must be specified")
return
}

duration, _ := cmd.Flags().GetInt(DurationFlag)
err := client().TailLog(args[0], getLogType(), duration)
err := client(nil).TailLog(args[0], getLogType(), duration)
if err != nil {
fmt.Printf("%s", err.Error())
return
Expand All @@ -101,7 +112,9 @@ func fileLog(cmd *cobra.Command, args []string) {
fmt.Println("ERROR: An [appId] and [output_dir] must be specified")
return
}
logs, err := client().GetLog(args[0], getLogType(), args[1])

completed, _ := cmd.Flags().GetBool(CompletedFlag)
logs, err := client(&ml.MesosClientOptions{SearchCompletedTasks: completed}).GetLog(args[0], getLogType(), args[1])
if err != nil {
fmt.Printf("%s", err.Error())
return
Expand All @@ -112,7 +125,7 @@ func fileLog(cmd *cobra.Command, args []string) {
}

func listApps(cmd *cobra.Command, args []string) {
apps, err := client().GetAppNames()
apps, err := client(nil).GetAppNames()
if err != nil {
fmt.Printf("%s", err.Error())
return
Expand All @@ -134,7 +147,7 @@ func getLogType() ml.LogType {
return ml.STDOUT
}

func client() *ml.MesosClient {
func client(options *ml.MesosClientOptions) *ml.MesosClient {
var host string
var port = 5050
master, err := rootCmd.PersistentFlags().GetString(MasterFlag)
Expand All @@ -145,7 +158,6 @@ func client() *ml.MesosClient {
os.Exit(1)
}
master = os.Getenv(EnvMesosMaster)
// master = "internal-lt-mesos-privatee-mw3lkjkwdyni-213084364.us-west-2.elb.amazonaws.com:5050"
}

if strings.Contains(master, ":") {
Expand All @@ -160,7 +172,7 @@ func client() *ml.MesosClient {
host = master
}

c, err := ml.NewMesosClient(host, port)
c, err := ml.NewMesosClientWithOptions(host, port, options)
if err != nil {
printErr(err)
os.Exit(1)
Expand Down
107 changes: 90 additions & 17 deletions mesoslog/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
Expand All @@ -30,22 +31,46 @@ type MesosClient struct {
Port int
MasterURL string
State *masterState
Options *MesosClientOptions
}

type MesosClientOptions struct {
SearchCompletedTasks bool
ShowLatestOnly bool
}

// NewMesosClient - Creates a new MesosClient
// {host} - the host/ip of the mesos master node
// {port} - the port # of the mesos master node
func NewMesosClient(host string, port int) (*MesosClient, error) {
return NewMesosClientWithOptions(host, port, nil)
}

// NewMesosClient - Creates a new MesosClient
// {host} - the host/ip of the mesos master node
// {port} - the port # of the mesos master node
// {options} - client options - optional
func NewMesosClientWithOptions(host string, port int, options *MesosClientOptions) (*MesosClient, error) {
masterURL, err := getMasterRedirect(host, port)
if err != nil {
return nil, err
}

if options == nil {
options = &MesosClientOptions{}
}

state, err := getMasterState(masterURL)
if err != nil {
return nil, err
}
return &MesosClient{Host: host, Port: port, MasterURL: masterURL, State: state}, nil
return &MesosClient{
Host: host,
Port: port,
MasterURL: masterURL,
State: state,
Options: options,
}, nil
}

// GetAppNames - List all unique app names aka task names running in the Mesos cluster
Expand All @@ -63,13 +88,24 @@ func (c *MesosClient) GetAppNames() (map[string]int, error) {
// {dir} - optional output dir which is used to download vs stdout
func (c *MesosClient) GetLog(appID string, logtype LogType, dir string) ([]*LogOut, error) {
var result []*LogOut
tasks := findTask(c.State, appID)

taskInfo := findTask(c.State, appID)
tasks := taskInfo.Tasks
if c.Options.SearchCompletedTasks {
tasks = taskInfo.CompletedTasks
}

sort.Sort(SortTasksByLatestTimestamp(tasks))

if c.Options.ShowLatestOnly {
tasks = tasks[:1]
}

if tasks == nil || len(tasks) == 0 {
return nil, fmt.Errorf("application could not be found")
}

for _, task := range tasks {

slaveInfo, err := c.getSlaveInfo(task)
if err != nil {
return nil, err
Expand All @@ -95,7 +131,8 @@ func (c *MesosClient) GetLog(appID string, logtype LogType, dir string) ([]*LogO
// {logtype} - the desired log type STDOUT | STDERR
// {duration} - poll frequency in seconds
func (c *MesosClient) TailLog(appID string, logtype LogType, duration int) error {
tasks := findTask(c.State, appID)
tasks := findTask(c.State, appID).Tasks

if tasks == nil || len(tasks) == 0 {
return fmt.Errorf("application could not be found")
}
Expand All @@ -121,10 +158,16 @@ func (c *MesosClient) TailLog(appID string, logtype LogType, duration int) error
// GetAppNameForTaskID - Attempts to find the Mesos Application name for the given TaskID
// {taskID} - the task identifier
func (c *MesosClient) GetAppNameForTaskID(taskID string) (string, error) {
tasks := findTask(c.State, taskID)
if tasks != nil {
for _, task := range tasks {
return task.Name, nil
ti := findTask(c.State, taskID)
if ti != nil {
if c.Options.SearchCompletedTasks == false {
for _, task := range ti.Tasks {
return task.Name, nil
}
} else {
for _, task := range ti.CompletedTasks {
return task.Name, nil
}
}
}
return "", fmt.Errorf("application could not be found")
Expand Down Expand Up @@ -200,6 +243,7 @@ func decorateLog(name, data string) string {
}

func (c *MesosClient) getSlaveInfo(task *mstateTask) (*slaveInfo, error) {

slave := findSlave(c.State, task.SlaveID)
if slave == nil {
return nil, fmt.Errorf("invalid state.json; referenced slave not present")
Expand All @@ -215,7 +259,8 @@ func (c *MesosClient) getSlaveInfo(task *mstateTask) (*slaveInfo, error) {
return nil, err
}

directory := findDirectory(slaveState, task.FrameworkID, task.ID, task.ExecutorID)
directory := findDirectory(slaveState, task, c.Options.SearchCompletedTasks)

if directory == "" {
return nil, fmt.Errorf("couldn't locate directory on slave")
}
Expand Down Expand Up @@ -259,16 +304,38 @@ func getMasterState(masterURL string) (*masterState, error) {
return &mstate, nil
}

func findTask(state *masterState, appID string) map[string]*mstateTask {
m := make(map[string]*mstateTask)
func findTask(state *masterState, appID string) *taskInfo {
taskInfo := &taskInfo{
Tasks: []*mstateTask{},
CompletedTasks: []*mstateTask{},
}
for _, framework := range state.Frameworks {
for _, task := range framework.Tasks {
if task.Name == appID || task.ID == appID {
m[task.ID] = task
task.UpdateLastState(findTaskLastState(task))
taskInfo.Tasks = append(taskInfo.Tasks, task)
}
}

if framework.CompletedTasks != nil {
for _, task := range framework.CompletedTasks {
if task.Name == appID || task.ID == appID {
task.UpdateLastState(findTaskLastState(task))
taskInfo.CompletedTasks = append(taskInfo.CompletedTasks, task)
}
}
}
}
return m
return taskInfo
}

func findTaskLastState(task *mstateTask) *mstateTaskStatus {
statuses := task.Statuses

if statuses != nil && len(statuses) > 0 {
return statuses[len(statuses)-1]
}
return nil
}

func findApps(state *masterState) map[string]int {
Expand Down Expand Up @@ -337,14 +404,20 @@ func getSlaveState(slaveURL *url.URL) (*slaveState, error) {
return &sstate, nil
}

func findDirectory(sstate *slaveState, frameworkID, taskID, executorID string) string {
func findDirectory(sstate *slaveState, task *mstateTask, completedTasks bool) string {

for _, framework := range sstate.Frameworks {
if framework.ID != frameworkID {
if framework.ID != task.FrameworkID {
continue
}
for _, executor := range framework.Executors {

if executor.ID == executorID || executor.ID == taskID {
executors := framework.Executors
if completedTasks {
executors = framework.CompletedExecutors
}

for _, executor := range executors {
if executor.ID == task.ExecutorID || executor.ID == task.ID {
return executor.Directory
}
}
Expand Down
13 changes: 13 additions & 0 deletions mesoslog/sort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package mesoslog

type SortTasksByLatestTimestamp []*mstateTask

func (s SortTasksByLatestTimestamp) Len() int {
return len(s)
}
func (s SortTasksByLatestTimestamp) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s SortTasksByLatestTimestamp) Less(i, j int) bool {
return s[j].LastTimestamp < s[i].LastTimestamp
}
38 changes: 30 additions & 8 deletions mesoslog/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ type masterState struct {
Slaves []*mstateSlave `json:"slaves"`
}

type taskInfo struct {
Tasks []*mstateTask
CompletedTasks []*mstateTask
}

type mstateFramework struct {
Tasks []*mstateTask `json:"tasks"`
Tasks []*mstateTask `json:"tasks"`
CompletedTasks []*mstateTask `json:"completed_tasks"`
}

type mstateSlave struct {
Expand All @@ -35,11 +41,19 @@ type mstateSlave struct {
}

type mstateTask struct {
ID string `json:"id"`
FrameworkID string `json:"framework_id"`
ExecutorID string `json:"executor_id"`
SlaveID string `json:"slave_id"`
Name string `json:"name"`
ID string `json:"id"`
FrameworkID string `json:"framework_id"`
ExecutorID string `json:"executor_id"`
SlaveID string `json:"slave_id"`
Name string `json:"name"`
Statuses []*mstateTaskStatus `json:"statuses"`
LastTimestamp float64 `json:"-"`
LastState string `json:"-"`
}

type mstateTaskStatus struct {
State string `json:"state"`
Timestamp float64 `json:"timestamp"`
}

type slaveState struct {
Expand All @@ -48,8 +62,9 @@ type slaveState struct {
}

type sstateFramework struct {
ID string `json:"id"`
Executors []*sstateExecutor `json:"executors"`
ID string `json:"id"`
Executors []*sstateExecutor `json:"executors"`
CompletedExecutors []*sstateExecutor `json:"completed_executors"`
}

type sstateExecutor struct {
Expand Down Expand Up @@ -78,3 +93,10 @@ type LogOut struct {
// Log - filename of the outputted log when in download more or RAW log if request is to print to stdout
Log string
}

func (t *mstateTask) UpdateLastState(state *mstateTaskStatus) {
if state != nil {
t.LastState = state.State
t.LastTimestamp = state.Timestamp
}
}

0 comments on commit 63150c8

Please sign in to comment.