From f7f4634f37da783e90c64d94a2fb0b5cb947ef33 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 27 Sep 2022 15:15:17 +0800 Subject: [PATCH] factor out defer logic to avoid further bugs Signed-off-by: tison --- .../org/apache/pulsar/io/jdbc/JdbcAbstractSink.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index 4401f1f01ea4c..c5d5d63f93b9c 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -240,8 +240,15 @@ public void scheduleNow() { @Override public void run() { phaser.register(); - if (closed) { + try { + internalRun(); + } finally { phaser.arriveAndDeregister(); + } + } + + private void internalRun() { + if (closed) { return; } // if not in flushing state, do flush, else return; @@ -250,7 +257,6 @@ public void run() { log.debug("Starting flush, queue size: {}", incomingList.size()); } if (!swapList.isEmpty()) { - phaser.arriveAndDeregister(); throw new IllegalStateException("swapList should be empty since last flush. swapList.size: " + swapList.size()); } @@ -303,7 +309,6 @@ public void run() { connection.rollback(); } } catch (Exception ex) { - phaser.arriveAndDeregister(); throw new RuntimeException(ex); } } @@ -323,7 +328,6 @@ public void run() { log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size()); } } - phaser.arriveAndDeregister(); } @Override