Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor task logs framework #4396

Merged
merged 3 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 7 additions & 18 deletions flyteplugins/go/tasks/logs/config.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,34 @@
package logs

import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
)

//go:generate pflags LogConfig --default-var=DefaultConfig

// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates.
type TemplateURI = string

// LogConfig encapsulates plugins' log configs
type LogConfig struct {
IsCloudwatchEnabled bool `json:"cloudwatch-enabled" pflag:",Enable Cloudwatch Logging"`
// Deprecated: Please use CloudwatchTemplateURI
CloudwatchRegion string `json:"cloudwatch-region" pflag:",AWS region in which Cloudwatch logs are stored."`
// Deprecated: Please use CloudwatchTemplateURI
CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."`
CloudwatchTemplateURI TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"`
CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."`
CloudwatchTemplateURI tasklog.TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"`

IsKubernetesEnabled bool `json:"kubernetes-enabled" pflag:",Enable Kubernetes Logging"`
// Deprecated: Please use KubernetesTemplateURI
KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"`
KubernetesTemplateURI TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"`
KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"`
KubernetesTemplateURI tasklog.TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"`

IsStackDriverEnabled bool `json:"stackdriver-enabled" pflag:",Enable Log-links to stackdriver"`
// Deprecated: Please use StackDriverTemplateURI
GCPProjectName string `json:"gcp-project" pflag:",Name of the project in GCP"`
// Deprecated: Please use StackDriverTemplateURI
StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"`
StackDriverTemplateURI TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"`

Templates []TemplateLogPluginConfig `json:"templates" pflag:"-,"`
}
StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"`
StackDriverTemplateURI tasklog.TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"`

type TemplateLogPluginConfig struct {
DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."`
TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."`
MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."`
Scheme tasklog.TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."`
Templates []tasklog.TemplateLogPlugin `json:"templates" pflag:"-,"`
}

var (
Expand Down
53 changes: 17 additions & 36 deletions flyteplugins/go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ import (
"github.com/flyteorg/flyte/flytestdlib/logger"
)

type logPlugin struct {
Name string
Plugin tasklog.Plugin
}

// Internal
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) {
if logPlugin == nil {
Expand Down Expand Up @@ -66,67 +61,53 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas
return logs.TaskLogs, nil
}

type taskLogPluginWrapper struct {
logPlugins []logPlugin
type templateLogPluginCollection struct {
plugins []tasklog.TemplateLogPlugin
}

func (t taskLogPluginWrapper) GetTaskLogs(input tasklog.Input) (logOutput tasklog.Output, err error) {
logs := make([]*core.TaskLog, 0, len(t.logPlugins))
suffix := input.LogName
func (t templateLogPluginCollection) GetTaskLogs(input tasklog.Input) (tasklog.Output, error) {
var taskLogs []*core.TaskLog

for _, plugin := range t.logPlugins {
input.LogName = plugin.Name + suffix
o, err := plugin.Plugin.GetTaskLogs(input)
for _, plugin := range t.plugins {
o, err := plugin.GetTaskLogs(input)
if err != nil {
return tasklog.Output{}, err
}

logs = append(logs, o.TaskLogs...)
taskLogs = append(taskLogs, o.TaskLogs...)
}

return tasklog.Output{
TaskLogs: logs,
}, nil
return tasklog.Output{TaskLogs: taskLogs}, nil
}

// InitializeLogPlugins initializes log plugin based on config.
func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) {
// Use a list to maintain order.
logPlugins := make([]logPlugin, 0, 2)
var plugins []tasklog.TemplateLogPlugin

if cfg.IsKubernetesEnabled {
if len(cfg.KubernetesTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.KubernetesTemplateURI}, core.TaskLog_JSON)})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.KubernetesTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, core.TaskLog_JSON)})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, MessageFormat: core.TaskLog_JSON})
}
}

if cfg.IsCloudwatchEnabled {
if len(cfg.CloudwatchTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.CloudwatchTemplateURI}, core.TaskLog_JSON)})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.CloudwatchTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, core.TaskLog_JSON)})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, MessageFormat: core.TaskLog_JSON})
}
}

if cfg.IsStackDriverEnabled {
if len(cfg.StackDriverTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.StackDriverTemplateURI}, core.TaskLog_JSON)})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.StackDriverTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, core.TaskLog_JSON)})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, MessageFormat: core.TaskLog_JSON})
}
}

if len(cfg.Templates) > 0 {
for _, cfg := range cfg.Templates {
logPlugins = append(logPlugins, logPlugin{Name: cfg.DisplayName, Plugin: tasklog.NewTemplateLogPlugin(cfg.Scheme, cfg.TemplateURIs, cfg.MessageFormat)})
}
}

if len(logPlugins) == 0 {
return nil, nil
}

return taskLogPluginWrapper{logPlugins: logPlugins}, nil
plugins = append(plugins, cfg.Templates...)
return templateLogPluginCollection{plugins: plugins}, nil
}
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/logs/logging_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*c

func TestGetLogsForContainerInPod_Templates(t *testing.T) {
assertTestSucceeded(t, &LogConfig{
Templates: []TemplateLogPluginConfig{
Templates: []tasklog.TemplateLogPlugin{
{
DisplayName: "StackDriver",
TemplateURIs: []string{
Expand Down
10 changes: 10 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const (
TemplateSchemeTaskExecution
)

// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates.
type TemplateURI = string

type TemplateVar struct {
Regex *regexp.Regexp
Value string
Expand Down Expand Up @@ -57,3 +60,10 @@ type Plugin interface {
// Generates a TaskLog object given necessary computation information
GetTaskLogs(i Input) (logs Output, err error)
}

type TemplateLogPlugin struct {
DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."`
TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."`
MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."`
Scheme TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."`
}
57 changes: 12 additions & 45 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type templateRegexes struct {
ExecutionName *regexp.Regexp
ExecutionProject *regexp.Regexp
ExecutionDomain *regexp.Regexp
GeneratedName *regexp.Regexp
}

func initDefaultRegexes() templateRegexes {
Expand All @@ -58,6 +59,7 @@ func initDefaultRegexes() templateRegexes {
MustCreateRegex("executionName"),
MustCreateRegex("executionProject"),
MustCreateRegex("executionDomain"),
MustCreateRegex("generatedName"),
}
}

Expand Down Expand Up @@ -121,6 +123,10 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
defaultRegexes.NodeID,
input.TaskExecutionID.GetUniqueNodeID(),
},
TemplateVar{
defaultRegexes.GeneratedName,
input.TaskExecutionID.GetGeneratedName(),
},
TemplateVar{
defaultRegexes.TaskRetryAttempt,
strconv.FormatUint(uint64(taskExecutionIdentifier.RetryAttempt), 10),
Expand Down Expand Up @@ -172,55 +178,16 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
return vars
}

// A simple log plugin that supports templates in urls to build the final log link.
// See `defaultRegexes` for supported templates.
type TemplateLogPlugin struct {
scheme TemplateScheme
templateUris []string
messageFormat core.TaskLog_MessageFormat
}

func (s TemplateLogPlugin) GetTaskLog(podName, podUID, namespace, containerName, containerID, logName string, podRFC3339StartTime string, podRFC3339FinishTime string, podUnixStartTime, podUnixFinishTime int64) (core.TaskLog, error) {
o, err := s.GetTaskLogs(Input{
LogName: logName,
Namespace: namespace,
PodName: podName,
PodUID: podUID,
ContainerName: containerName,
ContainerID: containerID,
PodRFC3339StartTime: podRFC3339StartTime,
PodRFC3339FinishTime: podRFC3339FinishTime,
PodUnixStartTime: podUnixStartTime,
PodUnixFinishTime: podUnixFinishTime,
})

if err != nil || len(o.TaskLogs) == 0 {
return core.TaskLog{}, err
}

return *o.TaskLogs[0], nil
}

func (s TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) {
templateVars := input.templateVarsForScheme(s.scheme)
taskLogs := make([]*core.TaskLog, 0, len(s.templateUris))
for _, templateURI := range s.templateUris {
func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) {
templateVars := input.templateVarsForScheme(p.Scheme)
taskLogs := make([]*core.TaskLog, 0, len(p.TemplateURIs))
for _, templateURI := range p.TemplateURIs {
taskLogs = append(taskLogs, &core.TaskLog{
Uri: replaceAll(templateURI, templateVars),
Name: input.LogName,
MessageFormat: s.messageFormat,
Name: p.DisplayName + input.LogName,
MessageFormat: p.MessageFormat,
})
}

return Output{TaskLogs: taskLogs}, nil
}

// NewTemplateLogPlugin creates a template-based log plugin with the provided template Uri and message format.
// See `defaultRegexes` for supported templates.
func NewTemplateLogPlugin(scheme TemplateScheme, templateUris []string, messageFormat core.TaskLog_MessageFormat) TemplateLogPlugin {
return TemplateLogPlugin{
scheme: scheme,
templateUris: templateUris,
messageFormat: messageFormat,
}
}
Loading
Loading