Skip to content

Commit

Permalink
Add More Template Strings for Email Notifications (#24)
Browse files Browse the repository at this point in the history
* Add more template strings for email notifications
  • Loading branch information
matthewphsmith authored Nov 5, 2019
1 parent 6a7b144 commit a72fb01
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 33 deletions.
100 changes: 86 additions & 14 deletions flyteadmin/pkg/async/notifications/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,108 @@ import (

"strings"

"github.com/lyft/flyteadmin/pkg/repositories/models"
runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
)

type GetTemplateValue func(admin.WorkflowExecutionEventRequest, *admin.Execution) string

const executionError = " The execution failed with error: [%s]."

const substitutionParam = "{{ %s }}"
const substitutionParamNoSpaces = "{{%s}}"
const project = "project"
const domain = "domain"
const name = "name"
const phase = "phase"
const errorPlaceholder = "error"
const workflowProject = "workflow.project"
const workflowDomain = "workflow.domain"
const workflowName = "workflow.name"
const workflowVersion = "workflow.version"
const launchPlanProject = "launch_plan.project"
const launchPlanDomain = "launch_plan.domain"
const launchPlanName = "launch_plan.name"
const launchPlanVersion = "launch_plan.version"
const replaceAllInstances = -1

func substituteEmailParameters(message string, request admin.WorkflowExecutionEventRequest, execution models.Execution) string {
response := strings.Replace(message, fmt.Sprintf(substitutionParam, project), execution.Project, replaceAllInstances)
response = strings.Replace(response, fmt.Sprintf(substitutionParam, domain), execution.Domain, replaceAllInstances)
response = strings.Replace(response, fmt.Sprintf(substitutionParam, name), execution.Name, replaceAllInstances)
response = strings.Replace(response, fmt.Sprintf(substitutionParam, phase),
strings.ToLower(request.Event.Phase.String()), replaceAllInstances)
func getProject(_ admin.WorkflowExecutionEventRequest, exec *admin.Execution) string {
return exec.Id.Project
}

func getDomain(_ admin.WorkflowExecutionEventRequest, exec *admin.Execution) string {
return exec.Id.Domain
}

func getName(_ admin.WorkflowExecutionEventRequest, exec *admin.Execution) string {
return exec.Id.Name
}

func getPhase(request admin.WorkflowExecutionEventRequest, _ *admin.Execution) string {
return strings.ToLower(request.Event.Phase.String())
}

func getError(request admin.WorkflowExecutionEventRequest, _ *admin.Execution) string {
if request.Event.GetError() != nil {
response = strings.Replace(response, fmt.Sprintf(substitutionParam, errorPlaceholder),
fmt.Sprintf(executionError, request.Event.GetError().Message), replaceAllInstances)
} else {
// Replace the optional error placeholder with an empty string.
response = strings.Replace(response, fmt.Sprintf(substitutionParam, errorPlaceholder), "", replaceAllInstances)
return fmt.Sprintf(executionError, request.Event.GetError().Message)
}
return ""
}

func getWorkflowProject(_ admin.WorkflowExecutionEventRequest, exec *admin.Execution) string {
return exec.Closure.WorkflowId.Project
}

return response
func getWorkflowDomain(_ admin.WorkflowExecutionEventRequest, exec *admin.Execution) string {
return exec.Closure.WorkflowId.Domain
}

func getWorkflowName(_ admin.WorkflowExecutionEventRequest, exec *admin.Execution) string {
return exec.Closure.WorkflowId.Name
}

func getWorkflowVersion(_ admin.WorkflowExecutionEventRequest, exec *admin.Execution) string {
return exec.Closure.WorkflowId.Version
}

func getLaunchPlanProject(_ admin.WorkflowExecutionEventRequest, exec *admin.Execution) string {
return exec.Spec.LaunchPlan.Project
}

func getLaunchPlanDomain(_ admin.WorkflowExecutionEventRequest, exec *admin.Execution) string {
return exec.Spec.LaunchPlan.Domain
}

func getLaunchPlanName(_ admin.WorkflowExecutionEventRequest, exec *admin.Execution) string {
return exec.Spec.LaunchPlan.Name
}

func getLaunchPlanVersion(_ admin.WorkflowExecutionEventRequest, exec *admin.Execution) string {
return exec.Spec.LaunchPlan.Version
}

var getTemplateValueFuncs = map[string]GetTemplateValue{
project: getProject,
domain: getDomain,
name: getName,
phase: getPhase,
errorPlaceholder: getError,
workflowProject: getWorkflowProject,
workflowDomain: getWorkflowDomain,
workflowName: getWorkflowName,
workflowVersion: getWorkflowVersion,
launchPlanProject: getLaunchPlanProject,
launchPlanDomain: getLaunchPlanDomain,
launchPlanName: getLaunchPlanName,
launchPlanVersion: getLaunchPlanVersion,
}

func substituteEmailParameters(message string, request admin.WorkflowExecutionEventRequest, execution *admin.Execution) string {
for template, function := range getTemplateValueFuncs {
message = strings.Replace(message, fmt.Sprintf(substitutionParam, template), function(request, execution), replaceAllInstances)
message = strings.Replace(message, fmt.Sprintf(substitutionParamNoSpaces, template), function(request, execution), replaceAllInstances)
}
return message
}

// Converts a terminal execution event and existing execution model to an admin.EmailMessage proto, substituting parameters
Expand All @@ -43,7 +115,7 @@ func ToEmailMessageFromWorkflowExecutionEvent(
config runtimeInterfaces.NotificationsConfig,
emailNotification admin.EmailNotification,
request admin.WorkflowExecutionEventRequest,
execution models.Execution) *admin.EmailMessage {
execution *admin.Execution) *admin.EmailMessage {

return &admin.EmailMessage{
SubjectLine: substituteEmailParameters(config.NotificationsEmailerConfig.Subject, request, execution),
Expand Down
118 changes: 100 additions & 18 deletions flyteadmin/pkg/async/notifications/email_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,127 @@ import (
"fmt"
"testing"

"strings"

"github.com/gogo/protobuf/proto"
"github.com/lyft/flyteadmin/pkg/repositories/models"
runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
"github.com/stretchr/testify/assert"
)

const executionProjectValue = "proj"
const executionDomainValue = "prod"
const executionNameValue = "e124"
const launchPlanProjectValue = "lp_proj"
const launchPlanDomainValue = "lp_domain"
const launchPlanNameValue = "lp_name"
const launchPlanVersionValue = "lp_version"
const workflowProjectValue = "wf_proj"
const workflowDomainValue = "wf_domain"
const workflowNameValue = "wf_name"
const workflowVersionValue = "wf_version"

var workflowExecution = &admin.Execution{
Id: &core.WorkflowExecutionIdentifier{
Project: executionProjectValue,
Domain: executionDomainValue,
Name: executionNameValue,
},
Spec: &admin.ExecutionSpec{
LaunchPlan: &core.Identifier{
Project: launchPlanProjectValue,
Domain: launchPlanDomainValue,
Name: launchPlanNameValue,
Version: launchPlanVersionValue,
},
},
Closure: &admin.ExecutionClosure{
WorkflowId: &core.Identifier{
Project: workflowProjectValue,
Domain: workflowDomainValue,
Name: workflowNameValue,
Version: workflowVersionValue,
},
Phase: core.WorkflowExecution_SUCCEEDED,
},
}

func TestSubstituteEmailParameters(t *testing.T) {
message := "{{ unused }}. {{project }} and {{ domain }} and {{ name }} ended up in {{ phase }}.{{ error }}"
request := admin.WorkflowExecutionEventRequest{
Event: &event.WorkflowExecutionEvent{
Phase: core.WorkflowExecution_SUCCEEDED,
},
}
model := models.Execution{
ExecutionKey: models.ExecutionKey{
Project: "proj",
Domain: "prod",
Name: "e124",
},
}
assert.Equal(t, "{{ unused }}. {{project }} and prod and e124 ended up in succeeded.",
substituteEmailParameters(message, request, model))
substituteEmailParameters(message, request, workflowExecution))
request.Event.OutputResult = &event.WorkflowExecutionEvent_Error{
Error: &core.ExecutionError{
Message: "uh-oh",
},
}
assert.Equal(t, "{{ unused }}. {{project }} and prod and e124 ended up in succeeded. The execution failed with error: [uh-oh].",
substituteEmailParameters(message, request, model))
substituteEmailParameters(message, request, workflowExecution))
}

func TestSubstituteAllTemplates(t *testing.T) {
templateVars := map[string]string{
fmt.Sprintf(substitutionParam, project): executionProjectValue,
fmt.Sprintf(substitutionParam, domain): executionDomainValue,
fmt.Sprintf(substitutionParam, name): executionNameValue,
fmt.Sprintf(substitutionParam, launchPlanProject): launchPlanProjectValue,
fmt.Sprintf(substitutionParam, launchPlanDomain): launchPlanDomainValue,
fmt.Sprintf(substitutionParam, launchPlanName): launchPlanNameValue,
fmt.Sprintf(substitutionParam, launchPlanVersion): launchPlanVersionValue,
fmt.Sprintf(substitutionParam, workflowProject): workflowProjectValue,
fmt.Sprintf(substitutionParam, workflowDomain): workflowDomainValue,
fmt.Sprintf(substitutionParam, workflowName): workflowNameValue,
fmt.Sprintf(substitutionParam, workflowVersion): workflowVersionValue,
fmt.Sprintf(substitutionParam, phase): strings.ToLower(core.WorkflowExecution_SUCCEEDED.String()),
}
var messageTemplate, desiredResult []string
for template, result := range templateVars {
messageTemplate = append(messageTemplate, template)
desiredResult = append(desiredResult, result)
}
request := admin.WorkflowExecutionEventRequest{
Event: &event.WorkflowExecutionEvent{
Phase: core.WorkflowExecution_SUCCEEDED,
},
}
assert.Equal(t, strings.Join(desiredResult, ","),
substituteEmailParameters(strings.Join(messageTemplate, ","), request, workflowExecution))
}

func TestSubstituteAllTemplatesNoSpaces(t *testing.T) {
templateVars := map[string]string{
fmt.Sprintf(substitutionParamNoSpaces, project): executionProjectValue,
fmt.Sprintf(substitutionParamNoSpaces, domain): executionDomainValue,
fmt.Sprintf(substitutionParamNoSpaces, name): executionNameValue,
fmt.Sprintf(substitutionParamNoSpaces, launchPlanProject): launchPlanProjectValue,
fmt.Sprintf(substitutionParamNoSpaces, launchPlanDomain): launchPlanDomainValue,
fmt.Sprintf(substitutionParamNoSpaces, launchPlanName): launchPlanNameValue,
fmt.Sprintf(substitutionParamNoSpaces, launchPlanVersion): launchPlanVersionValue,
fmt.Sprintf(substitutionParamNoSpaces, workflowProject): workflowProjectValue,
fmt.Sprintf(substitutionParamNoSpaces, workflowDomain): workflowDomainValue,
fmt.Sprintf(substitutionParamNoSpaces, workflowName): workflowNameValue,
fmt.Sprintf(substitutionParamNoSpaces, workflowVersion): workflowVersionValue,
fmt.Sprintf(substitutionParamNoSpaces, phase): strings.ToLower(core.WorkflowExecution_SUCCEEDED.String()),
}
var messageTemplate, desiredResult []string
for template, result := range templateVars {
messageTemplate = append(messageTemplate, template)
desiredResult = append(desiredResult, result)
}
request := admin.WorkflowExecutionEventRequest{
Event: &event.WorkflowExecutionEvent{
Phase: core.WorkflowExecution_SUCCEEDED,
},
}
assert.Equal(t, strings.Join(desiredResult, ","),
substituteEmailParameters(strings.Join(messageTemplate, ","), request, workflowExecution))
}

func TestToEmailMessageFromWorkflowExecutionEvent(t *testing.T) {
Expand All @@ -58,14 +147,7 @@ func TestToEmailMessageFromWorkflowExecutionEvent(t *testing.T) {
Phase: core.WorkflowExecution_ABORTED,
},
}
model := models.Execution{
ExecutionKey: models.ExecutionKey{
Project: "proj",
Domain: "prod",
Name: "e124",
},
}
emailMessage := ToEmailMessageFromWorkflowExecutionEvent(notificationsConfig, emailNotification, request, model)
emailMessage := ToEmailMessageFromWorkflowExecutionEvent(notificationsConfig, emailNotification, request, workflowExecution)
assert.True(t, proto.Equal(emailMessage, &admin.EmailMessage{
RecipientsEmail: []string{
"[email protected]", "[email protected]",
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ func (m *ExecutionManager) publishNotifications(ctx context.Context, request adm
// Currently there are no possible errors while creating an email message.
// Once customizable content is specified, errors are possible.
email := notifications.ToEmailMessageFromWorkflowExecutionEvent(
*m.config.ApplicationConfiguration().GetNotificationsConfig(), emailNotification, request, execution)
*m.config.ApplicationConfiguration().GetNotificationsConfig(), emailNotification, request, adminExecution)
// Errors seen while publishing a message are considered non-fatal to the method and will not result
// in the method returning an error.
if err = m.notificationClient.Publish(ctx, proto.MessageName(&emailNotification), email); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,13 @@ func TestExecutionManager_PublishNotifications(t *testing.T) {
}
var execClosure = admin.ExecutionClosure{
Notifications: testutils.GetExecutionRequest().Spec.GetNotifications().Notifications,
WorkflowId: &core.Identifier{
ResourceType: core.ResourceType_WORKFLOW,
Project: "wf_project",
Domain: "wf_domain",
Name: "wf_name",
Version: "wf_version",
},
}
var extraNotifications = []*admin.Notification{
{
Expand Down Expand Up @@ -1618,6 +1625,7 @@ func TestExecutionManager_PublishNotifications(t *testing.T) {
LaunchPlanID: uint(1),
WorkflowID: uint(2),
Closure: execClosureBytes,
Spec: specBytes,
}
assert.Nil(t, myExecManager.publishNotifications(context.Background(), workflowRequest, executionModel))
}
Expand Down Expand Up @@ -1709,6 +1717,13 @@ func TestExecutionManager_TestExecutionManager_PublishNotificationsTransformErro
}
var execClosure = admin.ExecutionClosure{
Notifications: testutils.GetExecutionRequest().Spec.GetNotifications().Notifications,
WorkflowId: &core.Identifier{
ResourceType: core.ResourceType_WORKFLOW,
Project: "wf_project",
Domain: "wf_domain",
Name: "wf_name",
Version: "wf_version",
},
}
execClosureBytes, _ := proto.Marshal(&execClosure)
executionModel := models.Execution{
Expand All @@ -1721,6 +1736,7 @@ func TestExecutionManager_TestExecutionManager_PublishNotificationsTransformErro
LaunchPlanID: uint(1),
WorkflowID: uint(2),
Closure: execClosureBytes,
Spec: specBytes,
}
assert.Nil(t, myExecManager.publishNotifications(context.Background(), workflowRequest, executionModel))

Expand Down

0 comments on commit a72fb01

Please sign in to comment.