Skip to content

Commit

Permalink
fix: continue monitoring unless InterruptedException is thrown
Browse files Browse the repository at this point in the history
chore: add additional logging for efm
  • Loading branch information
crystall-bitquill committed Oct 17, 2023
1 parent b598d74 commit eabc10f
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 96 deletions.
211 changes: 115 additions & 96 deletions wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public MonitorImpl(
this.properties = properties;
this.monitorDisposalTimeMillis = monitorDisposalTimeMillis;
this.monitorService = monitorService;

this.contextLastUsedTimestampNano = this.getCurrentTimeNano();
this.contextsSizeGauge = telemetryFactory.createGauge("efm.activeContexts.queue.size",
() -> (long) activeContexts.size());
Expand All @@ -113,6 +113,9 @@ public MonitorImpl(

@Override
public void startMonitoring(final MonitorConnectionContext context) {
if (this.stopped) {
LOGGER.warning(() -> Messages.get("MonitorImpl.monitorIsStopped", new Object[] {this.hostSpec.getHost()}));
}
final long currentTimeNano = this.getCurrentTimeNano();
context.setStartMonitorTimeNano(currentTimeNano);
this.contextLastUsedTimestampNano = currentTimeNano;
Expand All @@ -137,125 +140,141 @@ public void clearContexts() {

@Override
public void run() {
this.telemetryContext = telemetryFactory.openTelemetryContext(
"monitoring thread", TelemetryTraceLevel.TOP_LEVEL);
telemetryContext.setAttribute("url", hostSpec.getUrl());
try {
this.stopped = false;
while (true) {

// process new contexts
MonitorConnectionContext newMonitorContext;
MonitorConnectionContext firstAddedNewMonitorContext = null;
final long currentTimeNano = this.getCurrentTimeNano();
while ((newMonitorContext = this.newContexts.poll()) != null) {
if (firstAddedNewMonitorContext == newMonitorContext) {
// This context has already been processed.
// Add it back to the queue and process it in the next round.
this.newContexts.add(newMonitorContext);
break;
}
if (newMonitorContext.isActiveContext()) {
if (newMonitorContext.getExpectedActiveMonitoringStartTimeNano() > currentTimeNano) {
// The context active monitoring time hasn't come.
// Add the context to the queue and check it later.
boolean continueMonitoring = true;
while (continueMonitoring) {
this.telemetryContext = telemetryFactory.openTelemetryContext(
"monitoring thread", TelemetryTraceLevel.TOP_LEVEL);
telemetryContext.setAttribute("url", hostSpec.getUrl());
try {
this.stopped = false;
while (true) {

// process new contexts
MonitorConnectionContext newMonitorContext;
MonitorConnectionContext firstAddedNewMonitorContext = null;
final long currentTimeNano = this.getCurrentTimeNano();
while ((newMonitorContext = this.newContexts.poll()) != null) {
if (firstAddedNewMonitorContext == newMonitorContext) {
// This context has already been processed.
// Add it back to the queue and process it in the next round.
this.newContexts.add(newMonitorContext);
if (firstAddedNewMonitorContext == null) {
firstAddedNewMonitorContext = newMonitorContext;
break;
}
if (newMonitorContext.isActiveContext()) {
if (newMonitorContext.getExpectedActiveMonitoringStartTimeNano() > currentTimeNano) {
// The context active monitoring time hasn't come.
// Add the context to the queue and check it later.
this.newContexts.add(newMonitorContext);
if (firstAddedNewMonitorContext == null) {
firstAddedNewMonitorContext = newMonitorContext;
}
} else {
// It's time to start actively monitor this context.
this.activeContexts.add(newMonitorContext);
}
} else {
// It's time to start actively monitor this context.
this.activeContexts.add(newMonitorContext);
}
}
}

if (!this.activeContexts.isEmpty()) {

final long statusCheckStartTimeNano = this.getCurrentTimeNano();
this.contextLastUsedTimestampNano = statusCheckStartTimeNano;
if (!this.activeContexts.isEmpty()) {

final ConnectionStatus status =
checkConnectionStatus(this.nodeCheckTimeoutMillis);
final long statusCheckStartTimeNano = this.getCurrentTimeNano();
this.contextLastUsedTimestampNano = statusCheckStartTimeNano;

long delayMillis = -1;
MonitorConnectionContext monitorContext;
MonitorConnectionContext firstAddedMonitorContext = null;
final ConnectionStatus status =
checkConnectionStatus(this.nodeCheckTimeoutMillis);

while ((monitorContext = this.activeContexts.poll()) != null) {
long delayMillis = -1;
MonitorConnectionContext monitorContext;
MonitorConnectionContext firstAddedMonitorContext = null;

synchronized (monitorContext) {
// If context is already invalid, just skip it
if (!monitorContext.isActiveContext()) {
continue;
}
while ((monitorContext = this.activeContexts.poll()) != null) {

if (firstAddedMonitorContext == monitorContext) {
// this context has already been processed by this loop
// add it to the queue and exit this loop
this.activeContexts.add(monitorContext);
break;
}
synchronized (monitorContext) {

Check warning on line 192 in wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java

View workflow job for this annotation

GitHub Actions / qodana

Synchronization on local variable or method parameter

Synchronization on local variable `monitorContext`
// If context is already invalid, just skip it
if (!monitorContext.isActiveContext()) {
continue;
}

// otherwise, process this context
monitorContext.updateConnectionStatus(
this.hostSpec.getUrl(),
statusCheckStartTimeNano,
statusCheckStartTimeNano + status.elapsedTimeNano,
status.isValid);

// If context is still valid and node is still healthy, it needs to continue updating this context
if (monitorContext.isActiveContext() && !monitorContext.isNodeUnhealthy()) {
this.activeContexts.add(monitorContext);
if (firstAddedMonitorContext == null) {
firstAddedMonitorContext = monitorContext;
if (firstAddedMonitorContext == monitorContext) {
// this context has already been processed by this loop
// add it to the queue and exit this loop
this.activeContexts.add(monitorContext);
break;
}

if (delayMillis == -1 || delayMillis > monitorContext.getFailureDetectionIntervalMillis()) {
delayMillis = monitorContext.getFailureDetectionIntervalMillis();
// otherwise, process this context
monitorContext.updateConnectionStatus(
this.hostSpec.getUrl(),
statusCheckStartTimeNano,
statusCheckStartTimeNano + status.elapsedTimeNano,
status.isValid);

// If context is still valid and node is still healthy, it needs to continue updating this context
if (monitorContext.isActiveContext() && !monitorContext.isNodeUnhealthy()) {
this.activeContexts.add(monitorContext);
if (firstAddedMonitorContext == null) {
firstAddedMonitorContext = monitorContext;
}

if (delayMillis == -1 || delayMillis > monitorContext.getFailureDetectionIntervalMillis()) {
delayMillis = monitorContext.getFailureDetectionIntervalMillis();
}
}
}
}
}

if (delayMillis == -1) {
// No active contexts
delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS;
} else {
delayMillis -= status.elapsedTimeNano;
// Check for min delay between node health check
if (delayMillis <= 0) {
delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS;
if (delayMillis == -1) {
// No active contexts
delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS;
} else {
delayMillis -= status.elapsedTimeNano;
// Check for min delay between node health check
if (delayMillis <= 0) {
delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS;
}
// Use this delay as node checkout timeout since it corresponds to min interval for all active contexts
this.nodeCheckTimeoutMillis = delayMillis;
}
// Use this delay as node checkout timeout since it corresponds to min interval for all active contexts
this.nodeCheckTimeoutMillis = delayMillis;
}

TimeUnit.MILLISECONDS.sleep(delayMillis);
TimeUnit.MILLISECONDS.sleep(delayMillis);

} else {
if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano)
>= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) {
monitorService.notifyUnused(this);
break;
} else {
if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano)
>= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) {
monitorService.notifyUnused(this);
break;
}
TimeUnit.MILLISECONDS.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS);
}
TimeUnit.MILLISECONDS.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS);
}
}
} catch (final InterruptedException intEx) {
// do nothing; exit thread
} finally {
if (this.monitoringConn != null) {
try {
this.monitoringConn.close();
} catch (final SQLException ex) {
// ignore
} catch (final InterruptedException intEx) {
// set continueMonitoring to false so monitoring will be stopped; exit thread
continueMonitoring = false;
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.interruptedExceptionDuringMonitoring",
new Object[] {this.hostSpec.getHost(), intEx.getMessage()}));
} catch (final Exception ex) {
// do nothing; exit thread
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.exceptionDuringMonitoring",
new Object[] {this.hostSpec.getHost(), ex.getMessage()}));
} finally {
if (!continueMonitoring) {
if (this.monitoringConn != null) {
try {
this.monitoringConn.close();
} catch (final SQLException ex) {
// ignore
}
}
if (telemetryContext != null) {
this.telemetryContext.closeContext();
}
this.stopped = true;
}
}
if (telemetryContext != null) {
this.telemetryContext.closeContext();
}
this.stopped = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ MonitorThreadContainer.emptyNodeKeys=Provided node keys are empty.

# Monitor Impl
MonitorImpl.contextNullWarning=Parameter 'context' should not be null.
MonitorImpl.interruptedExceptionDuringMonitoring=Monitoring thread for node {0} was interrupted: {1}
MonitorImpl.exceptionDuringMonitoring=Unhandled exception in monitoring thread for node {0}: {1}
MonitorImpl.monitorIsStopped=Monitoring was already stopped for node {0}.

# Monitor Service Impl
MonitorServiceImpl.nullMonitorParam=Parameter 'monitor' should not be null.
Expand Down

0 comments on commit eabc10f

Please sign in to comment.