Skip to content

Commit

Permalink
chore: virtual threads support
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiyvamz committed Sep 17, 2024
1 parent 4c8e24d commit 0bf53ac
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 55 deletions.
4 changes: 4 additions & 0 deletions wrapper/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ tasks.named("jacocoTestCoverageVerification") {
dependsOn("processResources")
}

tasks.withType<JavaCompile>(){
options.compilerArgs.addAll(listOf("-Xlint:unchecked", "-Xlint:deprecation"))
}

checkstyle {
// Checkstyle versions 7.x, 8.x, and 9.x are supported by JRE version 8 and above.
toolVersion = "9.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -48,6 +49,8 @@ public class RoundRobinHostSelector implements HostSelector {
"((?<host>[^:/?#]*):(?<weight>[0-9]*))");
protected static final CacheMap<String, RoundRobinClusterInfo> roundRobinCache = new CacheMap<>();

protected static final ReentrantLock lock = new ReentrantLock();

static {
PropertyDefinition.registerPluginProperties(RoundRobinHostSelector.class);
}
Expand All @@ -69,56 +72,63 @@ public static void setRoundRobinHostWeightPairsProperty(final @NonNull Propertie
}

@Override
public synchronized HostSpec getHost(
public HostSpec getHost(
final @NonNull List<HostSpec> hosts,
final @NonNull HostRole role,
final @Nullable Properties props) throws SQLException {
final List<HostSpec> eligibleHosts = hosts.stream()
.filter(hostSpec ->
role.equals(hostSpec.getRole()) && hostSpec.getAvailability().equals(HostAvailability.AVAILABLE))
.sorted(Comparator.comparing(HostSpec::getHost))
.collect(Collectors.toList());

if (eligibleHosts.isEmpty()) {
throw new SQLException(Messages.get("HostSelector.noHostsMatchingRole", new Object[]{role}));
}
lock.lock();
try {
final List<HostSpec> eligibleHosts = hosts.stream()
.filter(hostSpec ->
role.equals(hostSpec.getRole()) && hostSpec.getAvailability().equals(HostAvailability.AVAILABLE))
.sorted(Comparator.comparing(HostSpec::getHost))
.collect(Collectors.toList());

if (eligibleHosts.isEmpty()) {
throw new SQLException(Messages.get("HostSelector.noHostsMatchingRole", new Object[]{role}));
}

// Create new cache entries for provided hosts if necessary. All hosts point to the same cluster info.
createCacheEntryForHosts(eligibleHosts, props);
final String currentClusterInfoKey = eligibleHosts.get(0).getHost();
final RoundRobinClusterInfo clusterInfo = roundRobinCache.get(currentClusterInfoKey);
// Create new cache entries for provided hosts if necessary. All hosts point to the same cluster info.
createCacheEntryForHosts(eligibleHosts, props);
final String currentClusterInfoKey = eligibleHosts.get(0).getHost();
final RoundRobinClusterInfo clusterInfo = roundRobinCache.get(currentClusterInfoKey);

final HostSpec lastHost = clusterInfo.lastHost;
int lastHostIndex = -1;
final HostSpec lastHost = clusterInfo.lastHost;
int lastHostIndex = -1;

// Check if lastHost is in list of eligible hosts. Update lastHostIndex.
if (lastHost != null) {
for (int i = 0; i < eligibleHosts.size(); i++) {
if (eligibleHosts.get(i).getHost().equals(lastHost.getHost())) {
lastHostIndex = i;
// Check if lastHost is in list of eligible hosts. Update lastHostIndex.
if (lastHost != null) {
for (int i = 0; i < eligibleHosts.size(); i++) {
if (eligibleHosts.get(i).getHost().equals(lastHost.getHost())) {
lastHostIndex = i;
}
}
}
}

final int targetHostIndex;
// If the host is weighted and the lastHost is in the eligibleHosts list.
if (clusterInfo.weightCounter > 0 && lastHostIndex != -1) {
targetHostIndex = lastHostIndex;
} else {
if (lastHostIndex != -1 && lastHostIndex != eligibleHosts.size() - 1) {
targetHostIndex = lastHostIndex + 1;
final int targetHostIndex;
// If the host is weighted and the lastHost is in the eligibleHosts list.
if (clusterInfo.weightCounter > 0 && lastHostIndex != -1) {
targetHostIndex = lastHostIndex;
} else {
targetHostIndex = 0;
if (lastHostIndex != -1 && lastHostIndex != eligibleHosts.size() - 1) {
targetHostIndex = lastHostIndex + 1;
} else {
targetHostIndex = 0;
}

final Integer weight = clusterInfo.clusterWeightsMap.get(eligibleHosts.get(targetHostIndex).getHost());
clusterInfo.weightCounter = weight == null ? clusterInfo.defaultWeight : weight;
}

final Integer weight = clusterInfo.clusterWeightsMap.get(eligibleHosts.get(targetHostIndex).getHost());
clusterInfo.weightCounter = weight == null ? clusterInfo.defaultWeight : weight;
}
clusterInfo.weightCounter--;
clusterInfo.lastHost = eligibleHosts.get(targetHostIndex);

clusterInfo.weightCounter--;
clusterInfo.lastHost = eligibleHosts.get(targetHostIndex);
return eligibleHosts.get(targetHostIndex);

return eligibleHosts.get(targetHostIndex);
} finally {
lock.unlock();
}
}

private void createCacheEntryForHosts(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import software.amazon.jdbc.util.RdsUrlType;
import software.amazon.jdbc.util.RdsUtils;
import software.amazon.jdbc.util.SubscribedMethodHelper;
import software.amazon.jdbc.util.telemetry.TelemetryFactory;

/**
* Monitor the server while the connection is executing methods for more sophisticated failure
Expand Down Expand Up @@ -181,7 +180,8 @@ public <T, E extends Exception> T execute(

} finally {
if (monitorContext != null) {
synchronized (monitorContext) {
monitorContext.getLock().lock();
try {
this.monitorService.stopMonitoring(monitorContext);

if (monitorContext.isNodeUnhealthy()) {
Expand All @@ -206,6 +206,8 @@ public <T, E extends Exception> T execute(
new Object[] {this.pluginService.getCurrentHostSpec().asAlias()})));
}
}
} finally {
monitorContext.getLock().unlock();
}

LOGGER.finest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.telemetry.TelemetryCounter;
Expand Down Expand Up @@ -49,6 +50,8 @@ public class MonitorConnectionContext {
private long invalidNodeStartTimeNano; // Only accessed by monitor thread
private long failureCount; // Only accessed by monitor thread

private final ReentrantLock lock = new ReentrantLock();

/**
* Constructor.
*
Expand Down Expand Up @@ -238,4 +241,8 @@ void setConnectionValid(
() -> Messages.get("MonitorConnectionContext.hostAlive",
new Object[] {hostName}));
}

public ReentrantLock getLock() {
return this.lock;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ public void run() {

while ((monitorContext = this.activeContexts.poll()) != null) {

synchronized (monitorContext) {
monitorContext.getLock().lock();
try {
// If context is already invalid, just skip it
if (!monitorContext.isActiveContext()) {
continue;
Expand Down Expand Up @@ -221,6 +222,8 @@ public void run() {
delayMillis = monitorContext.getFailureDetectionIntervalMillis();
}
}
} finally {
monitorContext.getLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ protected <E extends Exception> void dealWithIllegalStateException(
* @param failedHost The host with network errors.
* @throws SQLException if an error occurs
*/
protected synchronized void failover(final HostSpec failedHost) throws SQLException {
protected void failover(final HostSpec failedHost) throws SQLException {
this.pluginService.setAvailability(failedHost.asAliases(), HostAvailability.NOT_AVAILABLE);

if (this.failoverMode == FailoverMode.STRICT_WRITER) {
Expand Down Expand Up @@ -720,7 +720,7 @@ protected void invalidateCurrentConnection() {
}
}

protected synchronized void pickNewConnection() throws SQLException {
protected void pickNewConnection() throws SQLException {
if (this.isClosed && this.closedExplicitly) {
LOGGER.fine(() -> Messages.get("Failover.transactionResolutionUnknownError"));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
Expand All @@ -56,7 +57,7 @@ public class LimitlessRouterMonitor implements AutoCloseable, Runnable {

private static final String MONITORING_PROPERTY_PREFIX = "limitless-router-monitor-";
private final int intervalMs;
private @NonNull HostSpec hostSpec;
private final @NonNull HostSpec hostSpec;
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final AtomicReference<List<HostSpec>> limitlessRouters = new AtomicReference<>(
Collections.unmodifiableList(new ArrayList<>()));
Expand All @@ -73,6 +74,8 @@ public class LimitlessRouterMonitor implements AutoCloseable, Runnable {
return monitoringThread;
});

private final ReentrantLock lock = new ReentrantLock();

public LimitlessRouterMonitor(
final @NonNull PluginService pluginService,
final @NonNull HostSpec hostSpec,
Expand Down Expand Up @@ -164,17 +167,24 @@ public void run() {
}
}

public synchronized List<HostSpec> forceGetLimitlessRouters() throws SQLException {
public List<HostSpec> forceGetLimitlessRouters() throws SQLException {
LOGGER.finest(Messages.get("LimitlessRouterMonitor.forceGetLimitlessRouters"));
this.openConnection();
if (this.monitoringConn == null || this.monitoringConn.isClosed()) {
throw new SQLException(Messages.get("LimitlessRouterMonitor.forceGetLimitlessRoutersFailed"));

lock.lock();
try {
this.openConnection();
if (this.monitoringConn == null || this.monitoringConn.isClosed()) {
throw new SQLException(Messages.get("LimitlessRouterMonitor.forceGetLimitlessRoutersFailed"));
}
List<HostSpec> newLimitlessRouters = queryForLimitlessRouters(this.monitoringConn);
this.limitlessRouters.set(Collections.unmodifiableList(newLimitlessRouters));
RoundRobinHostSelector.setRoundRobinHostWeightPairsProperty(this.props, newLimitlessRouters);
LOGGER.finest(Utils.logTopology(limitlessRouters.get(), "[limitlessRouterMonitor]"));
return newLimitlessRouters;

} finally {
lock.unlock();
}
List<HostSpec> newLimitlessRouters = queryForLimitlessRouters(this.monitoringConn);
this.limitlessRouters.set(Collections.unmodifiableList(newLimitlessRouters));
RoundRobinHostSelector.setRoundRobinHostWeightPairsProperty(this.props, newLimitlessRouters);
LOGGER.finest(Utils.logTopology(limitlessRouters.get(), "[limitlessRouterMonitor]"));
return newLimitlessRouters;
}

private void openConnection() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public List<HostSpec> forceGetLimitlessRouters(final String clusterId, final Pro
}

@Override
public synchronized void startMonitoring(final @NonNull PluginService pluginService,
public void startMonitoring(final @NonNull PluginService pluginService,
final @NonNull HostSpec hostSpec,
final @NonNull Properties props,
final int intervalMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ private void logAndThrowException(
throw new ReadWriteSplittingSQLException(logMessage, sqlState.getState(), cause);
}

private synchronized void switchToWriterConnection(
private void switchToWriterConnection(
final List<HostSpec> hosts)
throws SQLException {
final Connection currentConnection = this.pluginService.getCurrentConnection();
Expand Down Expand Up @@ -418,7 +418,7 @@ private void switchCurrentConnectionTo(
newConnectionHost.getUrl()}));
}

private synchronized void switchToReaderConnection(final List<HostSpec> hosts)
private void switchToReaderConnection(final List<HostSpec> hosts)
throws SQLException {
final Connection currentConnection = this.pluginService.getCurrentConnection();
final HostSpec currentHost = this.pluginService.getCurrentHostSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void abort(final Executor executor) throws SQLException {
}

@Override
public synchronized void clearWarnings() throws SQLException {
public void clearWarnings() throws SQLException {
WrapperUtils.runWithPlugins(
SQLException.class,
this.pluginManager,
Expand Down Expand Up @@ -499,7 +499,7 @@ public Map<String, Class<?>> getTypeMap() throws SQLException {
}

@Override
public synchronized SQLWarning getWarnings() throws SQLException {
public SQLWarning getWarnings() throws SQLException {
return WrapperUtils.executeWithPlugins(
SQLWarning.class,
SQLException.class,
Expand Down

0 comments on commit 0bf53ac

Please sign in to comment.