diff --git a/.gitignore b/.gitignore
index 2228db3e..26c780bd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,3 +21,4 @@ release.properties
# Documentation build
/docs/_site/
+.pr-train.yml
diff --git a/agent/pom.xml b/agent/pom.xml
index e953871c..fb4f6978 100644
--- a/agent/pom.xml
+++ b/agent/pom.xml
@@ -92,6 +92,11 @@
mockito-core
test
+
+ com.github.stefanbirkner
+ system-rules
+ test
+
@@ -119,6 +124,18 @@
shade
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
diff --git a/agent/src/main/java/com/spotify/ffwd/FastForwardAgent.java b/agent/src/main/java/com/spotify/ffwd/FastForwardAgent.java
index 7530124d..585ab191 100644
--- a/agent/src/main/java/com/spotify/ffwd/FastForwardAgent.java
+++ b/agent/src/main/java/com/spotify/ffwd/FastForwardAgent.java
@@ -29,7 +29,6 @@
import com.spotify.metrics.jvm.MemoryUsageGaugeSet;
import com.spotify.metrics.jvm.ThreadStatesMetricSet;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -52,13 +51,11 @@ public static void main(String[] argv) {
path = Optional.of(Paths.get(argv[0]));
}
- final FastForwardAgent agent = setup(path, Optional.empty());
+ final FastForwardAgent agent = setup(path);
run(agent);
}
- static FastForwardAgent setup(
- final Optional configPath, final Optional configStream
- ) {
+ static FastForwardAgent setup(final Optional configPath) {
// needed for HTTP content decompression in:
// com.spotify.ffwd.http.HttpModule
System.setProperty("io.netty.noJdkZlibDecoder", "false");
@@ -102,12 +99,9 @@ static FastForwardAgent setup(
final AgentCore.Builder builder = AgentCore.builder()
.modules(modules)
.statistics(statistics.statistics);
-
- configStream.map(builder::configStream);
configPath.map(builder::configPath);
final AgentCore core = builder.build();
-
return new FastForwardAgent(statistics, core);
}
diff --git a/agent/src/test/java/com/spotify/ffwd/FfwdConfigurationTest.java b/agent/src/test/java/com/spotify/ffwd/FfwdConfigurationTest.java
index c6c9d680..5be5fdf4 100644
--- a/agent/src/test/java/com/spotify/ffwd/FfwdConfigurationTest.java
+++ b/agent/src/test/java/com/spotify/ffwd/FfwdConfigurationTest.java
@@ -23,28 +23,28 @@
import static org.junit.Assert.assertEquals;
import com.google.common.collect.ImmutableList;
-import com.google.inject.Injector;
import com.spotify.ffwd.output.BatchingPluginSink;
import com.spotify.ffwd.output.CoreOutputManager;
import com.spotify.ffwd.output.FilteringPluginSink;
import com.spotify.ffwd.output.OutputManager;
import com.spotify.ffwd.output.PluginSink;
-import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
-import java.util.function.Supplier;
-import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class FfwdConfigurationTest {
- @Before
- public void setup() {
- }
+ @Rule
+ public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
@Test
public void testConfAllPluginsEnabled() {
@@ -90,17 +90,43 @@ public void testConfMixedPluginsEnabled() {
"ffwd-mixed-plugins.yaml", expectedSinks);
}
+ @Test
+ public void testConfigFromEnvVars() {
+ environmentVariables.set("FFWD_TTL", "100");
+ CoreOutputManager outputManager = getOutputManager(null);
+ assertEquals(100, outputManager.getTtl());
+ }
+
+ @Test
+ public void testIgnoreUnknownFields() {
+ Path configPath = resource("invalid.yaml");
+
+ String host = getOutputManager(configPath).getHost();
+ assertEquals("jimjam", host);
+ }
+
+ @Test
+ public void testMergeOrder() {
+ environmentVariables.set("FFWD_TTL", "100");
+ Path configPath = resource("basic-settings.yaml");
+ CoreOutputManager outputManager = getOutputManager(configPath);
+
+ assertEquals(100, outputManager.getTtl());
+ assertEquals("jimjam", outputManager.getHost());
+ }
+
+ private CoreOutputManager getOutputManager(final Path configPath) {
+ final FastForwardAgent agent = FastForwardAgent.setup(Optional.ofNullable(configPath));
+ return (CoreOutputManager) agent.getCore().getPrimaryInjector().getInstance(OutputManager.class);
+ }
+
private void verifyLoadedSinksForConfig(
- final String expectationString, final String configName,
+ final String expectationString,
+ final String configName,
final List> expectedSinks
) {
- final InputStream configStream = stream(configName).get();
-
- final FastForwardAgent agent =
- FastForwardAgent.setup(Optional.empty(), Optional.of(configStream));
- final Injector primaryInjector = agent.getCore().getPrimaryInjector();
- final CoreOutputManager outputManager =
- (CoreOutputManager) primaryInjector.getInstance(OutputManager.class);
+ final Path configPath = resource(configName);
+ final CoreOutputManager outputManager = getOutputManager(configPath);
final List sinks = outputManager.getSinks();
final List> sinkChains = new ArrayList<>();
@@ -130,7 +156,8 @@ private List extractSinkChain(final PluginSink sink) {
return sinkChain;
}
- private Supplier stream(String name) {
- return () -> getClass().getClassLoader().getResourceAsStream(name);
+ private Path resource(String name) {
+ final String path = Objects.requireNonNull(getClass().getClassLoader().getResource(name)).getPath();
+ return Paths.get(path);
}
}
diff --git a/agent/src/test/resources/basic-settings.yaml b/agent/src/test/resources/basic-settings.yaml
new file mode 100644
index 00000000..583cfdaf
--- /dev/null
+++ b/agent/src/test/resources/basic-settings.yaml
@@ -0,0 +1,2 @@
+ttl: 50
+host: jimjam
\ No newline at end of file
diff --git a/agent/src/test/resources/invalid.yaml b/agent/src/test/resources/invalid.yaml
new file mode 100644
index 00000000..7b57fa6f
--- /dev/null
+++ b/agent/src/test/resources/invalid.yaml
@@ -0,0 +1,3 @@
+unknown_fake_property: true
+
+host: jimjam
\ No newline at end of file
diff --git a/core/pom.xml b/core/pom.xml
index f2e85f3e..eabd0594 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -30,6 +30,10 @@
org.jetbrains.kotlin
kotlin-stdlib-jdk8
+
+ com.uchuhimo
+ konf
+
com.fasterxml.jackson.module
diff --git a/core/src/main/java/com/spotify/ffwd/AgentConfig.kt b/core/src/main/java/com/spotify/ffwd/AgentConfig.kt
index edb16876..5b2d6f96 100644
--- a/core/src/main/java/com/spotify/ffwd/AgentConfig.kt
+++ b/core/src/main/java/com/spotify/ffwd/AgentConfig.kt
@@ -16,40 +16,78 @@
package com.spotify.ffwd
-import com.fasterxml.jackson.annotation.JsonCreator
-import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.databind.module.SimpleModule
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module
import com.spotify.ffwd.domain.SearchDomainDiscovery
import com.spotify.ffwd.input.InputManagerModule
import com.spotify.ffwd.output.OutputManagerModule
+import com.uchuhimo.konf.Config
+import com.uchuhimo.konf.ConfigSpec
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.UnknownHostException
import java.nio.file.Path
-import java.nio.file.Paths
-data class AgentConfig(
- @JsonProperty("debug") var debug: Debug?,
- @JsonProperty("host") var host: String = buildDefaultHost(),
- @JsonProperty("tags") var tags: Map = emptyMap(),
- @JsonProperty("tagsToResource") var tagsToResource: Map = emptyMap(),
- @JsonProperty("riemannTags") var riemannTags: Set = emptySet(),
- @JsonProperty("skipTagsForKeys") var skipTagsForKeys: Set = emptySet(),
- @JsonProperty("automaticHostTag") var automaticHostTag: Boolean = true,
- var input: InputManagerModule =
- InputManagerModule.supplyDefault().get(),
- @JsonProperty("output") var output: OutputManagerModule =
- OutputManagerModule.supplyDefault().get(),
- @JsonProperty("searchDomain") var searchDomain: SearchDomainDiscovery =
- SearchDomainDiscovery.supplyDefault(),
- @JsonProperty("asyncThreads") var asyncThreads: Int = 4,
- @JsonProperty("schedulerThreads") var schedulerThreads: Int = 4,
- @JsonProperty("bossThreads") var bossThreads: Int = 2,
- @JsonProperty("workerThreads") var workerThreads: Int = 4,
- @JsonProperty("ttl") var ttl: Long = 0,
- // NB(hexedpackets): qlog is unused and can be removed once the config parser ignores unknown
- // properties.
- @JsonProperty("qlog") var qlog: String?
-)
+// Helper class to make interop with java easier. The configuration loading is done through the
+// static object.
+class AgentConfig(val config: Config) {
+ fun hasDebug(): Boolean = config.contains(Debug.host) or config.contains(Debug.port)
+
+ val debugLocalAddress = config[Debug.localAddress]
+ val host = config[AgentConfig.host]
+ val tags = config[AgentConfig.tags]
+ val tagsToResource = config[AgentConfig.tagsToResource]
+ val riemannTags = config[AgentConfig.riemannTags]
+ val skipTagsForKeys = config[AgentConfig.skipTagsForKeys]
+ val automaticHostTag = config[AgentConfig.automaticHostTag]
+ val input: InputManagerModule = config[AgentConfig.input]
+ val output: OutputManagerModule = config[AgentConfig.output]
+ val searchDomain = config[AgentConfig.searchDomain]
+ val asyncThreads = config[AgentConfig.asyncThreads]
+ val schedulerThreads = config[AgentConfig.schedulerThreads]
+ val bossThreads = config[AgentConfig.bossThreads]
+ val workerThreads = config[AgentConfig.workerThreads]
+ val ttl = config[AgentConfig.ttl]
+
+ companion object : ConfigSpec("") {
+ object Debug : ConfigSpec() {
+ val host by optional("localhost")
+ val port by optional(19001)
+ val localAddress by lazy { InetSocketAddress(it[host], it[port]) }
+ }
+
+ val host by lazy { buildDefaultHost() }
+ val tags by optional(emptyMap())
+ val tagsToResource by optional(emptyMap())
+ val riemannTags by optional(emptySet())
+ val skipTagsForKeys by optional(emptySet())
+ val automaticHostTag by optional(true)
+ val input by lazy { InputManagerModule.supplyDefault().get() }
+ val output by lazy { OutputManagerModule.supplyDefault().get() }
+
+ val searchDomain by lazy { SearchDomainDiscovery.supplyDefault() }
+ val asyncThreads by optional(4)
+ val schedulerThreads by optional(4)
+ val bossThreads by optional(2)
+ val workerThreads by optional(4)
+ val ttl by optional(0)
+
+ @JvmStatic
+ fun load(path: Path, extraModule: SimpleModule): Config {
+ val config = Config { addSpec(AgentConfig) }
+ config.mapper
+ .registerModule(Jdk8Module())
+ .registerModule(extraModule)
+
+ // Load yaml config files with no prefix, then set it to "ffwd" for other sources.
+ return config
+ .from.yaml.file(path.toFile())
+ .withPrefix("ffwd")
+ .from.env()
+ .from.systemProperties()
+ }
+ }
+}
private fun buildDefaultHost(): String {
try {
@@ -57,12 +95,4 @@ private fun buildDefaultHost(): String {
} catch (e: UnknownHostException) {
throw RuntimeException("unable to get local host", e)
}
-}
-
-data class Debug(
- val localAddress: InetSocketAddress
-) {
- @JsonCreator
- constructor(@JsonProperty("host") host: String?, @JsonProperty port: Int?)
- : this(InetSocketAddress(host ?: "localhost", port ?: 19001))
}
\ No newline at end of file
diff --git a/core/src/main/java/com/spotify/ffwd/AgentCore.java b/core/src/main/java/com/spotify/ffwd/AgentCore.java
index 82de54b3..c0b19eb0 100644
--- a/core/src/main/java/com/spotify/ffwd/AgentCore.java
+++ b/core/src/main/java/com/spotify/ffwd/AgentCore.java
@@ -20,13 +20,8 @@
package com.spotify.ffwd;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
-import com.fasterxml.jackson.module.kotlin.KotlinModule;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.AbstractModule;
@@ -65,6 +60,7 @@
import com.spotify.ffwd.serializer.ToStringSerializer;
import com.spotify.ffwd.statistics.CoreStatistics;
import com.spotify.ffwd.statistics.NoopCoreStatistics;
+import com.uchuhimo.konf.Config;
import eu.toolchain.async.AsyncCaller;
import eu.toolchain.async.AsyncFramework;
import eu.toolchain.async.AsyncFuture;
@@ -75,9 +71,7 @@
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import java.io.IOException;
-import java.io.InputStream;
import java.lang.reflect.Constructor;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -99,18 +93,16 @@ public class AgentCore {
private static final Logger log = LoggerFactory.getLogger(AgentCore.class);
private final List> modules;
- private final Optional configStream;
private final Optional configPath;
private final CoreStatistics statistics;
private final Injector primaryInjector;
private AgentCore(
- final List> modules, Optional configStream,
+ final List> modules,
Optional configPath, CoreStatistics statistics
) {
this.modules = modules;
- this.configStream = configStream;
this.configPath = configPath;
this.statistics = statistics;
@@ -150,18 +142,15 @@ private void waitUntilStopped(final Injector primary) throws InterruptedExceptio
}
private Thread setupShutdownHook(final Injector primary, final CountDownLatch shutdown) {
- final Thread thread = new Thread() {
- @Override
- public void run() {
- try {
- AgentCore.this.stop(primary);
- } catch (Exception e) {
- log.error("AgentCore#stop(Injector) failed", e);
- }
-
- shutdown.countDown();
+ final Thread thread = new Thread(() -> {
+ try {
+ AgentCore.this.stop(primary);
+ } catch (Exception e) {
+ log.error("AgentCore#stop(Injector) failed", e);
}
- };
+
+ shutdown.countDown();
+ });
thread.setName("ffwd-agent-core-shutdown-hook");
@@ -285,10 +274,9 @@ private Injector setupPrimaryInjector(
modules.add(new AbstractModule() {
@Override
protected void configure() {
- final Debug debug = config.getDebug();
- if (debug != null) {
+ if (config.hasDebug()) {
bind(DebugServer.class).toInstance(
- new NettyDebugServer(debug.getLocalAddress()));
+ new NettyDebugServer(config.getDebugLocalAddress()));
} else {
bind(DebugServer.class).toInstance(new NoopDebugServer());
}
@@ -386,32 +374,26 @@ protected void configure() {
return early.createChildInjector(modules);
}
- private AgentConfig readConfig(Injector early) throws IOException {
- final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ /**
+ * Reads the configuration of the agent from multiple possible sources. In order of precedence,
+ * the following sources are loaded:
+ * - System properties
+ * - Environment variables
+ * - YAML file in specified location
+ * - YAML file bundled with JAR
+ *
+ * Values are merged, those higher on this list override those that are lower.
+ *
+ * @param early
+ * @return Data class of parsed config.
+ * @throws IOException when the passed config path is not valid YAML.
+ */
+ private AgentConfig readConfig(Injector early) {
final SimpleModule module =
- early.getInstance(Key.get(SimpleModule.class, Names.named("config")));
-
- mapper.registerModule(new Jdk8Module());
- mapper.registerModule(module);
- mapper.registerModule(new KotlinModule());
+ early.getInstance(Key.get(SimpleModule.class, Names.named("config")));
- final InputStream stream = configStream.orElseGet(() -> configPath
- .map(this::getConfigStream)
- .orElseGet(() -> getConfigStream(DEFAULT_CONFIG_PATH)));
-
- try {
- return mapper.readValue(stream, AgentConfig.class);
- } catch (JsonParseException | JsonMappingException e) {
- throw new IOException("Failed to parse configuration", e);
- }
- }
-
- private InputStream getConfigStream(final Path path) {
- try {
- return Files.newInputStream(path);
- } catch (IOException e) {
- throw new RuntimeException("Failed to read configuration file '" + path + "'", e);
- }
+ final Config config = AgentConfig.load(configPath.orElse(DEFAULT_CONFIG_PATH), module);
+ return new AgentConfig(config);
}
private List loadModules(Injector injector) throws Exception {
@@ -451,19 +433,9 @@ public Injector getPrimaryInjector() {
public static final class Builder {
private List> modules = Lists.newArrayList();
- private Optional configStream = Optional.empty();
private Optional configPath = Optional.empty();
private CoreStatistics statistics = NoopCoreStatistics.get();
- public Builder configStream(final InputStream configStream) {
- if (configStream == null) {
- throw new NullPointerException("'configStream' must not be null");
- }
-
- this.configStream = Optional.of(configStream);
- return this;
- }
-
public Builder configPath(final Path configPath) {
if (configPath == null) {
throw new NullPointerException("'configPath' must not be null");
@@ -492,7 +464,7 @@ public Builder statistics(CoreStatistics statistics) {
}
public AgentCore build() {
- return new AgentCore(modules, configStream, configPath, statistics);
+ return new AgentCore(modules, configPath, statistics);
}
}
}
diff --git a/core/src/main/java/com/spotify/ffwd/output/CoreOutputManager.java b/core/src/main/java/com/spotify/ffwd/output/CoreOutputManager.java
index af3351aa..3b500393 100644
--- a/core/src/main/java/com/spotify/ffwd/output/CoreOutputManager.java
+++ b/core/src/main/java/com/spotify/ffwd/output/CoreOutputManager.java
@@ -2,7 +2,7 @@
* -\-\-
* FastForward Core
* --
- * Copyright (C) 2016 - 2018 Spotify AB
+ * Copyright (C) 2016 - 2019 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -42,12 +42,13 @@
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-@Slf4j
public class CoreOutputManager implements OutputManager {
private static final String DEBUG_ID = "core.output";
private static final String HOST = "host";
+ private static final Logger log = LoggerFactory.getLogger(CoreOutputManager.class);
@Inject
@Getter
@@ -84,10 +85,18 @@ public class CoreOutputManager implements OutputManager {
@Named("host")
private String host;
+ public String getHost() {
+ return host;
+ }
+
@Inject
@Named("ttl")
private long ttl;
+ public long getTtl() {
+ return ttl;
+ }
+
@Inject
private DebugServer debug;
diff --git a/modules/pubsub/pom.xml b/modules/pubsub/pom.xml
index d0002420..cddf2160 100644
--- a/modules/pubsub/pom.xml
+++ b/modules/pubsub/pom.xml
@@ -9,7 +9,7 @@
ffwd-module-pubsub
- 0.4.12-SNAPSHOT
+ 0.4.12-SNAPSHOT
jar
FastForward Pubsub Module
@@ -51,18 +51,18 @@
27.0.1-jre
-
-
- junit
- junit
- 4.11
- test
-
-
- org.mockito
- mockito-core
- test
-
+
+
+ junit
+ junit
+ 4.11
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
@@ -91,6 +91,17 @@
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
diff --git a/pom.xml b/pom.xml
index 114195e0..32de84aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -230,6 +230,11 @@
kotlin-stdlib-jdk8
${kotlin.version}
+
+ com.uchuhimo
+ konf
+ 0.13.3
+
com.fasterxml.jackson.module
@@ -355,9 +360,23 @@
1.9.5
test
+
+ com.github.stefanbirkner
+ system-rules
+ 1.19.0
+ test
+
+
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib-jdk8
+
+
+