Skip to content

Commit

Permalink
Experimental cloud operations client (#1462)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Jul 8, 2024
1 parent c0af8a4 commit 79cd73a
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 15 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ jobs:
TEMPORAL_NAMESPACE: sdk-ci.a2dd6
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
TEMPORAL_CLIENT_CLOUD_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
TEMPORAL_CLIENT_CLOUD_API_VERSION: 2024-05-13-00
steps:
- uses: actions/checkout@v4
with:
Expand All @@ -114,6 +116,9 @@ jobs:
- name: Single integration test against cloud
run: 'go test -v --count 1 -p 1 . -run "TestIntegrationSuite/TestBasic$"'
working-directory: test
- name: Cloud operations tests
run: 'go test -v --count 1 -p 1 . -run "TestCloudOperationsSuite/.*" -cloud-operations-tests'
working-directory: test

features-test:
uses: temporalio/features/.github/workflows/go.yaml@main
Expand Down
35 changes: 31 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"crypto/tls"
"io"

"go.temporal.io/api/cloud/cloudservice/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
Expand Down Expand Up @@ -146,6 +147,11 @@ type (
// Options are optional parameters for Client creation.
Options = internal.ClientOptions

// CloudOperationsClientOptions are parameters for CloudOperationsClient creation.
//
// WARNING: Cloud operations client is currently experimental.
CloudOperationsClientOptions = internal.CloudOperationsClientOptions

// ConnectionOptions are optional parameters that can be specified in ClientOptions
ConnectionOptions = internal.ConnectionOptions

Expand Down Expand Up @@ -830,6 +836,17 @@ type (
Close()
}

// CloudOperationsClient is the client for cloud operations.
//
// WARNING: Cloud operations client is currently experimental.
CloudOperationsClient interface {
// CloudService provides access to the underlying gRPC service.
CloudService() cloudservice.CloudServiceClient

// Close client and clean up underlying resources.
Close()
}

// NamespaceClient is the client for managing operations on the namespace.
// CLI, tools, ... can use this layer to manager operations on namespace.
NamespaceClient interface {
Expand Down Expand Up @@ -946,6 +963,14 @@ func NewClientFromExistingWithContext(ctx context.Context, existingClient Client
return internal.NewClientFromExisting(ctx, existingClient, options)
}

// DialCloudOperationsClient creates a cloud client to perform cloud-management
// operations. Users should provide Credentials in the options.
//
// WARNING: Cloud operations client is currently experimental.
func DialCloudOperationsClient(ctx context.Context, options CloudOperationsClientOptions) (CloudOperationsClient, error) {
return internal.DialCloudOperationsClient(ctx, options)
}

// NewNamespaceClient creates an instance of a namespace client, to manage
// lifecycle of namespaces. This will not attempt to connect to the server
// eagerly and therefore may not fail for an unreachable server until a call is
Expand All @@ -956,10 +981,12 @@ func NewNamespaceClient(options Options) (NamespaceClient, error) {

// make sure if new methods are added to internal.Client they are also added to public Client.
var (
_ Client = internal.Client(nil)
_ internal.Client = Client(nil)
_ NamespaceClient = internal.NamespaceClient(nil)
_ internal.NamespaceClient = NamespaceClient(nil)
_ Client = internal.Client(nil)
_ internal.Client = Client(nil)
_ CloudOperationsClient = internal.CloudOperationsClient(nil)
_ internal.CloudOperationsClient = CloudOperationsClient(nil)
_ NamespaceClient = internal.NamespaceClient(nil)
_ internal.NamespaceClient = NamespaceClient(nil)
)

// NewValue creates a new [converter.EncodedValue] which can be used to decode binary data returned by Temporal. For example:
Expand Down
112 changes: 104 additions & 8 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sync/atomic"
"time"

"go.temporal.io/api/cloud/cloudservice/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"
Expand Down Expand Up @@ -489,6 +490,48 @@ type (
DisableErrorCodeMetricTags bool
}

CloudOperationsClient interface {
CloudService() cloudservice.CloudServiceClient
Close()
}

// CloudOperationsClientOptions are parameters for CloudOperationsClient creation.
//
// WARNING: Cloud operations client is currently experimental.
CloudOperationsClientOptions struct {
// Optional: The credentials for this client. This is essentially required.
// See [go.temporal.io/sdk/client.NewAPIKeyStaticCredentials],
// [go.temporal.io/sdk/client.NewAPIKeyDynamicCredentials], and
// [go.temporal.io/sdk/client.NewMTLSCredentials].
// Default: No credentials.
Credentials Credentials

// Optional: Version header for safer mutations. May or may not be required
// depending on cloud settings.
// Default: No header.
Version string

// Optional: Advanced server connection options such as TLS settings. Not
// usually needed.
ConnectionOptions ConnectionOptions

// Optional: Logger framework can use to log.
// Default: Default logger provided.
Logger log.Logger

// Optional: Metrics handler for reporting metrics.
// Default: No metrics
MetricsHandler metrics.Handler

// Optional: Overrides the specific host to connect to. Not usually needed.
// Default: saas-api.tmprl.cloud:443
HostPort string

// Optional: Disable TLS.
// Default: false (i.e. TLS enabled)
DisableTLS bool
}

// HeadersProvider returns a map of gRPC headers that should be used on every request.
HeadersProvider interface {
GetHeaders(ctx context.Context) (map[string]string, error)
Expand Down Expand Up @@ -728,7 +771,7 @@ type (

// Credentials are optional credentials that can be specified in ClientOptions.
type Credentials interface {
applyToOptions(*ClientOptions) error
applyToOptions(*ConnectionOptions) error
// Can return nil to have no interceptor
gRPCInterceptor() grpc.UnaryClientInterceptor
}
Expand Down Expand Up @@ -783,7 +826,7 @@ func newClient(ctx context.Context, options ClientOptions, existing *WorkflowCli
}

if options.Credentials != nil {
if err := options.Credentials.applyToOptions(&options); err != nil {
if err := options.Credentials.applyToOptions(&options.ConnectionOptions); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -897,6 +940,59 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
return client
}

// DialCloudOperationsClient creates a cloud client to perform cloud-management
// operations.
func DialCloudOperationsClient(ctx context.Context, options CloudOperationsClientOptions) (CloudOperationsClient, error) {
// Set defaults
if options.MetricsHandler == nil {
options.MetricsHandler = metrics.NopHandler
}
if options.Logger == nil {
options.Logger = ilog.NewDefaultLogger()
}
if options.HostPort == "" {
options.HostPort = "saas-api.tmprl.cloud:443"
}
if options.Version != "" {
options.ConnectionOptions.DialOptions = append(
options.ConnectionOptions.DialOptions,
grpc.WithChainUnaryInterceptor(func(
ctx context.Context, method string, req, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) error {
ctx = metadata.AppendToOutgoingContext(ctx, "temporal-cloud-api-version", options.Version)
return invoker(ctx, method, req, reply, cc, opts...)
}),
)
}
if options.Credentials != nil {
if err := options.Credentials.applyToOptions(&options.ConnectionOptions); err != nil {
return nil, err
}
}
if options.ConnectionOptions.TLS == nil && !options.DisableTLS {
options.ConnectionOptions.TLS = &tls.Config{}
}
// Exclude internal from retry by default
options.ConnectionOptions.excludeInternalFromRetry = &atomic.Bool{}
options.ConnectionOptions.excludeInternalFromRetry.Store(true)
// TODO(cretz): Pass through context on dial
conn, err := dial(newDialParameters(&ClientOptions{
HostPort: options.HostPort,
ConnectionOptions: options.ConnectionOptions,
MetricsHandler: options.MetricsHandler,
Credentials: options.Credentials,
}, options.ConnectionOptions.excludeInternalFromRetry))
if err != nil {
return nil, err
}
return &cloudOperationsClient{
conn: conn,
logger: options.Logger,
cloudServiceClient: cloudservice.NewCloudServiceClient(conn),
}, nil
}

// NewNamespaceClient creates an instance of a namespace client, to manager lifecycle of namespaces.
func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) {
// Initialize root tags
Expand Down Expand Up @@ -964,7 +1060,7 @@ func NewAPIKeyDynamicCredentials(apiKeyCallback func(context.Context) (string, e
return apiKeyCredentials(apiKeyCallback)
}

func (apiKeyCredentials) applyToOptions(*ClientOptions) error { return nil }
func (apiKeyCredentials) applyToOptions(*ConnectionOptions) error { return nil }

func (a apiKeyCredentials) gRPCInterceptor() grpc.UnaryClientInterceptor { return a.gRPCIntercept }

Expand Down Expand Up @@ -992,13 +1088,13 @@ type mTLSCredentials tls.Certificate

func NewMTLSCredentials(certificate tls.Certificate) Credentials { return mTLSCredentials(certificate) }

func (m mTLSCredentials) applyToOptions(opts *ClientOptions) error {
if opts.ConnectionOptions.TLS == nil {
opts.ConnectionOptions.TLS = &tls.Config{}
} else if len(opts.ConnectionOptions.TLS.Certificates) != 0 {
func (m mTLSCredentials) applyToOptions(opts *ConnectionOptions) error {
if opts.TLS == nil {
opts.TLS = &tls.Config{}
} else if len(opts.TLS.Certificates) != 0 {
return fmt.Errorf("cannot apply mTLS credentials, certificates already exist on TLS options")
}
opts.ConnectionOptions.TLS.Certificates = append(opts.ConnectionOptions.TLS.Certificates, tls.Certificate(m))
opts.TLS.Certificates = append(opts.TLS.Certificates, tls.Certificate(m))
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/grpc_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,14 +536,14 @@ func TestCredentialsMTLS(t *testing.T) {
// No TLS set
var clientOptions ClientOptions
creds := NewMTLSCredentials(tls.Certificate{Certificate: [][]byte{[]byte("somedata1")}})
require.NoError(t, creds.applyToOptions(&clientOptions))
require.NoError(t, creds.applyToOptions(&clientOptions.ConnectionOptions))
require.Equal(t, "somedata1", string(clientOptions.ConnectionOptions.TLS.Certificates[0].Certificate[0]))

// TLS already set
clientOptions = ClientOptions{}
clientOptions.ConnectionOptions.TLS = &tls.Config{ServerName: "my-server-name"}
creds = NewMTLSCredentials(tls.Certificate{Certificate: [][]byte{[]byte("somedata2")}})
require.NoError(t, creds.applyToOptions(&clientOptions))
require.NoError(t, creds.applyToOptions(&clientOptions.ConnectionOptions))
require.Equal(t, "my-server-name", clientOptions.ConnectionOptions.TLS.ServerName)
require.Equal(t, "somedata2", string(clientOptions.ConnectionOptions.TLS.Certificates[0].Certificate[0]))

Expand All @@ -553,7 +553,7 @@ func TestCredentialsMTLS(t *testing.T) {
Certificates: []tls.Certificate{{Certificate: [][]byte{[]byte("somedata3")}}},
}
creds = NewMTLSCredentials(tls.Certificate{Certificate: [][]byte{[]byte("somedata4")}})
require.Error(t, creds.applyToOptions(&clientOptions))
require.Error(t, creds.applyToOptions(&clientOptions.ConnectionOptions))
}

type testGRPCServer struct {
Expand Down
18 changes: 18 additions & 0 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

"go.temporal.io/api/cloud/cloudservice/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
Expand Down Expand Up @@ -100,6 +101,13 @@ type (
unclosedClients *int32
}

// cloudOperationsClient is the client for managing cloud.
cloudOperationsClient struct {
conn *grpc.ClientConn
logger log.Logger
cloudServiceClient cloudservice.CloudServiceClient
}

// namespaceClient is the client for managing namespaces.
namespaceClient struct {
workflowService workflowservice.WorkflowServiceClient
Expand Down Expand Up @@ -1289,6 +1297,16 @@ func (wc *WorkflowClient) Close() {
}
}

func (c *cloudOperationsClient) CloudService() cloudservice.CloudServiceClient {
return c.cloudServiceClient
}

func (c *cloudOperationsClient) Close() {
if err := c.conn.Close(); err != nil {
c.logger.Warn("unable to close connection", tagError, err)
}
}

// Register a namespace with temporal server
// The errors it can throw:
// - NamespaceAlreadyExistsError
Expand Down
Loading

0 comments on commit 79cd73a

Please sign in to comment.