in adb2client/src/main/java/com/alibaba/cloud/analyticdb/adbclient/AdbClient.java [417:485]
private void sqlQueueExecute() {
if (isSync) {
while (true) {
StringBuilder sb = sqlQueue.poll();
if (sb == null) {
break;
}
executeBatchSql(sb);
}
if (databaseConfig.isPartitionBatch()) {
partitionBatch.clear();
} else {
batchBuffer.clear();
}
if (System.currentTimeMillis() - periodStartTime > periodTime) {
periodStartTime = System.currentTimeMillis();
logger("info", "Total record count " + totalCount);
}
} else {
List<Future> futureList = new ArrayList<Future>();
final CountDownLatch latch = new CountDownLatch(databaseConfig.getParallelNumber());
for (int i = 0; i < databaseConfig.getParallelNumber(); i++) {
Future future = executorService.submit(new Runnable() {
@Override
public void run() {
try {
while (true) {
StringBuilder sb = sqlQueue.poll();
if (sb == null) {
break;
}
executeBatchSql(sb);
}
} finally {
latch.countDown();
}
}
});
futureList.add(future);
}
try {
latch.await();
for (Future f : futureList) {
f.get();
}
if (databaseConfig.isPartitionBatch()) {
partitionBatch.clear();
} else {
batchBuffer.clear();
}
if (System.currentTimeMillis() - periodStartTime > periodTime) {
periodStartTime = System.currentTimeMillis();
logger("info", "Total record count " + totalCount);
}
} catch (AdbClientException e) {
logger("error", "commit " + e.getMessage());
throw e;
} catch (Exception e) {
logger("error", "commit " + e.getMessage());
throw new RuntimeException(e.getMessage());
}
if (commitExceptionDataList.size() > 0) {
logger("error", "commit error data list " + commitExceptionDataList.toString());
throw new AdbClientException(AdbClientException.COMMIT_ERROR_DATA_LIST, commitExceptionDataList, commitException);
}
}
}