Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track session states #729

Merged
merged 3 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/PluginService.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import software.amazon.jdbc.dialect.Dialect;
import software.amazon.jdbc.exceptions.ExceptionHandler;
import software.amazon.jdbc.hostavailability.HostAvailability;
import software.amazon.jdbc.states.SessionDirtyFlag;
import software.amazon.jdbc.util.telemetry.TelemetryFactory;

/**
Expand All @@ -47,6 +48,18 @@ EnumSet<NodeChangeOptions> setCurrentConnection(
@Nullable ConnectionPlugin skipNotificationForThisPlugin)
throws SQLException;

EnumSet<SessionDirtyFlag> getCurrentConnectionState();

void setCurrentConnectionState(SessionDirtyFlag flag);

void resetCurrentConnectionState(SessionDirtyFlag flag);

void resetCurrentConnectionStates();

boolean getAutoCommit();

void setAutoCommit(final boolean autoCommit);

List<HostSpec> getHosts();

HostSpec getInitialConnectionHostSpec();
Expand Down
26 changes: 26 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import software.amazon.jdbc.hostavailability.HostAvailability;
import software.amazon.jdbc.hostavailability.HostAvailabilityStrategyFactory;
import software.amazon.jdbc.hostlistprovider.StaticHostListProvider;
import software.amazon.jdbc.states.SessionDirtyFlag;
import software.amazon.jdbc.util.CacheMap;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.telemetry.TelemetryFactory;
Expand All @@ -68,6 +69,8 @@ public class PluginServiceImpl implements PluginService, CanReleaseResources,
private final ExceptionManager exceptionManager;
protected final DialectProvider dialectProvider;
protected Dialect dialect;
protected EnumSet<SessionDirtyFlag> currentConnectionSessionState = EnumSet.noneOf(SessionDirtyFlag.class);
protected boolean isAutoCommit = false;

public PluginServiceImpl(
@NonNull final ConnectionPluginManager pluginManager,
Expand Down Expand Up @@ -568,4 +571,27 @@ public String getTargetName() {
return this.pluginManager.getDefaultConnProvider().getTargetName();
}

public EnumSet<SessionDirtyFlag> getCurrentConnectionState() {
return this.currentConnectionSessionState.clone();
}

public void setCurrentConnectionState(SessionDirtyFlag flag) {
this.currentConnectionSessionState.add(flag);
}

public void resetCurrentConnectionState(SessionDirtyFlag flag) {
this.currentConnectionSessionState.remove(flag);
}

public void resetCurrentConnectionStates() {
this.currentConnectionSessionState.clear();
}

public boolean getAutoCommit() {
return this.isAutoCommit;
}

public void setAutoCommit(final boolean autoCommit) {
this.isAutoCommit = autoCommit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
import software.amazon.jdbc.hostavailability.HostAvailability;
import software.amazon.jdbc.plugin.AbstractConnectionPlugin;
import software.amazon.jdbc.plugin.staledns.AuroraStaleDnsHelper;
import software.amazon.jdbc.states.RestoreSessionStateCallable;
import software.amazon.jdbc.states.SessionDirtyFlag;
import software.amazon.jdbc.states.SessionStateHelper;
import software.amazon.jdbc.states.SessionStateTransferCallable;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.RdsUrlType;
import software.amazon.jdbc.util.RdsUtils;
Expand Down Expand Up @@ -85,6 +89,10 @@ public class FailoverConnectionPlugin extends AbstractConnectionPlugin {
static final String METHOD_ABORT = "Connection.abort";
static final String METHOD_CLOSE = "Connection.close";
static final String METHOD_IS_CLOSED = "Connection.isClosed";

protected static SessionStateTransferCallable sessionStateTransferCallable;
protected static RestoreSessionStateCallable restoreSessionStateCallable;

private final PluginService pluginService;
protected final Properties properties;
protected boolean enableFailoverSetting;
Expand Down Expand Up @@ -199,6 +207,22 @@ public FailoverConnectionPlugin(final PluginService pluginService, final Propert
this.failoverReaderFailedCounter = telemetryFactory.createCounter("readerFailover.completed.failed.count");
}

public static void setSessionStateTransferFunc(SessionStateTransferCallable callable) {
sessionStateTransferCallable = callable;
}

public static void resetSessionStateTransferFunc() {
sessionStateTransferCallable = null;
}

public static void setRestoreSessionStateFunc(RestoreSessionStateCallable callable) {
restoreSessionStateCallable = callable;
}

public static void resetRestoreSessionStateFunc() {
restoreSessionStateCallable = null;
}

@Override
public Set<String> getSubscribedMethods() {
return subscribedMethods;
Expand Down Expand Up @@ -521,9 +545,10 @@ private boolean shouldAttemptReaderConnection() {
*/
private void switchCurrentConnectionTo(final HostSpec host, final Connection connection) throws SQLException {
Connection currentConnection = this.pluginService.getCurrentConnection();
HostSpec currentHostSpec = this.pluginService.getCurrentHostSpec();

if (currentConnection != connection) {
transferSessionState(currentConnection, connection);
transferSessionState(currentConnection, currentHostSpec, connection, host);
invalidateCurrentConnection();
}

Expand All @@ -535,46 +560,73 @@ private void switchCurrentConnectionTo(final HostSpec host, final Connection con
}

/**
* Transfers basic session state from one connection to another.
* Transfers session state from one connection to another.
*
* @param from The connection to transfer state from
* @param to The connection to transfer state to
* @param src The connection to transfer state from
* @param srcHostSpec The connection {@link HostSpec} to transfer state from
* @param dest The connection to transfer state to
* @param destHostSpec The connection {@link HostSpec} to transfer state to
* @throws SQLException if a database access error occurs, this method is called on a closed connection, this
* method is called during a distributed transaction, or this method is called during a
* transaction
*/
protected void transferSessionState(
final Connection from,
final Connection to) throws SQLException {
final Connection src,
final HostSpec srcHostSpec,
final Connection dest,
final HostSpec destHostSpec) throws SQLException {

if (from == null || to == null) {
if (src == null || dest == null) {
return;
}

to.setReadOnly(from.isReadOnly());
to.setAutoCommit(from.getAutoCommit());
to.setTransactionIsolation(from.getTransactionIsolation());
EnumSet<SessionDirtyFlag> sessionState = this.pluginService.getCurrentConnectionState();

SessionStateTransferCallable callableCopy = sessionStateTransferCallable;
if (callableCopy != null) {
final boolean isHandled = callableCopy.transferSessionState(sessionState, src, srcHostSpec, dest, destHostSpec);
if (isHandled) {
// Custom function has handled session transfer
return;
}
}

// Otherwise, lets run default logic.
sessionState = this.pluginService.getCurrentConnectionState();
final SessionStateHelper helper = new SessionStateHelper();
helper.transferSessionState(sessionState, src, dest);
}

/**
* Restores partial session state from saved values to a connection.
*
* @param to The connection to transfer state to
* @param dest The connection to transfer state to
* @throws SQLException if a database access error occurs, this method is called on a closed connection, this
* method is called during a distributed transaction, or this method is called during a
* transaction
*/
protected void restoreSessionState(final Connection to) throws SQLException {
if (to == null) {
protected void restoreSessionState(final Connection dest) throws SQLException {
if (dest == null) {
return;
}

if (savedReadOnlyStatus != null) {
to.setReadOnly(savedReadOnlyStatus);
}
if (savedAutoCommitStatus != null) {
to.setAutoCommit(savedAutoCommitStatus);
final RestoreSessionStateCallable callableCopy = restoreSessionStateCallable;
if (callableCopy != null) {
final boolean isHandled = callableCopy.restoreSessionState(
this.pluginService.getCurrentConnectionState(),
dest,
this.savedReadOnlyStatus,
this.savedAutoCommitStatus
);
if (isHandled) {
// Custom function has handled everything.
return;
}
}

// Otherwise, lets run default logic.
final SessionStateHelper helper = new SessionStateHelper();
helper.restoreSessionState(dest, this.savedReadOnlyStatus, this.savedAutoCommitStatus);
}

private <E extends Exception> void dealWithOriginalException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import software.amazon.jdbc.cleanup.CanReleaseResources;
import software.amazon.jdbc.plugin.AbstractConnectionPlugin;
import software.amazon.jdbc.plugin.failover.FailoverSQLException;
import software.amazon.jdbc.states.SessionDirtyFlag;
import software.amazon.jdbc.states.SessionStateHelper;
import software.amazon.jdbc.states.SessionStateTransferCallable;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.SqlState;
import software.amazon.jdbc.util.WrapperUtils;
Expand All @@ -61,6 +64,9 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin
static final String METHOD_SET_READ_ONLY = "Connection.setReadOnly";
static final String METHOD_CLEAR_WARNINGS = "Connection.clearWarnings";

protected static SessionStateTransferCallable sessionStateTransferCallable;


private final PluginService pluginService;
private final Properties properties;
private final String readerSelectorStrategy;
Expand Down Expand Up @@ -105,6 +111,14 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin
this.readerConnection = readerConnection;
}

public static void setSessionStateTransferFunc(SessionStateTransferCallable callable) {
sessionStateTransferCallable = callable;
}

public static void resetSessionStateTransferFunc() {
sessionStateTransferCallable = null;
}

@Override
public Set<String> getSubscribedMethods() {
return subscribedMethods;
Expand Down Expand Up @@ -408,7 +422,7 @@ private void switchCurrentConnectionTo(
return;
}

transferSessionStateOnReadWriteSplit(newConnection);
transferSessionStateOnReadWriteSplit(newConnection, newConnectionHost);
this.pluginService.setCurrentConnection(newConnection, newConnectionHost);
LOGGER.finest(() -> Messages.get(
"ReadWriteSplittingPlugin.settingCurrentConnection",
Expand All @@ -421,19 +435,41 @@ private void switchCurrentConnectionTo(
* status. This method is only called when setReadOnly is being called; the read-only status
* will be updated when the setReadOnly call continues down the plugin chain
*
* @param to The connection to transfer state to
* @param dest The destination connection to transfer state to
* @param destHostSpec The destination connection {@link HostSpec}
* @throws SQLException if a database access error occurs, this method is called on a closed
* connection, or this method is called during a distributed transaction
*/
protected void transferSessionStateOnReadWriteSplit(
final Connection to) throws SQLException {
final Connection from = this.pluginService.getCurrentConnection();
if (from == null || to == null) {
final Connection dest,
final HostSpec destHostSpec)
throws SQLException {

final Connection src = this.pluginService.getCurrentConnection();
if (src == null || dest == null) {
return;
}

to.setAutoCommit(from.getAutoCommit());
to.setTransactionIsolation(from.getTransactionIsolation());
EnumSet<SessionDirtyFlag> sessionState = this.pluginService.getCurrentConnectionState();

SessionStateTransferCallable callableCopy = sessionStateTransferCallable;
if (callableCopy != null) {
final boolean isHandled = callableCopy.transferSessionState(
sessionState,
src,
this.pluginService.getCurrentHostSpec(),
dest,
destHostSpec);
if (isHandled) {
// Custom function has handled session transfer
return;
}
}

sessionState = this.pluginService.getCurrentConnectionState();
sessionState.remove(SessionDirtyFlag.READONLY); // We don't want to change READONLY flag of the connection
final SessionStateHelper helper = new SessionStateHelper();
helper.transferSessionState(sessionState, src, dest);
}

private synchronized void switchToReaderConnection(final List<HostSpec> hosts)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.jdbc.states;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.EnumSet;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

public interface RestoreSessionStateCallable {
/**
* Restores partial session state from saved values to a connection.
*
* @param sessionState Session state flags for from-connection
* @param dest The destination connection to transfer state to
* @param readOnly ReadOnly flag to set to
* @param autoCommit AutoCommit flag to set to
* @return true, if session state is restored successful and no default logic should be executed after.
* False, if default logic should be executed.
*/
boolean restoreSessionState(
final @NonNull EnumSet<SessionDirtyFlag> sessionState,
final @NonNull Connection dest,
final @Nullable Boolean readOnly,
final @Nullable Boolean autoCommit)
throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.jdbc.states;


import java.util.EnumSet;

public enum SessionDirtyFlag {
READONLY,
AUTO_COMMIT,
TRANSACTION_ISOLATION,
CATALOG,
NETWORK_TIMEOUT,
SCHEMA,
TYPE_MAP,
HOLDABILITY;

public static final EnumSet<SessionDirtyFlag> ALL = EnumSet.allOf(SessionDirtyFlag.class);
}
Loading
Loading