Skip to content

Commit

Permalink
feat(worker): add log on start job about prj... (#3539)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault authored and bnjjj committed Nov 2, 2018
1 parent 49bac8b commit 1287ec1
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 12 deletions.
17 changes: 14 additions & 3 deletions engine/worker/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,16 @@ func (w *currentWorker) runGRPCPlugin(ctx context.Context, a *sdk.Action, buildI
}

result, err := actionPluginClient.Run(ctx, &query)
pluginDetails := fmt.Sprintf("plugin %s v%s", manifest.Name, manifest.Version)
if err != nil {
log.Error("plugin failure %s v%s err: %v", manifest.Name, manifest.Version, err)
t := fmt.Sprintf("failure %s err: %v", pluginDetails, err)
syncStd(t)
log.Error(t)
pluginFail(chanRes, sendLog, fmt.Sprintf("Error running action: %v", err))
return
}

_ = os.Stdout.Sync()
_ = os.Stderr.Sync()
syncStd(pluginDetails)

chanRes <- sdk.Result{
Status: result.GetStatus(),
Expand All @@ -182,6 +184,15 @@ func (w *currentWorker) runGRPCPlugin(ctx context.Context, a *sdk.Action, buildI
}
}

func syncStd(p string) {
if err := os.Stdout.Sync(); err != nil {
log.Error("os.Stdout.Sync %s err:%v", p, err)
}
if err := os.Stderr.Sync(); err != nil {
log.Error("os.Stderr.Sync %s err:%v", p, err)
}
}

func pluginFail(chanRes chan<- sdk.Result, sendLog LoggerFunc, reason string) {
res := sdk.Result{
Reason: reason,
Expand Down
12 changes: 12 additions & 0 deletions engine/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,23 @@ func (w *currentWorker) processActionVariables(a *sdk.Action, parent *sdk.Action

func (w *currentWorker) startAction(ctx context.Context, a *sdk.Action, buildID int64, params *[]sdk.Parameter, secrets []sdk.Variable, stepOrder int, stepName string) sdk.Result {
// Process action build arguments
var project, workflow, node, job string
for _, abp := range *params {
// Process build variable for root action
for j := range a.Parameters {
if abp.Name == a.Parameters[j].Name {
a.Parameters[j].Value = abp.Value
}
switch abp.Name {
case "cds.project":
project = abp.Value
case "cds.worklow":
workflow = abp.Value
case "cds.node":
node = abp.Value
case "cds.job":
job = abp.Value
}
}
}

Expand All @@ -113,6 +124,7 @@ func (w *currentWorker) startAction(ctx context.Context, a *sdk.Action, buildID
}
}

log.Info("startAction> project:%s workflow:%s node:%s job:%s", project, workflow, node, job)
return w.runJob(ctx, a, buildID, params, secrets, stepOrder, stepName)
}

Expand Down
18 changes: 9 additions & 9 deletions engine/worker/run_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func startGRPCPlugin(ctx context.Context, pluginName string, w *currentWorker, p
var errBi error
binary, errBi = w.client.PluginGetBinaryInfos(pluginName, currentOS, currentARCH)
if errBi != nil || binary == nil {
return nil, sdk.WrapError(errBi, "Unable to get plugin binary infos... Aborting")
return nil, sdk.WrapError(errBi, "plugin:%s Unable to get plugin binary infos... Aborting", pluginName)
}
}

Expand All @@ -106,17 +106,17 @@ func startGRPCPlugin(ctx context.Context, pluginName string, w *currentWorker, p
log.Info("Starting GRPC Plugin %s in dir %s", binary.Name, dir)
fileContent, err := ioutil.ReadFile(path.Join(w.basedir, binary.GetName()))
if err != nil {
return nil, sdk.WrapError(err, "Unable to get plugin binary file... Aborting")
return nil, sdk.WrapError(err, "plugin:%s unable to get plugin binary file... Aborting", pluginName)
}

switch {
case sdk.IsTar(fileContent):
if err := sdk.Untar(w.basedir, bytes.NewReader(fileContent)); err != nil {
return nil, sdk.WrapError(err, "Unable to untar binary file")
return nil, sdk.WrapError(err, "plugin:%s unable to untar binary file", pluginName)
}
case sdk.IsGz(fileContent):
if err := sdk.UntarGz(w.basedir, bytes.NewReader(fileContent)); err != nil {
return nil, sdk.WrapError(err, "Unable to untarGz binary file")
return nil, sdk.WrapError(err, "plugin:%s unable to untarGz binary file", pluginName)
}
}

Expand All @@ -129,13 +129,13 @@ func startGRPCPlugin(ctx context.Context, pluginName string, w *currentWorker, p
cmd = path.Join(w.basedir, cmd)
_, err = exec.LookPath(cmd)
if err != nil {
return nil, sdk.WrapError(err, "Unable to start GRPC plugin, binary command not found.")
return nil, sdk.WrapError(err, "plugin:%s unable to start GRPC plugin, binary command not found.", pluginName)
}
}
args := append(binary.Entrypoints, binary.Args...)

if err := grpcplugin.StartPlugin(ctx, dir, cmd, args, envs, mOut, mErr); err != nil {
return nil, sdk.WrapError(err, "Unable to start GRPC plugin... Aborting")
return nil, sdk.WrapError(err, "plugin:%s unable to start GRPC plugin... Aborting", pluginName)
}
log.Info("GRPC Plugin %s started", binary.Name)

Expand All @@ -149,15 +149,15 @@ func startGRPCPlugin(ctx context.Context, pluginName string, w *currentWorker, p
if err != nil && len(buff.String()) > 0 {
buff.Reset()
if time.Now().Before(tsStart.Add(5 * time.Second)) {
log.Warning("Error on ReadByte, retry in 500ms...")
log.Warning("plugin:%s error on ReadByte, retry in 500ms...", pluginName)
time.Sleep(500 * time.Millisecond)
continue
}
log.Error("error on ReadByte(len buff %d, content: %s): %v", len(buff.String()), buff.String(), err)
log.Error("plugin:%s error on ReadByte(len buff %d, content: %s): %v", pluginName, len(buff.String()), buff.String(), err)
return nil, fmt.Errorf("unable to get socket address from started binary")
}
if err := buff.WriteByte(b); err != nil {
log.Error("error on write byte: %v", err)
log.Error("plugin:%s error on write byte: %v", pluginName, err)
break
}
if strings.HasSuffix(buff.String(), "is ready to accept new connection\n") {
Expand Down

0 comments on commit 1287ec1

Please sign in to comment.