Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable #23634

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

yyj8
Copy link
Contributor

@yyj8 yyj8 commented Nov 24, 2024

Fixes #23635

Main Issue: #xyz

PIP: #xyz

Motivation

In some special scenarios, when the broker service has a deadlock, it needs to be able to automatically recover instead of requiring manual intervention. For example, when the service is deployed in a customer environment, we cannot directly manage it. If the service has a deadlock, the k8s probe should return a failure because the service may be unavailable. The probe failure triggers a broker pod restart to resolve the deadlock.

Modifications

Add deadlock detection in the probe. If a deadlock exists, print the thread stack and return a service unavailable exception.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:
yyj8#10

…return a failure because the service may be unavailable
Copy link

@yyj8 Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Nov 24, 2024
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's already a deadlock check in the health check:

@GET
@Path("/health")
@ApiOperation(value = "Run a healthCheck against the broker")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
public void healthCheck(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Topic Version")
@QueryParam("topicVersion") TopicVersion topicVersion) {
validateSuperUserAccessAsync()
.thenAccept(__ -> checkDeadlockedThreads())
.thenCompose(__ -> internalRunHealthCheck(topicVersion))
.thenAccept(__ -> {
LOG.info("[{}] Successfully run health check.", clientAppId());
asyncResponse.resume(Response.ok("ok").build());
}).exceptionally(ex -> {
LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
private void checkDeadlockedThreads() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
long[] threadIds = threadBean.findDeadlockedThreads();
if (threadIds != null && threadIds.length > 0) {
ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false, false);
String threadNames = Arrays.stream(threadInfos)
.map(threadInfo -> threadInfo.getThreadName() + "(tid=" + threadInfo.getThreadId() + ")").collect(
Collectors.joining(", "));
if (System.currentTimeMillis() - threadDumpLoggedTimestamp
> LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) {
threadDumpLoggedTimestamp = System.currentTimeMillis();
LOG.error("Deadlocked threads detected. {}\n{}", threadNames,
ThreadDumpUtil.buildThreadDiagnosticString());
} else {
LOG.error("Deadlocked threads detected. {}", threadNames);
}
throw new IllegalStateException("Deadlocked threads detected. " + threadNames);
}
}

It also contains an example of how to check deadlocks.

…return a failure because the service may be unavailable
@yyj8 yyj8 requested a review from lhotari November 25, 2024 14:38
Comment on lines 67 to 84
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
long[] threadIds = threadBean.findDeadlockedThreads();
if (threadIds != null && threadIds.length > 0) {
ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false,
false);
String threadNames = Arrays.stream(threadInfos)
.map(threadInfo -> threadInfo.getThreadName()
+ "(tid=" + threadInfo.getThreadId() + ")")
.collect(Collectors.joining(", "));
if (System.currentTimeMillis() - threadDumpLoggedTimestamp
> LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) {
String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString();
log.error("Deadlock detected, service may be unavailable, "
+ "thread stack details are as follows: {}.", diagnosticResult);
threadDumpLoggedTimestamp = System.currentTimeMillis();
} else {
log.error("Deadlocked threads detected. {}", threadNames);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the status endpoint doesn't have authentication, it will be necessary to have a solution to prevent introducing a new DoS vulnerability where calling the status endpoint in a tight loop could introduce significant load to the system. One way would be to check that the deadlock check is executed only when there's more than 1 seconds from the previous check. If it's less than that, the previous result of the deadlock check would be reused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the status endpoint doesn't have authentication, it will be necessary to have a solution to prevent introducing a new DoS vulnerability where calling the status endpoint in a tight loop could introduce significant load to the system. One way would be to check that the deadlock check is executed only when there's more than 1 seconds from the previous check. If it's less than that, the previous result of the deadlock check would be reused.

This is a very good proposal, and there is indeed a risk of being attacked by DoS. The code logic has been adjusted accordingly. Please help review the code again.

@lhotari lhotari changed the title [fix][broker]If there is a deadlock in the service, the probe should return a failure because the service may be unavailable [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable Nov 26, 2024
…e should return a failure because the service may be unavailable
…e should return a failure because the service may be unavailable
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work @yyj8. Some suggestions for field naming and simplifying the code comment.

…e should return a failure because the service may be unavailable
…e should return a failure because the service may be unavailable.
@yyj8 yyj8 requested a review from lhotari November 27, 2024 07:41
// Locking classes to avoid deadlock detection in multi-thread concurrent requests.
synchronized (VipStatus.class) {
if (System.currentTimeMillis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) {
lastCheckStatusTimestamp = System.currentTimeMillis();
Copy link
Member

@lhotari lhotari Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this line should be removed since the purpose of the field value is to record the timestamp when an actual check was made and rate limit that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this line should be removed since the purpose of the field value is to record the timestamp when an actual check was made and rate limit that.

Your suggestion is not to need synchronization, right? If there is no code synchronization, will there be a massive number of DoS attacks that execute deadlock detection logic simultaneously, causing a surge in node load.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your suggestion is not to need synchronization, right? If there is no code synchronization, will there be a massive number of DoS attacks that execute deadlock detection logic simultaneously, causing a surge in node load.

I'm not talking about synchronization. If lastCheckStatusTimestamp gets updated here, it breaks the logic.
The previous DoS concern was about calling the deadlock detection since that has a higher cost than before. Jetty has separate DoSFilter for adding basic DoS protection. In general Pulsar isn't protected against malicious DoS attacks and it's not meant to be exposed on the public internet. In this case, we just need to avoid adding a lot of extra cost compared to the previous version.

The original suggestion was to rate limit the deadlock check, however it's fine to also rate limit the file existence check. By removing this line, I believe that the logic would be fine. However, adding tests would be helpful to ensure this. There could be unit tests with mocking to ensure it. That might require further refactoring so that mocks could be properly injected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new WebApplicationException(Status.SERVICE_UNAVAILABLE);
}
}
lastCheckStatusTimestamp = System.currentTimeMillis();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the only location where lastCheckStatusTimestamp is updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final global code logic content is as follows:

public class VipStatus {

    public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath";
    public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe";

    // log a full thread dump when a deadlock is detected in status check once every 10 minutes
    // to prevent excessive logging
    private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L;
    private static volatile long lastCheckStatusTimestamp;

    // Rate limit status checks to every 500ms to prevent DoS
    private static final long CHECK_STATUS_INTERVAL = 500L;
    private static volatile boolean lastCheckStatusResult;

    @Context
    protected ServletContext servletContext;

    @GET
    public String checkStatus() {
        // Locking classes to avoid deadlock detection in multi-thread concurrent requests.
        synchronized (VipStatus.class) {
            if (System.currentTimeMillis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) {
                if (lastCheckStatusResult) {
                    return "OK";
                } else {
                    throw new WebApplicationException(Status.SERVICE_UNAVAILABLE);
                }
            }
            lastCheckStatusTimestamp = System.currentTimeMillis();

            String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
            @SuppressWarnings("unchecked")
            Supplier<Boolean> isReadyProbe = (Supplier<Boolean>) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE);

            boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true;

            if (statusFilePath != null) {
                File statusFile = new File(statusFilePath);
                if (isReady && statusFile.exists() && statusFile.isFile()) {
                    // check deadlock
                    ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
                    long[] threadIds = threadBean.findDeadlockedThreads();
                    if (threadIds != null && threadIds.length > 0) {
                        ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false,
                                false);
                        String threadNames = Arrays.stream(threadInfos)
                                .map(threadInfo -> threadInfo.getThreadName()
                                        + "(tid=" + threadInfo.getThreadId() + ")")
                                .collect(Collectors.joining(", "));
                        if (System.currentTimeMillis() - lastCheckStatusTimestamp
                                > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) {
                            String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString();
                            log.error("Deadlock detected, service may be unavailable, "
                                    + "thread stack details are as follows: {}.", diagnosticResult);
                        } else {
                            log.error("Deadlocked threads detected. {}", threadNames);
                        }
                        lastCheckStatusResult = false;
                        throw new WebApplicationException(Status.SERVICE_UNAVAILABLE);
                    } else {
                        lastCheckStatusResult = true;
                        return "OK";
                    }
                }
            }
            lastCheckStatusResult = false;
            log.warn("Failed to access \"status.html\". The service is not ready");
            throw new WebApplicationException(Status.NOT_FOUND);
        }
    }

}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding tests will be useful since validating logic just by looking at the code isn't a good practice. You might have to modify some details to inject mocks. For example, there could be a separate constructor for tests where the ThreadTXBean is injected. In the default constructor used in production code, ManagementFactory,getThreadMXBean() method call could be used to pass the value. In test code, a mock could be passed. It's also useful to refactor the code to use a java.time.Clock.millis() instead of System.currentTimeMillis() and also have the Clock instance in the constructor so that test code could pass a mock implementation where it's possible to make time jump forward when validating the logic. Clock.systemUTC() could be used in the default constructor for production code.

…e should return a failure because the service may be unavailable.
@lhotari
Copy link
Member

lhotari commented Nov 27, 2024

@yyj8 btw. when you add commits to the PR, it's useful to make the commit title about the change and not copy the PR title into the follow up commits. When the PR is merged, all commits are squashed so they won't end up in the final merged commit. The benefit of the commit messages in the PR commits is that the reviewer will be able to follow the changes.

@lhotari lhotari added this to the 4.1.0 milestone Nov 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs
Projects
None yet
2 participants