Skip to content

Commit

Permalink
fix: SqliteJdbcSinkTest close in order (#17849)
Browse files Browse the repository at this point in the history
* fix: delete sqlite files after jdbc connection closed

This closes #17713.

Signed-off-by: tison <[email protected]>

* uses isolated db file

Signed-off-by: tison <[email protected]>

* Revert "uses isolated db file"

This reverts commit 295db3c.

* close in order

Signed-off-by: tison <[email protected]>

* strong order guarantee

Signed-off-by: tison <[email protected]>

* factor out defer logic to avoid further bugs

Signed-off-by: tison <[email protected]>

* Revert "factor out defer logic to avoid further bugs"

This reverts commit f7f4634.

* Revert "strong order guarantee"

This reverts commit 747086f.

* use awaitTermination

Signed-off-by: tison <[email protected]>

Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored Sep 28, 2022
1 parent 7e4c746 commit c1ce3dd
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,11 @@ private void initStatement() throws Exception {

@Override
public void close() throws Exception {
if (connection != null && jdbcSinkConfig.isUseTransactions()) {
connection.commit();
if (flushExecutor != null) {
int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2;
flushExecutor.shutdown();
flushExecutor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
flushExecutor = null;
}
if (insertStatement != null) {
insertStatement.close();
Expand All @@ -152,9 +155,8 @@ public void close() throws Exception {
if (deleteStatement != null) {
deleteStatement.close();
}
if (flushExecutor != null) {
flushExecutor.shutdown();
flushExecutor = null;
if (connection != null && jdbcSinkConfig.isUseTransactions()) {
connection.commit();
}
if (connection != null) {
connection.close();
Expand Down Expand Up @@ -267,7 +269,7 @@ private void flush() {
}
swapList.forEach(Record::ack);
} catch (Exception e) {
log.error("Got exception ", e.getMessage(), e);
log.error("Got exception {}", e.getMessage(), e);
swapList.forEach(Record::fail);
try {
if (jdbcSinkConfig.isUseTransactions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,14 @@ public void setUp() throws Exception {

jdbcSink = new SqliteJdbcAutoSchemaSink();

// open should success
// open should succeed
jdbcSink.open(conf, null);


}

@AfterMethod(alwaysRun = true)
public void tearDown() throws Exception {
sqliteUtils.tearDown();
jdbcSink.close();
sqliteUtils.tearDown();
}

private void testOpenAndWriteSinkNullValue(Map<String, String> actionProperties) throws Exception {
Expand Down

0 comments on commit c1ce3dd

Please sign in to comment.