Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: virtual threads support #1120

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### :magic_wand: Added
- Logic and a connection property to enable driver failover when network exceptions occur in the connect pipeline (PR #1099)[https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1099]
- A new reworked and re-architected failover plugin (PR #1089)[https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1089]
- Virtual Threading support (PR #1120)[https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1120]

## [2.3.9] - 2024-08-09

Expand Down Expand Up @@ -308,7 +310,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Lock initialization of `AuroraHostListProvider` ([PR #347](https://github.com/awslabs/aws-advanced-jdbc-wrapper/pull/347)).
- Optimized thread locks and expiring cache for the Enhanced Monitoring Plugin ([PR #365](https://github.com/awslabs/aws-advanced-jdbc-wrapper/pull/365)).
- Updated Hibernate sample code to reflect changes in the wrapper source code ([PR #368](https://github.com/awslabs/aws-advanced-jdbc-wrapper/pull/368)).
- Updated KnownLimitations.md to reflect that Amazon RDS Blue/Green Deployments are not supported. See [Amazon RDS Blue/Green Deployments](./docs/KnownLimitations.md#amazon-rds-blue-green-deployments).
- Updated KnownLimitations.md to reflect that Amazon RDS Blue/Green Deployments are not supported. See [Amazon RDS Blue/Green Deployments](./docs/README.md#amazon-rds-bluegreen-deployments).

## [1.0.1] - 2023-01-30
### :magic_wand: Added
Expand Down
71 changes: 34 additions & 37 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -48,6 +49,8 @@ public class RoundRobinHostSelector implements HostSelector {
"((?<host>[^:/?#]*):(?<weight>[0-9]*))");
protected static final CacheMap<String, RoundRobinClusterInfo> roundRobinCache = new CacheMap<>();

protected static final ReentrantLock lock = new ReentrantLock();

static {
PropertyDefinition.registerPluginProperties(RoundRobinHostSelector.class);
}
Expand All @@ -69,56 +72,63 @@ public static void setRoundRobinHostWeightPairsProperty(final @NonNull Propertie
}

@Override
public synchronized HostSpec getHost(
public HostSpec getHost(
final @NonNull List<HostSpec> hosts,
final @NonNull HostRole role,
final @Nullable Properties props) throws SQLException {
final List<HostSpec> eligibleHosts = hosts.stream()
.filter(hostSpec ->
role.equals(hostSpec.getRole()) && hostSpec.getAvailability().equals(HostAvailability.AVAILABLE))
.sorted(Comparator.comparing(HostSpec::getHost))
.collect(Collectors.toList());

if (eligibleHosts.isEmpty()) {
throw new SQLException(Messages.get("HostSelector.noHostsMatchingRole", new Object[]{role}));
}
lock.lock();
try {
final List<HostSpec> eligibleHosts = hosts.stream()
.filter(hostSpec ->
role.equals(hostSpec.getRole()) && hostSpec.getAvailability().equals(HostAvailability.AVAILABLE))
.sorted(Comparator.comparing(HostSpec::getHost))
.collect(Collectors.toList());

if (eligibleHosts.isEmpty()) {
throw new SQLException(Messages.get("HostSelector.noHostsMatchingRole", new Object[]{role}));
}

// Create new cache entries for provided hosts if necessary. All hosts point to the same cluster info.
createCacheEntryForHosts(eligibleHosts, props);
final String currentClusterInfoKey = eligibleHosts.get(0).getHost();
final RoundRobinClusterInfo clusterInfo = roundRobinCache.get(currentClusterInfoKey);
// Create new cache entries for provided hosts if necessary. All hosts point to the same cluster info.
createCacheEntryForHosts(eligibleHosts, props);
final String currentClusterInfoKey = eligibleHosts.get(0).getHost();
final RoundRobinClusterInfo clusterInfo = roundRobinCache.get(currentClusterInfoKey);

final HostSpec lastHost = clusterInfo.lastHost;
int lastHostIndex = -1;
final HostSpec lastHost = clusterInfo.lastHost;
int lastHostIndex = -1;

// Check if lastHost is in list of eligible hosts. Update lastHostIndex.
if (lastHost != null) {
for (int i = 0; i < eligibleHosts.size(); i++) {
if (eligibleHosts.get(i).getHost().equals(lastHost.getHost())) {
lastHostIndex = i;
// Check if lastHost is in list of eligible hosts. Update lastHostIndex.
if (lastHost != null) {
for (int i = 0; i < eligibleHosts.size(); i++) {
if (eligibleHosts.get(i).getHost().equals(lastHost.getHost())) {
lastHostIndex = i;
}
}
}
}

final int targetHostIndex;
// If the host is weighted and the lastHost is in the eligibleHosts list.
if (clusterInfo.weightCounter > 0 && lastHostIndex != -1) {
targetHostIndex = lastHostIndex;
} else {
if (lastHostIndex != -1 && lastHostIndex != eligibleHosts.size() - 1) {
targetHostIndex = lastHostIndex + 1;
final int targetHostIndex;
// If the host is weighted and the lastHost is in the eligibleHosts list.
if (clusterInfo.weightCounter > 0 && lastHostIndex != -1) {
targetHostIndex = lastHostIndex;
} else {
targetHostIndex = 0;
if (lastHostIndex != -1 && lastHostIndex != eligibleHosts.size() - 1) {
targetHostIndex = lastHostIndex + 1;
} else {
targetHostIndex = 0;
}

final Integer weight = clusterInfo.clusterWeightsMap.get(eligibleHosts.get(targetHostIndex).getHostId());
clusterInfo.weightCounter = weight == null ? clusterInfo.defaultWeight : weight;
}

final Integer weight = clusterInfo.clusterWeightsMap.get(eligibleHosts.get(targetHostIndex).getHostId());
clusterInfo.weightCounter = weight == null ? clusterInfo.defaultWeight : weight;
}
clusterInfo.weightCounter--;
clusterInfo.lastHost = eligibleHosts.get(targetHostIndex);

clusterInfo.weightCounter--;
clusterInfo.lastHost = eligibleHosts.get(targetHostIndex);
return eligibleHosts.get(targetHostIndex);

return eligibleHosts.get(targetHostIndex);
} finally {
lock.unlock();
}
}

private void createCacheEntryForHosts(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import software.amazon.jdbc.util.RdsUrlType;
import software.amazon.jdbc.util.RdsUtils;
import software.amazon.jdbc.util.SubscribedMethodHelper;
import software.amazon.jdbc.util.telemetry.TelemetryFactory;

/**
* Monitor the server while the connection is executing methods for more sophisticated failure
Expand Down Expand Up @@ -181,7 +180,8 @@ public <T, E extends Exception> T execute(

} finally {
if (monitorContext != null) {
synchronized (monitorContext) {
monitorContext.getLock().lock();
try {
this.monitorService.stopMonitoring(monitorContext);

if (monitorContext.isNodeUnhealthy()) {
Expand All @@ -206,6 +206,8 @@ public <T, E extends Exception> T execute(
new Object[] {this.pluginService.getCurrentHostSpec().asAlias()})));
}
}
} finally {
monitorContext.getLock().unlock();
}

LOGGER.finest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.telemetry.TelemetryCounter;
Expand Down Expand Up @@ -49,6 +50,8 @@ public class MonitorConnectionContext {
private long invalidNodeStartTimeNano; // Only accessed by monitor thread
private long failureCount; // Only accessed by monitor thread

private final ReentrantLock lock = new ReentrantLock();

/**
* Constructor.
*
Expand Down Expand Up @@ -238,4 +241,8 @@ void setConnectionValid(
() -> Messages.get("MonitorConnectionContext.hostAlive",
new Object[] {hostName}));
}

public ReentrantLock getLock() {
return this.lock;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ public void run() {

while ((monitorContext = this.activeContexts.poll()) != null) {

synchronized (monitorContext) {
monitorContext.getLock().lock();
try {
// If context is already invalid, just skip it
if (!monitorContext.isActiveContext()) {
continue;
Expand Down Expand Up @@ -223,6 +224,8 @@ public void run() {
delayMillis = monitorContext.getFailureDetectionIntervalMillis();
}
}
} finally {
monitorContext.getLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ protected <E extends Exception> void dealWithIllegalStateException(
* @param failedHost The host with network errors.
* @throws SQLException if an error occurs
*/
protected synchronized void failover(final HostSpec failedHost) throws SQLException {
protected void failover(final HostSpec failedHost) throws SQLException {
this.pluginService.setAvailability(failedHost.asAliases(), HostAvailability.NOT_AVAILABLE);

if (this.failoverMode == FailoverMode.STRICT_WRITER) {
Expand Down Expand Up @@ -720,7 +720,7 @@ protected void invalidateCurrentConnection() {
}
}

protected synchronized void pickNewConnection() throws SQLException {
protected void pickNewConnection() throws SQLException {
if (this.isClosed && this.closedExplicitly) {
LOGGER.fine(() -> Messages.get("Failover.transactionResolutionUnknownError"));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
Expand All @@ -56,7 +57,7 @@ public class LimitlessRouterMonitor implements AutoCloseable, Runnable {

private static final String MONITORING_PROPERTY_PREFIX = "limitless-router-monitor-";
private final int intervalMs;
private @NonNull HostSpec hostSpec;
private final @NonNull HostSpec hostSpec;
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final AtomicReference<List<HostSpec>> limitlessRouters = new AtomicReference<>(
Collections.unmodifiableList(new ArrayList<>()));
Expand All @@ -73,6 +74,8 @@ public class LimitlessRouterMonitor implements AutoCloseable, Runnable {
return monitoringThread;
});

private final ReentrantLock lock = new ReentrantLock();

public LimitlessRouterMonitor(
final @NonNull PluginService pluginService,
final @NonNull HostSpec hostSpec,
Expand Down Expand Up @@ -164,17 +167,24 @@ public void run() {
}
}

public synchronized List<HostSpec> forceGetLimitlessRouters() throws SQLException {
public List<HostSpec> forceGetLimitlessRouters() throws SQLException {
LOGGER.finest(Messages.get("LimitlessRouterMonitor.forceGetLimitlessRouters"));
this.openConnection();
if (this.monitoringConn == null || this.monitoringConn.isClosed()) {
throw new SQLException(Messages.get("LimitlessRouterMonitor.forceGetLimitlessRoutersFailed"));

lock.lock();
try {
this.openConnection();
if (this.monitoringConn == null || this.monitoringConn.isClosed()) {
throw new SQLException(Messages.get("LimitlessRouterMonitor.forceGetLimitlessRoutersFailed"));
}
List<HostSpec> newLimitlessRouters = queryForLimitlessRouters(this.monitoringConn);
this.limitlessRouters.set(Collections.unmodifiableList(newLimitlessRouters));
RoundRobinHostSelector.setRoundRobinHostWeightPairsProperty(this.props, newLimitlessRouters);
LOGGER.finest(Utils.logTopology(limitlessRouters.get(), "[limitlessRouterMonitor]"));
return newLimitlessRouters;

} finally {
lock.unlock();
}
List<HostSpec> newLimitlessRouters = queryForLimitlessRouters(this.monitoringConn);
this.limitlessRouters.set(Collections.unmodifiableList(newLimitlessRouters));
RoundRobinHostSelector.setRoundRobinHostWeightPairsProperty(this.props, newLimitlessRouters);
LOGGER.finest(Utils.logTopology(limitlessRouters.get(), "[limitlessRouterMonitor]"));
return newLimitlessRouters;
}

private void openConnection() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public List<HostSpec> forceGetLimitlessRouters(final String clusterId, final Pro
}

@Override
public synchronized void startMonitoring(final @NonNull PluginService pluginService,
public void startMonitoring(final @NonNull PluginService pluginService,
final @NonNull HostSpec hostSpec,
final @NonNull Properties props,
final int intervalMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ private void logAndThrowException(
throw new ReadWriteSplittingSQLException(logMessage, sqlState.getState(), cause);
}

private synchronized void switchToWriterConnection(
private void switchToWriterConnection(
final List<HostSpec> hosts)
throws SQLException {
final Connection currentConnection = this.pluginService.getCurrentConnection();
Expand Down Expand Up @@ -418,7 +418,7 @@ private void switchCurrentConnectionTo(
newConnectionHost.getUrl()}));
}

private synchronized void switchToReaderConnection(final List<HostSpec> hosts)
private void switchToReaderConnection(final List<HostSpec> hosts)
throws SQLException {
final Connection currentConnection = this.pluginService.getCurrentConnection();
final HostSpec currentHost = this.pluginService.getCurrentHostSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void abort(final Executor executor) throws SQLException {
}

@Override
public synchronized void clearWarnings() throws SQLException {
public void clearWarnings() throws SQLException {
WrapperUtils.runWithPlugins(
SQLException.class,
this.pluginManager,
Expand Down Expand Up @@ -500,7 +500,7 @@ public Map<String, Class<?>> getTypeMap() throws SQLException {
}

@Override
public synchronized SQLWarning getWarnings() throws SQLException {
public SQLWarning getWarnings() throws SQLException {
return WrapperUtils.executeWithPlugins(
SQLWarning.class,
SQLException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -85,6 +86,7 @@ class HostMonitoringConnectionPluginTest {
@Mock Supplier<MonitorService> supplier;
@Mock RdsUtils rdsUtils;
@Mock MonitorConnectionContext context;
@Mock ReentrantLock mockReentrantLock;
@Mock MonitorService monitorService;
@Mock JdbcCallable<ResultSet, SQLException> sqlFunction;
private HostMonitoringConnectionPlugin plugin;
Expand Down Expand Up @@ -130,6 +132,7 @@ void initDefaultMockReturns() throws Exception {
anyInt(),
anyInt()))
.thenReturn(context);
when(context.getLock()).thenReturn(mockReentrantLock);

when(pluginService.getCurrentConnection()).thenReturn(connection);
when(pluginService.getCurrentHostSpec()).thenReturn(hostSpec);
Expand Down
Loading