Skip to content

Commit

Permalink
strong order guarantee
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed Sep 27, 2022
1 parent be12e67 commit 747086f
Showing 1 changed file with 117 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
package org.apache.pulsar.io.jdbc;

import com.google.common.collect.Lists;
import java.io.Closeable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -58,7 +61,6 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
private PreparedStatement upsertStatement;
private PreparedStatement deleteStatement;


protected static final String ACTION_PROPERTY = "ACTION";

protected JdbcUtils.TableDefinition tableDefinition;
Expand All @@ -68,7 +70,8 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
private List<Record<T>> swapList;
private AtomicBoolean isFlushing;
private int batchSize;
private ScheduledExecutorService flushExecutor;

private Flusher flusher;

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
Expand Down Expand Up @@ -106,8 +109,7 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
swapList = Lists.newArrayList();
isFlushing = new AtomicBoolean(false);

flushExecutor = Executors.newScheduledThreadPool(1);
flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
flusher = new Flusher(timeoutMs);
}

private void initStatement() throws Exception {
Expand Down Expand Up @@ -137,9 +139,13 @@ private void initStatement() throws Exception {

@Override
public void close() throws Exception {
if (flushExecutor != null) {
flushExecutor.shutdown();
flushExecutor = null;
if (flusher != null) {
int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2;
flusher.close();
if (!flusher.latch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
log.warn("Flusher cannot be closed in time.");
}
flusher = null;
}
if (insertStatement != null) {
insertStatement.close();
Expand Down Expand Up @@ -171,7 +177,7 @@ public void write(Record<T> record) throws Exception {
number = incomingList.size();
}
if (number == batchSize) {
flushExecutor.schedule(this::flush, 0, TimeUnit.MILLISECONDS);
flusher.scheduleNow();
}
}

Expand Down Expand Up @@ -209,89 +215,122 @@ protected enum MutationType {
DELETE
}

private final class Flusher implements Runnable, Closeable {

private void flush() {
// if not in flushing state, do flush, else return;
if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
log.debug("Starting flush, queue size: {}", incomingList.size());
}
if (!swapList.isEmpty()) {
throw new IllegalStateException("swapList should be empty since last flush. swapList.size: "
+ swapList.size());
}
synchronized (this) {
List<Record<T>> tmpList;
swapList.clear();
private volatile boolean closed = false;

tmpList = swapList;
swapList = incomingList;
incomingList = tmpList;
private final ScheduledExecutorService flushExecutor = Executors.newScheduledThreadPool(1);
private final CountDownLatch latch = new CountDownLatch(1);
private final Phaser phaser = new Phaser(1) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
latch.countDown();
return true;
}
};

int count = 0;
try {
// bind each record value
for (Record<T> record : swapList) {
final Mutation mutation = createMutation(record);
switch (mutation.getType()) {
case DELETE:
bindValue(deleteStatement, mutation);
count += 1;
deleteStatement.execute();
break;
case UPDATE:
bindValue(updateStatement, mutation);
count += 1;
updateStatement.execute();
break;
case INSERT:
bindValue(insertStatement, mutation);
count += 1;
insertStatement.execute();
break;
case UPSERT:
bindValue(upsertStatement, mutation);
count += 1;
upsertStatement.execute();
break;
default:
String msg = String.format(
"Unsupported action %s, can be one of %s, or not set which indicate %s",
mutation.getType(), Arrays.toString(MutationType.values()), MutationType.INSERT);
throw new IllegalArgumentException(msg);
}
public Flusher(int timeoutMs) {
flushExecutor.scheduleAtFixedRate(this, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
}

public void scheduleNow() {
flushExecutor.schedule(this, 0, TimeUnit.MILLISECONDS);
}

@Override
public void run() {
phaser.register();
if (closed) {
phaser.arriveAndDeregister();
return;
}
// if not in flushing state, do flush, else return;
if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
log.debug("Starting flush, queue size: {}", incomingList.size());
}
if (!swapList.isEmpty()) {
phaser.arriveAndDeregister();
throw new IllegalStateException("swapList should be empty since last flush. swapList.size: "
+ swapList.size());
}
if (jdbcSinkConfig.isUseTransactions()) {
connection.commit();
synchronized (this) {
List<Record<T>> tmpList;
swapList.clear();

tmpList = swapList;
swapList = incomingList;
incomingList = tmpList;
}
swapList.forEach(Record::ack);
} catch (Exception e) {
log.error("Got exception ", e.getMessage(), e);
swapList.forEach(Record::fail);

int count = 0;
try {
// bind each record value
for (Record<T> record : swapList) {
final Mutation mutation = createMutation(record);
switch (mutation.getType()) {
case DELETE -> {
bindValue(deleteStatement, mutation);
count += 1;
deleteStatement.execute();
}
case UPDATE -> {
bindValue(updateStatement, mutation);
count += 1;
updateStatement.execute();
}
case INSERT -> {
bindValue(insertStatement, mutation);
count += 1;
insertStatement.execute();
}
case UPSERT -> {
bindValue(upsertStatement, mutation);
count += 1;
upsertStatement.execute();
}
}
}
if (jdbcSinkConfig.isUseTransactions()) {
connection.rollback();
connection.commit();
}
swapList.forEach(Record::ack);
} catch (Exception e) {
log.error("Got exception {}", e.getMessage(), e);
swapList.forEach(Record::fail);
try {
if (jdbcSinkConfig.isUseTransactions()) {
connection.rollback();
}
} catch (Exception ex) {
phaser.arriveAndDeregister();
throw new RuntimeException(ex);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

if (swapList.size() != count) {
log.error("Update count {} not match total number of records {}", count, swapList.size());
}
if (swapList.size() != count) {
log.error("Update count {} not match total number of records {}", count, swapList.size());
}

// finish flush
if (log.isDebugEnabled()) {
log.debug("Finish flush, queue size: {}", swapList.size());
}
swapList.clear();
isFlushing.set(false);
} else {
if (log.isDebugEnabled()) {
log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
// finish flush
if (log.isDebugEnabled()) {
log.debug("Finish flush, queue size: {}", swapList.size());
}
swapList.clear();
isFlushing.set(false);
} else {
if (log.isDebugEnabled()) {
log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
}
}
phaser.arriveAndDeregister();
}

@Override
public void close() {
flushExecutor.shutdown();
closed = true;
phaser.arriveAndDeregister();
}
}

Expand Down

0 comments on commit 747086f

Please sign in to comment.