Skip to content
This repository has been archived by the owner on Jan 10, 2019. It is now read-only.

accept offers for default role * in addition to the specified mesosRole #124

Merged
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ CASSANDRA_BOOTSTRAP_GRACE_TIME_SECONDS=120
# The number of seconds that should be used as the mesos framework timeout (default 604800 seconds / 7 days)
CASSANDRA_FAILOVER_TIMEOUT_SECONDS=604800

# The mesos role to used to reserve resources (default *). If this is set, the framework only accepts offers that have resources for that role.
# The mesos role to used to reserve resources (default *). If this is set, the framework accepts offers that have resources for that role or the default role *
CASSANDRA_FRAMEWORK_MESOS_ROLE=*

# A pre-defined data directory specifying where cassandra should write it's data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@
package io.mesosphere.mesos.util;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos;
import io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.*;

import org.apache.mesos.Protos;
import org.apache.mesos.Protos.Resource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.TreeSet;

import static com.google.common.collect.FluentIterable.from;
import static com.google.common.collect.Lists.newArrayList;
import static io.mesosphere.mesos.util.ProtoUtils.*;

public final class CassandraFrameworkProtosUtils {

Expand Down Expand Up @@ -172,6 +180,83 @@ public static CassandraNode.Builder removeTask(@NotNull final CassandraNode cass
return builder;
}

@NotNull
public static Function<Resource, TreeSet<Long>> resourceToPortSet() {
return new Function<Resource, TreeSet<Long>>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make a private static final inner class of this function with a singleton instance returned by this method.

See https://github.com/mesosphere/cassandra-mesos/blob/master/cassandra-mesos-model/src/main/java/io/mesosphere/mesos/util/CassandraFrameworkProtosUtils.java#L176 for an example of how this had been done previously.

@Override
@NotNull
public TreeSet<Long> apply(@Nullable Resource resource) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mark resource parameter as final.

return resourceValueRange(Optional.fromNullable(resource));
}
};
}

public static Predicate<Resource> containsPorts(final Iterable<Long> ports) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please annotate ports with @NotNull.

return new Predicate<Resource>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make a private static final inner class of this predicate with a single constructor argument of the passed in ports.

See https://github.com/mesosphere/cassandra-mesos/blob/master/cassandra-mesos-model/src/main/java/io/mesosphere/mesos/util/CassandraFrameworkProtosUtils.java#L236-L248 for an example of how this had been done previously.

@Override
public boolean apply(Resource resource) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mark resource parameter as final.

TreeSet<Long> portsInResource = resourceValueRange(Optional.fromNullable(resource));
return portsInResource.containsAll(newArrayList(ports));
}
};
}

public static ImmutableListMultimap<String, Resource> resourcesForRoleAndOffer(String role, Protos.Offer offer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mark resource and offer parameters as final and annotate with @NotNull.

return from(offer.getResourcesList())
.filter(resourceHasExpectedRole(role))
.index(resourceToName());
}

public static Predicate<Resource> scalarValueAtLeast(final long v) {
return new Predicate<Resource>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make a private static final inner class of this predicate with a single constructor argument of the passed in v.

See https://github.com/mesosphere/cassandra-mesos/blob/master/cassandra-mesos-model/src/main/java/io/mesosphere/mesos/util/CassandraFrameworkProtosUtils.java#L236-L248 for an example of how this had been done previously.

@Override
public boolean apply(Resource resource) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mark resource parameter as final.

return resource.getType() == Protos.Value.Type.SCALAR &&
resource.getScalar().getValue() > v;
}
};
}

public static Function<Resource, Double> toDoubleResourceValue() {
return new Function<Resource, Double>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make a private static final inner class of this function with a singleton instance returned by this method.

See https://github.com/mesosphere/cassandra-mesos/blob/master/cassandra-mesos-model/src/main/java/io/mesosphere/mesos/util/CassandraFrameworkProtosUtils.java#L176 for an example of how this had been done previously.

@Override
public Double apply(@Nullable Resource resource) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mark resource parameter as final.

return resourceValueDouble(Optional.fromNullable(resource)).or(0.0);
}
};
}

public static Function<Resource, Long> toLongResourceValue() {
return new Function<Resource, Long>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make a private static final inner class of this function with a singleton instance returned by this method.

See https://github.com/mesosphere/cassandra-mesos/blob/master/cassandra-mesos-model/src/main/java/io/mesosphere/mesos/util/CassandraFrameworkProtosUtils.java#L176 for an example of how this had been done previously.

@Override
public Long apply(@Nullable Resource resource) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mark resource parameter as final.

return resourceValueLong(Optional.fromNullable(resource)).or(0l);
}
};
}

public static Optional<Double> maxResourceValueDouble(List<Resource> resource) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mark resource parameter as final and annotate with @NotNull.

ImmutableList<Double> values = from(resource)
.transform(toDoubleResourceValue())
.toList();
if (values.isEmpty()) {
return Optional.absent();
} else {
return Optional.of(Collections.max(values));
}
}

public static Optional<Long> maxResourceValueLong(List<Resource> resource) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mark resource parameter as final and annotate with @NotNull.

ImmutableList<Long> values = from(resource)
.transform(toLongResourceValue())
.toList();
if (values.isEmpty()) {
return Optional.absent();
} else {
return Optional.of(Collections.max(values));
}
}

private static final class CassandraNodeToIp implements Function<CassandraNode, String> {
private static final CassandraNodeToIp INSTANCE = new CassandraNodeToIp();

Expand Down Expand Up @@ -299,7 +384,8 @@ public ResourceHasExpectedRole(@NotNull final String role) {

@Override
public boolean apply(final Resource item) {
return item.getRole().equals(role);
String givenRole = item.getRole();
return givenRole.equals(role) || givenRole.equals("*");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,15 @@ public static Optional<Long> resourceValueLong(@NotNull final Optional<Resource>
}
}

@NotNull
public static Optional<String> resourceValueRole(@NotNull final Optional<Resource> resource) {
if (resource.isPresent()) {
return Optional.of(resource.get().getRole());
} else {
return Optional.absent();
}
}

@NotNull
public static TreeSet<Long> resourceValueRange(@NotNull final Optional<Resource> resource) {
if (resource.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos;
import io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.*;
Expand Down Expand Up @@ -649,31 +650,31 @@ private static CassandraNodeTask getMetadataTask(@NotNull final String executorI
@NotNull
static List<String> hasResources(
@NotNull final Protos.Offer offer,
@NotNull final TaskResources resources,
@NotNull final TaskResources taskResources,
@NotNull final Map<String, Long> portMapping,
@NotNull final String mesosRole
) {
final List<String> errors = newArrayList();
final ListMultimap<String, Protos.Resource> index = from(offer.getResourcesList())
.filter(resourceHasExpectedRole(mesosRole))
.index(resourceToName());
final ListMultimap<String, Protos.Resource> availableResources = resourcesForRoleAndOffer(mesosRole,offer);

final double availableCpus = maxResourceValueDouble(availableResources.get("cpus")).or(0d);
final long availableMem = maxResourceValueLong(availableResources.get("mem")).or(0l);
final long availableDisk = maxResourceValueLong(availableResources.get("disk")).or(0l);

final double availableCpus = resourceValueDouble(headOption(index.get("cpus"))).or(0.0);
final long availableMem = resourceValueLong(headOption(index.get("mem"))).or(0L);
final long availableDisk = resourceValueLong(headOption(index.get("disk"))).or(0L);

if (availableCpus < resources.getCpuCores()) {
errors.add(String.format("Not enough cpu resources for role %s. Required %s only %s available", mesosRole, String.valueOf(resources.getCpuCores()), String.valueOf(availableCpus)));
if (availableCpus < taskResources.getCpuCores()) {
errors.add(String.format("Not enough cpu resources for role %s. Required %s only %s available", mesosRole, String.valueOf(taskResources.getCpuCores()), String.valueOf(availableCpus)));
}
if (availableMem < resources.getMemMb()) {
errors.add(String.format("Not enough mem resources for role %s. Required %d only %d available", mesosRole, resources.getMemMb(), availableMem));
if (availableMem < taskResources.getMemMb()) {
errors.add(String.format("Not enough mem resources for role %s. Required %d only %d available", mesosRole, taskResources.getMemMb(), availableMem));
}
if (availableDisk < resources.getDiskMb()) {
errors.add(String.format("Not enough disk resources for role %s. Required %d only %d available", mesosRole, resources.getDiskMb(), availableDisk));
if (availableDisk < taskResources.getDiskMb()) {
errors.add(String.format("Not enough disk resources for role %s. Required %d only %d available", mesosRole, taskResources.getDiskMb(), availableDisk));
}

final TreeSet<Long> ports = resourceValueRange(headOption(index.get("ports")));
ImmutableSet<Long> ports = from(availableResources.get("ports"))
.transformAndConcat(resourceToPortSet())
.toSet();

for (final Map.Entry<String, Long> entry : portMapping.entrySet()) {
final String key = entry.getKey();
final Long value = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ListMultimap;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.mesosphere.mesos.util.Clock;
import io.mesosphere.mesos.util.ProtoUtils;
import org.apache.mesos.Protos.*;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
Expand All @@ -36,8 +38,11 @@
import java.util.Collections;
import java.util.List;

import static com.google.common.collect.FluentIterable.from;
import static com.google.common.collect.Lists.newArrayList;
import static io.mesosphere.mesos.frameworks.cassandra.CassandraFrameworkProtos.*;
import static io.mesosphere.mesos.util.CassandraFrameworkProtosUtils.*;
import static io.mesosphere.mesos.util.Functions.headOption;
import static io.mesosphere.mesos.util.ProtoUtils.*;

public final class CassandraScheduler implements Scheduler {
Expand Down Expand Up @@ -311,7 +316,6 @@ private boolean evaluateOffer(@NotNull final SchedulerDriver driver, @NotNull fi
final List<TaskInfo> taskInfos = newArrayList();

for (final CassandraNodeTask cassandraNodeTask : tasksForOffer.getLaunchTasks()) {

final TaskDetails taskDetails = cassandraNodeTask.getTaskDetails();

final ExecutorInfo info = executorInfo(
Expand All @@ -323,13 +327,13 @@ private boolean evaluateOffer(@NotNull final SchedulerDriver driver, @NotNull fi
environmentFromTaskEnv(executor.getTaskEnv()),
newArrayList(FluentIterable.from(executor.getDownloadList()).transform(uriToCommandInfoUri))
),
resourceList(executor.getResources(), configuration.mesosRole())
resourceList(executor.getResources(), configuration.mesosRole(), offer)
);

final TaskID taskId = taskId(cassandraNodeTask.getTaskId());
final List<Resource> resources = resourceList(cassandraNodeTask.getResources(), configuration.mesosRole());
final List<Resource> resources = resourceList(cassandraNodeTask.getResources(), configuration.mesosRole(), offer);
if (!cassandraNodeTask.getResources().getPortsList().isEmpty()) {
resources.add(ports(cassandraNodeTask.getResources().getPortsList(), configuration.mesosRole()));
resources.add(ports(cassandraNodeTask.getResources().getPortsList(), configuration.mesosRole(), offer));
}

final TaskInfo task = TaskInfo.newBuilder()
Expand Down Expand Up @@ -367,17 +371,45 @@ public static String getTaskName(@Nullable final String taskName, @NotNull final

@NotNull
@VisibleForTesting
static List<Resource> resourceList(@NotNull final TaskResources resources, @NotNull final String role) {
static List<Resource> resourceList(@NotNull final TaskResources taskResources, @NotNull final String role, Offer offer) {
final ListMultimap<String, Resource> index = resourcesForRoleAndOffer(role, offer);

final String cpuRole = resourceValueRole(headOption(index.get("cpus"))).or("*");
Optional<Resource> memResource = from(index.get("mem"))
.filter(scalarValueAtLeast(taskResources.getMemMb()))
.first();
final String memRole = resourceValueRole(memResource).or("*");
final String diskRole = resourceValueRole(headOption(index.get("disk"))).or("*");

final List<Resource> retVal = newArrayList(
cpu(resources.getCpuCores(), role),
mem(resources.getMemMb(), role)
cpu(taskResources.getCpuCores(), cpuRole),
mem(taskResources.getMemMb(), memRole)
);
if (resources.hasDiskMb() && resources.getDiskMb() > 0) {
retVal.add(disk(resources.getDiskMb(), role));
if (taskResources.hasDiskMb() && taskResources.getDiskMb() > 0) {
retVal.add(disk(taskResources.getDiskMb(), diskRole));
}
return retVal;
}

@NotNull
@VisibleForTesting
static Resource ports(@NotNull final Iterable<Long> ports, @NotNull final String mesosRole, Offer offer) {
final ListMultimap<String, Resource> index = resourcesForRoleAndOffer(mesosRole, offer);

Optional<Resource> offeredPorts = from(index.get("ports"))
.filter(containsPorts(ports))
.first();

String roleToUse;
if (offeredPorts.isPresent()) {
roleToUse = offeredPorts.get().getRole();
} else {
roleToUse = "*";
}
return ProtoUtils.ports(ports, roleToUse);
}


@NotNull
private static Environment environmentFromTaskEnv(@NotNull final TaskEnv taskEnv) {
final Environment.Builder builder = Environment.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,7 @@ protected Protos.TaskInfo[] threeNodeClusterPost() throws InvalidProtocolBufferE
return executorMetadata;
}


protected void executorTaskError(final Protos.TaskInfo taskInfo) {
scheduler.statusUpdate(driver, Protos.TaskStatus.newBuilder()
.setExecutorId(executorId(taskInfo))
Expand Down Expand Up @@ -1150,15 +1151,19 @@ protected void noopOnOfferAll() {
}
}

protected void cleanState() {
super.cleanState();
protected void cleanState(String mesosRole) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mark mesosRole as final.

super.cleanState(mesosRole);

scheduler = new CassandraScheduler(configuration, cluster, clock);

driver = new MockSchedulerDriver(scheduler);
driver.callRegistered(Protos.FrameworkID.newBuilder().setValue(UUID.randomUUID().toString()).build());
}

protected void cleanState() {
cleanState("*");
}

protected static String executorIdValue(final Protos.TaskInfo executorMetadata) {
return executorId(executorMetadata).getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public abstract class AbstractSchedulerTest {
};
protected PersistedCassandraClusterHealthCheckHistory healthCheckHistory;

protected void cleanState() {
protected void cleanState(String mesosRole) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mark mesosRole as final.

// start with clean state
state = new InMemoryState();

Expand All @@ -59,7 +59,7 @@ protected void cleanState() {
"2.1.4",
2, 4096, 4096, 0,
3, 2,
"*",
mesosRole,
".",
true,
false,
Expand All @@ -81,7 +81,12 @@ protected void cleanState() {
clusterState = cluster.getClusterState();
}

protected Protos.Offer createOffer(final Tuple2<Protos.SlaveID, String> slave) {
protected void cleanState() {
cleanState("*");
}


protected Protos.Offer createOffer(final Tuple2<Protos.SlaveID, String> slave) {
final Protos.Offer.Builder builder = Protos.Offer.newBuilder()
.setFrameworkId(frameworkId)
.setHostname(slave._2)
Expand Down
Loading