Skip to content
This repository has been archived by the owner on Jan 10, 2019. It is now read-only.

multi-dc-basic #115

Merged
merged 6 commits into from
Jul 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private static void writeCassandraServerConfig(

modifyCassandraYaml(taskIdMarker, version, serverConfig);
modifyCassandraEnvSh(taskIdMarker, version, serverConfig);
modifyCassandraRackdc(taskIdMarker, version);
modifyCassandraRackdc(taskIdMarker, version, serverConfig);
}

@NotNull
Expand All @@ -110,15 +110,17 @@ private static String processBuilderToString(@NotNull final ProcessBuilder build
}

private static void modifyCassandraRackdc(
@NotNull final Marker taskIdMarker,
@NotNull final String version
@NotNull final Marker taskIdMarker,
@NotNull final String version,
@NotNull final CassandraServerConfig serverConfig
) throws IOException {

LOGGER.info(taskIdMarker, "Building cassandra-rackdc.properties");

final Properties props = new Properties();
props.put("dc", "DC1");
props.put("rack", "RAC1");
final RackDc rackDc = serverConfig.getRackDc();
props.put("dc", rackDc.getDc());
props.put("rack", rackDc.getRack());

// Add a suffix to a datacenter name. Used by the Ec2Snitch and Ec2MultiRegionSnitch to append a string to the EC2 region name.
//props.put("dc_suffix", "");
// Uncomment the following line to make this snitch prefer the internal ip when possible, as the Ec2MultiRegionSnitch does.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ private static int _main() throws UnknownHostException {
final String dataDirectory = Env.option("CASSANDRA_DATA_DIRECTORY").or(DEFAULT_DATA_DIRECTORY); // TODO: Temporary. Will be removed when MESOS-1554 is released
final boolean jmxLocal = Boolean.parseBoolean( Env.option("CASSANDRA_JMX_LOCAL").or("true"));
final boolean jmxNoAuthentication = Boolean.parseBoolean( Env.option("CASSANDRA_JMX_NO_AUTHENTICATION").or("false"));
final String defaultRack = Env.option("CASSANDRA_DEFAULT_RACK").or("RACK0");
final String defaultDc = Env.option("CASSANDRA_DEFAULT_DC").or("DC0");

final Matcher matcher = validateZkUrl(zkUrl);

Expand Down Expand Up @@ -159,7 +161,9 @@ private static int _main() throws UnknownHostException {
mesosRole,
dataDirectory,
jmxLocal,
jmxNoAuthentication);
jmxNoAuthentication,
defaultRack,
defaultDc);


final FrameworkInfo.Builder frameworkBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ message CassandraConfigRole {
*/
optional string mesosRole = 8;

/** Rack/dc info */
optional RackDc rackDc = 9;

/**
* A pre-defined data directory specifying where cassandra should write it's data.
* NOTE:
Expand Down Expand Up @@ -345,6 +348,17 @@ message ClusterJobKeyspaceStatus {
required int64 duration = 3;
}

/**
* Describes node rack and dc.
*/
message RackDc {
/** Rack identifier */
optional string rack = 1 [default = "RACK0"];

/** DataCenter identifier */
optional string dc = 2 [default = "DC0"];
}

/**
* Describes a node.
*/
Expand Down Expand Up @@ -435,6 +449,9 @@ message CassandraNode {
* need to be restarted.
*/
optional bool needsConfigUpdate = 14;

/** Rack/DC information */
optional RackDc rackDc = 15;
}
/**
* Describes a data volume for a node.
Expand Down Expand Up @@ -686,6 +703,9 @@ message CassandraServerConfig {
* Cassandra.yaml configuration details.
*/
required TaskConfig cassandraYamlConfig = 3;

/** Rack/DC information */
optional RackDc rackDc = 4;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ private CassandraNode buildCassandraNode(@NotNull final Protos.Offer offer, fina
builder.setReplacementForIp(replacementForIp);
}

builder.setRackDc(configuration.getDefaultRackDc());

try {
final InetAddress ia = InetAddress.getByName(offer.getHostname());

Expand Down Expand Up @@ -463,12 +465,13 @@ private String getUrlForResource(@NotNull final String resourceName) {
@NotNull
private CassandraNodeTask getConfigUpdateTask(
@NotNull final String taskId,
@NotNull final RackDc rackDc,
@NotNull final ExecutorMetadata metadata
) {
final CassandraFrameworkConfiguration config = configuration.get();
final CassandraConfigRole configRole = config.getDefaultConfigRole();

final CassandraServerConfig cassandraServerConfig = buildCassandraServerConfig(metadata, config, configRole, TaskEnv.getDefaultInstance());
final CassandraServerConfig cassandraServerConfig = buildCassandraServerConfig(metadata, config, configRole, rackDc, TaskEnv.getDefaultInstance());

final TaskDetails taskDetails = TaskDetails.newBuilder()
.setType(TaskDetails.TaskDetailsType.UPDATE_CONFIG)
Expand Down Expand Up @@ -515,7 +518,7 @@ private CassandraNodeTask getServerTask(
command.add("-Dcassandra.replace_address=" + node.getReplacementForIp());
}

final CassandraServerConfig cassandraServerConfig = buildCassandraServerConfig(metadata, config, configRole, taskEnv.build());
final CassandraServerConfig cassandraServerConfig = buildCassandraServerConfig(metadata, config, configRole, node.getRackDc(), taskEnv.build());

final TaskDetails taskDetails = TaskDetails.newBuilder()
.setType(TaskDetails.TaskDetailsType.CASSANDRA_SERVER_RUN)
Expand Down Expand Up @@ -546,6 +549,7 @@ private CassandraServerConfig buildCassandraServerConfig(
@NotNull final ExecutorMetadata metadata,
@NotNull final CassandraFrameworkConfiguration config,
@NotNull final CassandraConfigRole configRole,
@NotNull final RackDc rackDc,
@NotNull final TaskEnv taskEnv
) {
final TaskConfig.Builder taskConfig = TaskConfig.newBuilder(configRole.getCassandraYamlConfig());
Expand All @@ -568,6 +572,7 @@ private CassandraServerConfig buildCassandraServerConfig(
return CassandraServerConfig.newBuilder()
.setCassandraYamlConfig(taskConfig)
.setTaskEnv(taskEnv)
.setRackDc(rackDc)
.build();
}

Expand Down Expand Up @@ -1217,7 +1222,7 @@ TasksForOffer _getTasksForOffer(@NotNull final Marker marker, final @NotNull Pro
LOGGER.debug(marker, "Server task for node already running.");
if (node.getNeedsConfigUpdate()) {
LOGGER.info(marker, "Launching config update tasks for executor: {}", executorId);
final CassandraNodeTask task = getConfigUpdateTask(configUpdateTaskId(node), maybeMetadata.get());
final CassandraNodeTask task = getConfigUpdateTask(configUpdateTaskId(node), node.getRackDc(), maybeMetadata.get());
node.addTasks(task)
.setNeedsConfigUpdate(false);
result.getLaunchTasks().add(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public PersistedCassandraFrameworkConfiguration(
@NotNull final String mesosRole,
@NotNull final String dataDirectory,
final boolean jmxLocal,
final boolean jmxNoAuthentication
final boolean jmxNoAuthentication,
@NotNull final String defaultRack,
@NotNull final String defaultDc
) {
super(
"CassandraFrameworkConfiguration",
Expand All @@ -65,7 +67,8 @@ public CassandraFrameworkConfiguration get() {
.setValue(jmxLocal ? "yes" : "no"))
.addVariables(CassandraFrameworkProtos.TaskEnv.Entry.newBuilder()
.setName("CASSANDRA_JMX_NO_AUTHENTICATION")
.setValue(jmxNoAuthentication ? "yes" : "no")));
.setValue(jmxNoAuthentication ? "yes" : "no")))
.setRackDc(CassandraFrameworkProtos.RackDc.newBuilder().setRack(defaultRack).setDc(defaultDc));
if (javeHeapMb > 0) {
configRole.setMemJavaHeapMb(javeHeapMb);
}
Expand Down Expand Up @@ -144,6 +147,14 @@ public CassandraFrameworkProtos.CassandraConfigRole getDefaultConfigRole() {
return get().getDefaultConfigRole();
}

// TODO: Persistence Schema Update
@NotNull
public CassandraFrameworkProtos.RackDc getDefaultRackDc() {
CassandraFrameworkProtos.RackDc rackDc = getDefaultConfigRole().getRackDc();
if (rackDc == null) rackDc = CassandraFrameworkProtos.RackDc.newBuilder().setRack("RACK0").setDc("DC0").build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add // TODO: Persistence Schema Update so that we can more appropriately address the code needed to update object schemas in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

return rackDc;
}

@NotNull
public Duration healthCheckInterval() {
return Duration.standardSeconds(get().getHealthCheckIntervalSeconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public List<ApiEndpoint> indexPage(@Context final UriInfo uriInfo) {
new ApiEndpoint("POST", "node/{node}/restart/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/terminate/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/replace/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/rackdc", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/make-seed/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/make-non-seed/", newArrayList("application/json")),
new ApiEndpoint("GET", "live-nodes", newArrayList("application/json")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ public void write(final JsonGenerator json) throws IOException {
json.writeStringField("targetRunState", cassandraNode.getTargetRunState().name());
json.writeNumberField("jmxPort", cassandraNode.getJmxConnect().getJmxPort());
json.writeBooleanField("seedNode", cassandraNode.getSeed());

CassandraFrameworkProtos.RackDc rackDc = cassandraNode.getRackDc();
json.writeObjectFieldStart("rackDc");
json.writeStringField("rack", rackDc.getRack());
json.writeStringField("dc", rackDc.getDc());
json.writeEndObject();

if (!cassandraNode.hasCassandraDaemonPid()) {
json.writeNullField("cassandraDaemonPid");
} else {
Expand Down Expand Up @@ -479,6 +486,42 @@ public void write(final JsonGenerator json) throws IOException {
});
}

public static class RackDcParams {
public String rack;
public String dc;
}

/**
* Update node with specified parameters. Note: node should be restarted for changes to take effect.
*/
@POST
@Path("/{node}/rackdc")
public Response nodeRackDc(
@PathParam("node") String id,
RackDcParams params
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change rack and dc to be a json object passed in the body rather than query params?

Something like this:

{
  "dc": "DC_1",
  "rack": "rack_5"
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

CassandraFrameworkProtos.CassandraNode node = cluster.findNode(id);
if (node == null) return Response.status(404).build();

final CassandraFrameworkProtos.CassandraNode.Builder copy = CassandraFrameworkProtos.CassandraNode.newBuilder(node);
CassandraFrameworkProtos.RackDc.Builder rackDc = CassandraFrameworkProtos.RackDc.newBuilder(node.getRackDc());

if (params.rack != null) rackDc.setRack(params.rack);
if (params.dc != null) rackDc.setDc(params.dc);

copy.setRackDc(rackDc);
cluster.getClusterState().addOrSetNode(copy.build());

return JaxRsUtils.buildStreamingResponse(factory, new StreamingJsonResponse() {
@Override
public void write(final JsonGenerator json) throws IOException {
json.writeBooleanField("success", true);
json.writeStringField("rack", copy.getRackDc().getRack());
json.writeStringField("dc", copy.getRackDc().getDc());
}
});
}

private Response nodeStatusUpdate(final CassandraFrameworkProtos.CassandraNode cassandraNode) {
if (cassandraNode == null) {
return Response.status(Response.Status.NOT_FOUND).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,14 @@ public static void writeConfigRole(final JsonGenerator json, final CassandraConf
}
json.writeEndObject();
}

RackDc rackDc = configRole.getRackDc();
if (rackDc != null) {
json.writeObjectFieldStart("rackDc");
json.writeStringField("rack", rackDc.getRack());
json.writeStringField("dc", rackDc.getDc());
json.writeEndObject();
}
}

public static void writeTask(final JsonGenerator json, final CassandraNodeTask task) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ protected void cleanState() {
"*",
".",
true,
false);
false,
"RACK0",
"DC0");

healthCheckHistory = new PersistedCassandraClusterHealthCheckHistory(state);
cluster = new CassandraCluster(new SystemClock(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.mesosphere.mesos.frameworks.cassandra.scheduler;

import io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos;
import org.apache.mesos.state.InMemoryState;
import org.junit.Test;

import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.CassandraConfigRole;
import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.CassandraFrameworkConfiguration;
import static junit.framework.Assert.assertEquals;

public class PersistedCassandraFrameworkConfigurationTest {
@Test
public void testGetDefaultRackDc() {
InMemoryState state = new InMemoryState();

PersistedCassandraFrameworkConfiguration config = new PersistedCassandraFrameworkConfiguration(
state,
"name",
60,
30,
"2.1",
0.5,
1024,
1024,
512,
1,
1,
"role",
"",
false,
true,
"RACK1",
"DC1"
);

CassandraFrameworkProtos.RackDc rackDc = config.getDefaultRackDc();
assertEquals("RACK1", rackDc.getRack());
assertEquals("DC1", rackDc.getDc());

// backward compatibility: if rackDc is not defined - use defaults
CassandraFrameworkConfiguration.Builder builder = CassandraFrameworkConfiguration.newBuilder(config.get());
builder.setDefaultConfigRole(CassandraConfigRole.newBuilder(builder.getDefaultConfigRole()).clearRackDc());
config.setValue(builder.build());

rackDc = config.getDefaultRackDc();
assertEquals("RACK0", rackDc.getRack());
assertEquals("DC0", rackDc.getDc());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void testRoot() throws Exception {
new ApiEndpoint("POST", "node/{node}/restart/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/terminate/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/replace/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/rackdc", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/make-seed/", newArrayList("application/json")),
new ApiEndpoint("POST", "node/{node}/make-non-seed/", newArrayList("application/json")),
new ApiEndpoint("GET", "live-nodes", newArrayList("application/json")),
Expand Down
26 changes: 26 additions & 0 deletions docs/docs/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ The `/` endpoint returns a simple JSON object that lists all URLs the method to
"method": "POST",
"url": "http://localhost:18080/node/{node}/replace/"
},
{
"contentType": [
"application/json"
],
"method": "POST",
"url": "http://localhost:18080/node/{node}/rackdc"
},
{
"contentType": [
"application/json"
Expand Down Expand Up @@ -270,6 +277,7 @@ Endpoint | HTTP method | Content-Types| Description
`/node/{node}/restart` | `POST` | `application/json` | Sets the run-status of the `node` (either IP, hostname, or executor ID) to `RESTART`, which is effectively a sequence of `STOP` followed by `RUN`.
`/node/{node}/terminate` | `POST` | `application/json` | Sets the requested run-status of the `node` (either IP, hostname, or executor ID) to `TERMINATE`, which ensures that the Cassandra node can be replaced. There's no way to bring a `terminated` node back.
`/node/{node}/replace` | `POST` | `application/json` | Allocates a new Cassandra node that is configured to replace the given _node_ (either IP, hostname, or executor ID).
`/node/{node}/rackdc` | `POST` | `application/json` | Updates node with specified rack and dc passed as JSON object.
`/node/{node}/make-seed` | `POST` | `application/json` | Converts a non-seed node to a seed node. Implicitly forces a rollout of the Cassandra configuration to all nodes.
`/node/{node}/make-non-seed` | `POST` | `application/json` | Converts a seed node to a non-seed node. Implicitly forces a rollout of the Cassandra configuration to all nodes.
`/live-nodes` | `GET` | `application/json` | Retrieve multiple live nodes, limited to 3 nodes by default. The limit can be changed with the query parameter `limit`.
Expand Down Expand Up @@ -731,6 +739,24 @@ IP: 127.0.0.2
}
```

## `/node/{node}/rackdc`
request:
```json
{
"rack": "RACK2",
"dc": "DC2"
}
```

response:
```json
{
"success": true,
"rack": "RACK2",
"dc": "DC2"
}
```

## `/node/{node}/make-seed`

```json
Expand Down