in adb2client/src/main/java/com/alibaba/cloud/analyticdb/adbclient/AdbClient.java [207:294]
public void addRow(String table, Row row) {
String tableName = table.toLowerCase();
if (this.tableInfo.get(tableName) == null) {
throw new AdbClientException(AdbClientException.CONFIG_ERROR, "The table " + tableName + " do not exist", null);
}
if (row.getColumnValues().size() != databaseConfig.getColumns(tableName).size()) {
throw new AdbClientException(AdbClientException.ADD_DATA_ERROR, "Add row data is illegal, column size is not equal as config", null);
}
Connection conn = null;
conn = this.getConnection();
StringBuilder sqlSb = null;
int partitionId = 0;
// Write by partition
if (databaseConfig.isPartitionBatch()) {
partitionId = getHashPartition(tableName, row);
if (partitionBatch.get(tableName) == null) {
partitionBatch.put(tableName, new HashMap<Integer, MutablePair<StringBuilder, Integer>>());
}
if (partitionBatch.get(tableName).get(partitionId) == null) {
partitionBatch.get(tableName).put(partitionId, new MutablePair<StringBuilder, Integer>(new StringBuilder(), 0));
}
sqlSb = partitionBatch.get(tableName).get(partitionId).getLeft();
} else {
// Single combine
if (batchBuffer.get(tableName) == null) {
batchBuffer.put(tableName, new StringBuilder());
}
sqlSb = batchBuffer.get(tableName);
}
try {
String sqlResult = generateInsertSql(tableName, conn, row);
closeDBResources(null, null, conn);
String subSql = sqlResult.substring(insertSqlPrefix.get(tableName).length());
int subSqlLen = subSql.getBytes().length;
if (sqlSb.length() > 0) {
int l = partitionBatch.get(tableName).get(partitionId).getRight() + subSqlLen;
if (l + SQL_SPLIT_CHARACTER_LEN >= databaseConfig.getCommitSize()) {
try {
if (commitExceptionDataList.size() > 0 || commitException != null) {
commitExceptionDataList.clear();
commitException = null;
}
if (databaseConfig.isPartitionBatch()) {
// 自动提交只会提交单条语句
executeBatchSql(sqlSb);
if (partitionBatch.get(tableName) == null) {
partitionBatch.put(tableName, new HashMap<Integer, MutablePair<StringBuilder, Integer>>());
}
partitionBatch.get(tableName).put(partitionId, new MutablePair<StringBuilder, Integer>(new StringBuilder(), 0));
sqlSb = partitionBatch.get(tableName).get(partitionId).getLeft();
} else {
// Commit directly
executeBatchSql(sqlSb);
sqlSb = new StringBuilder();
batchBuffer.put(tableName, sqlSb);
}
if (commitExceptionDataList.size() > 0) {
logger("error", "Auto commit error data list " + commitExceptionDataList.toString());
throw new AdbClientException(AdbClientException.COMMIT_ERROR_DATA_LIST, commitExceptionDataList, commitException);
}
} catch (AdbClientException e) {
throw e;
} catch (Exception e) {
throw new AdbClientException(AdbClientException.SQL_LENGTH_LIMIT, "Sql length is too large, auto commit failed. Please commit first. exception:" + e.getMessage(), e);
}
}
}
if (sqlSb.length() == 0) {
sqlSb.append(sqlResult.replace(SQL_SPLIT_CHARACTER, " , "));
subSqlLen += insertSqlPrefix.get(tableName).getBytes().length;
} else {
sqlSb.append(SQL_SPLIT_CHARACTER);
sqlSb.append(subSql.replace(SQL_SPLIT_CHARACTER, " , "));
subSqlLen += SQL_SPLIT_CHARACTER_LEN;
}
partitionBatch.get(tableName).get(partitionId).setRight(partitionBatch.get(tableName).get(partitionId).getRight() + subSqlLen);
totalCount.incrementAndGet();
} catch (AdbClientException e) {
logger("error", "addRow " + e.getMessage());
throw e;
} catch (Exception e) {
throw new AdbClientException(AdbClientException.ADD_DATA_ERROR, String.format("Add row data (%s) error: %s", row.getColumnValues().toString(), e.getMessage()), e);
} finally {
closeDBResources(null, null, conn);
}
}