Skip to content

Commit

Permalink
[FLINK-22678][state][changelog] Configurations and user APIs for Chan…
Browse files Browse the repository at this point in the history
…gelogStateBackend

This fix apache#16153.
  • Loading branch information
Zakelly authored Jun 22, 2021
1 parent 8ff4c1e commit 641c31e
Show file tree
Hide file tree
Showing 24 changed files with 473 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>state.backend.changelog.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable state backend to write state changes to StateChangelog. If this config is not set explicitly, it means no preference for enabling the change log, and the value in lower config level will take effect. The default value 'false' here means if no value set (job or cluster), the change log will not be enabled.</td>
</tr>
<tr>
<td><h5>state.backend.incremental</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
<td>String</td>
<td>The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend).</td>
</tr>
<tr>
<td><h5>state.backend.changelog.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable state backend to write state changes to StateChangelog. If this config is not set explicitly, it means no preference for enabling the change log, and the value in lower config level will take effect. The default value 'false' here means if no value set (job or cluster), the change log will not be enabled.</td>
</tr>
<tr>
<td><h5>state.backend.incremental</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,17 @@ public class CheckpointingOptions {

/** Whether to enable state change log. */
@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS)
@Documentation.ExcludeFromDocumentation("Hidden for now")
public static final ConfigOption<Boolean> ENABLE_STATE_CHANGE_LOG =
ConfigOptions.key("state.backend.changelog.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable state backend to write state changes to StateChangelog.");
"Whether to enable state backend to write state changes to StateChangelog. "
+ "If this config is not set explicitly, it means no preference "
+ "for enabling the change log, and the value in lower config "
+ "level will take effect. The default value 'false' here means "
+ "if no value set (job or cluster), the change log will not be "
+ "enabled.");

/** The maximum number of completed checkpoints to retain. */
@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.util.TernaryBoolean;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -214,6 +215,8 @@ StreamConfig getConfig(
config.setOperatorName(operatorID.toHexString());
config.setOperatorID(operatorID);
config.setStateBackend(stateBackend);
// This means leaving this stateBackend unwrapped.
config.setChangelogStateBackendEnabled(TernaryBoolean.FALSE);
config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND, 1.0);
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def excluded_methods(cls):
'socketTextStream', 'initializeContextEnvironment', 'readTextFile',
'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
'clearJobListeners', 'getJobListeners', "fromSequence",
'setDefaultSavepointDirectory', 'getDefaultSavepointDirectory'}
'setDefaultSavepointDirectory', 'getDefaultSavepointDirectory',
'enableChangelogStateBackend', 'isChangelogStateBackendEnabled'}


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,11 @@ public static DefaultExecutionGraph buildGraph(
try {
rootBackend =
StateBackendLoader.fromApplicationOrConfigOrDefault(
applicationConfiguredBackend, jobManagerConfig, classLoader, log);
applicationConfiguredBackend,
snapshotSettings.isChangelogStateBackendEnabled(),
jobManagerConfig,
classLoader,
log);
} catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
throw new JobExecutionException(
jobId, "Could not instantiate configured state backend", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TernaryBoolean;

import javax.annotation.Nullable;

Expand All @@ -42,6 +43,9 @@ public class JobCheckpointingSettings implements Serializable {
/** The default state backend, if configured by the user in the job */
@Nullable private final SerializedValue<StateBackend> defaultStateBackend;

/** The enable flag for change log state backend, if configured by the user in the job */
private final TernaryBoolean changelogStateBackendEnabled;

/** The default checkpoint storage, if configured by the user in the job */
@Nullable private final SerializedValue<CheckpointStorage> defaultCheckpointStorage;

Expand All @@ -52,18 +56,23 @@ public JobCheckpointingSettings(
CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration,
@Nullable SerializedValue<StateBackend> defaultStateBackend) {

this(checkpointCoordinatorConfiguration, defaultStateBackend, null, null);
this(checkpointCoordinatorConfiguration, defaultStateBackend, null, null, null);
}

public JobCheckpointingSettings(
CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration,
@Nullable SerializedValue<StateBackend> defaultStateBackend,
@Nullable TernaryBoolean changelogStateBackendEnabled,
@Nullable SerializedValue<CheckpointStorage> defaultCheckpointStorage,
@Nullable SerializedValue<MasterTriggerRestoreHook.Factory[]> masterHooks) {

this.checkpointCoordinatorConfiguration =
Preconditions.checkNotNull(checkpointCoordinatorConfiguration);
this.defaultStateBackend = defaultStateBackend;
this.changelogStateBackendEnabled =
changelogStateBackendEnabled == null
? TernaryBoolean.UNDEFINED
: changelogStateBackendEnabled;
this.defaultCheckpointStorage = defaultCheckpointStorage;
this.masterHooks = masterHooks;
}
Expand All @@ -79,6 +88,10 @@ public SerializedValue<StateBackend> getDefaultStateBackend() {
return defaultStateBackend;
}

public TernaryBoolean isChangelogStateBackendEnabled() {
return changelogStateBackendEnabled;
}

@Nullable
public SerializedValue<CheckpointStorage> getDefaultCheckpointStorage() {
return defaultCheckpointStorage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.TernaryBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,7 +41,6 @@
import java.lang.reflect.InvocationTargetException;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** This class contains utility methods to load state backends from configurations. */
Expand Down Expand Up @@ -103,7 +103,7 @@ public class StateBackendLoader {
* @throws IOException May be thrown by the StateBackendFactory when instantiating the state
* backend
*/
private static StateBackend loadUnwrappedStateBackendFromConfig(
public static StateBackend loadStateBackendFromConfig(
ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger)
throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {

Expand Down Expand Up @@ -187,49 +187,6 @@ private static StateBackend loadUnwrappedStateBackendFromConfig(
}
}

/**
* Loads the state backend from the configuration. It returns a {@code ChangelogStateBackend} if
* '{@code CheckpointingOptions.ENABLE_STATE_CHANGE_LOG}' is enabled; otherwise returns an
* unwrapped state backend created through {@link
* StateBackendLoader#loadUnwrappedStateBackendFromConfig}.
*
* <p>Refer to {@link StateBackendLoader#loadUnwrappedStateBackendFromConfig} for details on how
* an unwrapped state backend is loaded from the configuration.
*
* @param config The configuration to load the state backend from
* @param classLoader The class loader that should be used to load the state backend
* @param logger Optionally, a logger to log actions to (may be null)
* @return The instantiated {@code ChangelogStateBackend} if '{@code
* CheckpointingOptions.ENABLE_STATE_CHANGE_LOG}' is enabled; An unwrapped state backend
* otherwise
* @throws DynamicCodeLoadingException Thrown if a state backend factory is configured and the
* factory class was not found or the factory could not be instantiated
* @throws IllegalConfigurationException May be thrown by the StateBackendFactory when creating
* / configuring the state backend in the factory
* @throws IOException May be thrown by the StateBackendFactory when instantiating the state
* backend
*/
public static StateBackend loadStateBackendFromConfig(
ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger)
throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {

checkNotNull(config, "config");
checkNotNull(classLoader, "classLoader");

final StateBackend backend =
loadUnwrappedStateBackendFromConfig(config, classLoader, logger);

checkArgument(
!(backend instanceof DelegatingStateBackend),
"expecting non-delegating state backend");

if (config.get(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG) && (backend != null)) {
return loadChangelogStateBackend(backend, classLoader);
} else {
return backend;
}
}

/**
* Checks if an application-defined state backend is given, and if not, loads the state backend
* from the configuration, from the parameter 'state.backend', as defined in {@link
Expand All @@ -240,8 +197,8 @@ public static StateBackend loadStateBackendFromConfig(
* ConfigurableStateBackend}, this methods calls {@link
* ConfigurableStateBackend#configure(ReadableConfig, ClassLoader)} on the state backend.
*
* <p>Refer to {@link #loadUnwrappedStateBackendFromConfig(ReadableConfig, ClassLoader, Logger)}
* for details on how the state backend is loaded from the configuration.
* <p>Refer to {@link #loadStateBackendFromConfig(ReadableConfig, ClassLoader, Logger)} for
* details on how the state backend is loaded from the configuration.
*
* @param config The configuration to load the state backend from
* @param classLoader The class loader that should be used to load the state backend
Expand Down Expand Up @@ -289,8 +246,7 @@ private static StateBackend loadFromApplicationOrConfigOrDefaultInternal(
}
} else {
// (2) check if the config defines a state backend
final StateBackend fromConfig =
loadUnwrappedStateBackendFromConfig(config, classLoader, logger);
final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
if (fromConfig != null) {
backend = fromConfig;
} else {
Expand All @@ -314,6 +270,8 @@ private static StateBackend loadFromApplicationOrConfigOrDefaultInternal(
* If delegation is not enabled, the underlying wrapped state backend is returned instead.
*
* @param fromApplication StateBackend defined from application
* @param isChangelogStateBackendEnableFromApplication whether to enable the
* ChangelogStateBackend from application
* @param config The configuration to load the state backend from
* @param classLoader The class loader that should be used to load the state backend
* @param logger Optionally, a logger to log actions to (may be null)
Expand All @@ -327,21 +285,37 @@ private static StateBackend loadFromApplicationOrConfigOrDefaultInternal(
*/
public static StateBackend fromApplicationOrConfigOrDefault(
@Nullable StateBackend fromApplication,
TernaryBoolean isChangelogStateBackendEnableFromApplication,
Configuration config,
ClassLoader classLoader,
@Nullable Logger logger)
throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {

final StateBackend backend =
StateBackend rootBackend =
loadFromApplicationOrConfigOrDefaultInternal(
fromApplication, config, classLoader, logger);

if (config.get(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG)
&& !(fromApplication instanceof DelegatingStateBackend)) {
return loadChangelogStateBackend(backend, classLoader);
// Configuration from application will override the one from env.
boolean enableChangeLog =
TernaryBoolean.TRUE.equals(isChangelogStateBackendEnableFromApplication)
|| (TernaryBoolean.UNDEFINED.equals(
isChangelogStateBackendEnableFromApplication)
&& config.get(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG));

StateBackend backend;
if (enableChangeLog) {
backend = loadChangelogStateBackend(rootBackend, classLoader);
LOG.info(
"State backend loader loads {} to delegate {}",
backend.getClass().getSimpleName(),
rootBackend.getClass().getSimpleName());
} else {
return backend;
backend = rootBackend;
LOG.info(
"State backend loader loads the state backend as {}",
backend.getClass().getSimpleName());
}
return backend;
}

/**
Expand All @@ -368,8 +342,7 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo

// (2) check if the config defines a state backend
try {
final StateBackend fromConfig =
loadUnwrappedStateBackendFromConfig(config, classLoader, LOG);
final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, LOG);
if (fromConfig != null) {
return fromConfig.useManagedMemory();
}
Expand All @@ -392,7 +365,8 @@ private static StateBackend loadChangelogStateBackend(
Constructor<? extends DelegatingStateBackend> constructor =
Class.forName(CHANGELOG_STATE_BACKEND, false, classLoader)
.asSubclass(DelegatingStateBackend.class)
.getConstructor(StateBackend.class);
.getDeclaredConstructor(StateBackend.class);
constructor.setAccessible(true);
return constructor.newInstance(backend);
} catch (ClassNotFoundException e) {
throw new DynamicCodeLoadingException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
Expand Down Expand Up @@ -89,6 +90,7 @@ public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception
0,
0),
new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
TernaryBoolean.UNDEFINED,
new SerializedValue<CheckpointStorage>(
new CustomCheckpointStorage(outOfClassPath)),
serHooks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TernaryBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -185,7 +186,11 @@ public static void enableCheckpointing(

jobGraph.setSnapshotSettings(
new JobCheckpointingSettings(
config, serializedStateBackend, serializedCheckpointStorage, null));
config,
serializedStateBackend,
TernaryBoolean.UNDEFINED,
serializedCheckpointStorage,
null));
}

public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(
Expand Down
Loading

0 comments on commit 641c31e

Please sign in to comment.