Skip to content

Commit

Permalink
Fix eclipse-jkube#364: jkube.watch.postExec property/parameter/config…
Browse files Browse the repository at this point in the history
…uration is ignored

Added support for copying changed files and executing a command provided in postExec option after copying files
to the application pod.
  • Loading branch information
rohanKanojia committed Sep 23, 2020
1 parent e36a9b0 commit 7c2d103
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Usage:
* Fix #381: Remove root as default user in AssemblyConfigurationUtils#getAssemblyConfigurationOrCreateDefault
* Fix #358: Prometheus is enabled by default, opt-out via AB_PROMETHEUS_OFF required to disable (like in FMP)
* Fix #384: Enricher defined Container environment variables get merged with vars defined in Image Build Configuration
* Fix #364: jkube.watch.postExec property/parameter/configuration is ignored

### 1.0.0 (2020-09-09)
* Fix #351: Fix AutoTLSEnricher - add annotation + volume config to resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
Expand Down Expand Up @@ -201,7 +202,8 @@ public File createChangedFilesArchive(
File archiveDir = createArchiveDir(dirs);
for (AssemblyFileEntry entry : entries) {
File dest = prepareChangedFilesArchivePath(archiveDir, entry.getDest(), assemblyDirectory);
Files.copy(Paths.get(entry.getSource().getAbsolutePath()), Paths.get(dest.getAbsolutePath()));
Files.createDirectories(dest.getParentFile().toPath());
Files.copy(Paths.get(entry.getSource().getAbsolutePath()), Paths.get(dest.getAbsolutePath()), StandardCopyOption.REPLACE_EXISTING);
}
return JKubeTarArchiver.createTarBallOfDirectory(archive, archiveDir, ArchiveCompression.none);
} catch (IOException exp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,23 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.eclipse.jkube.kit.build.core.GavLabel;
import org.eclipse.jkube.kit.build.service.docker.access.DockerAccessException;
import org.eclipse.jkube.kit.common.AssemblyFileEntry;
import org.eclipse.jkube.kit.config.image.RunImageConfiguration;
import org.eclipse.jkube.kit.config.image.WaitConfiguration;
import org.eclipse.jkube.kit.config.image.build.JKubeConfiguration;
import org.eclipse.jkube.kit.build.api.assembly.AssemblyFiles;
import org.eclipse.jkube.kit.build.service.docker.access.DockerAccess;
import org.eclipse.jkube.kit.build.service.docker.access.DockerAccessException;
import org.eclipse.jkube.kit.build.service.docker.access.ExecException;
import org.eclipse.jkube.kit.build.service.docker.access.PortMapping;
import org.eclipse.jkube.kit.build.service.docker.access.log.LogDispatcher;
import org.eclipse.jkube.kit.config.image.WatchImageConfiguration;
Expand Down Expand Up @@ -95,7 +97,7 @@ public synchronized void watch(WatchContext context, JKubeConfiguration buildCon
imageConfig.getBuildConfiguration().getAssembly() != null) {
if (watcher.isCopy()) {
String containerBaseDir = imageConfig.getBuildConfiguration().getAssembly().getTargetDir();
schedule(executor, createCopyWatchTask(watcher, context.getBuildContext(), containerBaseDir), interval);
schedule(executor, createCopyWatchTask(watcher, context.getBuildContext()), interval);
tasks.add("copying artifacts");
}

Expand Down Expand Up @@ -134,7 +136,7 @@ private void schedule(ScheduledExecutorService executor, Runnable runnable, long
}

private Runnable createCopyWatchTask(final ImageWatcher watcher,
final JKubeConfiguration jKubeConfiguration, final String containerBaseDir) throws IOException {
final JKubeConfiguration jKubeConfiguration) throws IOException {
final ImageConfiguration imageConfig = watcher.getImageConfiguration();

final AssemblyFiles files = archiveService.getAssemblyFiles(imageConfig, jKubeConfiguration);
Expand All @@ -148,9 +150,9 @@ public void run() {

File changedFilesArchive = archiveService.createChangedFilesArchive(entries, files.getAssemblyDirectory(),
imageConfig.getName(), jKubeConfiguration);
dockerAccess.copyArchive(watcher.getContainerId(), changedFilesArchive, containerBaseDir);
copyFilesToContainer(changedFilesArchive, watcher);
callPostExec(watcher);
} catch (IOException | ExecException e) {
} catch (Exception e) {
log.error("%s: Error when copying files to container %s: %s",
imageConfig.getDescription(), watcher.getContainerId(), e.getMessage());
}
Expand All @@ -159,13 +161,44 @@ public void run() {
};
}

private void callPostExec(ImageWatcher watcher) throws DockerAccessException, ExecException {
private void copyFilesToContainer(File changedFilesArchive, ImageWatcher watcher) {
Predicate<File> copyTask = watcher.getWatchContext().getContainerCopyTask();
if (copyTask != null) {
boolean copyStatus = copyTask.test(changedFilesArchive);
if (!copyStatus) {
log.warn("Unable to copy files into container");
return;
}
log.info("Files successfully coped to the container..");
} else {
log.warn("No copy task found for copy mode. Ignoring..");
}
}


private void callPostExec(ImageWatcher watcher) throws Exception {
if (watcher.getPostExec() != null) {
String containerId = watcher.getContainerId();
runService.execInContainer(containerId, watcher.getPostExec(), watcher.getImageConfiguration());
Function<ImageWatcher, String> execTask = watcher.getWatchContext().getContainerCommandExecutor();
if (execTask == null) {
execTask = getDefaultContainerExecTask();
}
String execOutput = execTask.apply(watcher);
log.info("postExec output: " + execOutput);
}
}

private Function<ImageWatcher, String> getDefaultContainerExecTask() throws Exception {
return watcher -> {
String containerId = watcher.getContainerId();
try {
return runService.execInContainer(containerId, watcher.getPostExec(), watcher.getImageConfiguration());
} catch (Exception e) {
log.info("Not able to execute command specified in postExec ", e.getMessage());
}
return null;
};
}

private Runnable createBuildWatchTask(final ImageWatcher watcher,
final JKubeConfiguration mojoParameters, final boolean doRestart, final JKubeConfiguration buildContext)
throws IOException {
Expand Down Expand Up @@ -408,6 +441,8 @@ public static class WatchContext implements Serializable {
private boolean autoCreateCustomNetworks;
private Task<ImageConfiguration> imageCustomizer;
private Task<ImageWatcher> containerRestarter;
private Function<ImageWatcher, String> containerCommandExecutor;
private Predicate<File> containerCopyTask;

private transient ServiceHub hub;
private transient ServiceHubFactory serviceHubFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.jkube.kit.common;

import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -66,4 +67,23 @@ public static String formatDurationTill(long start) {

return res.toString();
}

/**
* Waits until a condition is satisfied upto a certain amount of time.
*
* @param predicate condition which is tested after each second
* @param item item which needs to be acted upon
* @param timeInMillis max timeout in seconds
* @param pollIntervalInMillis Poll interval for checking if condition is satisfied
* @param <T> type for item
* @throws InterruptedException in case interrupted while waiting
*/
public static <T> void waitUntilCondition(Predicate<T> predicate, T item, int timeInMillis, int pollIntervalInMillis) throws InterruptedException {
for (int timeWaited = 0; timeWaited < timeInMillis; timeWaited += pollIntervalInMillis) {
if (predicate.test(item)) {
break;
}
Thread.sleep(pollIntervalInMillis);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import io.fabric8.kubernetes.api.model.batch.Job;
import io.fabric8.kubernetes.api.model.batch.JobSpec;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
Expand Down Expand Up @@ -888,5 +889,11 @@ public static Map<File, String> getCustomResourcesFileToNameMap(
}
return fileToCrdGroupMap;
}

public static String getNewestApplicationPodName(KubernetesClient client, String namespace, Set<HasMetadata> resources) {
LabelSelector selector = KubernetesHelper.getPodLabelSelector(resources);
PodList pods = client.pods().inNamespace(namespace).withLabelSelector(selector).list();
return KubernetesHelper.getNewestPod(pods.getItems()).getMetadata().getName();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright (c) 2019 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at:
*
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.jkube.kit.common.util;

import org.eclipse.jkube.kit.common.TimeUtil;
import org.junit.Test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertTrue;

public class TimeUtilTest {
@Test
public void testWaitUntilCondition() throws InterruptedException {
long timeBeforeWait = System.currentTimeMillis();
AtomicBoolean value = new AtomicBoolean(false);
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
value.set(true);
}).start();

TimeUtil.waitUntilCondition(AtomicBoolean::get, value, 200, 50);
long timeAfterWait = System.currentTimeMillis();
assertTrue(timeAfterWait - timeBeforeWait < 200);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package org.eclipse.jkube.watcher.standard;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.util.Date;
import java.util.List;
import java.util.Set;
Expand All @@ -29,9 +32,12 @@
import io.fabric8.kubernetes.api.model.apps.ReplicaSetSpec;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.fabric8.openshift.api.model.DeploymentConfig;
import io.fabric8.openshift.api.model.DeploymentConfigSpec;
import io.fabric8.openshift.client.OpenShiftClient;
import org.eclipse.jkube.kit.common.KitLogger;
import org.eclipse.jkube.kit.common.TimeUtil;
import org.eclipse.jkube.kit.config.image.ImageConfiguration;
import org.eclipse.jkube.kit.build.service.docker.ServiceHub;
import org.eclipse.jkube.kit.build.service.docker.WatchService;
Expand All @@ -45,6 +51,9 @@

public class DockerImageWatcher extends BaseWatcher {

private static final int WAIT_TIMEOUT_IN_SECONDS = 5000;
private static final int POLL_INTERVAL_IN_MILLISECONDS = 200;

public DockerImageWatcher(WatcherContext watcherContext) {
super(watcherContext, "docker-image");
}
Expand All @@ -61,7 +70,10 @@ public void watch(List<ImageConfiguration> configs, final Set<HasMetadata> resou

// add a image customizer
watchContext = watchContext.toBuilder()
.imageCustomizer(this::buildImage).containerRestarter(imageWatcher -> restartContainer(imageWatcher, resources))
.imageCustomizer(this::buildImage)
.containerRestarter(imageWatcher -> restartContainer(imageWatcher, resources))
.containerCommandExecutor(imageWatcher -> executeCommandInPod(imageWatcher, resources))
.containerCopyTask(f -> copyFileToPod(f, resources))
.build();

ServiceHub hub = getContext().getJKubeServiceHub().getDockerServiceHub();
Expand Down Expand Up @@ -161,6 +173,63 @@ private void updateImageName(KubernetesClient kubernetes, String namespace, HasM
}
}

private String executeCommandInPod(WatchService.ImageWatcher imageWatcher, Set<HasMetadata> resources) {
ClusterAccess clusterAccess = getContext().getJKubeServiceHub().getClusterAccess();
return executeCommandInPod(imageWatcher, resources, clusterAccess, this.log, WAIT_TIMEOUT_IN_SECONDS, POLL_INTERVAL_IN_MILLISECONDS);
}

private boolean copyFileToPod(File fileToUpload, Set<HasMetadata> resources) {
ClusterAccess clusterAccess = getContext().getJKubeServiceHub().getClusterAccess();
return copyFileToPod(fileToUpload, resources, clusterAccess, this.log, WAIT_TIMEOUT_IN_SECONDS, POLL_INTERVAL_IN_MILLISECONDS);
}


static String executeCommandInPod(WatchService.ImageWatcher imageWatcher, Set<HasMetadata> resources, ClusterAccess clusterAccess, KitLogger logger, int execWaitTimeoutInMillis, int pollIntervalInMillis) {
try (KubernetesClient client = clusterAccess.createDefaultClient()) {
String namespace = clusterAccess.getNamespace();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ExecWatch execWatch = client.pods().inNamespace(namespace).withName(KubernetesHelper.getNewestApplicationPodName(client, namespace, resources)).writingOutput(byteArrayOutputStream)
.exec(imageWatcher.getPostExec().split("[\\s']"));

// Wait for at most 5 seconds for Exec to complete
TimeUtil.waitUntilCondition(o -> o.size() > 0, byteArrayOutputStream, execWaitTimeoutInMillis, pollIntervalInMillis);
String commandOutput = byteArrayOutputStream.toString();
execWatch.close();
return commandOutput;
} catch (KubernetesClientException e) {
KubernetesHelper.handleKubernetesClientException(e, logger);
} catch (IllegalStateException e) {
throw e;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
return null;
}

static boolean copyFileToPod(File fileToUpload, Set<HasMetadata> resources, ClusterAccess clusterAccess, KitLogger logger, int execWaitTimeoutInSeconds, int pollIntervalInMillis) {
try (KubernetesClient client = clusterAccess.createDefaultClient()) {
String namespace = clusterAccess.getNamespace();
ByteArrayOutputStream out = new ByteArrayOutputStream();
ExecWatch execWatch = client.pods().inNamespace(namespace)
.withName(KubernetesHelper.getNewestApplicationPodName(client, namespace, resources))
.readingInput(new FileInputStream(fileToUpload))
.writingOutput(out)
.exec("tar", "-xf", "-", "-C", "/");

// Wait for at most 5 seconds for Exec to complete
TimeUtil.waitUntilCondition(o -> o.size() > 0, out, execWaitTimeoutInSeconds, pollIntervalInMillis);
execWatch.close();
return true;
} catch (KubernetesClientException e) {
KubernetesHelper.handleKubernetesClientException(e, logger);
} catch (IllegalStateException e) {
throw e;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
return false;
}

private boolean updateImageName(HasMetadata entity, PodTemplateSpec template, String imagePrefix, String imageName) {
boolean answer = false;
PodSpec spec = template.getSpec();
Expand Down
Loading

0 comments on commit 7c2d103

Please sign in to comment.