diff --git a/auth/handlers.go b/auth/handlers.go index 43bf3cc01f..26c9469c01 100644 --- a/auth/handlers.go +++ b/auth/handlers.go @@ -12,7 +12,6 @@ import ( "golang.org/x/oauth2" - "github.com/flyteorg/flyteadmin/pkg/audit" "github.com/flyteorg/flyteadmin/pkg/common" "google.golang.org/grpc/peer" @@ -35,6 +34,13 @@ const ( type HTTPRequestToMetadataAnnotator func(ctx context.Context, request *http.Request) metadata.MD +type AuthenticatedClientMeta struct { + ClientIds []string + TokenIssuedAt time.Time + ClientIP string + Subject string +} + func RegisterHandlers(ctx context.Context, handler interfaces.HandlerRegisterer, authCtx interfaces.AuthenticationContext) { // Add HTTP handlers for OAuth2 endpoints handler.HandleFunc("/login", RefreshTokensIfExists(ctx, authCtx, @@ -180,7 +186,11 @@ func AuthenticationLoggingInterceptor(ctx context.Context, req interface{}, info // Invoke 'handler' to use your gRPC server implementation and get // the response. identityContext := IdentityContextFromContext(ctx) - logger.Debugf(ctx, "gRPC server info in logging interceptor [%s] method [%s]\n", identityContext.UserID(), info.FullMethod) + var emailPlaceholder string + if len(identityContext.UserInfo().GetEmail()) > 0 { + emailPlaceholder = fmt.Sprintf(" (%s) ", identityContext.UserInfo().GetEmail()) + } + logger.Debugf(ctx, "gRPC server info in logging interceptor [%s]%smethod [%s]\n", identityContext.UserID(), emailPlaceholder, info.FullMethod) return handler(ctx, req) } @@ -214,7 +224,7 @@ func SetContextForIdentity(ctx context.Context, identityContext interfaces.Ident email := identityContext.UserInfo().GetEmail() newCtx := identityContext.WithContext(ctx) if len(email) > 0 { - newCtx = WithUserEmail(newCtx, identityContext.UserID()) + newCtx = WithUserEmail(newCtx, email) } return WithAuditFields(newCtx, identityContext.UserID(), []string{identityContext.AppID()}, identityContext.AuthenticatedAt()) @@ -263,7 +273,7 @@ func WithAuditFields(ctx context.Context, subject string, clientIds []string, to if ok { clientIP = peerInfo.Addr.String() } - return context.WithValue(ctx, common.AuditFieldsContextKey, audit.AuthenticatedClientMeta{ + return context.WithValue(ctx, common.AuditFieldsContextKey, AuthenticatedClientMeta{ ClientIds: clientIds, TokenIssuedAt: tokenIssuedAt, ClientIP: clientIP, diff --git a/pkg/audit/common.go b/pkg/audit/common.go deleted file mode 100644 index 79941ac057..0000000000 --- a/pkg/audit/common.go +++ /dev/null @@ -1,12 +0,0 @@ -package audit - -import ( - "time" -) - -type AuthenticatedClientMeta struct { - ClientIds []string - TokenIssuedAt time.Time - ClientIP string - Subject string -} diff --git a/pkg/audit/log.go b/pkg/audit/log.go deleted file mode 100644 index f62411b422..0000000000 --- a/pkg/audit/log.go +++ /dev/null @@ -1,96 +0,0 @@ -package audit - -import ( - "context" - "encoding/json" - "fmt" - "time" - - "github.com/flyteorg/flyteadmin/pkg/common" - "github.com/flyteorg/flyteadmin/pkg/errors" - "github.com/flyteorg/flytestdlib/logger" - "google.golang.org/grpc/codes" -) - -type LogBuilder interface { - WithAuthenticatedCtx(ctx context.Context) LogBuilder - WithRequest(method string, parameters map[string]string, mode AccessMode, requestedAt time.Time) LogBuilder - WithResponse(sentAt time.Time, err error) LogBuilder - Log(ctx context.Context) -} - -type logBuilder struct { - auditLog Message - readOnly bool -} - -func (b *logBuilder) WithAuthenticatedCtx(ctx context.Context) LogBuilder { - clientMeta := ctx.Value(common.AuditFieldsContextKey) - switch m := clientMeta.(type) { - case AuthenticatedClientMeta: - b.auditLog.Principal = Principal{ - Subject: m.Subject, - TokenIssuedAt: m.TokenIssuedAt, - } - if len(m.ClientIds) > 0 { - b.auditLog.Principal.ClientID = m.ClientIds[0] - } - b.auditLog.Client = Client{ - ClientIP: m.ClientIP, - } - default: - logger.Warningf(ctx, "Failed to parse authenticated client metadata when creating audit log") - } - return b -} - -// TODO: Also look into passing down HTTP verb -func (b *logBuilder) WithRequest(method string, parameters map[string]string, mode AccessMode, - requestedAt time.Time) LogBuilder { - b.auditLog.Request = Request{ - Method: method, - Parameters: parameters, - Mode: mode, - ReceivedAt: requestedAt, - } - return b -} - -func (b *logBuilder) WithResponse(sentAt time.Time, err error) LogBuilder { - responseCode := codes.OK.String() - if err != nil { - switch err := err.(type) { - case errors.FlyteAdminError: - responseCode = err.Code().String() - default: - responseCode = codes.Internal.String() - } - } - b.auditLog.Response = Response{ - ResponseCode: responseCode, - SentAt: sentAt, - } - return b -} - -func (b *logBuilder) formatLogString(ctx context.Context) string { - auditLog, err := json.Marshal(&b.auditLog) - if err != nil { - logger.Warningf(ctx, "Failed to marshal audit log to protobuf with err: %v", err) - } - return fmt.Sprintf("Recording request: [%s]", auditLog) -} - -func (b *logBuilder) Log(ctx context.Context) { - if b.readOnly { - logger.Warningf(ctx, "Attempting to record audit log for request: [%+v] more than once. Aborting.", b.auditLog.Request) - } - defer func() { - b.readOnly = true - }() - logger.Info(ctx, b.formatLogString(ctx)) -} - -func NewLogBuilder() LogBuilder { - return &logBuilder{} -} diff --git a/pkg/audit/log_test.go b/pkg/audit/log_test.go deleted file mode 100644 index 02faeb11f7..0000000000 --- a/pkg/audit/log_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package audit - -import ( - "context" - "testing" - "time" - - "github.com/flyteorg/flyteadmin/pkg/common" - "github.com/flyteorg/flyteadmin/pkg/errors" - "github.com/stretchr/testify/assert" - "google.golang.org/grpc/codes" -) - -func TestLogBuilderLog(t *testing.T) { - ctx := context.Background() - ctx = context.WithValue(ctx, common.PrincipalContextKey, "prince") - tokenIssuedAt := time.Date(2020, time.January, 5, 10, 15, 0, 0, time.UTC) - requestedAt := time.Date(2020, time.January, 5, 10, 30, 0, 0, time.UTC) - sentAt := time.Date(2020, time.January, 5, 10, 31, 0, 0, time.UTC) - err := errors.NewFlyteAdminError(codes.AlreadyExists, "womp womp") - ctx = context.WithValue(ctx, common.AuditFieldsContextKey, AuthenticatedClientMeta{ - ClientIds: []string{"12345"}, - TokenIssuedAt: tokenIssuedAt, - ClientIP: "192.0.2.1:25", - Subject: "prince", - }) - builder := NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "my_method", map[string]string{ - "my": "params", - }, ReadWrite, requestedAt).WithResponse(sentAt, err) - assert.EqualValues(t, "Recording request: [{\"Principal\":{\"Subject\":\"prince\",\"ClientID\":\"12345\","+ - "\"TokenIssuedAt\":\"2020-01-05T10:15:00Z\"},\"Client\":{\"ClientIP\":\"192.0.2.1:25\"},\"Request\":"+ - "{\"Method\":\"my_method\",\"Parameters\":{\"my\":\"params\"},\"Mode\":1,\"ReceivedAt\":"+ - "\"2020-01-05T10:30:00Z\"},\"Response\":{\"ResponseCode\":\"AlreadyExists\",\"SentAt\":"+ - "\"2020-01-05T10:31:00Z\"}}]", builder.(*logBuilder).formatLogString(context.TODO())) -} diff --git a/pkg/audit/message.go b/pkg/audit/message.go deleted file mode 100644 index 733264acca..0000000000 --- a/pkg/audit/message.go +++ /dev/null @@ -1,55 +0,0 @@ -package audit - -import "time" - -type Principal struct { - // Identifies authenticated end-user - Subject string - - // The client that initiated the auth flow. - ClientID string - - TokenIssuedAt time.Time -} - -type Client struct { - ClientIP string -} - -type AccessMode int - -const ( - ReadOnly AccessMode = iota - ReadWrite -) - -// Details about a specific request issued by a user. -type Request struct { - // Service method endpoint e.g. GetWorkflowExecution - Method string - - // Includes parameters submitted in the request. - Parameters map[string]string - - Mode AccessMode - - ReceivedAt time.Time -} - -// Summary of service response details. -type Response struct { - // e.g. gRPC status code - ResponseCode string - - SentAt time.Time -} - -type Message struct { - Principal Principal - - Client Client - - Request Request - - Response Response -} diff --git a/pkg/audit/util.go b/pkg/audit/util.go deleted file mode 100644 index b727ca96d3..0000000000 --- a/pkg/audit/util.go +++ /dev/null @@ -1,93 +0,0 @@ -package audit - -import ( - "fmt" - - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" -) - -type requestParameters = map[string]string - -const ( - Project = "project" - Domain = "domain" - Name = "name" - Version = "version" - NodeID = "node_id" - RetryAttempt = "retry_attempt" - ResourceType = "resource" - - TaskProject = "task_project" - TaskDomain = "task_domain" - TaskName = "task_name" - TaskVersion = "task_version" -) - -func ParametersFromIdentifier(identifier *core.Identifier) requestParameters { - if identifier == nil { - return requestParameters{} - } - return requestParameters{ - Project: identifier.Project, - Domain: identifier.Domain, - Name: identifier.Name, - Version: identifier.Version, - } -} - -func ParametersFromNamedEntityIdentifier(identifier *admin.NamedEntityIdentifier) requestParameters { - if identifier == nil { - return requestParameters{} - } - return requestParameters{ - Project: identifier.Project, - Domain: identifier.Domain, - Name: identifier.Name, - } -} - -func ParametersFromNamedEntityIdentifierAndResource(identifier *admin.NamedEntityIdentifier, resourceType core.ResourceType) requestParameters { - if identifier == nil { - return requestParameters{} - } - parameters := ParametersFromNamedEntityIdentifier(identifier) - parameters[ResourceType] = resourceType.String() - return parameters -} - -func ParametersFromExecutionIdentifier(identifier *core.WorkflowExecutionIdentifier) requestParameters { - if identifier == nil { - return requestParameters{} - } - return requestParameters{ - Project: identifier.Project, - Domain: identifier.Domain, - Name: identifier.Name, - } -} - -func ParametersFromNodeExecutionIdentifier(identifier *core.NodeExecutionIdentifier) requestParameters { - if identifier == nil || identifier.ExecutionId == nil { - return requestParameters{} - } - return requestParameters{ - Project: identifier.ExecutionId.Project, - Domain: identifier.ExecutionId.Domain, - Name: identifier.ExecutionId.Name, - NodeID: identifier.NodeId, - } -} - -func ParametersFromTaskExecutionIdentifier(identifier *core.TaskExecutionIdentifier) requestParameters { - if identifier == nil || identifier.NodeExecutionId == nil || identifier.TaskId == nil { - return requestParameters{} - } - params := ParametersFromNodeExecutionIdentifier(identifier.NodeExecutionId) - params[RetryAttempt] = fmt.Sprint(identifier.RetryAttempt) - params[TaskProject] = identifier.TaskId.Project - params[TaskDomain] = identifier.TaskId.Domain - params[TaskName] = identifier.TaskId.Name - params[TaskVersion] = identifier.TaskId.Version - return params -} diff --git a/pkg/audit/util_test.go b/pkg/audit/util_test.go deleted file mode 100644 index 9d9acbbee7..0000000000 --- a/pkg/audit/util_test.go +++ /dev/null @@ -1,106 +0,0 @@ -package audit - -import ( - "testing" - - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/stretchr/testify/assert" -) - -func TestParametersFromIdentifier(t *testing.T) { - assert.EqualValues(t, map[string]string{ - "project": "proj", - "domain": "development", - "name": "foo", - "version": "123", - }, ParametersFromIdentifier(&core.Identifier{ - Project: "proj", - Domain: "development", - Name: "foo", - Version: "123", - })) -} - -func TestParametersFromNamedEntityIdentifier(t *testing.T) { - assert.EqualValues(t, map[string]string{ - "project": "proj", - "domain": "development", - "name": "foo", - }, ParametersFromNamedEntityIdentifier(&admin.NamedEntityIdentifier{ - Project: "proj", - Domain: "development", - Name: "foo", - })) -} - -func TestParametersFromNamedEntityIdentifierAndResource(t *testing.T) { - assert.EqualValues(t, map[string]string{ - "project": "proj", - "domain": "development", - "name": "foo", - "resource": "LAUNCH_PLAN", - }, ParametersFromNamedEntityIdentifierAndResource(&admin.NamedEntityIdentifier{ - Project: "proj", - Domain: "development", - Name: "foo", - }, core.ResourceType_LAUNCH_PLAN)) -} - -func TestParametersFromExecutionIdentifier(t *testing.T) { - assert.EqualValues(t, map[string]string{ - "project": "proj", - "domain": "development", - "name": "foo", - }, ParametersFromExecutionIdentifier(&core.WorkflowExecutionIdentifier{ - Project: "proj", - Domain: "development", - Name: "foo", - })) -} - -func TestParametersFromNodeExecutionIdentifier(t *testing.T) { - assert.EqualValues(t, map[string]string{ - "project": "proj", - "domain": "development", - "name": "foo", - "node_id": "nodey", - }, ParametersFromNodeExecutionIdentifier(&core.NodeExecutionIdentifier{ - NodeId: "nodey", - ExecutionId: &core.WorkflowExecutionIdentifier{ - Project: "proj", - Domain: "development", - Name: "foo", - }, - })) -} - -func TestParametersFromTaskExecutionIdentifier(t *testing.T) { - assert.EqualValues(t, map[string]string{ - "project": "proj", - "domain": "development", - "name": "foo", - "node_id": "nodey", - "retry_attempt": "1", - "task_project": "proj2", - "task_domain": "production", - "task_name": "bar", - "task_version": "version", - }, ParametersFromTaskExecutionIdentifier(&core.TaskExecutionIdentifier{ - TaskId: &core.Identifier{ - Project: "proj2", - Domain: "production", - Name: "bar", - Version: "version", - }, - NodeExecutionId: &core.NodeExecutionIdentifier{ - NodeId: "nodey", - ExecutionId: &core.WorkflowExecutionIdentifier{ - Project: "proj", - Domain: "development", - Name: "foo", - }, - }, - RetryAttempt: 1, - })) -} diff --git a/pkg/rpc/adminservice/attributes.go b/pkg/rpc/adminservice/attributes.go index 477379c600..f65df0ed29 100644 --- a/pkg/rpc/adminservice/attributes.go +++ b/pkg/rpc/adminservice/attributes.go @@ -2,9 +2,6 @@ package adminservice import ( "context" - "time" - - "github.com/flyteorg/flyteadmin/pkg/audit" "github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/util" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" @@ -15,7 +12,6 @@ import ( func (m *AdminService) UpdateWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesUpdateRequest) ( *admin.WorkflowAttributesUpdateResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -24,16 +20,6 @@ func (m *AdminService) UpdateWorkflowAttributes(ctx context.Context, request *ad m.Metrics.workflowAttributesEndpointMetrics.update.Time(func() { response, err = m.ResourceManager.UpdateWorkflowAttributes(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "UpdateWorkflowAttributes", - map[string]string{ - audit.Project: request.Attributes.Project, - audit.Domain: request.Attributes.Domain, - audit.Name: request.Attributes.Workflow, - }, - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowAttributesEndpointMetrics.update) } @@ -44,7 +30,6 @@ func (m *AdminService) UpdateWorkflowAttributes(ctx context.Context, request *ad func (m *AdminService) GetWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesGetRequest) ( *admin.WorkflowAttributesGetResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -53,16 +38,6 @@ func (m *AdminService) GetWorkflowAttributes(ctx context.Context, request *admin m.Metrics.workflowAttributesEndpointMetrics.get.Time(func() { response, err = m.ResourceManager.GetWorkflowAttributes(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "GetWorkflowAttributes", - map[string]string{ - audit.Project: request.Project, - audit.Domain: request.Domain, - audit.Name: request.Workflow, - }, - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowAttributesEndpointMetrics.get) } @@ -73,7 +48,6 @@ func (m *AdminService) GetWorkflowAttributes(ctx context.Context, request *admin func (m *AdminService) DeleteWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesDeleteRequest) ( *admin.WorkflowAttributesDeleteResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -82,16 +56,6 @@ func (m *AdminService) DeleteWorkflowAttributes(ctx context.Context, request *ad m.Metrics.workflowAttributesEndpointMetrics.delete.Time(func() { response, err = m.ResourceManager.DeleteWorkflowAttributes(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "DeleteWorkflowAttributes", - map[string]string{ - audit.Project: request.Project, - audit.Domain: request.Domain, - audit.Name: request.Workflow, - }, - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowAttributesEndpointMetrics.delete) } @@ -102,7 +66,6 @@ func (m *AdminService) DeleteWorkflowAttributes(ctx context.Context, request *ad func (m *AdminService) UpdateProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesUpdateRequest) ( *admin.ProjectDomainAttributesUpdateResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -111,15 +74,6 @@ func (m *AdminService) UpdateProjectDomainAttributes(ctx context.Context, reques m.Metrics.projectDomainAttributesEndpointMetrics.update.Time(func() { response, err = m.ResourceManager.UpdateProjectDomainAttributes(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "UpdateProjectDomainAttributes", - map[string]string{ - audit.Project: request.Attributes.Project, - audit.Domain: request.Attributes.Domain, - }, - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.projectDomainAttributesEndpointMetrics.update) } @@ -130,7 +84,6 @@ func (m *AdminService) UpdateProjectDomainAttributes(ctx context.Context, reques func (m *AdminService) GetProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesGetRequest) ( *admin.ProjectDomainAttributesGetResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -139,15 +92,6 @@ func (m *AdminService) GetProjectDomainAttributes(ctx context.Context, request * m.Metrics.workflowAttributesEndpointMetrics.get.Time(func() { response, err = m.ResourceManager.GetProjectDomainAttributes(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "GetProjectDomainAttributes", - map[string]string{ - audit.Project: request.Project, - audit.Domain: request.Domain, - }, - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowAttributesEndpointMetrics.get) } @@ -158,7 +102,6 @@ func (m *AdminService) GetProjectDomainAttributes(ctx context.Context, request * func (m *AdminService) DeleteProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesDeleteRequest) ( *admin.ProjectDomainAttributesDeleteResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -167,15 +110,6 @@ func (m *AdminService) DeleteProjectDomainAttributes(ctx context.Context, reques m.Metrics.workflowAttributesEndpointMetrics.delete.Time(func() { response, err = m.ResourceManager.DeleteProjectDomainAttributes(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "DeleteProjectDomainAttributes", - map[string]string{ - audit.Project: request.Project, - audit.Domain: request.Domain, - }, - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowAttributesEndpointMetrics.delete) } @@ -186,7 +120,6 @@ func (m *AdminService) DeleteProjectDomainAttributes(ctx context.Context, reques func (m *AdminService) ListMatchableAttributes(ctx context.Context, request *admin.ListMatchableAttributesRequest) ( *admin.ListMatchableAttributesResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -195,14 +128,6 @@ func (m *AdminService) ListMatchableAttributes(ctx context.Context, request *adm m.Metrics.matchableAttributesEndpointMetrics.list.Time(func() { response, err = m.ResourceManager.ListAll(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListMatchableAttributes", - map[string]string{ - audit.ResourceType: request.ResourceType.String(), - }, - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.matchableAttributesEndpointMetrics.list) } diff --git a/pkg/rpc/adminservice/execution.go b/pkg/rpc/adminservice/execution.go index 5a3d21f04a..57680a58f5 100644 --- a/pkg/rpc/adminservice/execution.go +++ b/pkg/rpc/adminservice/execution.go @@ -4,8 +4,6 @@ import ( "context" "time" - "github.com/flyteorg/flyteadmin/pkg/audit" - "github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/util" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "google.golang.org/grpc/codes" @@ -24,16 +22,6 @@ func (m *AdminService) CreateExecution( m.Metrics.executionEndpointMetrics.create.Time(func() { response, err = m.ExecutionManager.CreateExecution(ctx, *request, requestedAt) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ExecutionCreateRequest", - map[string]string{ - audit.Project: request.Project, - audit.Domain: request.Domain, - audit.Name: request.Name, - }, - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.create) } @@ -53,12 +41,6 @@ func (m *AdminService) RelaunchExecution( m.Metrics.executionEndpointMetrics.relaunch.Time(func() { response, err = m.ExecutionManager.RelaunchExecution(ctx, *request, requestedAt) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ExecutionCreateRequest", - audit.ParametersFromExecutionIdentifier(request.Id), - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.relaunch) } @@ -78,12 +60,6 @@ func (m *AdminService) RecoverExecution( m.Metrics.executionEndpointMetrics.recover.Time(func() { response, err = m.ExecutionManager.RecoverExecution(ctx, *request, requestedAt) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ExecutionCreateRequest", - audit.ParametersFromExecutionIdentifier(request.Id), - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.relaunch) } @@ -94,7 +70,6 @@ func (m *AdminService) RecoverExecution( func (m *AdminService) CreateWorkflowEvent( ctx context.Context, request *admin.WorkflowExecutionEventRequest) (*admin.WorkflowExecutionEventResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -103,12 +78,6 @@ func (m *AdminService) CreateWorkflowEvent( m.Metrics.executionEndpointMetrics.createEvent.Time(func() { response, err = m.ExecutionManager.CreateWorkflowEvent(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "CreateWorkflowEvent", - audit.ParametersFromExecutionIdentifier(request.Event.ExecutionId), - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.createEvent) } @@ -120,7 +89,6 @@ func (m *AdminService) CreateWorkflowEvent( func (m *AdminService) GetExecution( ctx context.Context, request *admin.WorkflowExecutionGetRequest) (*admin.Execution, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -129,12 +97,6 @@ func (m *AdminService) GetExecution( m.Metrics.executionEndpointMetrics.get.Time(func() { response, err = m.ExecutionManager.GetExecution(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "GetExecution", - audit.ParametersFromExecutionIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.get) } @@ -154,12 +116,6 @@ func (m *AdminService) UpdateExecution( m.Metrics.executionEndpointMetrics.update.Time(func() { response, err = m.ExecutionManager.UpdateExecution(ctx, *request, requestedAt) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "UpdateExecution", - audit.ParametersFromExecutionIdentifier(request.Id), - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.update) } @@ -170,7 +126,6 @@ func (m *AdminService) UpdateExecution( func (m *AdminService) GetExecutionData( ctx context.Context, request *admin.WorkflowExecutionGetDataRequest) (*admin.WorkflowExecutionGetDataResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -179,12 +134,6 @@ func (m *AdminService) GetExecutionData( m.Metrics.executionEndpointMetrics.get.Time(func() { response, err = m.ExecutionManager.GetExecutionData(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "GetExecutionData", - audit.ParametersFromExecutionIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.getData) } @@ -195,7 +144,6 @@ func (m *AdminService) GetExecutionData( func (m *AdminService) ListExecutions( ctx context.Context, request *admin.ResourceListRequest) (*admin.ExecutionList, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -204,12 +152,6 @@ func (m *AdminService) ListExecutions( m.Metrics.executionEndpointMetrics.list.Time(func() { response, err = m.ExecutionManager.ListExecutions(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListExecutions", - audit.ParametersFromNamedEntityIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.list) } @@ -220,7 +162,6 @@ func (m *AdminService) ListExecutions( func (m *AdminService) TerminateExecution( ctx context.Context, request *admin.ExecutionTerminateRequest) (*admin.ExecutionTerminateResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -229,12 +170,6 @@ func (m *AdminService) TerminateExecution( m.Metrics.executionEndpointMetrics.terminate.Time(func() { response, err = m.ExecutionManager.TerminateExecution(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "TerminateExecution", - audit.ParametersFromExecutionIdentifier(request.Id), - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.terminate) } diff --git a/pkg/rpc/adminservice/launch_plan.go b/pkg/rpc/adminservice/launch_plan.go index 8c20ebb6b6..6a7dae4f2a 100644 --- a/pkg/rpc/adminservice/launch_plan.go +++ b/pkg/rpc/adminservice/launch_plan.go @@ -2,9 +2,6 @@ package adminservice import ( "context" - "time" - - "github.com/flyteorg/flyteadmin/pkg/audit" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/logger" @@ -18,7 +15,6 @@ import ( func (m *AdminService) CreateLaunchPlan( ctx context.Context, request *admin.LaunchPlanCreateRequest) (*admin.LaunchPlanCreateResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -27,12 +23,6 @@ func (m *AdminService) CreateLaunchPlan( m.Metrics.launchPlanEndpointMetrics.create.Time(func() { response, err = m.LaunchPlanManager.CreateLaunchPlan(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "CreateLaunchPlan", - audit.ParametersFromIdentifier(request.Id), - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.create) } @@ -42,7 +32,6 @@ func (m *AdminService) CreateLaunchPlan( func (m *AdminService) GetLaunchPlan(ctx context.Context, request *admin.ObjectGetRequest) (*admin.LaunchPlan, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -57,12 +46,6 @@ func (m *AdminService) GetLaunchPlan(ctx context.Context, request *admin.ObjectG m.Metrics.launchPlanEndpointMetrics.get.Time(func() { response, err = m.LaunchPlanManager.GetLaunchPlan(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "GetLaunchPlan", - audit.ParametersFromIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.get) } @@ -73,7 +56,6 @@ func (m *AdminService) GetLaunchPlan(ctx context.Context, request *admin.ObjectG func (m *AdminService) GetActiveLaunchPlan(ctx context.Context, request *admin.ActiveLaunchPlanRequest) (*admin.LaunchPlan, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -82,12 +64,6 @@ func (m *AdminService) GetActiveLaunchPlan(ctx context.Context, request *admin.A m.Metrics.launchPlanEndpointMetrics.getActive.Time(func() { response, err = m.LaunchPlanManager.GetActiveLaunchPlan(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "GetActiveLaunchPlan", - audit.ParametersFromNamedEntityIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.getActive) } @@ -98,7 +74,6 @@ func (m *AdminService) GetActiveLaunchPlan(ctx context.Context, request *admin.A func (m *AdminService) UpdateLaunchPlan(ctx context.Context, request *admin.LaunchPlanUpdateRequest) ( *admin.LaunchPlanUpdateResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -113,12 +88,6 @@ func (m *AdminService) UpdateLaunchPlan(ctx context.Context, request *admin.Laun m.Metrics.launchPlanEndpointMetrics.update.Time(func() { response, err = m.LaunchPlanManager.UpdateLaunchPlan(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "UpdateLaunchPlan", - audit.ParametersFromIdentifier(request.Id), - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.update) } @@ -129,7 +98,6 @@ func (m *AdminService) UpdateLaunchPlan(ctx context.Context, request *admin.Laun func (m *AdminService) ListLaunchPlans(ctx context.Context, request *admin.ResourceListRequest) ( *admin.LaunchPlanList, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.") } @@ -138,12 +106,6 @@ func (m *AdminService) ListLaunchPlans(ctx context.Context, request *admin.Resou m.Metrics.launchPlanEndpointMetrics.list.Time(func() { response, err = m.LaunchPlanManager.ListLaunchPlans(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListLaunchPlans", - audit.ParametersFromNamedEntityIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.list) } @@ -155,7 +117,6 @@ func (m *AdminService) ListLaunchPlans(ctx context.Context, request *admin.Resou func (m *AdminService) ListActiveLaunchPlans(ctx context.Context, request *admin.ActiveLaunchPlanListRequest) ( *admin.LaunchPlanList, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.") } @@ -164,15 +125,6 @@ func (m *AdminService) ListActiveLaunchPlans(ctx context.Context, request *admin m.Metrics.launchPlanEndpointMetrics.listActive.Time(func() { response, err = m.LaunchPlanManager.ListActiveLaunchPlans(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListActiveLaunchPlans", - map[string]string{ - audit.Project: request.Project, - audit.Domain: request.Domain, - }, - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.listActive) } @@ -184,7 +136,6 @@ func (m *AdminService) ListActiveLaunchPlans(ctx context.Context, request *admin func (m *AdminService) ListLaunchPlanIds(ctx context.Context, request *admin.NamedEntityIdentifierListRequest) ( *admin.NamedEntityIdentifierList, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.") } @@ -194,15 +145,6 @@ func (m *AdminService) ListLaunchPlanIds(ctx context.Context, request *admin.Nam m.Metrics.launchPlanEndpointMetrics.listIds.Time(func() { response, err = m.LaunchPlanManager.ListLaunchPlanIds(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListLaunchPlanIds", - map[string]string{ - audit.Project: request.Project, - audit.Domain: request.Domain, - }, - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.listIds) } diff --git a/pkg/rpc/adminservice/named_entity.go b/pkg/rpc/adminservice/named_entity.go index bcebd81af1..f1541d4096 100644 --- a/pkg/rpc/adminservice/named_entity.go +++ b/pkg/rpc/adminservice/named_entity.go @@ -2,9 +2,6 @@ package adminservice import ( "context" - "time" - - "github.com/flyteorg/flyteadmin/pkg/audit" "github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/util" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" @@ -14,7 +11,6 @@ import ( func (m *AdminService) GetNamedEntity(ctx context.Context, request *admin.NamedEntityGetRequest) (*admin.NamedEntity, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -24,12 +20,6 @@ func (m *AdminService) GetNamedEntity(ctx context.Context, request *admin.NamedE m.Metrics.namedEntityEndpointMetrics.get.Time(func() { response, err = m.NamedEntityManager.GetNamedEntity(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "GetNamedEntity", - audit.ParametersFromNamedEntityIdentifierAndResource(request.Id, request.ResourceType), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.namedEntityEndpointMetrics.get) } @@ -41,7 +31,6 @@ func (m *AdminService) GetNamedEntity(ctx context.Context, request *admin.NamedE func (m *AdminService) UpdateNamedEntity(ctx context.Context, request *admin.NamedEntityUpdateRequest) ( *admin.NamedEntityUpdateResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -51,12 +40,6 @@ func (m *AdminService) UpdateNamedEntity(ctx context.Context, request *admin.Nam m.Metrics.namedEntityEndpointMetrics.update.Time(func() { response, err = m.NamedEntityManager.UpdateNamedEntity(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "UpdateNamedEntity", - audit.ParametersFromNamedEntityIdentifierAndResource(request.Id, request.ResourceType), - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.namedEntityEndpointMetrics.update) } @@ -67,7 +50,6 @@ func (m *AdminService) UpdateNamedEntity(ctx context.Context, request *admin.Nam func (m *AdminService) ListNamedEntities(ctx context.Context, request *admin.NamedEntityListRequest) ( *admin.NamedEntityList, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -77,16 +59,6 @@ func (m *AdminService) ListNamedEntities(ctx context.Context, request *admin.Nam m.Metrics.namedEntityEndpointMetrics.list.Time(func() { response, err = m.NamedEntityManager.ListNamedEntities(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListNamedEntities", - map[string]string{ - audit.Project: request.Project, - audit.Domain: request.Domain, - audit.ResourceType: request.ResourceType.String(), - }, - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.namedEntityEndpointMetrics.list) } diff --git a/pkg/rpc/adminservice/node_execution.go b/pkg/rpc/adminservice/node_execution.go index bfb351e59e..c5479eda29 100644 --- a/pkg/rpc/adminservice/node_execution.go +++ b/pkg/rpc/adminservice/node_execution.go @@ -2,9 +2,6 @@ package adminservice import ( "context" - "time" - - "github.com/flyteorg/flyteadmin/pkg/audit" "github.com/flyteorg/flytestdlib/logger" @@ -18,7 +15,6 @@ import ( func (m *AdminService) CreateNodeEvent( ctx context.Context, request *admin.NodeExecutionEventRequest) (*admin.NodeExecutionEventResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -27,12 +23,6 @@ func (m *AdminService) CreateNodeEvent( m.Metrics.nodeExecutionEndpointMetrics.createEvent.Time(func() { response, err = m.NodeExecutionManager.CreateNodeEvent(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "CreateNodeEvent", - audit.ParametersFromNodeExecutionIdentifier(request.Event.Id), - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.nodeExecutionEndpointMetrics.createEvent) } @@ -43,7 +33,6 @@ func (m *AdminService) CreateNodeEvent( func (m *AdminService) GetNodeExecution( ctx context.Context, request *admin.NodeExecutionGetRequest) (*admin.NodeExecution, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -52,12 +41,6 @@ func (m *AdminService) GetNodeExecution( m.Metrics.nodeExecutionEndpointMetrics.get.Time(func() { response, err = m.NodeExecutionManager.GetNodeExecution(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "GetNodeExecution", - audit.ParametersFromNodeExecutionIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.nodeExecutionEndpointMetrics.get) } @@ -68,7 +51,6 @@ func (m *AdminService) GetNodeExecution( func (m *AdminService) ListNodeExecutions( ctx context.Context, request *admin.NodeExecutionListRequest) (*admin.NodeExecutionList, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -77,12 +59,6 @@ func (m *AdminService) ListNodeExecutions( m.Metrics.nodeExecutionEndpointMetrics.list.Time(func() { response, err = m.NodeExecutionManager.ListNodeExecutions(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListNodeExecutions", - audit.ParametersFromExecutionIdentifier(request.WorkflowExecutionId), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.nodeExecutionEndpointMetrics.list) } @@ -93,7 +69,6 @@ func (m *AdminService) ListNodeExecutions( func (m *AdminService) ListNodeExecutionsForTask( ctx context.Context, request *admin.NodeExecutionForTaskListRequest) (*admin.NodeExecutionList, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -109,12 +84,6 @@ func (m *AdminService) ListNodeExecutionsForTask( m.Metrics.nodeExecutionEndpointMetrics.listChildren.Time(func() { response, err = m.NodeExecutionManager.ListNodeExecutionsForTask(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListNodeExecutionsForTask", - audit.ParametersFromTaskExecutionIdentifier(request.TaskExecutionId), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.nodeExecutionEndpointMetrics.listChildren) } @@ -125,7 +94,6 @@ func (m *AdminService) ListNodeExecutionsForTask( func (m *AdminService) GetNodeExecutionData( ctx context.Context, request *admin.NodeExecutionGetDataRequest) (*admin.NodeExecutionGetDataResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -134,12 +102,6 @@ func (m *AdminService) GetNodeExecutionData( m.Metrics.nodeExecutionEndpointMetrics.getData.Time(func() { response, err = m.NodeExecutionManager.GetNodeExecutionData(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "GetNodeExecutionData", - audit.ParametersFromNodeExecutionIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.nodeExecutionEndpointMetrics.getData) } diff --git a/pkg/rpc/adminservice/project.go b/pkg/rpc/adminservice/project.go index 6612ee96af..e3b7a478e0 100644 --- a/pkg/rpc/adminservice/project.go +++ b/pkg/rpc/adminservice/project.go @@ -2,9 +2,6 @@ package adminservice import ( "context" - "time" - - "github.com/flyteorg/flyteadmin/pkg/audit" "github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/util" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" @@ -15,7 +12,6 @@ import ( func (m *AdminService) RegisterProject(ctx context.Context, request *admin.ProjectRegisterRequest) ( *admin.ProjectRegisterResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -24,14 +20,6 @@ func (m *AdminService) RegisterProject(ctx context.Context, request *admin.Proje m.Metrics.projectEndpointMetrics.register.Time(func() { response, err = m.ProjectManager.CreateProject(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "RegisterProject", - map[string]string{ - audit.Project: request.Project.Id, - }, - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.projectEndpointMetrics.register) } @@ -41,7 +29,6 @@ func (m *AdminService) RegisterProject(ctx context.Context, request *admin.Proje func (m *AdminService) ListProjects(ctx context.Context, request *admin.ProjectListRequest) (*admin.Projects, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -50,12 +37,6 @@ func (m *AdminService) ListProjects(ctx context.Context, request *admin.ProjectL m.Metrics.projectEndpointMetrics.list.Time(func() { response, err = m.ProjectManager.ListProjects(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListProjects", - map[string]string{}, - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.projectEndpointMetrics.list) } @@ -67,7 +48,6 @@ func (m *AdminService) ListProjects(ctx context.Context, request *admin.ProjectL func (m *AdminService) UpdateProject(ctx context.Context, request *admin.Project) ( *admin.ProjectUpdateResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -76,14 +56,6 @@ func (m *AdminService) UpdateProject(ctx context.Context, request *admin.Project m.Metrics.projectEndpointMetrics.register.Time(func() { response, err = m.ProjectManager.UpdateProject(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "UpdateProject", - map[string]string{ - audit.Project: request.Id, - }, - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.projectEndpointMetrics.update) } diff --git a/pkg/rpc/adminservice/task.go b/pkg/rpc/adminservice/task.go index 690605d64b..d1d4089ee3 100644 --- a/pkg/rpc/adminservice/task.go +++ b/pkg/rpc/adminservice/task.go @@ -2,9 +2,6 @@ package adminservice import ( "context" - "time" - - "github.com/flyteorg/flyteadmin/pkg/audit" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/logger" @@ -19,7 +16,6 @@ func (m *AdminService) CreateTask( ctx context.Context, request *admin.TaskCreateRequest) (*admin.TaskCreateResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -28,12 +24,6 @@ func (m *AdminService) CreateTask( m.Metrics.taskEndpointMetrics.create.Time(func() { response, err = m.TaskManager.CreateTask(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "CreateTask", - audit.ParametersFromIdentifier(request.Id), - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskEndpointMetrics.create) } @@ -43,7 +33,6 @@ func (m *AdminService) CreateTask( func (m *AdminService) GetTask(ctx context.Context, request *admin.ObjectGetRequest) (*admin.Task, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -58,12 +47,6 @@ func (m *AdminService) GetTask(ctx context.Context, request *admin.ObjectGetRequ m.Metrics.taskEndpointMetrics.get.Time(func() { response, err = m.TaskManager.GetTask(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "GetTask", - audit.ParametersFromIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskEndpointMetrics.get) } @@ -74,7 +57,6 @@ func (m *AdminService) GetTask(ctx context.Context, request *admin.ObjectGetRequ func (m *AdminService) ListTaskIds( ctx context.Context, request *admin.NamedEntityIdentifierListRequest) (*admin.NamedEntityIdentifierList, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -83,15 +65,6 @@ func (m *AdminService) ListTaskIds( m.Metrics.taskEndpointMetrics.listIds.Time(func() { response, err = m.TaskManager.ListUniqueTaskIdentifiers(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListTaskIds", - map[string]string{ - audit.Project: request.Project, - audit.Domain: request.Domain, - }, - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskEndpointMetrics.listIds) } @@ -102,7 +75,6 @@ func (m *AdminService) ListTaskIds( func (m *AdminService) ListTasks(ctx context.Context, request *admin.ResourceListRequest) (*admin.TaskList, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -111,12 +83,6 @@ func (m *AdminService) ListTasks(ctx context.Context, request *admin.ResourceLis m.Metrics.taskEndpointMetrics.list.Time(func() { response, err = m.TaskManager.ListTasks(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListTasks", - audit.ParametersFromNamedEntityIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskEndpointMetrics.list) } diff --git a/pkg/rpc/adminservice/task_execution.go b/pkg/rpc/adminservice/task_execution.go index baf9577e9f..a45b671060 100644 --- a/pkg/rpc/adminservice/task_execution.go +++ b/pkg/rpc/adminservice/task_execution.go @@ -2,9 +2,6 @@ package adminservice import ( "context" - "time" - - "github.com/flyteorg/flyteadmin/pkg/audit" "github.com/flyteorg/flytestdlib/logger" @@ -19,7 +16,6 @@ import ( func (m *AdminService) CreateTaskEvent( ctx context.Context, request *admin.TaskExecutionEventRequest) (*admin.TaskExecutionEventResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -29,16 +25,6 @@ func (m *AdminService) CreateTaskEvent( m.Metrics.taskExecutionEndpointMetrics.createEvent.Time(func() { response, err = m.TaskExecutionManager.CreateTaskExecutionEvent(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "CreateTask", - audit.ParametersFromTaskExecutionIdentifier(&core.TaskExecutionIdentifier{ - TaskId: request.Event.TaskId, - NodeExecutionId: request.Event.ParentNodeExecutionId, - RetryAttempt: request.Event.RetryAttempt, - }), - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskExecutionEndpointMetrics.createEvent) } @@ -49,7 +35,6 @@ func (m *AdminService) CreateTaskEvent( func (m *AdminService) GetTaskExecution( ctx context.Context, request *admin.TaskExecutionGetRequest) (*admin.TaskExecution, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -68,12 +53,6 @@ func (m *AdminService) GetTaskExecution( m.Metrics.taskExecutionEndpointMetrics.get.Time(func() { response, err = m.TaskExecutionManager.GetTaskExecution(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "GetTaskExecution", - audit.ParametersFromTaskExecutionIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskExecutionEndpointMetrics.get) } @@ -84,7 +63,6 @@ func (m *AdminService) GetTaskExecution( func (m *AdminService) ListTaskExecutions( ctx context.Context, request *admin.TaskExecutionListRequest) (*admin.TaskExecutionList, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Nil request") } @@ -97,12 +75,6 @@ func (m *AdminService) ListTaskExecutions( m.Metrics.taskExecutionEndpointMetrics.list.Time(func() { response, err = m.TaskExecutionManager.ListTaskExecutions(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListTaskExecutions", - audit.ParametersFromNodeExecutionIdentifier(request.NodeExecutionId), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskExecutionEndpointMetrics.list) } @@ -113,7 +85,6 @@ func (m *AdminService) ListTaskExecutions( func (m *AdminService) GetTaskExecutionData( ctx context.Context, request *admin.TaskExecutionGetDataRequest) (*admin.TaskExecutionGetDataResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -128,12 +99,6 @@ func (m *AdminService) GetTaskExecutionData( m.Metrics.taskExecutionEndpointMetrics.getData.Time(func() { response, err = m.TaskExecutionManager.GetTaskExecutionData(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "GetTaskExecutionData", - audit.ParametersFromTaskExecutionIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskExecutionEndpointMetrics.getData) } diff --git a/pkg/rpc/adminservice/workflow.go b/pkg/rpc/adminservice/workflow.go index 86d0eec88a..ae9c5ceb5f 100644 --- a/pkg/rpc/adminservice/workflow.go +++ b/pkg/rpc/adminservice/workflow.go @@ -2,9 +2,6 @@ package adminservice import ( "context" - "time" - - "github.com/flyteorg/flyteadmin/pkg/audit" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/logger" @@ -20,7 +17,6 @@ func (m *AdminService) CreateWorkflow( ctx context.Context, request *admin.WorkflowCreateRequest) (*admin.WorkflowCreateResponse, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -29,12 +25,6 @@ func (m *AdminService) CreateWorkflow( m.Metrics.workflowEndpointMetrics.create.Time(func() { response, err = m.WorkflowManager.CreateWorkflow(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "CreateWorkflow", - audit.ParametersFromIdentifier(request.Id), - audit.ReadWrite, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowEndpointMetrics.create) } @@ -44,7 +34,6 @@ func (m *AdminService) CreateWorkflow( func (m *AdminService) GetWorkflow(ctx context.Context, request *admin.ObjectGetRequest) (*admin.Workflow, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -59,12 +48,6 @@ func (m *AdminService) GetWorkflow(ctx context.Context, request *admin.ObjectGet m.Metrics.workflowEndpointMetrics.get.Time(func() { response, err = m.WorkflowManager.GetWorkflow(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "GetWorkflow", - audit.ParametersFromIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowEndpointMetrics.get) } @@ -75,7 +58,6 @@ func (m *AdminService) GetWorkflow(ctx context.Context, request *admin.ObjectGet func (m *AdminService) ListWorkflowIds(ctx context.Context, request *admin.NamedEntityIdentifierListRequest) ( *admin.NamedEntityIdentifierList, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -85,15 +67,6 @@ func (m *AdminService) ListWorkflowIds(ctx context.Context, request *admin.Named m.Metrics.workflowEndpointMetrics.listIds.Time(func() { response, err = m.WorkflowManager.ListWorkflowIdentifiers(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListWorkflowIds", - map[string]string{ - audit.Project: request.Project, - audit.Domain: request.Domain, - }, - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowEndpointMetrics.listIds) } @@ -104,7 +77,6 @@ func (m *AdminService) ListWorkflowIds(ctx context.Context, request *admin.Named func (m *AdminService) ListWorkflows(ctx context.Context, request *admin.ResourceListRequest) (*admin.WorkflowList, error) { defer m.interceptPanic(ctx, request) - requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -113,12 +85,6 @@ func (m *AdminService) ListWorkflows(ctx context.Context, request *admin.Resourc m.Metrics.workflowEndpointMetrics.list.Time(func() { response, err = m.WorkflowManager.ListWorkflows(ctx, *request) }) - audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( - "ListWorkflows", - audit.ParametersFromNamedEntityIdentifier(request.Id), - audit.ReadOnly, - requestedAt, - ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowEndpointMetrics.list) }