in pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java [159:233]
public abstract void bindValue(
PreparedStatement statement,
Record<T> message, String action) throws Exception;
private void flush() {
// if not in flushing state, do flush, else return;
if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
log.debug("Starting flush, queue size: {}", incomingList.size());
}
if (!swapList.isEmpty()) {
throw new IllegalStateException("swapList should be empty since last flush. swapList.size: " + swapList.size());
}
synchronized (this) {
List<Record<T>> tmpList;
swapList.clear();
tmpList = swapList;
swapList = incomingList;
incomingList = tmpList;
}
int count = 0;
try {
// bind each record value
for (Record<T> record : swapList) {
String action = record.getProperties().get(ACTION);
if (action == null) {
action = INSERT;
}
switch (action) {
case DELETE:
bindValue(deleteStatement, record, action);
count += 1;
deleteStatement.execute();
break;
case UPDATE:
bindValue(updateStatement, record, action);
count += 1;
updateStatement.execute();
break;
case INSERT:
bindValue(insertStatement, record, action);
count += 1;
insertStatement.execute();
break;
default:
String msg = String.format("Unsupported action %s, can be one of %s, or not set which indicate %s",
action, Arrays.asList(INSERT, UPDATE, DELETE), INSERT);
throw new IllegalArgumentException(msg);
}
}
connection.commit();
swapList.forEach(Record::ack);
} catch (Exception e) {
log.error("Got exception ", e);
swapList.forEach(Record::fail);
}
if (swapList.size() != count) {
log.error("Update count {} not match total number of records {}", count, swapList.size());
}
// finish flush
if (log.isDebugEnabled()) {
log.debug("Finish flush, queue size: {}", swapList.size());
}
swapList.clear();
isFlushing.set(false);
} else {
if (log.isDebugEnabled()) {
log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
}
}
}