Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Plugins for flyteadmin server middleware #420

Merged
merged 14 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions auth/interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package auth

import (
"context"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func BlanketAuthorization(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
resp interface{}, err error) {
identityContext := IdentityContextFromContext(ctx)
if identityContext.IsEmpty() {
return handler(ctx, req)
}

if !identityContext.Scopes().Has(ScopeAll) {
return nil, status.Errorf(codes.Unauthenticated, "authenticated user doesn't have required scope")
}

return handler(ctx, req)
}
61 changes: 61 additions & 0 deletions auth/interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package auth

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/sets"
)

func TestBlanketAuthorization(t *testing.T) {
t.Run("authenticated and authorized", func(t *testing.T) {
allScopes := sets.NewString(ScopeAll)
identityCtx := IdentityContext{
audience: "aud",
userID: "uid",
appID: "appid",
scopes: &allScopes,
}
handlerCalled := false
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
handlerCalled = true
return nil, nil
}
ctx := context.WithValue(context.TODO(), ContextKeyIdentityContext, identityCtx)
_, err := BlanketAuthorization(ctx, nil, nil, handler)
assert.NoError(t, err)
assert.True(t, handlerCalled)
})
t.Run("unauthenticated", func(t *testing.T) {
handlerCalled := false
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
handlerCalled = true
return nil, nil
}
ctx := context.TODO()
_, err := BlanketAuthorization(ctx, nil, nil, handler)
assert.NoError(t, err)
assert.True(t, handlerCalled)
})
t.Run("authenticated and not authorized", func(t *testing.T) {
identityCtx := IdentityContext{
audience: "aud",
userID: "uid",
appID: "appid",
}
handlerCalled := false
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
handlerCalled = true
return nil, nil
}
ctx := context.WithValue(context.TODO(), ContextKeyIdentityContext, identityCtx)
_, err := BlanketAuthorization(ctx, nil, nil, handler)
asStatus, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, asStatus.Code(), codes.Unauthenticated)
assert.False(t, handlerCalled)
})
}
7 changes: 7 additions & 0 deletions pkg/rpc/adminservice/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"fmt"
"runtime/debug"

grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"

"github.com/flyteorg/flyteadmin/auth"

"github.com/flyteorg/flyteadmin/plugins"

"github.com/flyteorg/flyteadmin/pkg/async/cloudevent"
Expand Down Expand Up @@ -96,6 +100,9 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi
logger.Info(ctx, "Successfully created a workflow executor engine")
pluginRegistry.RegisterDefault(plugins.PluginIDWorkflowExecutor, workflowExecutor)

logger.Infof(ctx, "Registering default middleware with blanket auth validation")
pluginRegistry.RegisterDefault(plugins.PluginIDUnaryServiceMiddleware, grpcmiddleware.ChainUnaryServer(auth.BlanketAuthorization))

publisher := notifications.NewNotificationsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope)
processor := notifications.NewNotificationsProcessor(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope)
eventPublisher := notifications.NewEventsPublisher(*configuration.ApplicationConfiguration().GetExternalEventsConfig(), adminScope)
Expand Down
20 changes: 2 additions & 18 deletions pkg/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
)

var defaultCorsHeaders = []string{"Content-Type"}
Expand All @@ -55,21 +53,6 @@ func Serve(ctx context.Context, pluginRegistry *plugins.Registry, additionalHand
return serveGatewayInsecure(ctx, pluginRegistry, serverConfig, authConfig.GetConfig(), storage.GetConfig(), additionalHandlers, adminScope)
}

func blanketAuthorization(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
resp interface{}, err error) {

identityContext := auth.IdentityContextFromContext(ctx)
if identityContext.IsEmpty() {
return handler(ctx, req)
}

if !identityContext.Scopes().Has(auth.ScopeAll) {
return nil, status.Errorf(codes.Unauthenticated, "authenticated user doesn't have required scope")
}

return handler(ctx, req)
}

// Creates a new gRPC Server with all the configuration
func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig,
storageCfg *storage.Config, authCtx interfaces.AuthenticationContext,
Expand All @@ -78,11 +61,12 @@ func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *c
var chainedUnaryInterceptors grpc.UnaryServerInterceptor
if cfg.Security.UseAuth {
logger.Infof(ctx, "Creating gRPC server with authentication")
middlewareInterceptors := plugins.Get[grpc.UnaryServerInterceptor](pluginRegistry, plugins.PluginIDUnaryServiceMiddleware)
chainedUnaryInterceptors = grpcmiddleware.ChainUnaryServer(grpcprometheus.UnaryServerInterceptor,
auth.GetAuthenticationCustomMetadataInterceptor(authCtx),
grpcauth.UnaryServerInterceptor(auth.GetAuthenticationInterceptor(authCtx)),
auth.AuthenticationLoggingInterceptor,
blanketAuthorization,
middlewareInterceptors,
)
} else {
logger.Infof(ctx, "Creating gRPC server without authentication")
Expand Down
5 changes: 3 additions & 2 deletions plugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
type PluginID = string

const (
PluginIDWorkflowExecutor PluginID = "WorkflowExecutor"
PluginIDDataProxy PluginID = "DataProxy"
PluginIDWorkflowExecutor PluginID = "WorkflowExecutor"
PluginIDDataProxy PluginID = "DataProxy"
PluginIDUnaryServiceMiddleware PluginID = "UnaryServiceMiddleware"
)

type AtomicRegistry struct {
Expand Down
1 change: 1 addition & 0 deletions tests/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package tests
import (
"context"
"fmt"

"github.com/flyteorg/flytestdlib/database"

"github.com/flyteorg/flyteadmin/pkg/repositories"
Expand Down