private void sqlQueueExecute()

in adb2client/src/main/java/com/alibaba/cloud/analyticdb/adbclient/AdbClient.java [417:485]


    private void sqlQueueExecute() {
        if (isSync) {
            while (true) {
                StringBuilder sb = sqlQueue.poll();
                if (sb == null) {
                    break;
                }
                executeBatchSql(sb);
            }
            if (databaseConfig.isPartitionBatch()) {
                partitionBatch.clear();
            } else {
                batchBuffer.clear();
            }
            if (System.currentTimeMillis() - periodStartTime > periodTime) {
                periodStartTime = System.currentTimeMillis();
                logger("info", "Total record count " + totalCount);
            }
        } else {
            List<Future> futureList = new ArrayList<Future>();
            final CountDownLatch latch = new CountDownLatch(databaseConfig.getParallelNumber());
            for (int i = 0; i < databaseConfig.getParallelNumber(); i++) {
                Future future = executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            while (true) {
                                StringBuilder sb = sqlQueue.poll();
                                if (sb == null) {
                                    break;
                                }
                                executeBatchSql(sb);
                            }
                        } finally {
                            latch.countDown();
                        }
                    }
                });
                futureList.add(future);
            }

            try {
                latch.await();
                for (Future f : futureList) {
                    f.get();
                }
                if (databaseConfig.isPartitionBatch()) {
                    partitionBatch.clear();
                } else {
                    batchBuffer.clear();
                }
                if (System.currentTimeMillis() - periodStartTime > periodTime) {
                    periodStartTime = System.currentTimeMillis();
                    logger("info", "Total record count " + totalCount);
                }
            } catch (AdbClientException e) {
                logger("error", "commit " + e.getMessage());
                throw e;
            } catch (Exception e) {
                logger("error", "commit " + e.getMessage());
                throw new RuntimeException(e.getMessage());
            }
            if (commitExceptionDataList.size() > 0) {
                logger("error", "commit error data list " + commitExceptionDataList.toString());
                throw new AdbClientException(AdbClientException.COMMIT_ERROR_DATA_LIST, commitExceptionDataList, commitException);
            }
        }

    }