Skip to content

Commit

Permalink
fix(engine): manage signout (#5467)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault authored Oct 2, 2020
1 parent 2d8dd4a commit 983bb56
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 22 deletions.
2 changes: 1 addition & 1 deletion docs/content/docs/concepts/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Here is the list of builtin variables, generated for every build:
- `{{.cds.job}}` The name of the current job
- `{{.cds.manual}}` true if current pipeline is manually run, false otherwise
- `{{.cds.pipeline}}` The name of the current pipeline
- `{{.cds.project}}` The name of the current project
- `{{.cds.project}}` The key of the current project
- `{{.cds.run}}` Run Number of current workflow, example: 3.0
- `{{.cds.run.number}}` Number of current workflow, example: 3 if `{{.cds.run}} = 3.0`
- `{{.cds.run.subnumber}}` Sub Number of current workflow, example: 4 if `{{.cds.run}} = 3.4`
Expand Down
4 changes: 2 additions & 2 deletions engine/api/worker/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func Initialize(c context.Context, DBFunc func() *gorp.DbMap, store cache.Store)
select {
case <-c.Done():
if c.Err() != nil {
log.Error(c, "Exiting workflow ticker: %v", c.Err())
log.Error(c, "Exiting worker ticker: %v", c.Err())
return nil
}
case <-tickHeart.C:
Expand All @@ -31,7 +31,7 @@ func Initialize(c context.Context, DBFunc func() *gorp.DbMap, store cache.Store)

go func() {
if err := DisableDeadWorkers(c, db); err != nil {
log.Warning(c, "workflow.disableDeadWorkers> Error on disableDeadWorkers : %v", err)
log.Warning(c, "worker.disableDeadWorkers> Error on disableDeadWorkers : %v", err)
}
}()
}
Expand Down
50 changes: 32 additions & 18 deletions engine/cmd_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ var (
flagStartVaultToken string
)

type serviceConf struct {
arg string
service service.Service
cfg interface{}
}

var startCmd = &cobra.Command{
Use: "start",
Short: "Start CDS",
Expand Down Expand Up @@ -113,21 +119,6 @@ See $ engine config command for more details.
// initialize context
defer cancel()

// gracefully shutdown all
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
signal.Stop(c)
cancel()
}()

type serviceConf struct {
arg string
service service.Service
cfg interface{}
}

var (
serviceConfs []serviceConf
names []string
Expand Down Expand Up @@ -255,6 +246,16 @@ See $ engine config command for more details.
}
}

// gracefully shutdown all
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func(ctx context.Context) {
<-c
unregisterServices(ctx, serviceConfs)
signal.Stop(c)
cancel()
}(ctx)

//Initialize logs
logConf := log.Conf{
Level: conf.Log.Level,
Expand Down Expand Up @@ -313,12 +314,25 @@ See $ engine config command for more details.

//Wait for the end
<-ctx.Done()
if ctx.Err() != nil {
fmt.Printf("Exiting (%v)\n", ctx.Err())
}

},
}

func unregisterServices(ctx context.Context, serviceConfs []serviceConf) {
// unregister all services
for i := range serviceConfs {
s := serviceConfs[i]
fmt.Printf("Unregister (%v)\n", s.service.Name())
if err := s.service.Unregister(ctx); err != nil {
log.Error(ctx, "%s> Unable to unregister: %v", s.service.Name(), err)
}
}

if ctx.Err() != nil {
fmt.Printf("Exiting (%v)\n", ctx.Err())
}
}

func start(c context.Context, s service.Service, cfg interface{}, serviceName string) {
if err := serve(c, s, serviceName, cfg); err != nil {
fmt.Printf("Service has been stopped: %s %+v", serviceName, err)
Expand Down
12 changes: 12 additions & 0 deletions engine/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ loop:
return nil
}

// Register registers a new service on API
func (c *Common) Register(ctx context.Context, cfg sdk.ServiceConfig) error {
log.Info(ctx, "Registing service %s(%T) %s", c.Type(), c, c.Name())

Expand Down Expand Up @@ -140,6 +141,17 @@ func (c *Common) Register(ctx context.Context, cfg sdk.ServiceConfig) error {
return nil
}

// Unregister logout the service
func (c *Common) Unregister(ctx context.Context) error {
// no logout needed for api
if c.ServiceType == "api" {
return nil
}

log.Info(ctx, "Unregisting service %s(%T) %s", c.Type(), c, c.Name())
return c.Client.AuthConsumerSignout()
}

// Heartbeat have to be launch as a goroutine, call DoHeartBeat each 30s
func (c *Common) Heartbeat(ctx context.Context, status func(ctx context.Context) *sdk.MonitoringStatus) error {
// no heartbeat for api
Expand Down
1 change: 1 addition & 0 deletions engine/service/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type Service interface {
Start(ctx context.Context, cfg cdsclient.ServiceConfig) error
Init(cfg interface{}) (cdsclient.ServiceConfig, error)
Register(ctx context.Context, cfg sdk.ServiceConfig) error
Unregister(ctx context.Context) error
Heartbeat(ctx context.Context, status func(ctx context.Context) *sdk.MonitoringStatus) error
Status(ctx context.Context) *sdk.MonitoringStatus
NamedService
Expand Down
2 changes: 1 addition & 1 deletion sdk/cdsclient/http_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *client) RequestWebsocket(ctx context.Context, goRoutines *sdk.GoRoutine
for {
select {
case <-ctx.Done():
log.Warning(wsContext, "Leaving....")
log.Warning(wsContext, "websocket goroutine leaving....")
return
case m := <-msgToSend:
if err := con.WriteJSON(m); err != nil {
Expand Down

0 comments on commit 983bb56

Please sign in to comment.