-
Notifications
You must be signed in to change notification settings - Fork 674
/
logging_utils.go
130 lines (109 loc) · 5.27 KB
/
logging_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package logs
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
"github.com/flyteorg/flyte/flytestdlib/logger"
)
// Internal
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVars []tasklog.TemplateVar, taskTemplate *core.TaskTemplate) ([]*core.TaskLog, error) {
if logPlugin == nil {
return nil, nil
}
if pod == nil {
logger.Error(ctx, "cannot extract logs for a nil container")
return nil, nil
}
if uint32(len(pod.Spec.Containers)) <= index {
logger.Errorf(ctx, "container IndexOutOfBound, requested [%d], but total containers [%d] in pod phase [%v]", index, len(pod.Spec.Containers), pod.Status.Phase)
return nil, nil
}
containerID := v1.ContainerStatus{}.ContainerID
if uint32(len(pod.Status.ContainerStatuses)) <= index {
logger.Errorf(ctx, "containerStatus IndexOutOfBound, requested [%d], but total containerStatuses [%d] in pod phase [%v]", index, len(pod.Status.ContainerStatuses), pod.Status.Phase)
} else {
containerID = pod.Status.ContainerStatuses[index].ContainerID
}
startTime := pod.CreationTimestamp.Unix()
finishTime := time.Now().Unix()
logs, err := logPlugin.GetTaskLogs(
tasklog.Input{
PodName: pod.Name,
PodUID: string(pod.GetUID()),
Namespace: pod.Namespace,
ContainerName: pod.Spec.Containers[index].Name,
ContainerID: containerID,
LogName: nameSuffix,
PodRFC3339StartTime: time.Unix(startTime, 0).Format(time.RFC3339),
PodRFC3339FinishTime: time.Unix(finishTime, 0).Format(time.RFC3339),
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
TaskExecutionID: taskExecID,
ExtraTemplateVars: extraLogTemplateVars,
TaskTemplate: taskTemplate,
HostName: pod.Spec.Hostname,
},
)
if err != nil {
return nil, err
}
return logs.TaskLogs, nil
}
type templateLogPluginCollection struct {
plugins []tasklog.TemplateLogPlugin
dynamicPlugins []tasklog.TemplateLogPlugin
}
func (t templateLogPluginCollection) GetTaskLogs(input tasklog.Input) (tasklog.Output, error) {
var taskLogs []*core.TaskLog
for _, plugin := range append(t.plugins, t.dynamicPlugins...) {
o, err := plugin.GetTaskLogs(input)
if err != nil {
return tasklog.Output{}, err
}
taskLogs = append(taskLogs, o.TaskLogs...)
}
return tasklog.Output{TaskLogs: taskLogs}, nil
}
// InitializeLogPlugins initializes log plugin based on config.
func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) {
// Use a list to maintain order.
var plugins []tasklog.TemplateLogPlugin
var dynamicPlugins []tasklog.TemplateLogPlugin
if cfg.IsKubernetesEnabled {
if len(cfg.KubernetesTemplateURI) > 0 {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", TemplateURIs: []tasklog.TemplateURI{cfg.KubernetesTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, MessageFormat: core.TaskLog_JSON})
}
}
if cfg.IsCloudwatchEnabled {
if len(cfg.CloudwatchTemplateURI) > 0 {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", TemplateURIs: []tasklog.TemplateURI{cfg.CloudwatchTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, MessageFormat: core.TaskLog_JSON})
}
}
if cfg.IsStackDriverEnabled {
if len(cfg.StackDriverTemplateURI) > 0 {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", TemplateURIs: []tasklog.TemplateURI{cfg.StackDriverTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, MessageFormat: core.TaskLog_JSON})
}
}
for logLinkType, dynamicLogLink := range cfg.DynamicLogLinks {
dynamicPlugins = append(
dynamicPlugins,
tasklog.TemplateLogPlugin{
Name: logLinkType,
DisplayName: dynamicLogLink.DisplayName,
DynamicTemplateURIs: dynamicLogLink.TemplateURIs,
MessageFormat: core.TaskLog_JSON,
})
}
plugins = append(plugins, cfg.Templates...)
return templateLogPluginCollection{plugins: plugins, dynamicPlugins: dynamicPlugins}, nil
}