Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Plugins for flyteadmin server middleware (#420)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored May 19, 2022
1 parent b3bce05 commit d0bda09
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 20 deletions.
23 changes: 23 additions & 0 deletions auth/interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package auth

import (
"context"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func BlanketAuthorization(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
resp interface{}, err error) {
identityContext := IdentityContextFromContext(ctx)
if identityContext.IsEmpty() {
return handler(ctx, req)
}

if !identityContext.Scopes().Has(ScopeAll) {
return nil, status.Errorf(codes.Unauthenticated, "authenticated user doesn't have required scope")
}

return handler(ctx, req)
}
61 changes: 61 additions & 0 deletions auth/interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package auth

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/sets"
)

func TestBlanketAuthorization(t *testing.T) {
t.Run("authenticated and authorized", func(t *testing.T) {
allScopes := sets.NewString(ScopeAll)
identityCtx := IdentityContext{
audience: "aud",
userID: "uid",
appID: "appid",
scopes: &allScopes,
}
handlerCalled := false
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
handlerCalled = true
return nil, nil
}
ctx := context.WithValue(context.TODO(), ContextKeyIdentityContext, identityCtx)
_, err := BlanketAuthorization(ctx, nil, nil, handler)
assert.NoError(t, err)
assert.True(t, handlerCalled)
})
t.Run("unauthenticated", func(t *testing.T) {
handlerCalled := false
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
handlerCalled = true
return nil, nil
}
ctx := context.TODO()
_, err := BlanketAuthorization(ctx, nil, nil, handler)
assert.NoError(t, err)
assert.True(t, handlerCalled)
})
t.Run("authenticated and not authorized", func(t *testing.T) {
identityCtx := IdentityContext{
audience: "aud",
userID: "uid",
appID: "appid",
}
handlerCalled := false
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
handlerCalled = true
return nil, nil
}
ctx := context.WithValue(context.TODO(), ContextKeyIdentityContext, identityCtx)
_, err := BlanketAuthorization(ctx, nil, nil, handler)
asStatus, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, asStatus.Code(), codes.Unauthenticated)
assert.False(t, handlerCalled)
})
}
7 changes: 7 additions & 0 deletions pkg/rpc/adminservice/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"fmt"
"runtime/debug"

grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"

"github.com/flyteorg/flyteadmin/auth"

"github.com/flyteorg/flyteadmin/plugins"

"github.com/flyteorg/flyteadmin/pkg/async/cloudevent"
Expand Down Expand Up @@ -96,6 +100,9 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi
logger.Info(ctx, "Successfully created a workflow executor engine")
pluginRegistry.RegisterDefault(plugins.PluginIDWorkflowExecutor, workflowExecutor)

logger.Infof(ctx, "Registering default middleware with blanket auth validation")
pluginRegistry.RegisterDefault(plugins.PluginIDUnaryServiceMiddleware, grpcmiddleware.ChainUnaryServer(auth.BlanketAuthorization))

publisher := notifications.NewNotificationsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope)
processor := notifications.NewNotificationsProcessor(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope)
eventPublisher := notifications.NewEventsPublisher(*configuration.ApplicationConfiguration().GetExternalEventsConfig(), adminScope)
Expand Down
20 changes: 2 additions & 18 deletions pkg/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
)

var defaultCorsHeaders = []string{"Content-Type"}
Expand All @@ -55,21 +53,6 @@ func Serve(ctx context.Context, pluginRegistry *plugins.Registry, additionalHand
return serveGatewayInsecure(ctx, pluginRegistry, serverConfig, authConfig.GetConfig(), storage.GetConfig(), additionalHandlers, adminScope)
}

func blanketAuthorization(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
resp interface{}, err error) {

identityContext := auth.IdentityContextFromContext(ctx)
if identityContext.IsEmpty() {
return handler(ctx, req)
}

if !identityContext.Scopes().Has(auth.ScopeAll) {
return nil, status.Errorf(codes.Unauthenticated, "authenticated user doesn't have required scope")
}

return handler(ctx, req)
}

// Creates a new gRPC Server with all the configuration
func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig,
storageCfg *storage.Config, authCtx interfaces.AuthenticationContext,
Expand All @@ -78,11 +61,12 @@ func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *c
var chainedUnaryInterceptors grpc.UnaryServerInterceptor
if cfg.Security.UseAuth {
logger.Infof(ctx, "Creating gRPC server with authentication")
middlewareInterceptors := plugins.Get[grpc.UnaryServerInterceptor](pluginRegistry, plugins.PluginIDUnaryServiceMiddleware)
chainedUnaryInterceptors = grpcmiddleware.ChainUnaryServer(grpcprometheus.UnaryServerInterceptor,
auth.GetAuthenticationCustomMetadataInterceptor(authCtx),
grpcauth.UnaryServerInterceptor(auth.GetAuthenticationInterceptor(authCtx)),
auth.AuthenticationLoggingInterceptor,
blanketAuthorization,
middlewareInterceptors,
)
} else {
logger.Infof(ctx, "Creating gRPC server without authentication")
Expand Down
5 changes: 3 additions & 2 deletions plugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
type PluginID = string

const (
PluginIDWorkflowExecutor PluginID = "WorkflowExecutor"
PluginIDDataProxy PluginID = "DataProxy"
PluginIDWorkflowExecutor PluginID = "WorkflowExecutor"
PluginIDDataProxy PluginID = "DataProxy"
PluginIDUnaryServiceMiddleware PluginID = "UnaryServiceMiddleware"
)

type AtomicRegistry struct {
Expand Down
1 change: 1 addition & 0 deletions tests/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package tests
import (
"context"
"fmt"

"github.com/flyteorg/flytestdlib/database"

"github.com/flyteorg/flyteadmin/pkg/repositories"
Expand Down

0 comments on commit d0bda09

Please sign in to comment.