Skip to content
This repository has been archived by the owner on Jan 10, 2019. It is now read-only.

Commit

Permalink
Merge pull request #120 from dmitrypekar/multi-dc-seed-sync
Browse files Browse the repository at this point in the history
multi-dc-seed-sync
  • Loading branch information
BenWhitehead committed Jul 17, 2015
2 parents 8360968 + 662defa commit a6a94fb
Show file tree
Hide file tree
Showing 15 changed files with 371 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,17 @@

import java.net.URI;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.collect.Lists.newArrayList;
import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.ExternalDc;
import static io.mesosphere.mesos.util.ProtoUtils.frameworkId;

public final class Main {
Expand Down Expand Up @@ -133,6 +137,7 @@ private static int _main() throws UnknownHostException {
final String defaultRack = Env.option("CASSANDRA_DEFAULT_RACK").or("RACK0");
final String defaultDc = Env.option("CASSANDRA_DEFAULT_DC").or("DC0");

final List<ExternalDc> externalDcs = getExternalDcs(Env.filterStartsWith("CASSANDRA_EXTERNAL_DC_", true));
final Matcher matcher = validateZkUrl(zkUrl);

final State state = new ZooKeeperState(
Expand Down Expand Up @@ -163,7 +168,8 @@ private static int _main() throws UnknownHostException {
jmxLocal,
jmxNoAuthentication,
defaultRack,
defaultDc);
defaultDc,
externalDcs);


final FrameworkInfo.Builder frameworkBuilder =
Expand All @@ -184,14 +190,16 @@ private static int _main() throws UnknownHostException {
final Clock clock = new SystemClock();
final PersistedCassandraClusterHealthCheckHistory healthCheckHistory = new PersistedCassandraClusterHealthCheckHistory(state);
final PersistedCassandraClusterState clusterState = new PersistedCassandraClusterState(state);
final SeedManager seedManager = new SeedManager(configuration, new ObjectMapper(), new SystemClock());
final CassandraCluster cassandraCluster = new CassandraCluster(
clock,
httpServerBaseUri.toString(),
new ExecutorCounter(state, 0L),
clusterState,
healthCheckHistory,
new PersistedCassandraClusterJobs(state),
configuration
configuration,
seedManager
);
final HealthReportService healthReportService = new HealthReportService(
clusterState,
Expand All @@ -201,7 +209,8 @@ private static int _main() throws UnknownHostException {
);
final Scheduler scheduler = new CassandraScheduler(
configuration,
cassandraCluster
cassandraCluster,
clock
);

final JsonFactory factory = new JsonFactory();
Expand Down Expand Up @@ -239,6 +248,8 @@ private static int _main() throws UnknownHostException {
driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), mesosMasterZkUrl);
}

seedManager.startSyncingSeeds(60);

final int status;
switch (driver.run()) {
case DRIVER_STOPPED:
Expand All @@ -261,6 +272,20 @@ private static int _main() throws UnknownHostException {
return status;
}

static List<ExternalDc> getExternalDcs(Map<String, String> dcOpts) {
final List<ExternalDc> externalDcs = newArrayList();

for (final String key: dcOpts.keySet()) {
externalDcs.add(
ExternalDc.newBuilder()
.setName(key)
.setUrl(dcOpts.get(key))
.build()
);
}
return externalDcs;
}

static Matcher validateZkUrl(final String zkUrl) {
final Matcher matcher = zkURLPattern.matcher(zkUrl);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@
*/
package io.mesosphere.mesos.frameworks.cassandra.framework;

import com.google.common.collect.Maps;
import org.junit.Test;

import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;

import static io.mesosphere.mesos.frameworks.cassandra.framework.Main.*;
import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.ExternalDc;
import static io.mesosphere.mesos.frameworks.cassandra.framework.Main.SystemExitException;
import static io.mesosphere.mesos.frameworks.cassandra.framework.Main.validateZkUrl;
import static junit.framework.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;

public class MainTest {
Expand Down Expand Up @@ -81,4 +87,17 @@ public void testTrailingSlash() throws Exception {
validateZkUrl("zk://host1:2181/cassandra-mesos/");
}

@Test
public void testGetExternalDcs() {
Map<String, String> opts = Maps.newLinkedHashMap();
opts.put("dc0", "http://dc0");
opts.put("dc1", "http://dc1");

List<ExternalDc> dcs = Main.getExternalDcs(opts);
assertEquals(2, dcs.size());

ExternalDc dc0 = dcs.get(0);
assertEquals("dc0", dc0.getName());
assertEquals("http://dc0", dc0.getUrl());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ message CassandraFrameworkConfiguration {
* target number of seed nodes.
*/
optional int32 targetNumberOfSeeds = 9;

/** List of external DCs running other rings of this cluster */
repeated ExternalDc externalDcs = 10;
}

/**
* External DC, running other ring of the same cluster
*/
message ExternalDc {
/** DC name */
optional string name = 1;

/** REST API url */
optional string url = 2;

/** Time of last seed fetching */
optional int64 seedFetchTime = 3;

/** List of fetched seeds */
repeated string seeds = 4;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public final class CassandraCluster {
private final PersistedCassandraFrameworkConfiguration configuration;
@NotNull
private final PersistedCassandraClusterJobs jobsState;
@NotNull
private final SeedManager seedManager;

@NotNull
private final Map<ClusterJobType, ClusterJobHandler> clusterJobHandlers;
Expand All @@ -138,7 +140,8 @@ public CassandraCluster(
@NotNull final PersistedCassandraClusterState clusterState,
@NotNull final PersistedCassandraClusterHealthCheckHistory healthCheckHistory,
@NotNull final PersistedCassandraClusterJobs jobsState,
@NotNull final PersistedCassandraFrameworkConfiguration configuration
@NotNull final PersistedCassandraFrameworkConfiguration configuration,
@NotNull final SeedManager seedManager
) {
this.clock = clock;
this.httpServerBaseUrl = httpServerBaseUrl;
Expand All @@ -147,6 +150,7 @@ public CassandraCluster(
this.healthCheckHistory = healthCheckHistory;
this.jobsState = jobsState;
this.configuration = configuration;
this.seedManager = seedManager;

clusterJobHandlers = new EnumMap<>(ClusterJobType.class);
clusterJobHandlers.put(ClusterJobType.CLEANUP, new NodeTaskClusterJobHandler(this, jobsState));
Expand All @@ -164,6 +168,11 @@ public PersistedCassandraFrameworkConfiguration getConfiguration() {
return configuration;
}

@NotNull
public SeedManager getSeedManager() {
return seedManager;
}

@Nullable
public ExecutorMetadata metadataForExecutor(@NotNull final String executorId) {
for (final ExecutorMetadata executorMetadata : clusterState.executorMetadata()) {
Expand Down Expand Up @@ -316,8 +325,12 @@ public void recordHealthCheck(@NotNull final String executorId, @NotNull final H
}

@NotNull
public List<String> getSeedNodeIps() {
return CassandraFrameworkProtosUtils.getSeedNodeIps(clusterState.nodes());
public List<String> getSeedNodeIps(final boolean addExternal) {
final List<String> seeds = CassandraFrameworkProtosUtils.getSeedNodeIps(clusterState.nodes());
if (addExternal) {
seeds.addAll(seedManager.getSeeds());
}
return seeds;
}

@NotNull
Expand Down Expand Up @@ -561,7 +574,7 @@ private CassandraServerConfig buildCassandraServerConfig(
CassandraFrameworkProtosUtils.setTaskConfig(taskConfig, configValue("ssl_storage_port", getPortMapping(config, PORT_STORAGE_SSL)));
CassandraFrameworkProtosUtils.setTaskConfig(taskConfig, configValue("native_transport_port", getPortMapping(config, PORT_NATIVE)));
CassandraFrameworkProtosUtils.setTaskConfig(taskConfig, configValue("rpc_port", getPortMapping(config, PORT_RPC)));
CassandraFrameworkProtosUtils.setTaskConfig(taskConfig, configValue("seeds", SEEDS_FORMAT_JOINER.join(getSeedNodeIps())));
CassandraFrameworkProtosUtils.setTaskConfig(taskConfig, configValue("seeds", SEEDS_FORMAT_JOINER.join(getSeedNodeIps(true))));
CassandraFrameworkProtosUtils.setTaskConfig(taskConfig, configValue("endpoint_snitch", config.hasSnitch() ? config.getSnitch() : "GossipingPropertyFileSnitch"));
// data directory config
// TODO: Update the logic here for defining data directories when mesos persistent volumes are released
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.FluentIterable;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.mesosphere.mesos.util.Clock;
import org.apache.mesos.Protos.*;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
Expand Down Expand Up @@ -55,13 +56,17 @@ public CommandInfo.URI apply(final FileDownload input) {
private final PersistedCassandraFrameworkConfiguration configuration;
@NotNull
private final CassandraCluster cassandraCluster;
@NotNull
private final Clock clock;

public CassandraScheduler(
@NotNull final PersistedCassandraFrameworkConfiguration configuration,
@NotNull final CassandraCluster cassandraCluster
@NotNull final CassandraCluster cassandraCluster,
@NotNull final Clock clock
) {
this.configuration = configuration;
this.cassandraCluster = cassandraCluster;
this.clock = clock;
}

@Override
Expand All @@ -88,6 +93,7 @@ public void resourceOffers(final SchedulerDriver driver, final List<Offer> offer
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("> resourceOffers(driver : {}, offers : {})", driver, protoToString(offers));
}

for (final Offer offer : offers) {
final Marker marker = MarkerFactory.getMarker("offerId:" + offer.getId().getValue() + ",hostname:" + offer.getHostname());
final boolean offerUsed = evaluateOffer(driver, marker, offer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import org.jetbrains.annotations.NotNull;
import org.joda.time.Duration;

public final class PersistedCassandraFrameworkConfiguration extends StatePersistedObject<CassandraFrameworkConfiguration> {
import java.util.List;

import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.ExternalDc;

public final class PersistedCassandraFrameworkConfiguration extends StatePersistedObject<CassandraFrameworkConfiguration> {
public PersistedCassandraFrameworkConfiguration(
@NotNull final State state,
@NotNull final String frameworkName,
Expand All @@ -45,7 +48,8 @@ public PersistedCassandraFrameworkConfiguration(
final boolean jmxLocal,
final boolean jmxNoAuthentication,
@NotNull final String defaultRack,
@NotNull final String defaultDc
@NotNull final String defaultDc,
@NotNull final List<ExternalDc> externalDcs
) {
super(
"CassandraFrameworkConfiguration",
Expand Down Expand Up @@ -79,6 +83,7 @@ public CassandraFrameworkConfiguration get() {
.setBootstrapGraceTimeSeconds(bootstrapGraceTimeSec)
.setTargetNumberOfNodes(executorCount)
.setTargetNumberOfSeeds(seedCount)
.addAllExternalDcs(externalDcs)
.build();
}
},
Expand Down
Loading

0 comments on commit a6a94fb

Please sign in to comment.