Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
fleetctl: call cAPI.UnitStates() only once before iteration
Browse files Browse the repository at this point in the history
As it's basically not efficient to call cAPI.UnitStates() for each unit,
let's call it only once outside the iteration over units, which means
calling in waitForSystemdActiveState(), not in getSingleUnitState().
To do that, we need to share the result apiStates across all units.

Also make use of goroutines in waitForSystemdActiveState(), instead of
sequential iteration over each unit.

Note that we still need to keep calling UnitStates() when
assertSystemdActiveState() failed, because in the next attempt the old
apiState is not necessarily going to be valid. Ideally in that case, we
should be able to fetch the state only for a single unit. However, we
cannot do that for now, because cAPI.UnitState() is not available. In
the future we would need to implement cAPI.UnitState() and all
dependendent parts all over the tree in fleet, e.g. schema, etcdRegistry,
rpcRegistry, etc. to replace UnitStates() in this place with the new
method UnitState(). In practice, calling UnitStates() here is not as
badly inefficient as it looks, because it will be anyway rarely called
only when the assertion failed.
  • Loading branch information
Dongsu Park committed Sep 7, 2016
1 parent 6a64a4e commit d919ee8
Showing 1 changed file with 74 additions and 29 deletions.
103 changes: 74 additions & 29 deletions fleetctl/fleetctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,42 +1081,92 @@ func tryWaitForSystemdActiveState(units []string, maxAttempts int) (err error) {
return nil
}

for _, uName := range units {
err := waitForSystemdActiveState(uName)
if err != nil {
stderr("cannot find an active unit: %v", err)
return err
}
errchan := waitForSystemdActiveState(units, maxAttempts)
for err := range errchan {
stderr("Error waiting for units: %v", err)
return err
}

return nil
}

// waitForSystemdActiveState tries to assert that the given unit becomes
// active, retrying periodically until the unit gets active. It will retry
// up to 15 seconds.
func waitForSystemdActiveState(unitName string) (err error) {
return func() error {
timeout := 15 * time.Second
alarm := time.After(timeout)
ticker := time.Tick(250 * time.Millisecond)
// active, making use of multiple goroutines that check unit states.
func waitForSystemdActiveState(units []string, maxAttempts int) (errch chan error) {
apiStates, err := cAPI.UnitStates()
if err != nil {
errch <- fmt.Errorf("Error retrieving list of units: %v", err)
return
}

errchan := make(chan error)
var wg sync.WaitGroup
for _, name := range units {
wg.Add(1)
go checkSystemdActiveState(apiStates, name, maxAttempts, &wg, errchan)
}

go func() {
wg.Wait()
close(errchan)
}()

return errchan
}

func checkSystemdActiveState(apiStates []*schema.UnitState, name string, maxAttempts int, wg *sync.WaitGroup, errchan chan error) {
defer wg.Done()

if maxAttempts < 1 {
for {
select {
case <-alarm:
return fmt.Errorf("Failed to reach expected state within %v.", timeout)
case <-ticker:
if errA := assertSystemdActiveState(unitName); errA == nil {
return nil
}
if err := assertSystemdActiveState(apiStates, name); err == nil {
return
}

// If the assertion failed, we again need to get unit states via cAPI,
// to retry the assertion repeatedly.
//
// NOTE: Ideally we should be able to fetch the state only for a single
// unit. However, we cannot do that for now, because cAPI.UnitState()
// is not available. In the future we need to implement cAPI.UnitState()
// and all dependendent parts all over the tree in fleet, (schema,
// etcdRegistry, rpcRegistry, etc) to replace UnitStates() in this place
// with the new method UnitState(). In practice, calling UnitStates() here
// is not as badly inefficient as it looks, because it will be anyway
// rarely called only when the assertion failed. - dpark 20160907

time.Sleep(defaultSleepTime)

var errU error
apiStates, errU = cAPI.UnitStates()
if errU != nil {
errchan <- fmt.Errorf("Error retrieving list of units: %v", errU)
}
}
}()
} else {
for attempt := 0; attempt < maxAttempts; attempt++ {

if err := assertSystemdActiveState(apiStates, name); err == nil {
return
}

// Assertion failed. See the description above.
time.Sleep(defaultSleepTime)

var errU error
apiStates, errU = cAPI.UnitStates()
if errU != nil {
errchan <- fmt.Errorf("Error retrieving list of units: %v", errU)
}
}
errchan <- fmt.Errorf("timed out waiting for unit %s to report active state", name)
}
}

// assertSystemdActiveState determines if a given systemd unit is actually
// in the active state, making use of cAPI
func assertSystemdActiveState(unitName string) (err error) {
uState, err := getSingleUnitState(unitName)
func assertSystemdActiveState(apiStates []*schema.UnitState, unitName string) error {
uState, err := getSingleUnitState(apiStates, unitName)
if err != nil {
return err
}
Expand All @@ -1131,12 +1181,7 @@ func assertSystemdActiveState(unitName string) (err error) {

// getSingleUnitState returns a single uState of type suState, which consists
// of necessary systemd states, only for a given unit name.
func getSingleUnitState(unitName string) (suState, error) {
apiStates, err := cAPI.UnitStates()
if err != nil {
return suState{}, fmt.Errorf("Error retrieving list of units: %v", err)
}

func getSingleUnitState(apiStates []*schema.UnitState, unitName string) (suState, error) {
for _, us := range apiStates {
if us.Name == unitName {
return suState{
Expand Down

0 comments on commit d919ee8

Please sign in to comment.