in adb3client/src/main/java/com/alibaba/cloud/analyticdb/adb3client/impl/collector/ActionCollector.java [85:125]
public void flush(boolean internal) throws AdbClientException {
flushLock.writeLock().lock();
try {
AdbClientException exception = null;
int doneCount = 0;
AtomicInteger uncommittedActionCount = new AtomicInteger(0);
boolean async = true;
while (true) {
doneCount = 0;
uncommittedActionCount.set(0);
for (Iterator<Map.Entry<TableName, TableCollector>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
TableCollector array = iter.next().getValue();
try {
if (array.flush(true, async, uncommittedActionCount)) {
++doneCount;
}
} catch (AdbClientException e) {
exception = ExceptionUtil.merge(exception, e);
}
}
if (doneCount == map.size()) {
break;
}
if (uncommittedActionCount.get() == 0) {
async = false;
}
}
if (exception != null) {
lastException.accumulateAndGet(exception, (lastOne, newOne) -> ExceptionUtil.merge(lastOne, newOne));
}
if (!internal) {
AdbClientException last = lastException.getAndSet(null);
if (last != null) {
throw last;
}
}
} finally {
flushLock.writeLock().unlock();
}
}