Skip to content

Commit

Permalink
Rework health check (#2579)
Browse files Browse the repository at this point in the history
The standard gRPC Health Check Service implementation is used instead of the custom implementation in Workflow Handler. This allows each separate service/handler to announce the state.
  • Loading branch information
Spikhalskiy authored Mar 8, 2022
1 parent 9d69465 commit 5fb4f1c
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 168 deletions.
13 changes: 9 additions & 4 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"go.temporal.io/api/serviceerror"
workflowpb "go.temporal.io/api/workflow/v1"
sdkclient "go.temporal.io/sdk/client"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.temporal.io/server/api/adminservice/v1"
clusterspb "go.temporal.io/server/api/cluster/v1"
Expand Down Expand Up @@ -111,6 +113,7 @@ type (
saProvider searchattribute.Provider
saManager searchattribute.Manager
clusterMetadata cluster.Metadata
healthServer *health.Server
}

NewAdminHandlerArgs struct {
Expand Down Expand Up @@ -138,6 +141,7 @@ type (
SaManager searchattribute.Manager
ClusterMetadata cluster.Metadata
ArchivalMetadata archiver.ArchivalMetadata
HealthServer *health.Server
}
)

Expand Down Expand Up @@ -196,17 +200,18 @@ func NewAdminHandler(
saProvider: args.SaProvider,
saManager: args.SaManager,
clusterMetadata: args.ClusterMetadata,
healthServer: args.HealthServer,
}
}

// Start starts the handler
func (adh *AdminHandler) Start() {
if !atomic.CompareAndSwapInt32(
if atomic.CompareAndSwapInt32(
&adh.status,
common.DaemonStatusInitialized,
common.DaemonStatusStarted,
) {
return
adh.healthServer.SetServingStatus(AdminServiceName, healthpb.HealthCheckResponse_SERVING)
}

// Start namespace replication queue cleanup
Expand All @@ -216,12 +221,12 @@ func (adh *AdminHandler) Start() {

// Stop stops the handler
func (adh *AdminHandler) Stop() {
if !atomic.CompareAndSwapInt32(
if atomic.CompareAndSwapInt32(
&adh.status,
common.DaemonStatusStarted,
common.DaemonStatusStopped,
) {
return
adh.healthServer.SetServingStatus(AdminServiceName, healthpb.HealthCheckResponse_NOT_SERVING)
}

// Calling stop if the queue does not start is ok
Expand Down
3 changes: 3 additions & 0 deletions service/frontend/adminHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"fmt"
"testing"

"google.golang.org/grpc/health"

"go.temporal.io/server/api/adminservicemock/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
Expand Down Expand Up @@ -145,6 +147,7 @@ func (s *adminHandlerSuite) SetupTest() {
s.mockResource.GetSearchAttributesManager(),
s.mockMetadata,
s.mockResource.GetArchivalMetadata(),
health.NewServer(),
}
s.handler = NewAdminHandler(args)
s.handler.Start()
Expand Down
16 changes: 0 additions & 16 deletions service/frontend/dcRedirectionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"time"

"go.temporal.io/api/workflowservice/v1"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.temporal.io/server/client"
"go.temporal.io/server/common"
Expand Down Expand Up @@ -104,21 +103,6 @@ func (handler *DCRedirectionHandlerImpl) GetConfig() *Config {
return handler.frontendHandler.GetConfig()
}

// UpdateHealthStatus sets the health status for this rpc handler.
// This health status will be used within the rpc health check handler
func (handler *DCRedirectionHandlerImpl) UpdateHealthStatus(status HealthStatus) {
handler.frontendHandler.UpdateHealthStatus(status)
}

// Check is for health check
func (handler *DCRedirectionHandlerImpl) Check(ctx context.Context, request *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
return handler.frontendHandler.Check(ctx, request)
}

func (handler *DCRedirectionHandlerImpl) Watch(request *healthpb.HealthCheckRequest, server healthpb.Health_WatchServer) error {
return handler.frontendHandler.Watch(request, server)
}

// Namespace APIs, namespace APIs does not require redirection

// DeprecateNamespace API call
Expand Down
14 changes: 2 additions & 12 deletions service/frontend/dcRedirectionHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/api/workflowservicemock/v1"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/health"

tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/common/cluster"
Expand Down Expand Up @@ -125,6 +125,7 @@ func (s *dcRedirectionHandlerSuite) SetupTest() {
s.mockResource.GetSearchAttributesProvider(),
s.mockResource.GetClusterMetadata(),
s.mockResource.GetArchivalMetadata(),
health.NewServer(),
)

s.mockFrontendHandler = workflowservicemock.NewMockWorkflowServiceServer(s.controller)
Expand Down Expand Up @@ -902,17 +903,6 @@ func (serverHandler *testServerHandler) Start() {
func (serverHandler *testServerHandler) Stop() {
}

func (serverHandler *testServerHandler) Check(context.Context, *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
return nil, nil
}

func (serverHandler *testServerHandler) Watch(*healthpb.HealthCheckRequest, healthpb.Health_WatchServer) error {
return nil
}

func (serverHandler *testServerHandler) UpdateHealthStatus(status HealthStatus) {
}

func (serverHandler *testServerHandler) GetConfig() *Config {
return nil
}
16 changes: 15 additions & 1 deletion service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
sdkclient "go.temporal.io/sdk/client"
"go.uber.org/fx"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/keepalive"

"go.temporal.io/server/api/historyservice/v1"
Expand Down Expand Up @@ -83,8 +84,9 @@ var Module = fx.Options(
fx.Provide(ThrottledLoggerRpsFnProvider),
fx.Provide(PersistenceMaxQpsProvider),
fx.Provide(FEReplicatorNamespaceReplicationQueueProvider),
fx.Provide(HandlerProvider),
fx.Provide(func(so []grpc.ServerOption) *grpc.Server { return grpc.NewServer(so...) }),
fx.Provide(healthServerProvider),
fx.Provide(HandlerProvider),
fx.Provide(AdminHandlerProvider),
fx.Provide(OperatorHandlerProvider),
fx.Provide(NewVersionChecker),
Expand All @@ -96,6 +98,7 @@ var Module = fx.Options(
func NewServiceProvider(
serviceConfig *Config,
server *grpc.Server,
healthServer *health.Server,
handler Handler,
adminHandler *AdminHandler,
operatorHandler *OperatorHandlerImpl,
Expand All @@ -109,6 +112,7 @@ func NewServiceProvider(
return NewService(
serviceConfig,
server,
healthServer,
handler,
adminHandler,
operatorHandler,
Expand Down Expand Up @@ -346,6 +350,10 @@ func ServiceResolverProvider(membershipMonitor membership.Monitor) (membership.S
return membershipMonitor.GetResolver(common.FrontendServiceName)
}

func healthServerProvider() *health.Server {
return health.NewServer()
}

func AdminHandlerProvider(
params *resource.BootstrapParams,
config *Config,
Expand All @@ -371,6 +379,7 @@ func AdminHandlerProvider(
saManager searchattribute.Manager,
clusterMetadata cluster.Metadata,
archivalMetadata archiver.ArchivalMetadata,
healthServer *health.Server,
) *AdminHandler {
args := NewAdminHandlerArgs{
params,
Expand All @@ -397,6 +406,7 @@ func AdminHandlerProvider(
saManager,
clusterMetadata,
archivalMetadata,
healthServer,
}
return NewAdminHandler(args)
}
Expand All @@ -409,6 +419,7 @@ func OperatorHandlerProvider(
metricsClient metrics.Client,
saProvider searchattribute.Provider,
saManager searchattribute.Manager,
healthServer *health.Server,
) *OperatorHandlerImpl {
args := NewOperatorHandlerImplArgs{
esConfig,
Expand All @@ -418,6 +429,7 @@ func OperatorHandlerProvider(
metricsClient,
saProvider,
saManager,
healthServer,
}
return NewOperatorHandlerImpl(args)
}
Expand Down Expand Up @@ -445,6 +457,7 @@ func HandlerProvider(
saProvider searchattribute.Provider,
clusterMetadata cluster.Metadata,
archivalMetadata archiver.ArchivalMetadata,
healthServer *health.Server,
) Handler {
wfHandler := NewWorkflowHandler(
serviceConfig,
Expand All @@ -464,6 +477,7 @@ func HandlerProvider(
saProvider,
clusterMetadata,
archivalMetadata,
healthServer,
)
handler := NewDCRedirectionHandler(wfHandler, params.DCRedirectionPolicy, logger, clientBean, metricsClient, timeSource, namespaceRegistry, clusterMetadata)
return handler
Expand Down
14 changes: 6 additions & 8 deletions service/frontend/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,21 @@ import (
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/workflowservice/v1"

healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.temporal.io/server/common"
)

const (
WorkflowServiceName = "temporal.api.workflowservice.v1.WorkflowService"
OperatorServiceName = "temporal.api.operatorservice.v1.OperatorService"
AdminServiceName = "temporal.api.adminservice.v1.AdminService"
)

type (
// Handler is interface wrapping frontend workflow handler
Handler interface {
workflowservice.WorkflowServiceServer
common.Daemon

// HealthServer is the health check method for the whole frontend server
healthpb.HealthServer
// UpdateHealthStatus sets the health status for this rpc handler.
// This health status will be used within the rpc health check handler
UpdateHealthStatus(status HealthStatus)

GetConfig() *Config
}

Expand Down
42 changes: 0 additions & 42 deletions service/frontend/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5fb4f1c

Please sign in to comment.