Skip to content

Commit

Permalink
fix(hook): gerrit reconnection (#5413)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Sep 4, 2020
1 parent 7ed8d51 commit cd6e2a1
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
26 changes: 15 additions & 11 deletions engine/hooks/gerrit.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,12 @@ func (s *Service) ComputeGerritStreamEvent(ctx context.Context, vcsServer string
}

// ListenGerritStreamEvent listen the gerrit event stream
func ListenGerritStreamEvent(ctx context.Context, store cache.Store, v sdk.VCSConfiguration, gerritEventChan chan<- GerritEvent) {
func ListenGerritStreamEvent(ctx context.Context, store cache.Store, v sdk.VCSConfiguration, gerritEventChan chan<- GerritEvent) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
signer, err := ssh.ParsePrivateKey([]byte(v.Password))
if err != nil {
log.Error(ctx, "unable to read ssh key: %v", err)
return
return sdk.WithStack(err)
}

// Create config
Expand All @@ -265,37 +266,40 @@ func ListenGerritStreamEvent(ctx context.Context, store cache.Store, v sdk.VCSCo
// Dial TCP
conn, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", URL.Hostname(), v.SSHPort), config)
if err != nil {
log.Error(ctx, "ListenGerritStreamEvent> unable to open ssh connection to gerrit: %v", err)
return
return sdk.WithStack(err)
}
defer conn.Close()

session, err := conn.NewSession()
if err != nil {
log.Error(ctx, "ListenGerritStreamEvent> unable to create new session: %v", err)
return
return sdk.WithStack(err)
}
defer session.Close()

r, w := io.Pipe()
defer r.Close()
defer w.Close()
session.Stdout = w

stdoutreader := bufio.NewReader(r)

go func() {
sdk.GoRoutine(ctx, "gerrit-ssh-run", func(ctx context.Context) {
// Run command
log.Debug("Listening to gerrit event stream %s", v.URL)
if err := session.Run("gerrit stream-events"); err != nil {
log.Error(ctx, "ListenGerritStreamEvent> unable to run gerrit stream-events command: %v", err)
}
}()
cancel()
r.Close()
return
})

lockKey := cache.Key("gerrit", "event", "lock")
tick := time.NewTicker(50 * time.Millisecond)
for {
select {
case <-ctx.Done():
session.Close()
conn.Close()
return ctx.Err()
case <-tick.C:
line, errs := stdoutreader.ReadString('\n')
if errs == io.EOF {
Expand Down
16 changes: 15 additions & 1 deletion engine/hooks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,21 @@ func (s *Service) initGerritStreamEvent(ctx context.Context, vcsName string, vcs
gerritEventChan := make(chan GerritEvent, 20)
// Listen to gerrit event stream
sdk.GoRoutine(ctx, "gerrit.EventStream."+vcsName, func(ctx context.Context) {
ListenGerritStreamEvent(ctx, s.Cache, vcsConfig[vcsName], gerritEventChan)
for {
select {
case <-ctx.Done():
if ctx.Err() != nil {
log.Error(ctx, "hook:initGerritStreamEvent: %v", ctx.Err())
}
return
default:
if err := ListenGerritStreamEvent(ctx, s.Cache, vcsConfig[vcsName], gerritEventChan); err != nil {
log.Error(ctx, "hook:initGerritStreamEvent: failed listening gerrit event stream: %v", err)
}
time.Sleep(10 * time.Second)
}
}

})
// Listen to gerrit event stream
sdk.GoRoutine(ctx, "gerrit.EventStreamCompute."+vcsName, func(ctx context.Context) {
Expand Down

0 comments on commit cd6e2a1

Please sign in to comment.