diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java index c85040356..b6c566d28 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java @@ -100,7 +100,7 @@ public MonitorImpl( this.properties = properties; this.monitorDisposalTimeMillis = monitorDisposalTimeMillis; this.monitorService = monitorService; - + this.contextLastUsedTimestampNano = this.getCurrentTimeNano(); this.contextsSizeGauge = telemetryFactory.createGauge("efm.activeContexts.queue.size", () -> (long) activeContexts.size()); @@ -113,6 +113,9 @@ public MonitorImpl( @Override public void startMonitoring(final MonitorConnectionContext context) { + if (this.stopped) { + LOGGER.warning(() -> Messages.get("MonitorImpl.monitorIsStopped", new Object[] {this.hostSpec.getHost()})); + } final long currentTimeNano = this.getCurrentTimeNano(); context.setStartMonitorTimeNano(currentTimeNano); this.contextLastUsedTimestampNano = currentTimeNano; @@ -137,125 +140,141 @@ public void clearContexts() { @Override public void run() { - this.telemetryContext = telemetryFactory.openTelemetryContext( - "monitoring thread", TelemetryTraceLevel.TOP_LEVEL); - telemetryContext.setAttribute("url", hostSpec.getUrl()); - try { - this.stopped = false; - while (true) { - - // process new contexts - MonitorConnectionContext newMonitorContext; - MonitorConnectionContext firstAddedNewMonitorContext = null; - final long currentTimeNano = this.getCurrentTimeNano(); - while ((newMonitorContext = this.newContexts.poll()) != null) { - if (firstAddedNewMonitorContext == newMonitorContext) { - // This context has already been processed. - // Add it back to the queue and process it in the next round. - this.newContexts.add(newMonitorContext); - break; - } - if (newMonitorContext.isActiveContext()) { - if (newMonitorContext.getExpectedActiveMonitoringStartTimeNano() > currentTimeNano) { - // The context active monitoring time hasn't come. - // Add the context to the queue and check it later. + boolean continueMonitoring = true; + while (continueMonitoring) { + this.telemetryContext = telemetryFactory.openTelemetryContext( + "monitoring thread", TelemetryTraceLevel.TOP_LEVEL); + telemetryContext.setAttribute("url", hostSpec.getUrl()); + try { + this.stopped = false; + while (true) { + + // process new contexts + MonitorConnectionContext newMonitorContext; + MonitorConnectionContext firstAddedNewMonitorContext = null; + final long currentTimeNano = this.getCurrentTimeNano(); + while ((newMonitorContext = this.newContexts.poll()) != null) { + if (firstAddedNewMonitorContext == newMonitorContext) { + // This context has already been processed. + // Add it back to the queue and process it in the next round. this.newContexts.add(newMonitorContext); - if (firstAddedNewMonitorContext == null) { - firstAddedNewMonitorContext = newMonitorContext; + break; + } + if (newMonitorContext.isActiveContext()) { + if (newMonitorContext.getExpectedActiveMonitoringStartTimeNano() > currentTimeNano) { + // The context active monitoring time hasn't come. + // Add the context to the queue and check it later. + this.newContexts.add(newMonitorContext); + if (firstAddedNewMonitorContext == null) { + firstAddedNewMonitorContext = newMonitorContext; + } + } else { + // It's time to start actively monitor this context. + this.activeContexts.add(newMonitorContext); } - } else { - // It's time to start actively monitor this context. - this.activeContexts.add(newMonitorContext); } } - } - - if (!this.activeContexts.isEmpty()) { - final long statusCheckStartTimeNano = this.getCurrentTimeNano(); - this.contextLastUsedTimestampNano = statusCheckStartTimeNano; + if (!this.activeContexts.isEmpty()) { - final ConnectionStatus status = - checkConnectionStatus(this.nodeCheckTimeoutMillis); + final long statusCheckStartTimeNano = this.getCurrentTimeNano(); + this.contextLastUsedTimestampNano = statusCheckStartTimeNano; - long delayMillis = -1; - MonitorConnectionContext monitorContext; - MonitorConnectionContext firstAddedMonitorContext = null; + final ConnectionStatus status = + checkConnectionStatus(this.nodeCheckTimeoutMillis); - while ((monitorContext = this.activeContexts.poll()) != null) { + long delayMillis = -1; + MonitorConnectionContext monitorContext; + MonitorConnectionContext firstAddedMonitorContext = null; - synchronized (monitorContext) { - // If context is already invalid, just skip it - if (!monitorContext.isActiveContext()) { - continue; - } + while ((monitorContext = this.activeContexts.poll()) != null) { - if (firstAddedMonitorContext == monitorContext) { - // this context has already been processed by this loop - // add it to the queue and exit this loop - this.activeContexts.add(monitorContext); - break; - } + synchronized (monitorContext) { + // If context is already invalid, just skip it + if (!monitorContext.isActiveContext()) { + continue; + } - // otherwise, process this context - monitorContext.updateConnectionStatus( - this.hostSpec.getUrl(), - statusCheckStartTimeNano, - statusCheckStartTimeNano + status.elapsedTimeNano, - status.isValid); - - // If context is still valid and node is still healthy, it needs to continue updating this context - if (monitorContext.isActiveContext() && !monitorContext.isNodeUnhealthy()) { - this.activeContexts.add(monitorContext); - if (firstAddedMonitorContext == null) { - firstAddedMonitorContext = monitorContext; + if (firstAddedMonitorContext == monitorContext) { + // this context has already been processed by this loop + // add it to the queue and exit this loop + this.activeContexts.add(monitorContext); + break; } - if (delayMillis == -1 || delayMillis > monitorContext.getFailureDetectionIntervalMillis()) { - delayMillis = monitorContext.getFailureDetectionIntervalMillis(); + // otherwise, process this context + monitorContext.updateConnectionStatus( + this.hostSpec.getUrl(), + statusCheckStartTimeNano, + statusCheckStartTimeNano + status.elapsedTimeNano, + status.isValid); + + // If context is still valid and node is still healthy, it needs to continue updating this context + if (monitorContext.isActiveContext() && !monitorContext.isNodeUnhealthy()) { + this.activeContexts.add(monitorContext); + if (firstAddedMonitorContext == null) { + firstAddedMonitorContext = monitorContext; + } + + if (delayMillis == -1 || delayMillis > monitorContext.getFailureDetectionIntervalMillis()) { + delayMillis = monitorContext.getFailureDetectionIntervalMillis(); + } } } } - } - if (delayMillis == -1) { - // No active contexts - delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS; - } else { - delayMillis -= status.elapsedTimeNano; - // Check for min delay between node health check - if (delayMillis <= 0) { - delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS; + if (delayMillis == -1) { + // No active contexts + delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS; + } else { + delayMillis -= status.elapsedTimeNano; + // Check for min delay between node health check + if (delayMillis <= 0) { + delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS; + } + // Use this delay as node checkout timeout since it corresponds to min interval for all active contexts + this.nodeCheckTimeoutMillis = delayMillis; } - // Use this delay as node checkout timeout since it corresponds to min interval for all active contexts - this.nodeCheckTimeoutMillis = delayMillis; - } - TimeUnit.MILLISECONDS.sleep(delayMillis); + TimeUnit.MILLISECONDS.sleep(delayMillis); - } else { - if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano) - >= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) { - monitorService.notifyUnused(this); - break; + } else { + if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano) + >= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) { + monitorService.notifyUnused(this); + break; + } + TimeUnit.MILLISECONDS.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS); } - TimeUnit.MILLISECONDS.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS); } - } - } catch (final InterruptedException intEx) { - // do nothing; exit thread - } finally { - if (this.monitoringConn != null) { - try { - this.monitoringConn.close(); - } catch (final SQLException ex) { - // ignore + } catch (final InterruptedException intEx) { + // set continueMonitoring to false so monitoring will be stopped; exit thread + continueMonitoring = false; + LOGGER.warning( + () -> Messages.get( + "MonitorImpl.interruptedExceptionDuringMonitoring", + new Object[] {this.hostSpec.getHost(), intEx.getMessage()})); + } catch (final Exception ex) { + // do nothing; exit thread + LOGGER.warning( + () -> Messages.get( + "MonitorImpl.exceptionDuringMonitoring", + new Object[] {this.hostSpec.getHost(), ex.getMessage()})); + } finally { + if (!continueMonitoring) { + if (this.monitoringConn != null) { + try { + this.monitoringConn.close(); + } catch (final SQLException ex) { + // ignore + } + } + if (telemetryContext != null) { + this.telemetryContext.closeContext(); + } + this.stopped = true; } } - if (telemetryContext != null) { - this.telemetryContext.closeContext(); - } - this.stopped = true; } } diff --git a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties index 335baad50..baadc20ec 100644 --- a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties +++ b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties @@ -188,6 +188,9 @@ MonitorThreadContainer.emptyNodeKeys=Provided node keys are empty. # Monitor Impl MonitorImpl.contextNullWarning=Parameter 'context' should not be null. +MonitorImpl.interruptedExceptionDuringMonitoring=Monitoring thread for node {0} was interrupted: {1} +MonitorImpl.exceptionDuringMonitoring=Unhandled exception in monitoring thread for node {0}: {1} +MonitorImpl.monitorIsStopped=Monitoring was already stopped for node {0}. # Monitor Service Impl MonitorServiceImpl.nullMonitorParam=Parameter 'monitor' should not be null.