Skip to content
This repository has been archived by the owner on Jan 10, 2019. It is now read-only.

Commit

Permalink
Merge pull request #115 from dmitrypekar/multi-dc-basic
Browse files Browse the repository at this point in the history
multi-dc-basic
  • Loading branch information
BenWhitehead committed Jul 8, 2015
2 parents 7515ebc + 950acbe commit 9912b5b
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private static void writeCassandraServerConfig(

modifyCassandraYaml(taskIdMarker, version, serverConfig);
modifyCassandraEnvSh(taskIdMarker, version, serverConfig);
modifyCassandraRackdc(taskIdMarker, version);
modifyCassandraRackdc(taskIdMarker, version, serverConfig);
}

@NotNull
Expand All @@ -110,15 +110,17 @@ private static String processBuilderToString(@NotNull final ProcessBuilder build
}

private static void modifyCassandraRackdc(
@NotNull final Marker taskIdMarker,
@NotNull final String version
@NotNull final Marker taskIdMarker,
@NotNull final String version,
@NotNull final CassandraServerConfig serverConfig
) throws IOException {

LOGGER.info(taskIdMarker, "Building cassandra-rackdc.properties");

final Properties props = new Properties();
props.put("dc", "DC1");
props.put("rack", "RAC1");
final RackDc rackDc = serverConfig.getRackDc();
props.put("dc", rackDc.getDc());
props.put("rack", rackDc.getRack());

// Add a suffix to a datacenter name. Used by the Ec2Snitch and Ec2MultiRegionSnitch to append a string to the EC2 region name.
//props.put("dc_suffix", "");
// Uncomment the following line to make this snitch prefer the internal ip when possible, as the Ec2MultiRegionSnitch does.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ private static int _main() throws UnknownHostException {
final String dataDirectory = Env.option("CASSANDRA_DATA_DIRECTORY").or(DEFAULT_DATA_DIRECTORY); // TODO: Temporary. Will be removed when MESOS-1554 is released
final boolean jmxLocal = Boolean.parseBoolean( Env.option("CASSANDRA_JMX_LOCAL").or("true"));
final boolean jmxNoAuthentication = Boolean.parseBoolean( Env.option("CASSANDRA_JMX_NO_AUTHENTICATION").or("false"));
final String defaultRack = Env.option("CASSANDRA_DEFAULT_RACK").or("RACK0");
final String defaultDc = Env.option("CASSANDRA_DEFAULT_DC").or("DC0");

final Matcher matcher = validateZkUrl(zkUrl);

Expand Down Expand Up @@ -159,7 +161,9 @@ private static int _main() throws UnknownHostException {
mesosRole,
dataDirectory,
jmxLocal,
jmxNoAuthentication);
jmxNoAuthentication,
defaultRack,
defaultDc);


final FrameworkInfo.Builder frameworkBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ message CassandraConfigRole {
*/
optional string mesosRole = 8;

/** Rack/dc info */
optional RackDc rackDc = 9;

/**
* A pre-defined data directory specifying where cassandra should write it's data.
* NOTE:
Expand Down Expand Up @@ -345,6 +348,17 @@ message ClusterJobKeyspaceStatus {
required int64 duration = 3;
}

/**
* Describes node rack and dc.
*/
message RackDc {
/** Rack identifier */
optional string rack = 1 [default = "RACK0"];

/** DataCenter identifier */
optional string dc = 2 [default = "DC0"];
}

/**
* Describes a node.
*/
Expand Down Expand Up @@ -435,6 +449,9 @@ message CassandraNode {
* need to be restarted.
*/
optional bool needsConfigUpdate = 14;

/** Rack/DC information */
optional RackDc rackDc = 15;
}
/**
* Describes a data volume for a node.
Expand Down Expand Up @@ -686,6 +703,9 @@ message CassandraServerConfig {
* Cassandra.yaml configuration details.
*/
required TaskConfig cassandraYamlConfig = 3;

/** Rack/DC information */
optional RackDc rackDc = 4;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ private CassandraNode buildCassandraNode(@NotNull final Protos.Offer offer, fina
builder.setReplacementForIp(replacementForIp);
}

builder.setRackDc(configuration.getDefaultRackDc());

try {
final InetAddress ia = InetAddress.getByName(offer.getHostname());

Expand Down Expand Up @@ -463,12 +465,13 @@ private String getUrlForResource(@NotNull final String resourceName) {
@NotNull
private CassandraNodeTask getConfigUpdateTask(
@NotNull final String taskId,
@NotNull final RackDc rackDc,
@NotNull final ExecutorMetadata metadata
) {
final CassandraFrameworkConfiguration config = configuration.get();
final CassandraConfigRole configRole = config.getDefaultConfigRole();

final CassandraServerConfig cassandraServerConfig = buildCassandraServerConfig(metadata, config, configRole, TaskEnv.getDefaultInstance());
final CassandraServerConfig cassandraServerConfig = buildCassandraServerConfig(metadata, config, configRole, rackDc, TaskEnv.getDefaultInstance());

final TaskDetails taskDetails = TaskDetails.newBuilder()
.setType(TaskDetails.TaskDetailsType.UPDATE_CONFIG)
Expand Down Expand Up @@ -515,7 +518,7 @@ private CassandraNodeTask getServerTask(
command.add("-Dcassandra.replace_address=" + node.getReplacementForIp());
}

final CassandraServerConfig cassandraServerConfig = buildCassandraServerConfig(metadata, config, configRole, taskEnv.build());
final CassandraServerConfig cassandraServerConfig = buildCassandraServerConfig(metadata, config, configRole, node.getRackDc(), taskEnv.build());

final TaskDetails taskDetails = TaskDetails.newBuilder()
.setType(TaskDetails.TaskDetailsType.CASSANDRA_SERVER_RUN)
Expand Down Expand Up @@ -546,6 +549,7 @@ private CassandraServerConfig buildCassandraServerConfig(
@NotNull final ExecutorMetadata metadata,
@NotNull final CassandraFrameworkConfiguration config,
@NotNull final CassandraConfigRole configRole,
@NotNull final RackDc rackDc,
@NotNull final TaskEnv taskEnv
) {
final TaskConfig.Builder taskConfig = TaskConfig.newBuilder(configRole.getCassandraYamlConfig());
Expand All @@ -568,6 +572,7 @@ private CassandraServerConfig buildCassandraServerConfig(
return CassandraServerConfig.newBuilder()
.setCassandraYamlConfig(taskConfig)
.setTaskEnv(taskEnv)
.setRackDc(rackDc)
.build();
}

Expand Down Expand Up @@ -1217,7 +1222,7 @@ TasksForOffer _getTasksForOffer(@NotNull final Marker marker, final @NotNull Pro
LOGGER.debug(marker, "Server task for node already running.");
if (node.getNeedsConfigUpdate()) {
LOGGER.info(marker, "Launching config update tasks for executor: {}", executorId);
final CassandraNodeTask task = getConfigUpdateTask(configUpdateTaskId(node), maybeMetadata.get());
final CassandraNodeTask task = getConfigUpdateTask(configUpdateTaskId(node), node.getRackDc(), maybeMetadata.get());
node.addTasks(task)
.setNeedsConfigUpdate(false);
result.getLaunchTasks().add(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public PersistedCassandraFrameworkConfiguration(
@NotNull final String mesosRole,
@NotNull final String dataDirectory,
final boolean jmxLocal,
final boolean jmxNoAuthentication
final boolean jmxNoAuthentication,
@NotNull final String defaultRack,
@NotNull final String defaultDc
) {
super(
"CassandraFrameworkConfiguration",
Expand All @@ -65,7 +67,8 @@ public CassandraFrameworkConfiguration get() {
.setValue(jmxLocal ? "yes" : "no"))
.addVariables(CassandraFrameworkProtos.TaskEnv.Entry.newBuilder()
.setName("CASSANDRA_JMX_NO_AUTHENTICATION")
.setValue(jmxNoAuthentication ? "yes" : "no")));
.setValue(jmxNoAuthentication ? "yes" : "no")))
.setRackDc(CassandraFrameworkProtos.RackDc.newBuilder().setRack(defaultRack).setDc(defaultDc));
if (javeHeapMb > 0) {
configRole.setMemJavaHeapMb(javeHeapMb);
}
Expand Down Expand Up @@ -144,6 +147,14 @@ public CassandraFrameworkProtos.CassandraConfigRole getDefaultConfigRole() {
return get().getDefaultConfigRole();
}

// TODO: Persistence Schema Update
@NotNull
public CassandraFrameworkProtos.RackDc getDefaultRackDc() {
CassandraFrameworkProtos.RackDc rackDc = getDefaultConfigRole().getRackDc();
if (rackDc == null) rackDc = CassandraFrameworkProtos.RackDc.newBuilder().setRack("RACK0").setDc("DC0").build();
return rackDc;
}

@NotNull
public Duration healthCheckInterval() {
return Duration.standardSeconds(get().getHealthCheckIntervalSeconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public List<ApiEndpoint> indexPage(@Context final UriInfo uriInfo) {
new ApiEndpoint("POST", "node/{node}/restart/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/terminate/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/replace/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/rackdc", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/make-seed/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/make-non-seed/", newArrayList("application/json")),
new ApiEndpoint("GET", "live-nodes", newArrayList("application/json")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ public void write(final JsonGenerator json) throws IOException {
json.writeStringField("targetRunState", cassandraNode.getTargetRunState().name());
json.writeNumberField("jmxPort", cassandraNode.getJmxConnect().getJmxPort());
json.writeBooleanField("seedNode", cassandraNode.getSeed());

CassandraFrameworkProtos.RackDc rackDc = cassandraNode.getRackDc();
json.writeObjectFieldStart("rackDc");
json.writeStringField("rack", rackDc.getRack());
json.writeStringField("dc", rackDc.getDc());
json.writeEndObject();

if (!cassandraNode.hasCassandraDaemonPid()) {
json.writeNullField("cassandraDaemonPid");
} else {
Expand Down Expand Up @@ -479,6 +486,42 @@ public void write(final JsonGenerator json) throws IOException {
});
}

public static class RackDcParams {
public String rack;
public String dc;
}

/**
* Update node with specified parameters. Note: node should be restarted for changes to take effect.
*/
@POST
@Path("/{node}/rackdc")
public Response nodeRackDc(
@PathParam("node") String id,
RackDcParams params
) {
CassandraFrameworkProtos.CassandraNode node = cluster.findNode(id);
if (node == null) return Response.status(404).build();

final CassandraFrameworkProtos.CassandraNode.Builder copy = CassandraFrameworkProtos.CassandraNode.newBuilder(node);
CassandraFrameworkProtos.RackDc.Builder rackDc = CassandraFrameworkProtos.RackDc.newBuilder(node.getRackDc());

if (params.rack != null) rackDc.setRack(params.rack);
if (params.dc != null) rackDc.setDc(params.dc);

copy.setRackDc(rackDc);
cluster.getClusterState().addOrSetNode(copy.build());

return JaxRsUtils.buildStreamingResponse(factory, new StreamingJsonResponse() {
@Override
public void write(final JsonGenerator json) throws IOException {
json.writeBooleanField("success", true);
json.writeStringField("rack", copy.getRackDc().getRack());
json.writeStringField("dc", copy.getRackDc().getDc());
}
});
}

private Response nodeStatusUpdate(final CassandraFrameworkProtos.CassandraNode cassandraNode) {
if (cassandraNode == null) {
return Response.status(Response.Status.NOT_FOUND).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,14 @@ public static void writeConfigRole(final JsonGenerator json, final CassandraConf
}
json.writeEndObject();
}

RackDc rackDc = configRole.getRackDc();
if (rackDc != null) {
json.writeObjectFieldStart("rackDc");
json.writeStringField("rack", rackDc.getRack());
json.writeStringField("dc", rackDc.getDc());
json.writeEndObject();
}
}

public static void writeTask(final JsonGenerator json, final CassandraNodeTask task) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ protected void cleanState() {
"*",
".",
true,
false);
false,
"RACK0",
"DC0");

healthCheckHistory = new PersistedCassandraClusterHealthCheckHistory(state);
cluster = new CassandraCluster(new SystemClock(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.mesosphere.mesos.frameworks.cassandra.scheduler;

import io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos;
import org.apache.mesos.state.InMemoryState;
import org.junit.Test;

import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.CassandraConfigRole;
import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.CassandraFrameworkConfiguration;
import static junit.framework.Assert.assertEquals;

public class PersistedCassandraFrameworkConfigurationTest {
@Test
public void testGetDefaultRackDc() {
InMemoryState state = new InMemoryState();

PersistedCassandraFrameworkConfiguration config = new PersistedCassandraFrameworkConfiguration(
state,
"name",
60,
30,
"2.1",
0.5,
1024,
1024,
512,
1,
1,
"role",
"",
false,
true,
"RACK1",
"DC1"
);

CassandraFrameworkProtos.RackDc rackDc = config.getDefaultRackDc();
assertEquals("RACK1", rackDc.getRack());
assertEquals("DC1", rackDc.getDc());

// backward compatibility: if rackDc is not defined - use defaults
CassandraFrameworkConfiguration.Builder builder = CassandraFrameworkConfiguration.newBuilder(config.get());
builder.setDefaultConfigRole(CassandraConfigRole.newBuilder(builder.getDefaultConfigRole()).clearRackDc());
config.setValue(builder.build());

rackDc = config.getDefaultRackDc();
assertEquals("RACK0", rackDc.getRack());
assertEquals("DC0", rackDc.getDc());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void testRoot() throws Exception {
new ApiEndpoint("POST", "node/{node}/restart/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/terminate/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/replace/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/rackdc", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/make-seed/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/make-non-seed/", newArrayList("application/json")),
new ApiEndpoint("GET", "live-nodes", newArrayList("application/json")),
Expand Down
26 changes: 26 additions & 0 deletions docs/docs/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ The `/` endpoint returns a simple JSON object that lists all URLs the method to
"method": "POST",
"url": "http://localhost:18080/node/{node}/replace/"
},
{
"contentType": [
"application/json"
],
"method": "POST",
"url": "http://localhost:18080/node/{node}/rackdc"
},
{
"contentType": [
"application/json"
Expand Down Expand Up @@ -270,6 +277,7 @@ Endpoint | HTTP method | Content-Types| Description
`/node/{node}/restart` | `POST` | `application/json` | Sets the run-status of the `node` (either IP, hostname, or executor ID) to `RESTART`, which is effectively a sequence of `STOP` followed by `RUN`.
`/node/{node}/terminate` | `POST` | `application/json` | Sets the requested run-status of the `node` (either IP, hostname, or executor ID) to `TERMINATE`, which ensures that the Cassandra node can be replaced. There's no way to bring a `terminated` node back.
`/node/{node}/replace` | `POST` | `application/json` | Allocates a new Cassandra node that is configured to replace the given _node_ (either IP, hostname, or executor ID).
`/node/{node}/rackdc` | `POST` | `application/json` | Updates node with specified rack and dc passed as JSON object.
`/node/{node}/make-seed` | `POST` | `application/json` | Converts a non-seed node to a seed node. Implicitly forces a rollout of the Cassandra configuration to all nodes.
`/node/{node}/make-non-seed` | `POST` | `application/json` | Converts a seed node to a non-seed node. Implicitly forces a rollout of the Cassandra configuration to all nodes.
`/live-nodes` | `GET` | `application/json` | Retrieve multiple live nodes, limited to 3 nodes by default. The limit can be changed with the query parameter `limit`.
Expand Down Expand Up @@ -731,6 +739,24 @@ IP: 127.0.0.2
}
```

## `/node/{node}/rackdc`
request:
```json
{
"rack": "RACK2",
"dc": "DC2"
}
```

response:
```json
{
"success": true,
"rack": "RACK2",
"dc": "DC2"
}
```

## `/node/{node}/make-seed`

```json
Expand Down

0 comments on commit 9912b5b

Please sign in to comment.