From 3ccf7f4904d7c0af791954b86dc234004a19f045 Mon Sep 17 00:00:00 2001 From: Anmol Khurana Date: Wed, 8 Jan 2020 16:08:26 -0800 Subject: [PATCH] Add logs links for Spark-Submit/other pods --- flyteplugins/go/tasks/plugins/k8s/spark/spark.go | 11 ++++++++++- flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go | 6 ++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/spark/spark.go b/flyteplugins/go/tasks/plugins/k8s/spark/spark.go index 81d86cff57..48d72c33c6 100755 --- a/flyteplugins/go/tasks/plugins/k8s/spark/spark.go +++ b/flyteplugins/go/tasks/plugins/k8s/spark/spark.go @@ -244,9 +244,18 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo, Name: "System Logs (via Cloudwatch)", MessageFormat: core.TaskLog_JSON, } + allUserLogs := core.TaskLog{ + Uri: fmt.Sprintf( + "https://console.aws.amazon.com/cloudwatch/home?region=%s#logStream:group=%s;prefix=var.log.containers.%s;streamFilter=typeLogStreamPrefix", + logConfig.CloudwatchRegion, + logConfig.CloudwatchLogGroup, + sj.Name), + Name: "Spark-Submit/All User Logs (via Cloudwatch)", + MessageFormat: core.TaskLog_JSON, + } taskLogs = append(taskLogs, &cwUserLogs) taskLogs = append(taskLogs, &cwSystemLogs) - + taskLogs = append(taskLogs, &allUserLogs) } // Spark UI. diff --git a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go index d7289f0a56..9cc60894c3 100755 --- a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go @@ -70,7 +70,8 @@ func TestGetEventInfo(t *testing.T) { assert.Equal(t, "k8s.com/#!/log/spark-namespace/spark-pod/pod?namespace=spark-namespace", info.Logs[0].Uri) assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-pod;streamFilter=typeLogStreamPrefix", info.Logs[1].Uri) assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=system_log.var.log.containers.spark-app-name;streamFilter=typeLogStreamPrefix", info.Logs[2].Uri) - assert.Equal(t, "https://spark-ui.flyte", info.Logs[3].Uri) + assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-app-name;streamFilter=typeLogStreamPrefix", info.Logs[3].Uri) + assert.Equal(t, "https://spark-ui.flyte", info.Logs[4].Uri) assert.NoError(t, setSparkConfig(&Config{ SparkHistoryServerURL: "spark-history.flyte", @@ -82,7 +83,8 @@ func TestGetEventInfo(t *testing.T) { assert.Equal(t, "k8s.com/#!/log/spark-namespace/spark-pod/pod?namespace=spark-namespace", info.Logs[0].Uri) assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-pod;streamFilter=typeLogStreamPrefix", info.Logs[1].Uri) assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=system_log.var.log.containers.spark-app-name;streamFilter=typeLogStreamPrefix", info.Logs[2].Uri) - assert.Equal(t, "spark-history.flyte/history/app-id", info.Logs[3].Uri) + assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-app-name;streamFilter=typeLogStreamPrefix", info.Logs[3].Uri) + assert.Equal(t, "spark-history.flyte/history/app-id", info.Logs[4].Uri) } func TestGetTaskPhase(t *testing.T) {