diff --git a/wrapper/build.gradle.kts b/wrapper/build.gradle.kts index 88ec862c4..b4b7a2426 100644 --- a/wrapper/build.gradle.kts +++ b/wrapper/build.gradle.kts @@ -117,6 +117,10 @@ tasks.named("jacocoTestCoverageVerification") { dependsOn("processResources") } +tasks.withType(){ + options.compilerArgs.addAll(listOf("-Xlint:unchecked", "-Xlint:deprecation")) +} + checkstyle { // Checkstyle versions 7.x, 8.x, and 9.x are supported by JRE version 8 and above. toolVersion = "9.3" diff --git a/wrapper/src/main/java/software/amazon/jdbc/RoundRobinHostSelector.java b/wrapper/src/main/java/software/amazon/jdbc/RoundRobinHostSelector.java index c1f244ea7..4338af5b0 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/RoundRobinHostSelector.java +++ b/wrapper/src/main/java/software/amazon/jdbc/RoundRobinHostSelector.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -48,6 +49,8 @@ public class RoundRobinHostSelector implements HostSelector { "((?[^:/?#]*):(?[0-9]*))"); protected static final CacheMap roundRobinCache = new CacheMap<>(); + protected static final ReentrantLock lock = new ReentrantLock(); + static { PropertyDefinition.registerPluginProperties(RoundRobinHostSelector.class); } @@ -69,56 +72,63 @@ public static void setRoundRobinHostWeightPairsProperty(final @NonNull Propertie } @Override - public synchronized HostSpec getHost( + public HostSpec getHost( final @NonNull List hosts, final @NonNull HostRole role, final @Nullable Properties props) throws SQLException { - final List eligibleHosts = hosts.stream() - .filter(hostSpec -> - role.equals(hostSpec.getRole()) && hostSpec.getAvailability().equals(HostAvailability.AVAILABLE)) - .sorted(Comparator.comparing(HostSpec::getHost)) - .collect(Collectors.toList()); - if (eligibleHosts.isEmpty()) { - throw new SQLException(Messages.get("HostSelector.noHostsMatchingRole", new Object[]{role})); - } + lock.lock(); + try { + final List eligibleHosts = hosts.stream() + .filter(hostSpec -> + role.equals(hostSpec.getRole()) && hostSpec.getAvailability().equals(HostAvailability.AVAILABLE)) + .sorted(Comparator.comparing(HostSpec::getHost)) + .collect(Collectors.toList()); + + if (eligibleHosts.isEmpty()) { + throw new SQLException(Messages.get("HostSelector.noHostsMatchingRole", new Object[]{role})); + } - // Create new cache entries for provided hosts if necessary. All hosts point to the same cluster info. - createCacheEntryForHosts(eligibleHosts, props); - final String currentClusterInfoKey = eligibleHosts.get(0).getHost(); - final RoundRobinClusterInfo clusterInfo = roundRobinCache.get(currentClusterInfoKey); + // Create new cache entries for provided hosts if necessary. All hosts point to the same cluster info. + createCacheEntryForHosts(eligibleHosts, props); + final String currentClusterInfoKey = eligibleHosts.get(0).getHost(); + final RoundRobinClusterInfo clusterInfo = roundRobinCache.get(currentClusterInfoKey); - final HostSpec lastHost = clusterInfo.lastHost; - int lastHostIndex = -1; + final HostSpec lastHost = clusterInfo.lastHost; + int lastHostIndex = -1; - // Check if lastHost is in list of eligible hosts. Update lastHostIndex. - if (lastHost != null) { - for (int i = 0; i < eligibleHosts.size(); i++) { - if (eligibleHosts.get(i).getHost().equals(lastHost.getHost())) { - lastHostIndex = i; + // Check if lastHost is in list of eligible hosts. Update lastHostIndex. + if (lastHost != null) { + for (int i = 0; i < eligibleHosts.size(); i++) { + if (eligibleHosts.get(i).getHost().equals(lastHost.getHost())) { + lastHostIndex = i; + } } } - } - final int targetHostIndex; - // If the host is weighted and the lastHost is in the eligibleHosts list. - if (clusterInfo.weightCounter > 0 && lastHostIndex != -1) { - targetHostIndex = lastHostIndex; - } else { - if (lastHostIndex != -1 && lastHostIndex != eligibleHosts.size() - 1) { - targetHostIndex = lastHostIndex + 1; + final int targetHostIndex; + // If the host is weighted and the lastHost is in the eligibleHosts list. + if (clusterInfo.weightCounter > 0 && lastHostIndex != -1) { + targetHostIndex = lastHostIndex; } else { - targetHostIndex = 0; + if (lastHostIndex != -1 && lastHostIndex != eligibleHosts.size() - 1) { + targetHostIndex = lastHostIndex + 1; + } else { + targetHostIndex = 0; + } + + final Integer weight = clusterInfo.clusterWeightsMap.get(eligibleHosts.get(targetHostIndex).getHost()); + clusterInfo.weightCounter = weight == null ? clusterInfo.defaultWeight : weight; } - final Integer weight = clusterInfo.clusterWeightsMap.get(eligibleHosts.get(targetHostIndex).getHost()); - clusterInfo.weightCounter = weight == null ? clusterInfo.defaultWeight : weight; - } + clusterInfo.weightCounter--; + clusterInfo.lastHost = eligibleHosts.get(targetHostIndex); - clusterInfo.weightCounter--; - clusterInfo.lastHost = eligibleHosts.get(targetHostIndex); + return eligibleHosts.get(targetHostIndex); - return eligibleHosts.get(targetHostIndex); + } finally { + lock.unlock(); + } } private void createCacheEntryForHosts( diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/HostMonitoringConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/HostMonitoringConnectionPlugin.java index 7305250f0..55de2859c 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/HostMonitoringConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/HostMonitoringConnectionPlugin.java @@ -40,7 +40,6 @@ import software.amazon.jdbc.util.RdsUrlType; import software.amazon.jdbc.util.RdsUtils; import software.amazon.jdbc.util.SubscribedMethodHelper; -import software.amazon.jdbc.util.telemetry.TelemetryFactory; /** * Monitor the server while the connection is executing methods for more sophisticated failure @@ -181,7 +180,8 @@ public T execute( } finally { if (monitorContext != null) { - synchronized (monitorContext) { + monitorContext.getLock().lock(); + try { this.monitorService.stopMonitoring(monitorContext); if (monitorContext.isNodeUnhealthy()) { @@ -206,6 +206,8 @@ public T execute( new Object[] {this.pluginService.getCurrentHostSpec().asAlias()}))); } } + } finally { + monitorContext.getLock().unlock(); } LOGGER.finest( diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorConnectionContext.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorConnectionContext.java index 3fcf03a28..3f538f838 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorConnectionContext.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorConnectionContext.java @@ -21,6 +21,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Logger; import software.amazon.jdbc.util.Messages; import software.amazon.jdbc.util.telemetry.TelemetryCounter; @@ -49,6 +50,8 @@ public class MonitorConnectionContext { private long invalidNodeStartTimeNano; // Only accessed by monitor thread private long failureCount; // Only accessed by monitor thread + private final ReentrantLock lock = new ReentrantLock(); + /** * Constructor. * @@ -238,4 +241,8 @@ void setConnectionValid( () -> Messages.get("MonitorConnectionContext.hostAlive", new Object[] {hostName})); } + + public ReentrantLock getLock() { + return this.lock; + } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java index 2a2d9d013..fc2b992dd 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java @@ -190,7 +190,8 @@ public void run() { while ((monitorContext = this.activeContexts.poll()) != null) { - synchronized (monitorContext) { + monitorContext.getLock().lock(); + try { // If context is already invalid, just skip it if (!monitorContext.isActiveContext()) { continue; @@ -221,6 +222,8 @@ public void run() { delayMillis = monitorContext.getFailureDetectionIntervalMillis(); } } + } finally { + monitorContext.getLock().unlock(); } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java index 15fb51ca8..672f676cc 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java @@ -558,7 +558,7 @@ protected void dealWithIllegalStateException( * @param failedHost The host with network errors. * @throws SQLException if an error occurs */ - protected synchronized void failover(final HostSpec failedHost) throws SQLException { + protected void failover(final HostSpec failedHost) throws SQLException { this.pluginService.setAvailability(failedHost.asAliases(), HostAvailability.NOT_AVAILABLE); if (this.failoverMode == FailoverMode.STRICT_WRITER) { @@ -720,7 +720,7 @@ protected void invalidateCurrentConnection() { } } - protected synchronized void pickNewConnection() throws SQLException { + protected void pickNewConnection() throws SQLException { if (this.isClosed && this.closedExplicitly) { LOGGER.fine(() -> Messages.get("Failover.transactionResolutionUnknownError")); return; diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterMonitor.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterMonitor.java index 14b4a2436..2b18fe46e 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterMonitor.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterMonitor.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; import org.checkerframework.checker.nullness.qual.NonNull; @@ -56,7 +57,7 @@ public class LimitlessRouterMonitor implements AutoCloseable, Runnable { private static final String MONITORING_PROPERTY_PREFIX = "limitless-router-monitor-"; private final int intervalMs; - private @NonNull HostSpec hostSpec; + private final @NonNull HostSpec hostSpec; private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicReference> limitlessRouters = new AtomicReference<>( Collections.unmodifiableList(new ArrayList<>())); @@ -73,6 +74,8 @@ public class LimitlessRouterMonitor implements AutoCloseable, Runnable { return monitoringThread; }); + private final ReentrantLock lock = new ReentrantLock(); + public LimitlessRouterMonitor( final @NonNull PluginService pluginService, final @NonNull HostSpec hostSpec, @@ -164,17 +167,24 @@ public void run() { } } - public synchronized List forceGetLimitlessRouters() throws SQLException { + public List forceGetLimitlessRouters() throws SQLException { LOGGER.finest(Messages.get("LimitlessRouterMonitor.forceGetLimitlessRouters")); - this.openConnection(); - if (this.monitoringConn == null || this.monitoringConn.isClosed()) { - throw new SQLException(Messages.get("LimitlessRouterMonitor.forceGetLimitlessRoutersFailed")); + + lock.lock(); + try { + this.openConnection(); + if (this.monitoringConn == null || this.monitoringConn.isClosed()) { + throw new SQLException(Messages.get("LimitlessRouterMonitor.forceGetLimitlessRoutersFailed")); + } + List newLimitlessRouters = queryForLimitlessRouters(this.monitoringConn); + this.limitlessRouters.set(Collections.unmodifiableList(newLimitlessRouters)); + RoundRobinHostSelector.setRoundRobinHostWeightPairsProperty(this.props, newLimitlessRouters); + LOGGER.finest(Utils.logTopology(limitlessRouters.get(), "[limitlessRouterMonitor]")); + return newLimitlessRouters; + + } finally { + lock.unlock(); } - List newLimitlessRouters = queryForLimitlessRouters(this.monitoringConn); - this.limitlessRouters.set(Collections.unmodifiableList(newLimitlessRouters)); - RoundRobinHostSelector.setRoundRobinHostWeightPairsProperty(this.props, newLimitlessRouters); - LOGGER.finest(Utils.logTopology(limitlessRouters.get(), "[limitlessRouterMonitor]")); - return newLimitlessRouters; } private void openConnection() throws SQLException { diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImpl.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImpl.java index 1bcbbbce7..3461cabef 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImpl.java @@ -98,7 +98,7 @@ public List forceGetLimitlessRouters(final String clusterId, final Pro } @Override - public synchronized void startMonitoring(final @NonNull PluginService pluginService, + public void startMonitoring(final @NonNull PluginService pluginService, final @NonNull HostSpec hostSpec, final @NonNull Properties props, final int intervalMs) { diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPlugin.java index ead24e32e..857436430 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPlugin.java @@ -377,7 +377,7 @@ private void logAndThrowException( throw new ReadWriteSplittingSQLException(logMessage, sqlState.getState(), cause); } - private synchronized void switchToWriterConnection( + private void switchToWriterConnection( final List hosts) throws SQLException { final Connection currentConnection = this.pluginService.getCurrentConnection(); @@ -418,7 +418,7 @@ private void switchCurrentConnectionTo( newConnectionHost.getUrl()})); } - private synchronized void switchToReaderConnection(final List hosts) + private void switchToReaderConnection(final List hosts) throws SQLException { final Connection currentConnection = this.pluginService.getCurrentConnection(); final HostSpec currentHost = this.pluginService.getCurrentHostSpec(); diff --git a/wrapper/src/main/java/software/amazon/jdbc/wrapper/ConnectionWrapper.java b/wrapper/src/main/java/software/amazon/jdbc/wrapper/ConnectionWrapper.java index 9c700b50e..2d94b397d 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/wrapper/ConnectionWrapper.java +++ b/wrapper/src/main/java/software/amazon/jdbc/wrapper/ConnectionWrapper.java @@ -192,7 +192,7 @@ public void abort(final Executor executor) throws SQLException { } @Override - public synchronized void clearWarnings() throws SQLException { + public void clearWarnings() throws SQLException { WrapperUtils.runWithPlugins( SQLException.class, this.pluginManager, @@ -499,7 +499,7 @@ public Map> getTypeMap() throws SQLException { } @Override - public synchronized SQLWarning getWarnings() throws SQLException { + public SQLWarning getWarnings() throws SQLException { return WrapperUtils.executeWithPlugins( SQLWarning.class, SQLException.class,