Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Log Plugin v1 (#26)
Browse files Browse the repository at this point in the history
1. Adds log plumbing for basic container tasks only.
2. Sets up some basic helper code that reads configs in order to create log links.  This will be refactored later.
  • Loading branch information
matthewphsmith authored and lyft-buildnotify-5 committed Dec 27, 2018
1 parent 01a95bf commit 37a7609
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 26 deletions.
8 changes: 5 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions go/tasks/v1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,35 @@ package v1

import "github.com/lyft/flytestdlib/config"

// TODO: This should all be part of flytestdlib
const configSectionKey = "plugins"
const logConfigSectionKey = "logs"

type Config struct {
IsDebugMode bool `json:"debug-mode" pflag:",Enable debug mode"`
}

type LogConfig struct {
IsCloudwatchEnabled bool `json:"cloudwatch-enabled" pflag:",Enable Cloudwatch Logging"`
AwsRegion string `json:"aws-region" pflag:",Region in which Cloudwatch logs are stored."`

IsKubernetesEnabled bool `json:"kubernetes-enabled" pflag:",Enable Kubernetes Logging"`
KubernetesUrl string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"`
}

func init() {
if err := config.RegisterSection(configSectionKey, &Config{}); err != nil {
panic(err)
}
if err := config.RegisterSection(logConfigSectionKey, &LogConfig{}); err != nil {
panic(err)
}
}

func GetConfig() *Config {
return config.GetSection(configSectionKey).(*Config)
}

func GetLogConfig() *LogConfig {
return config.GetSection(logConfigSectionKey).(*LogConfig)
}
21 changes: 16 additions & 5 deletions go/tasks/v1/k8splugins/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,29 @@ type containerTaskExecutor struct {

func (containerTaskExecutor) GetTaskStatus(r flytek8s.K8sResource) (types.TaskStatus, *events.TaskEventInfo, error) {
pod := r.(*v1.Pod)

logs, err := GetLogsForContainerInPod(pod, 0, " (User)")

if err != nil {
return types.TaskStatusPermanentFailure(err), nil, nil
}

info := &events.TaskEventInfo{
Logs: logs,
}

switch pod.Status.Phase {
case v1.PodSucceeded:
return types.TaskStatusSucceeded, nil, nil
return types.TaskStatusSucceeded, info, nil
case v1.PodFailed:
return types.TaskStatusPermanentFailure(errors.Errorf(pod.Status.Reason, pod.Status.Message)), nil, nil
return types.TaskStatusPermanentFailure(errors.Errorf(pod.Status.Reason, pod.Status.Message)), info, nil
case v1.PodPending:
status, err := DemystifyPending(pod.Status)
return status, nil, err
return status, info, err
case v1.PodUnknown:
return types.TaskStatusUnknown, nil, nil
return types.TaskStatusUnknown, info, nil
}
return types.TaskStatusRunning, nil, nil
return types.TaskStatusRunning, info, nil
}

// Creates a new Pod that will Exit on completion. The pods have no retries by design
Expand Down
8 changes: 4 additions & 4 deletions go/tasks/v1/k8splugins/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ func TestContainerTaskExecutor_GetTaskPhase(t *testing.T) {
t.Run("running", func(t *testing.T) {
s, i, err := c.GetTaskStatus(j)
assert.NoError(t, err)
assert.Nil(t, i)
assert.NotNil(t, i)
assert.Equal(t, types.TaskPhaseRunning, s.Phase)
})

t.Run("failNoCondition", func(t *testing.T) {
j.Status.Phase = v1.PodFailed
s, i, err := c.GetTaskStatus(j)
assert.NoError(t, err)
assert.Nil(t, i)
assert.NotNil(t, i)
assert.Equal(t, types.TaskPhasePermanentFailure, s.Phase)
})

Expand All @@ -107,7 +107,7 @@ func TestContainerTaskExecutor_GetTaskPhase(t *testing.T) {
}
s, i, err := c.GetTaskStatus(j)
assert.NoError(t, err)
assert.Nil(t, i)
assert.NotNil(t, i)
assert.Equal(t, types.TaskPhasePermanentFailure, s.Phase)
})

Expand All @@ -116,6 +116,6 @@ func TestContainerTaskExecutor_GetTaskPhase(t *testing.T) {
s, i, err := c.GetTaskStatus(j)
assert.NoError(t, err)
assert.Equal(t, types.TaskPhaseSucceeded, s.Phase)
assert.Nil(t, i)
assert.NotNil(t, i)
})
}
45 changes: 45 additions & 0 deletions go/tasks/v1/k8splugins/logging_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package k8splugins

import (
logUtils "github.com/lyft/flyteidl/clients/go/coreutils/logs"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
plugins "github.com/lyft/flyteplugins/go/tasks/v1"
"k8s.io/api/core/v1"
)

var GetLogConfig = plugins.GetLogConfig

func GetLogsForContainerInPod(pod *v1.Pod, index uint32, nameSuffix string) ([]*core.TaskLog, error) {
var logs []*core.TaskLog
logConfig := GetLogConfig()

if logConfig.IsKubernetesEnabled {
k8sLog, err := logUtils.NewKubernetesLogPlugin(logConfig.KubernetesUrl).GetTaskLog(
pod.Name,
pod.Namespace,
pod.Spec.Containers[index].Name,
pod.Status.ContainerStatuses[index].ContainerID,
"Kubernetes Logs"+nameSuffix)

if err != nil {
return nil, err
}
logs = append(logs, &k8sLog)
}

if logConfig.IsCloudwatchEnabled {
cwLogs, err := logUtils.NewCloudwatchLogPlugin(logConfig.AwsRegion).GetTaskLog(
pod.Name,
pod.Namespace,
pod.Spec.Containers[index].Name,
pod.Status.ContainerStatuses[index].ContainerID,
"Cloudwatch Logs"+nameSuffix)

if err != nil {
return nil, err
}
logs = append(logs, &cwLogs)
}

return logs, nil
}
133 changes: 133 additions & 0 deletions go/tasks/v1/k8splugins/logging_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package k8splugins

import (
plugins "github.com/lyft/flyteplugins/go/tasks/v1"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
"testing"
)

func TestGetLogsForContainerInPod_NoLogs(t *testing.T) {
oldGetConfig := GetLogConfig

// TODO: Refactor for dependency injection and remove
defer func() { GetLogConfig = oldGetConfig }()

GetLogConfig = func() *plugins.LogConfig {
return &plugins.LogConfig{}
}

logs, err := GetLogsForContainerInPod(nil, 0, " Suffix")
assert.Nil(t, err)
assert.Len(t, logs, 0)
}

func TestGetLogsForContainerInPod_Cloudwatch(t *testing.T) {
oldGetConfig := GetLogConfig

// TODO: Refactor for dependency injection and remove
defer func() { GetLogConfig = oldGetConfig }()

GetLogConfig = func() *plugins.LogConfig {
return &plugins.LogConfig{
IsCloudwatchEnabled: true,
AwsRegion: "us-east-1",
}
}

pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "ContainerName",
},
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
ContainerID: "ContainerID",
},
},
},
}
pod.Name = "PodName"

logs, err := GetLogsForContainerInPod(pod, 0, " Suffix")
assert.Nil(t, err)
assert.Len(t, logs, 1)
}

func TestGetLogsForContainerInPod_K8s(t *testing.T) {
oldGetConfig := GetLogConfig

// TODO: Refactor for dependency injection and remove
defer func() { GetLogConfig = oldGetConfig }()

GetLogConfig = func() *plugins.LogConfig {
return &plugins.LogConfig{
IsKubernetesEnabled: true,
KubernetesUrl: "k8s.com",
}
}

pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "ContainerName",
},
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
ContainerID: "ContainerID",
},
},
},
}
pod.Name = "PodName"

logs, err := GetLogsForContainerInPod(pod, 0, " Suffix")
assert.Nil(t, err)
assert.Len(t, logs, 1)
}

func TestGetLogsForContainerInPod_All(t *testing.T) {
oldGetConfig := GetLogConfig

// TODO: Refactor for dependency injection and remove
defer func() { GetLogConfig = oldGetConfig }()

GetLogConfig = func() *plugins.LogConfig {
return &plugins.LogConfig{
IsKubernetesEnabled: true,
KubernetesUrl: "k8s.com",
IsCloudwatchEnabled: true,
AwsRegion: "us-east-1",
}
}

pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "ContainerName",
},
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
ContainerID: "ContainerID",
},
},
},
}
pod.Name = "PodName"

logs, err := GetLogsForContainerInPod(pod, 0, " Suffix")
assert.Nil(t, err)
assert.Len(t, logs, 2)
}
13 changes: 9 additions & 4 deletions go/tasks/v1/k8splugins/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,20 @@ func (sparkResourceHandler) BuildIdentityResource(taskCtx types.TaskContext) (fl
func (sparkResourceHandler) GetTaskStatus(r flytek8s.K8sResource) (types.TaskStatus, *events.TaskEventInfo, error) {
app := r.(*sparkOp.SparkApplication)

// TODO: Get logs for Spark application
info := &events.TaskEventInfo{
Logs: nil,
}

switch app.Status.AppState.State {
case sparkOp.NewState, sparkOp.SubmittedState:
return types.TaskStatusQueued, nil, nil
return types.TaskStatusQueued, info, nil
case sparkOp.FailedState:
return types.TaskStatusPermanentFailure(errors.Errorf("UnknownError", "spark job exited with failure")), nil, nil
return types.TaskStatusPermanentFailure(errors.Errorf("UnknownError", "spark job exited with failure")), info, nil
case sparkOp.CompletedState:
return types.TaskStatusSucceeded, nil, nil
return types.TaskStatusSucceeded, info, nil
default:
return types.TaskStatusRunning, nil, nil
return types.TaskStatusRunning, info, nil
}
}

Expand Down
Loading

0 comments on commit 37a7609

Please sign in to comment.