Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-15126 Fixes cluster hanging if exception occurred during activation. #11694

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JIT_NAME;
Expand Down Expand Up @@ -1718,10 +1717,6 @@ private void fillNodeAttributes(boolean notifyEnabled) throws IgniteCheckedExcep
if (cfg.getConnectorConfiguration() != null)
add(ATTR_REST_PORT_RANGE, cfg.getConnectorConfiguration().getPortRange());

// Whether rollback of dynamic cache start is supported or not.
// This property is added because of backward compatibility.
add(ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED, Boolean.TRUE);

// Save data storage configuration.
addDataStorageConfigurationAttributes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,6 @@ public final class IgniteNodeAttributes {
/** Rebalance thread pool size. */
public static final String ATTR_REBALANCE_POOL_SIZE = ATTR_PREFIX + ".rebalance.pool.size";

/** Internal attribute name constant. */
public static final String ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED = ATTR_PREFIX + ".dynamic.cache.start.rollback.supported";

/** Supported features. */
public static final String ATTR_IGNITE_FEATURES = ATTR_PREFIX + ".features";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ public void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNo
* @param failMsg Dynamic change request fail message.
* @param topVer Current topology version.
*/
public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, AffinityTopologyVersion topVer) {
public void onCacheChangeRequested(ExchangeFailureMessage failMsg, AffinityTopologyVersion topVer) {
AffinityTopologyVersion actualTopVer = failMsg.exchangeId().topologyVersion();

ExchangeActions exchangeActions = new ExchangeActions();
Expand Down Expand Up @@ -603,7 +603,7 @@ public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, Aff
processStopCacheRequest(exchangeActions, req, res, req.cacheName(), cacheDesc, actualTopVer, true);
}

failMsg.exchangeActions(exchangeActions);
failMsg.exchangeRollbackActions(exchangeActions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.cache;

import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
Expand All @@ -28,55 +30,55 @@
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;

/**
* This class represents discovery message that is used to provide information about dynamic cache start failure.
*/
public class DynamicCacheChangeFailureMessage implements DiscoveryCustomMessage {
public class ExchangeFailureMessage implements DiscoveryCustomMessage {
/** */
private static final long serialVersionUID = 0L;

/** Cache names. */
@GridToStringInclude
private Collection<String> cacheNames;
private final Collection<String> cacheNames;

/** Custom message ID. */
private IgniteUuid id;
private final IgniteUuid id;

/** */
private GridDhtPartitionExchangeId exchId;
private final GridDhtPartitionExchangeId exchId;

/** */
@GridToStringInclude
private IgniteCheckedException cause;
private final Map<UUID, Exception> exchangeErrors;

/** Cache updates to be executed on exchange. */
private transient ExchangeActions exchangeActions;
/** Actions to be done to rollback changes done before the exchange failure. */
private transient ExchangeActions exchangeRollbackActions;

/**
* Creates new DynamicCacheChangeFailureMessage instance.
*
* @param locNode Local node.
* @param exchId Exchange Id.
* @param cause Cache start error.
* @param cacheNames Cache names.
* @param exchangeErrors Errors that caused PME to fail.
*/
public DynamicCacheChangeFailureMessage(
public ExchangeFailureMessage(
ClusterNode locNode,
GridDhtPartitionExchangeId exchId,
IgniteCheckedException cause,
Map<UUID, Exception> exchangeErrors,
Collection<String> cacheNames
) {
assert exchId != null;
assert cause != null;
assert !F.isEmpty(exchangeErrors);
assert !F.isEmpty(cacheNames) : cacheNames;

this.id = IgniteUuid.fromUuid(locNode.id());
this.exchId = exchId;
this.cause = cause;
this.cacheNames = cacheNames;
this.exchangeErrors = exchangeErrors;
}

/** {@inheritDoc} */
Expand All @@ -91,27 +93,40 @@ public Collection<String> cacheNames() {
return cacheNames;
}

/** */
public Map<UUID, Exception> exchangeErrors() {
return exchangeErrors;
}

/**
* @return Cache start error.
* @return Cache updates to be executed on exchange.
*/
public IgniteCheckedException error() {
return cause;
public ExchangeActions exchangeRollbackActions() {
return exchangeRollbackActions;
}

/**
* @return Cache updates to be executed on exchange.
* @param exchangeRollbackActions Cache updates to be executed on exchange.
*/
public ExchangeActions exchangeActions() {
return exchangeActions;
public void exchangeRollbackActions(ExchangeActions exchangeRollbackActions) {
assert exchangeRollbackActions != null && !exchangeRollbackActions.empty() : exchangeRollbackActions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!F.isEmpty ?


this.exchangeRollbackActions = exchangeRollbackActions;
}

/**
* @param exchangeActions Cache updates to be executed on exchange.
* Creates an IgniteCheckedException that is used as root cause of the exchange initialization failure. This method
* aggregates all the exceptions provided from all participating nodes.
*
* @return Exception that represents a cause of the exchange initialization failure.
*/
public void exchangeActions(ExchangeActions exchangeActions) {
assert exchangeActions != null && !exchangeActions.empty() : exchangeActions;
public IgniteCheckedException createFailureCompoundException() {
IgniteCheckedException ex = new IgniteCheckedException("Failed to complete exchange process.");

for (Map.Entry<UUID, Exception> entry : exchangeErrors.entrySet())
U.addSuppressed(ex, entry.getValue());

this.exchangeActions = exchangeActions;
return ex;
}

/**
Expand Down Expand Up @@ -141,6 +156,6 @@ public void exchangeActions(ExchangeActions exchangeActions) {

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheChangeFailureMessage.class, this);
return S.toString(ExchangeFailureMessage.class, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,12 @@
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterGroupEmptyException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.BaselineChangedEvent;
import org.apache.ignite.events.ClusterActivationEvent;
import org.apache.ignite.events.ClusterStateChangeEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDiagnosticAware;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
Expand All @@ -82,7 +74,6 @@
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
Expand Down Expand Up @@ -114,7 +105,6 @@
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.BooleanMetricImpl;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
Expand All @@ -139,7 +129,6 @@
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand All @@ -159,9 +148,6 @@
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_ACTIVATED;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_DEACTIVATED;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_STATE_CHANGED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
Expand Down Expand Up @@ -599,23 +585,6 @@ private void onDiscoveryEvent(DiscoveryEvent evt, DiscoCache cache) {
exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);

exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);

boolean baselineChanging;
if (stateChangeMsg.forceChangeBaselineTopology())
baselineChanging = true;
else {
DiscoveryDataClusterState state = cctx.kernalContext().state().clusterState();

assert state.transition() : state;

baselineChanging = exchActions.changedBaseline()
// Or it is the first activation.
|| state.state() != ClusterState.INACTIVE
&& !state.previouslyActive()
&& state.previousBaselineTopology() == null;
}

exchFut.listen(f -> onClusterStateChangeFinish(exchActions, baselineChanging));
}
}
else if (customMsg instanceof DynamicCacheChangeBatch) {
Expand Down Expand Up @@ -643,13 +612,14 @@ else if (msg.exchangeId().topologyVersion().topologyVersion() >= cctx.discovery(
exchangeFuture(msg.exchangeId(), null, null, null, null)
.onAffinityChangeMessage(evt.eventNode(), msg);
}
else if (customMsg instanceof DynamicCacheChangeFailureMessage) {
DynamicCacheChangeFailureMessage msg = (DynamicCacheChangeFailureMessage)customMsg;
else if (customMsg instanceof ExchangeFailureMessage) {
ExchangeFailureMessage msg = (ExchangeFailureMessage)customMsg;

if (msg.exchangeId().topologyVersion().topologyVersion() >=
affinityTopologyVersion(cctx.discovery().localJoinEvent()).topologyVersion())
exchangeFuture(msg.exchangeId(), null, null, null, null)
.onDynamicCacheChangeFail(evt.eventNode(), msg);
long exchangeTopVer = msg.exchangeId().topologyVersion().topologyVersion();
long locNodeJoinTopVer = affinityTopologyVersion(cctx.discovery().localJoinEvent()).topologyVersion();

if (exchangeTopVer >= locNodeJoinTopVer)
exchangeFuture(msg.exchangeId(), null, null, null, null).onExchangeFailureMessage(evt.eventNode(), msg);
}
else if (customMsg instanceof SnapshotDiscoveryMessage
&& ((SnapshotDiscoveryMessage)customMsg).needExchange()) {
Expand Down Expand Up @@ -720,73 +690,6 @@ else if (customMsg instanceof WalStateAbstractMessage
}
}

/** */
private void onClusterStateChangeFinish(ExchangeActions exchActions, boolean baselineChanging) {
A.notNull(exchActions, "exchActions");

GridEventStorageManager evtMngr = cctx.kernalContext().event();

if (exchActions.activate() && evtMngr.isRecordable(EVT_CLUSTER_ACTIVATED) ||
exchActions.deactivate() && evtMngr.isRecordable(EVT_CLUSTER_DEACTIVATED) ||
exchActions.changedClusterState() && evtMngr.isRecordable(EVT_CLUSTER_STATE_CHANGED)
) {
List<Event> evts = new ArrayList<>(2);

ClusterNode locNode = cctx.kernalContext().discovery().localNode();

Collection<BaselineNode> bltNodes = cctx.kernalContext().cluster().get().currentBaselineTopology();

boolean collectionUsed = false;

if (exchActions.activate() && evtMngr.isRecordable(EVT_CLUSTER_ACTIVATED)) {
assert !exchActions.deactivate() : exchActions;

collectionUsed = true;

evts.add(new ClusterActivationEvent(locNode, "Cluster activated.", EVT_CLUSTER_ACTIVATED, bltNodes));
}

if (exchActions.deactivate() && evtMngr.isRecordable(EVT_CLUSTER_DEACTIVATED)) {
assert !exchActions.activate() : exchActions;

collectionUsed = true;

evts.add(new ClusterActivationEvent(locNode, "Cluster deactivated.", EVT_CLUSTER_DEACTIVATED, bltNodes));
}

if (exchActions.changedClusterState() && evtMngr.isRecordable(EVT_CLUSTER_STATE_CHANGED)) {
StateChangeRequest req = exchActions.stateChangeRequest();

if (collectionUsed && bltNodes != null)
bltNodes = new ArrayList<>(bltNodes);

evts.add(new ClusterStateChangeEvent(req.prevState(), req.state(), bltNodes, locNode, "Cluster state changed."));
}

A.notEmpty(evts, "events " + exchActions);

cctx.kernalContext().pools().getSystemExecutorService()
.submit(() -> evts.forEach(e -> cctx.kernalContext().event().record(e)));
}

GridKernalContext ctx = cctx.kernalContext();

if (baselineChanging) {
ctx.pools().getStripedExecutorService().execute(new Runnable() {
@Override public void run() {
if (ctx.event().isRecordable(EventType.EVT_BASELINE_CHANGED)) {
ctx.event().record(new BaselineChangedEvent(
ctx.discovery().localNode(),
"Baseline changed.",
EventType.EVT_BASELINE_CHANGED,
ctx.cluster().get().currentBaselineTopology()
));
}
}
});
}
}

/**
* @param task Task to run in exchange worker thread.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4263,8 +4263,8 @@ else if (msg0 instanceof WalStateFinishMessage)
return changeRequested;
}

if (msg instanceof DynamicCacheChangeFailureMessage)
cachesInfo.onCacheChangeRequested((DynamicCacheChangeFailureMessage)msg, topVer);
if (msg instanceof ExchangeFailureMessage)
cachesInfo.onCacheChangeRequested((ExchangeFailureMessage)msg, topVer);

if (msg instanceof ClientCacheChangeDiscoveryMessage)
cachesInfo.onClientCacheChange((ClientCacheChangeDiscoveryMessage)msg, node);
Expand Down
Loading