From 0514e0926202b50054643f677510c7bfe16c01f1 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Mon, 23 Dec 2024 11:33:05 -0800 Subject: [PATCH] RATIS-2225. RaftClientRequest leak in RaftServerImpl. (#1198) --- .../ratis/server/impl/RaftServerImpl.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 5a7e372333..0ec73d5f48 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -631,6 +631,15 @@ synchronized void changeToLeader() { @Override public Collection getCommitInfos() { + try { + return getCommitInfosImpl(); + } catch (Throwable t) { + LOG.warn("{} Failed to getCommitInfos", getMemberId(), t); + return Collections.emptyList(); + } + } + + private Collection getCommitInfosImpl() { final List infos = new ArrayList<>(); // add the commit info of this server final long commitIndex = updateCommitInfoCache(); @@ -922,17 +931,10 @@ CompletableFuture executeSubmitClientRequestAsync( public CompletableFuture submitClientRequestAsync( ReferenceCountedObject requestRef) { final RaftClientRequest request = requestRef.retain(); - LOG.debug("{}: receive client request({})", getMemberId(), request); - try { + LOG.debug("{}: receive client request({})", getMemberId(), request); assertLifeCycleState(LifeCycle.States.RUNNING); - } catch (ServerNotReadyException e) { - final RaftClientReply reply = newExceptionReply(request, e); - requestRef.release(); - return CompletableFuture.completedFuture(reply); - } - try { RaftClientRequest.Type type = request.getType(); final Timekeeper timer = raftServerMetrics.getClientRequestTimer(type); final Optional timerContext = Optional.ofNullable(timer).map(Timekeeper::time); @@ -942,6 +944,11 @@ public CompletableFuture submitClientRequestAsync( raftServerMetrics.incFailedRequestCount(type); } }); + } catch (RaftException e) { + return CompletableFuture.completedFuture(newExceptionReply(request, e)); + } catch (Throwable t) { + LOG.error("{} Failed to submitClientRequestAsync for {}", getMemberId(), request, t); + return CompletableFuture.completedFuture(newExceptionReply(request, new RaftException(t))); } finally { requestRef.release(); }