in holo-client/src/main/java/com/alibaba/hologres/client/impl/UnnestUpsertStatementBuilder.java [331:436]
protected void buildInsertStatement(Connection conn, HoloVersion version, TableSchema schema, TableName tableName, Tuple<BitSet, BitSet> columnSet, List<Record> recordList, List<PreparedStatementWithBatchInfo> list, WriteMode mode) throws SQLException {
//版本不符合直接采用老链路工作
if (!isVersionSupport(version)) {
super.buildInsertStatement(conn, version, schema, tableName, columnSet, recordList, list, mode);
return;
}
Record re = recordList.get(0);
CheckAndPutCondition checkAndPutCondition = null;
if (re instanceof CheckAndPutRecord) {
checkAndPutCondition = ((CheckAndPutRecord) re).getCheckAndPutCondition();
}
InsertSql insertSql = insertCache.computeIfAbsent(new Tuple4<>(schema, tableName, mode, checkAndPutCondition), columnSet, this::buildInsertSql);
PreparedStatement currentPs = null;
try {
currentPs = conn.prepareStatement(insertSql.sql);
//解析出来的sql必须满足unnest才走新链路
if (insertSql.isUnnest) {
long totalBytes = recordList.stream().collect(Collectors.summingLong(r -> r.getByteSize()));
int rows = recordList.size();
int stepRows = 0;
if (config.getMaxBytesPerSql() > 0L) {
long avg = Math.max(totalBytes / recordList.size(), 1);
stepRows = (int) Math.min(config.getMaxBytesPerSql() / avg, config.getMaxRowsPerSql());
}
stepRows = Math.min(recordList.size(), Math.max(stepRows, 0));
if (stepRows != 0) {
rows = recordList.size() % stepRows + stepRows;
}
boolean isInit = false;
boolean isFirstBatch = true;
JdbcColumnValues[] arrayList = new JdbcColumnValues[columnSet.l.cardinality()];
// 准备一个batch rows行的存储对象
prepareColumnValues(conn, rows, columnSet.l, schema, arrayList);
// 表示在一个batch中第N行
int row = 0;
int batchCount = 0;
for (Record record : recordList) {
//没有初始化batch就初始化
if (!isInit) {
if (isFirstBatch) {
isFirstBatch = false;
} else {
rows = stepRows;
}
// 准备一个batch rows行的存储对象
prepareColumnValues(conn, rows, columnSet.l, schema, arrayList);
++batchCount;
isInit = true;
}
//第几列
int arrayIndex = -1;
IntStream columnStream = columnSet.l.stream();
for (PrimitiveIterator.OfInt it = columnStream.iterator(); it.hasNext(); ) {
int index = it.next();
++arrayIndex;
arrayList[arrayIndex].set(row, record.getObject(index));
}
++row;
//如果一个batch攒够了
if (row == rows) {
//收尾当前batch
{
arrayIndex = -1;
columnStream = columnSet.l.stream();
for (PrimitiveIterator.OfInt it = columnStream.iterator(); it.hasNext(); ) {
++arrayIndex;
int index = it.next();
Column column = schema.getColumn(index);
Array array = conn.createArrayOf(getRealTypeName(column.getType(), column.getTypeName()), arrayList[arrayIndex].getArray());
currentPs.setArray(arrayIndex + 1, array);
}
isInit = false;
}
//如果一个PreparedStatement不止一个batch,那么要用addBatch来处理
if (rows < recordList.size()) {
currentPs.addBatch();
}
row = 0;
}
}
PreparedStatementWithBatchInfo preparedStatementWithBatchInfo = new PreparedStatementWithBatchInfo(currentPs, rows < recordList.size(), Put.MutationType.INSERT);
preparedStatementWithBatchInfo.setByteSize(totalBytes);
preparedStatementWithBatchInfo.setBatchCount(batchCount);
list.add(preparedStatementWithBatchInfo);
} else {
super.buildInsertStatement(conn, version, schema, tableName, columnSet, recordList, list, mode);
}
} catch (SQLException e) {
if (null != currentPs) {
try {
currentPs.close();
} catch (SQLException e1) {
}
}
throw e;
} finally {
if (insertCache.getSize() > 500) {
insertCache.clear();
}
}
}