From 662defa88c2756a4c4e1caf1ee932f7641eeb39b Mon Sep 17 00:00:00 2001 From: yohan Date: Mon, 13 Jul 2015 16:30:49 +0300 Subject: [PATCH] added periodical syncing of seeds from external DCs --- .../frameworks/cassandra/framework/Main.java | 31 +++- .../cassandra/framework/MainTest.java | 21 ++- .../mesos/frameworks/cassandra/model.proto | 20 +++ .../cassandra/scheduler/CassandraCluster.java | 21 ++- .../scheduler/CassandraScheduler.java | 8 +- ...sistedCassandraFrameworkConfiguration.java | 9 +- .../cassandra/scheduler/SeedManager.java | 152 ++++++++++++++++++ .../cassandra/scheduler/util/Env.java | 18 +++ .../cassandra/scheduler/util/JaxRsUtils.java | 2 +- .../AbstractCassandraSchedulerTest.java | 5 +- .../scheduler/AbstractSchedulerTest.java | 9 +- .../scheduler/CassandraClusterStateTest.java | 2 +- .../scheduler/CassandraSchedulerTest.java | 7 +- ...edCassandraFrameworkConfigurationTest.java | 4 +- .../cassandra/scheduler/SeedManagerTest.java | 81 ++++++++++ 15 files changed, 371 insertions(+), 19 deletions(-) create mode 100644 cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/SeedManager.java create mode 100644 cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/SeedManagerTest.java diff --git a/cassandra-mesos-framework/src/main/java/io/mesosphere/mesos/frameworks/cassandra/framework/Main.java b/cassandra-mesos-framework/src/main/java/io/mesosphere/mesos/frameworks/cassandra/framework/Main.java index fcafe83..d6cc1cb 100644 --- a/cassandra-mesos-framework/src/main/java/io/mesosphere/mesos/frameworks/cassandra/framework/Main.java +++ b/cassandra-mesos-framework/src/main/java/io/mesosphere/mesos/frameworks/cassandra/framework/Main.java @@ -45,6 +45,8 @@ import java.net.URI; import java.net.UnknownHostException; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Handler; import java.util.logging.Level; @@ -52,6 +54,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static com.google.common.collect.Lists.newArrayList; +import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.ExternalDc; import static io.mesosphere.mesos.util.ProtoUtils.frameworkId; public final class Main { @@ -133,6 +137,7 @@ private static int _main() throws UnknownHostException { final String defaultRack = Env.option("CASSANDRA_DEFAULT_RACK").or("RACK0"); final String defaultDc = Env.option("CASSANDRA_DEFAULT_DC").or("DC0"); + final List externalDcs = getExternalDcs(Env.filterStartsWith("CASSANDRA_EXTERNAL_DC_", true)); final Matcher matcher = validateZkUrl(zkUrl); final State state = new ZooKeeperState( @@ -163,7 +168,8 @@ private static int _main() throws UnknownHostException { jmxLocal, jmxNoAuthentication, defaultRack, - defaultDc); + defaultDc, + externalDcs); final FrameworkInfo.Builder frameworkBuilder = @@ -184,6 +190,7 @@ private static int _main() throws UnknownHostException { final Clock clock = new SystemClock(); final PersistedCassandraClusterHealthCheckHistory healthCheckHistory = new PersistedCassandraClusterHealthCheckHistory(state); final PersistedCassandraClusterState clusterState = new PersistedCassandraClusterState(state); + final SeedManager seedManager = new SeedManager(configuration, new ObjectMapper(), new SystemClock()); final CassandraCluster cassandraCluster = new CassandraCluster( clock, httpServerBaseUri.toString(), @@ -191,7 +198,8 @@ private static int _main() throws UnknownHostException { clusterState, healthCheckHistory, new PersistedCassandraClusterJobs(state), - configuration + configuration, + seedManager ); final HealthReportService healthReportService = new HealthReportService( clusterState, @@ -201,7 +209,8 @@ private static int _main() throws UnknownHostException { ); final Scheduler scheduler = new CassandraScheduler( configuration, - cassandraCluster + cassandraCluster, + clock ); final JsonFactory factory = new JsonFactory(); @@ -239,6 +248,8 @@ private static int _main() throws UnknownHostException { driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), mesosMasterZkUrl); } + seedManager.startSyncingSeeds(60); + final int status; switch (driver.run()) { case DRIVER_STOPPED: @@ -261,6 +272,20 @@ private static int _main() throws UnknownHostException { return status; } + static List getExternalDcs(Map dcOpts) { + final List externalDcs = newArrayList(); + + for (final String key: dcOpts.keySet()) { + externalDcs.add( + ExternalDc.newBuilder() + .setName(key) + .setUrl(dcOpts.get(key)) + .build() + ); + } + return externalDcs; + } + static Matcher validateZkUrl(final String zkUrl) { final Matcher matcher = zkURLPattern.matcher(zkUrl); diff --git a/cassandra-mesos-framework/src/test/java/io/mesosphere/mesos/frameworks/cassandra/framework/MainTest.java b/cassandra-mesos-framework/src/test/java/io/mesosphere/mesos/frameworks/cassandra/framework/MainTest.java index c54a74f..3b6d9fe 100644 --- a/cassandra-mesos-framework/src/test/java/io/mesosphere/mesos/frameworks/cassandra/framework/MainTest.java +++ b/cassandra-mesos-framework/src/test/java/io/mesosphere/mesos/frameworks/cassandra/framework/MainTest.java @@ -15,11 +15,17 @@ */ package io.mesosphere.mesos.frameworks.cassandra.framework; +import com.google.common.collect.Maps; import org.junit.Test; +import java.util.List; +import java.util.Map; import java.util.regex.Matcher; -import static io.mesosphere.mesos.frameworks.cassandra.framework.Main.*; +import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.ExternalDc; +import static io.mesosphere.mesos.frameworks.cassandra.framework.Main.SystemExitException; +import static io.mesosphere.mesos.frameworks.cassandra.framework.Main.validateZkUrl; +import static junit.framework.Assert.assertEquals; import static org.assertj.core.api.Assertions.assertThat; public class MainTest { @@ -81,4 +87,17 @@ public void testTrailingSlash() throws Exception { validateZkUrl("zk://host1:2181/cassandra-mesos/"); } + @Test + public void testGetExternalDcs() { + Map opts = Maps.newLinkedHashMap(); + opts.put("dc0", "http://dc0"); + opts.put("dc1", "http://dc1"); + + List dcs = Main.getExternalDcs(opts); + assertEquals(2, dcs.size()); + + ExternalDc dc0 = dcs.get(0); + assertEquals("dc0", dc0.getName()); + assertEquals("http://dc0", dc0.getUrl()); + } } diff --git a/cassandra-mesos-model/src/main/proto/io/mesosphere/mesos/frameworks/cassandra/model.proto b/cassandra-mesos-model/src/main/proto/io/mesosphere/mesos/frameworks/cassandra/model.proto index 04657ad..2c7de4c 100644 --- a/cassandra-mesos-model/src/main/proto/io/mesosphere/mesos/frameworks/cassandra/model.proto +++ b/cassandra-mesos-model/src/main/proto/io/mesosphere/mesos/frameworks/cassandra/model.proto @@ -67,6 +67,26 @@ message CassandraFrameworkConfiguration { * target number of seed nodes. */ optional int32 targetNumberOfSeeds = 9; + + /** List of external DCs running other rings of this cluster */ + repeated ExternalDc externalDcs = 10; +} + +/** + * External DC, running other ring of the same cluster + */ +message ExternalDc { + /** DC name */ + optional string name = 1; + + /** REST API url */ + optional string url = 2; + + /** Time of last seed fetching */ + optional int64 seedFetchTime = 3; + + /** List of fetched seeds */ + repeated string seeds = 4; } /** diff --git a/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraCluster.java b/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraCluster.java index c004943..fce02b7 100644 --- a/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraCluster.java +++ b/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraCluster.java @@ -114,6 +114,8 @@ public final class CassandraCluster { private final PersistedCassandraFrameworkConfiguration configuration; @NotNull private final PersistedCassandraClusterJobs jobsState; + @NotNull + private final SeedManager seedManager; @NotNull private final Map clusterJobHandlers; @@ -138,7 +140,8 @@ public CassandraCluster( @NotNull final PersistedCassandraClusterState clusterState, @NotNull final PersistedCassandraClusterHealthCheckHistory healthCheckHistory, @NotNull final PersistedCassandraClusterJobs jobsState, - @NotNull final PersistedCassandraFrameworkConfiguration configuration + @NotNull final PersistedCassandraFrameworkConfiguration configuration, + @NotNull final SeedManager seedManager ) { this.clock = clock; this.httpServerBaseUrl = httpServerBaseUrl; @@ -147,6 +150,7 @@ public CassandraCluster( this.healthCheckHistory = healthCheckHistory; this.jobsState = jobsState; this.configuration = configuration; + this.seedManager = seedManager; clusterJobHandlers = new EnumMap<>(ClusterJobType.class); clusterJobHandlers.put(ClusterJobType.CLEANUP, new NodeTaskClusterJobHandler(this, jobsState)); @@ -164,6 +168,11 @@ public PersistedCassandraFrameworkConfiguration getConfiguration() { return configuration; } + @NotNull + public SeedManager getSeedManager() { + return seedManager; + } + @Nullable public ExecutorMetadata metadataForExecutor(@NotNull final String executorId) { for (final ExecutorMetadata executorMetadata : clusterState.executorMetadata()) { @@ -316,8 +325,12 @@ public void recordHealthCheck(@NotNull final String executorId, @NotNull final H } @NotNull - public List getSeedNodeIps() { - return CassandraFrameworkProtosUtils.getSeedNodeIps(clusterState.nodes()); + public List getSeedNodeIps(final boolean addExternal) { + final List seeds = CassandraFrameworkProtosUtils.getSeedNodeIps(clusterState.nodes()); + if (addExternal) { + seeds.addAll(seedManager.getSeeds()); + } + return seeds; } @NotNull @@ -561,7 +574,7 @@ private CassandraServerConfig buildCassandraServerConfig( CassandraFrameworkProtosUtils.setTaskConfig(taskConfig, configValue("ssl_storage_port", getPortMapping(config, PORT_STORAGE_SSL))); CassandraFrameworkProtosUtils.setTaskConfig(taskConfig, configValue("native_transport_port", getPortMapping(config, PORT_NATIVE))); CassandraFrameworkProtosUtils.setTaskConfig(taskConfig, configValue("rpc_port", getPortMapping(config, PORT_RPC))); - CassandraFrameworkProtosUtils.setTaskConfig(taskConfig, configValue("seeds", SEEDS_FORMAT_JOINER.join(getSeedNodeIps()))); + CassandraFrameworkProtosUtils.setTaskConfig(taskConfig, configValue("seeds", SEEDS_FORMAT_JOINER.join(getSeedNodeIps(true)))); CassandraFrameworkProtosUtils.setTaskConfig(taskConfig, configValue("endpoint_snitch", config.hasSnitch() ? config.getSnitch() : "GossipingPropertyFileSnitch")); // data directory config // TODO: Update the logic here for defining data directories when mesos persistent volumes are released diff --git a/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraScheduler.java b/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraScheduler.java index 7a560cf..8e121df 100644 --- a/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraScheduler.java +++ b/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraScheduler.java @@ -22,6 +22,7 @@ import com.google.common.collect.FluentIterable; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import io.mesosphere.mesos.util.Clock; import org.apache.mesos.Protos.*; import org.apache.mesos.Scheduler; import org.apache.mesos.SchedulerDriver; @@ -55,13 +56,17 @@ public CommandInfo.URI apply(final FileDownload input) { private final PersistedCassandraFrameworkConfiguration configuration; @NotNull private final CassandraCluster cassandraCluster; + @NotNull + private final Clock clock; public CassandraScheduler( @NotNull final PersistedCassandraFrameworkConfiguration configuration, - @NotNull final CassandraCluster cassandraCluster + @NotNull final CassandraCluster cassandraCluster, + @NotNull final Clock clock ) { this.configuration = configuration; this.cassandraCluster = cassandraCluster; + this.clock = clock; } @Override @@ -88,6 +93,7 @@ public void resourceOffers(final SchedulerDriver driver, final List offer if (LOGGER.isDebugEnabled()) { LOGGER.debug("> resourceOffers(driver : {}, offers : {})", driver, protoToString(offers)); } + for (final Offer offer : offers) { final Marker marker = MarkerFactory.getMarker("offerId:" + offer.getId().getValue() + ",hostname:" + offer.getHostname()); final boolean offerUsed = evaluateOffer(driver, marker, offer); diff --git a/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/PersistedCassandraFrameworkConfiguration.java b/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/PersistedCassandraFrameworkConfiguration.java index d65e47d..acf960a 100644 --- a/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/PersistedCassandraFrameworkConfiguration.java +++ b/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/PersistedCassandraFrameworkConfiguration.java @@ -26,8 +26,11 @@ import org.jetbrains.annotations.NotNull; import org.joda.time.Duration; -public final class PersistedCassandraFrameworkConfiguration extends StatePersistedObject { +import java.util.List; + +import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.ExternalDc; +public final class PersistedCassandraFrameworkConfiguration extends StatePersistedObject { public PersistedCassandraFrameworkConfiguration( @NotNull final State state, @NotNull final String frameworkName, @@ -45,7 +48,8 @@ public PersistedCassandraFrameworkConfiguration( final boolean jmxLocal, final boolean jmxNoAuthentication, @NotNull final String defaultRack, - @NotNull final String defaultDc + @NotNull final String defaultDc, + @NotNull final List externalDcs ) { super( "CassandraFrameworkConfiguration", @@ -79,6 +83,7 @@ public CassandraFrameworkConfiguration get() { .setBootstrapGraceTimeSeconds(bootstrapGraceTimeSec) .setTargetNumberOfNodes(executorCount) .setTargetNumberOfSeeds(seedCount) + .addAllExternalDcs(externalDcs) .build(); } }, diff --git a/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/SeedManager.java b/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/SeedManager.java new file mode 100644 index 0000000..a4b28b9 --- /dev/null +++ b/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/SeedManager.java @@ -0,0 +1,152 @@ +package io.mesosphere.mesos.frameworks.cassandra.scheduler; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import io.mesosphere.mesos.util.Clock; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static com.google.common.collect.Lists.newArrayList; +import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.CassandraFrameworkConfiguration; +import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.ExternalDc; + +public class SeedManager { + private static final Logger LOGGER = LoggerFactory.getLogger(SeedManager.class); + + @NotNull + private final PersistedCassandraFrameworkConfiguration configuration; + + @NotNull + private final ObjectMapper objectMapper; + + @NotNull + private final ScheduledExecutorService scheduledExecutorService; + private ScheduledFuture syncingTask; + + @NotNull + private final Clock clock; + + public SeedManager(@NotNull final PersistedCassandraFrameworkConfiguration configuration, + @NotNull final ObjectMapper objectMapper, + @NotNull final Clock clock) { + this.configuration = configuration; + this.objectMapper = objectMapper; + this.clock = clock; + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + } + + @NotNull + public List getSeeds() { + final List seeds = newArrayList(); + + for (final ExternalDc dc : configuration.get().getExternalDcsList()) { + seeds.addAll(dc.getSeedsList()); + } + + return seeds; + } + + public void startSyncingSeeds(final long periodSeconds) { + stopSyncingSeeds(); + LOGGER.debug("Scheduling background syncing task to run every {} seconds", periodSeconds); + syncingTask = scheduledExecutorService.scheduleAtFixedRate( + new SyncingTask(), + 0, + periodSeconds, + TimeUnit.SECONDS + ); + } + + private void stopSyncingSeeds() { + if (syncingTask != null) { + LOGGER.debug("Stopping scheduled background syncing task"); + syncingTask.cancel(true); + syncingTask = null; + } + } + + public void syncSeeds() { + LOGGER.info("Syncing seeds ..."); + for (ExternalDc dc : configuration.get().getExternalDcsList()) { + fetchSeeds(dc); + } + } + + private void fetchSeeds(@NotNull final ExternalDc dc) { + String url = dc.getUrl(); + if (url.endsWith("/")) url = url.substring(0, url.length() - 1); + url += "/node/seed/all"; + + JsonNode node = fetchJson(url); + if (node == null) return; + + JsonNode seedsArr = node.get("seeds"); + List seeds = newArrayList(); + for (JsonNode ip : seedsArr) { + seeds.add(ip.asText()); + } + + LOGGER.info("Fetched seeds for dc \"" + dc.getName() + "\": " + Joiner.on(", ").join(seeds)); + updateDcSeeds(dc, seeds); + } + + @Nullable + protected JsonNode fetchJson(@NotNull final String url) { + HttpURLConnection c = null; + try { + c = (HttpURLConnection)new URL(url).openConnection(); + return objectMapper.readTree(c.getInputStream()); + } catch (IOException e) { + LOGGER.warn("", e); + } finally { + if (c != null) c.disconnect(); + } + + return null; + } + + private int externalDcIdx(@NotNull final String name) { + final List dcs = configuration.get().getExternalDcsList(); + + for (int i = 0; i < dcs.size(); i++) { + ExternalDc dc = dcs.get(i); + if (dc.getName().equals(name)) { + return i; + } + } + return -1; + } + + private void updateDcSeeds(@NotNull final ExternalDc dc, @NotNull final List seeds) { + int idx = externalDcIdx(dc.getName()); + + ExternalDc newDc = ExternalDc.newBuilder(dc) + .clearSeeds().addAllSeeds(seeds) + .setSeedFetchTime(clock.now().getMillis()).build(); + + CassandraFrameworkConfiguration config = CassandraFrameworkConfiguration.newBuilder(configuration.get()) + .setExternalDcs(idx, newDc) + .build(); + + configuration.setValue(config); + } + + private class SyncingTask implements Runnable { + @Override + public void run() { + syncSeeds(); + } + } +} diff --git a/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/util/Env.java b/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/util/Env.java index 6e4b3f1..9e0ca85 100644 --- a/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/util/Env.java +++ b/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/util/Env.java @@ -18,6 +18,10 @@ import com.google.common.base.Optional; import org.jetbrains.annotations.NotNull; +import java.util.Map; + +import static com.google.common.collect.Maps.newHashMap; + public final class Env { @NotNull @@ -35,6 +39,20 @@ public static Optional option(@NotNull final String key) { return Optional.fromNullable(System.getenv(key)); } + @NotNull + public static Map filterStartsWith(@NotNull final String prefix, final boolean trimPrefix) { + final Map result = newHashMap(); + + for (final Map.Entry entry : System.getenv().entrySet()) { + final String key = entry.getKey(); + if (key.startsWith(prefix)) { + result.put(trimPrefix ? key.substring(prefix.length()) : key, entry.getValue()); + } + } + + return result; + } + @NotNull public static String workingDir(final String defaultFileName) { return System.getProperty("user.dir") + defaultFileName; diff --git a/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/util/JaxRsUtils.java b/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/util/JaxRsUtils.java index 2530972..490e699 100644 --- a/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/util/JaxRsUtils.java +++ b/cassandra-mesos-scheduler/src/main/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/util/JaxRsUtils.java @@ -28,7 +28,7 @@ private JaxRsUtils() {} public static void writeSeedIps(@NotNull final CassandraCluster cluster, @NotNull final JsonGenerator json) throws IOException { json.writeArrayFieldStart("seeds"); - for (final String seed : cluster.getSeedNodeIps()) { + for (final String seed : cluster.getSeedNodeIps(false)) { json.writeString(seed); } json.writeEndArray(); diff --git a/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/AbstractCassandraSchedulerTest.java b/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/AbstractCassandraSchedulerTest.java index 72b8895..43a3916 100644 --- a/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/AbstractCassandraSchedulerTest.java +++ b/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/AbstractCassandraSchedulerTest.java @@ -17,7 +17,9 @@ import com.google.protobuf.InvalidProtocolBufferException; import io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos; +import io.mesosphere.mesos.util.Clock; import io.mesosphere.mesos.util.ProtoUtils; +import io.mesosphere.mesos.util.SystemClock; import io.mesosphere.mesos.util.Tuple2; import org.apache.mesos.Protos; @@ -27,6 +29,7 @@ import static org.junit.Assert.*; public abstract class AbstractCassandraSchedulerTest extends AbstractSchedulerTest { + protected Clock clock = new SystemClock(); protected CassandraScheduler scheduler; protected MockSchedulerDriver driver; @@ -1150,7 +1153,7 @@ protected void noopOnOfferAll() { protected void cleanState() { super.cleanState(); - scheduler = new CassandraScheduler(configuration, cluster); + scheduler = new CassandraScheduler(configuration, cluster, clock); driver = new MockSchedulerDriver(scheduler); driver.callRegistered(Protos.FrameworkID.newBuilder().setValue(UUID.randomUUID().toString()).build()); diff --git a/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/AbstractSchedulerTest.java b/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/AbstractSchedulerTest.java index 6802bea..9761835 100644 --- a/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/AbstractSchedulerTest.java +++ b/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/AbstractSchedulerTest.java @@ -15,6 +15,8 @@ */ package io.mesosphere.mesos.frameworks.cassandra.scheduler; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos; import io.mesosphere.mesos.util.SystemClock; import io.mesosphere.mesos.util.Tuple2; @@ -62,7 +64,9 @@ protected void cleanState() { true, false, "RACK0", - "DC0"); + "DC0", + Lists.newArrayList() + ); healthCheckHistory = new PersistedCassandraClusterHealthCheckHistory(state); cluster = new CassandraCluster(new SystemClock(), @@ -71,7 +75,8 @@ protected void cleanState() { new PersistedCassandraClusterState(state), healthCheckHistory, new PersistedCassandraClusterJobs(state), - configuration); + configuration, + new SeedManager(configuration, new ObjectMapper(), new SystemClock())); clusterState = cluster.getClusterState(); } diff --git a/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraClusterStateTest.java b/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraClusterStateTest.java index 752801f..d8bdcbd 100644 --- a/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraClusterStateTest.java +++ b/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraClusterStateTest.java @@ -54,7 +54,7 @@ public void testLaunchNewCluster() { cluster.addExecutorMetadata(executorMetadata1); noopOnOffer(cluster, slaves[0], 1); - assertEquals(Collections.singletonList("127.1.1.1"), cluster.getSeedNodeIps()); + assertEquals(Collections.singletonList("127.1.1.1"), cluster.getSeedNodeIps(false)); assertThat(clusterState.nodeCounts()).isEqualTo(new NodeCounts(1, 1)); // rollout slave #2 diff --git a/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraSchedulerTest.java b/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraSchedulerTest.java index c37c571..cccb15f 100644 --- a/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraSchedulerTest.java +++ b/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/CassandraSchedulerTest.java @@ -15,6 +15,7 @@ */ package io.mesosphere.mesos.frameworks.cassandra.scheduler; +import com.fasterxml.jackson.databind.ObjectMapper; import io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos; import io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.TaskResources; import io.mesosphere.mesos.util.CassandraFrameworkProtosUtils; @@ -55,9 +56,11 @@ public void testReregistration() throws Exception { new PersistedCassandraClusterState(state), new PersistedCassandraClusterHealthCheckHistory(state), new PersistedCassandraClusterJobs(state), - configuration); + configuration, + new SeedManager(configuration, new ObjectMapper(), new SystemClock()) + ); clusterState = cluster.getClusterState(); - scheduler = new CassandraScheduler(configuration, cluster); + scheduler = new CassandraScheduler(configuration, cluster, clock); driver = new MockSchedulerDriver(scheduler); driver.callReRegistered(); diff --git a/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/PersistedCassandraFrameworkConfigurationTest.java b/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/PersistedCassandraFrameworkConfigurationTest.java index fd2c471..b5918b2 100644 --- a/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/PersistedCassandraFrameworkConfigurationTest.java +++ b/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/PersistedCassandraFrameworkConfigurationTest.java @@ -1,5 +1,6 @@ package io.mesosphere.mesos.frameworks.cassandra.scheduler; +import com.google.common.collect.Lists; import io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos; import org.apache.mesos.state.InMemoryState; import org.junit.Test; @@ -30,7 +31,8 @@ public void testGetDefaultRackDc() { false, true, "RACK1", - "DC1" + "DC1", + Lists.newArrayList() ); CassandraFrameworkProtos.RackDc rackDc = config.getDefaultRackDc(); diff --git a/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/SeedManagerTest.java b/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/SeedManagerTest.java new file mode 100644 index 0000000..285fb06 --- /dev/null +++ b/cassandra-mesos-scheduler/src/test/java/io/mesosphere/mesos/frameworks/cassandra/scheduler/SeedManagerTest.java @@ -0,0 +1,81 @@ +package io.mesosphere.mesos.frameworks.cassandra.scheduler; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import io.mesosphere.mesos.util.SystemClock; +import org.apache.mesos.state.InMemoryState; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.ExternalDc; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SeedManagerTest { + private SeedManager seedManager; + + private String jsonResponse = seedsJson(Collections.emptyList()); + + @Before + public void before() { + InMemoryState state = new InMemoryState(); + + PersistedCassandraFrameworkConfiguration config = new PersistedCassandraFrameworkConfiguration( + state, + "name", + 60, + 30, + "2.1", + 0.5, + 1024, + 1024, + 512, + 1, + 1, + "role", + "", + false, + true, + "RACK1", + "DC1", + Arrays.asList(ExternalDc.newBuilder() + .setName("dc") + .setUrl("http://dc") + .build()) + ); + + seedManager = new SeedManager(config, new ObjectMapper(), new SystemClock()) { + @Override + @Nullable + protected JsonNode fetchJson(@NotNull final String url) { + try { + return new ObjectMapper().readTree(jsonResponse); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } + }; + } + + @Test + public void getSeeds_syncSeeds() { + assertTrue(seedManager.getSeeds().isEmpty()); + + jsonResponse = seedsJson(Arrays.asList("1", "2")); + seedManager.syncSeeds(); + assertEquals(Arrays.asList("1", "2"), seedManager.getSeeds()); + } + + private static String seedsJson(List seeds) { + return "{\"seeds\": [" + (seeds.isEmpty() ? "" : "\"" + Joiner.on("\", \"").join(seeds) + "\"") + "]}"; + } +}