protected void buildInsertStatement()

in holo-client/src/main/java/com/alibaba/hologres/client/impl/UnnestUpsertStatementBuilder.java [331:436]


	protected void buildInsertStatement(Connection conn, HoloVersion version, TableSchema schema, TableName tableName, Tuple<BitSet, BitSet> columnSet, List<Record> recordList, List<PreparedStatementWithBatchInfo> list, WriteMode mode) throws SQLException {
		//版本不符合直接采用老链路工作
		if (!isVersionSupport(version)) {
			super.buildInsertStatement(conn, version, schema, tableName, columnSet, recordList, list, mode);
			return;
		}
		Record re = recordList.get(0);
		CheckAndPutCondition checkAndPutCondition = null;
		if (re instanceof CheckAndPutRecord) {
			checkAndPutCondition = ((CheckAndPutRecord) re).getCheckAndPutCondition();
		}
		InsertSql insertSql =  insertCache.computeIfAbsent(new Tuple4<>(schema, tableName, mode, checkAndPutCondition), columnSet, this::buildInsertSql);
		PreparedStatement currentPs = null;
		try {
			currentPs = conn.prepareStatement(insertSql.sql);

			//解析出来的sql必须满足unnest才走新链路
			if (insertSql.isUnnest) {
				long totalBytes = recordList.stream().collect(Collectors.summingLong(r -> r.getByteSize()));
				int rows = recordList.size();
				int stepRows = 0;
				if (config.getMaxBytesPerSql() > 0L) {
					long avg = Math.max(totalBytes / recordList.size(), 1);
					stepRows = (int) Math.min(config.getMaxBytesPerSql() / avg, config.getMaxRowsPerSql());
				}
				stepRows = Math.min(recordList.size(), Math.max(stepRows, 0));

				if (stepRows != 0) {
					rows = recordList.size() % stepRows + stepRows;
				}
				boolean isInit = false;
				boolean isFirstBatch = true;
				JdbcColumnValues[] arrayList = new JdbcColumnValues[columnSet.l.cardinality()];
				// 准备一个batch rows行的存储对象
				prepareColumnValues(conn, rows, columnSet.l, schema, arrayList);
				// 表示在一个batch中第N行
				int row = 0;

				int batchCount = 0;
				for (Record record : recordList) {
					//没有初始化batch就初始化
					if (!isInit) {
						if (isFirstBatch) {
							isFirstBatch = false;
						} else {
							rows = stepRows;
						}
						// 准备一个batch rows行的存储对象
						prepareColumnValues(conn, rows, columnSet.l, schema, arrayList);
						++batchCount;
						isInit = true;
					}
					//第几列
					int arrayIndex = -1;
					IntStream columnStream = columnSet.l.stream();
					for (PrimitiveIterator.OfInt it = columnStream.iterator(); it.hasNext(); ) {
						int index = it.next();
						++arrayIndex;
						arrayList[arrayIndex].set(row, record.getObject(index));
					}
					++row;
					//如果一个batch攒够了
					if (row == rows) {
						//收尾当前batch
						{
							arrayIndex = -1;
							columnStream = columnSet.l.stream();
							for (PrimitiveIterator.OfInt it = columnStream.iterator(); it.hasNext(); ) {
								++arrayIndex;
								int index = it.next();
								Column column = schema.getColumn(index);

								Array array = conn.createArrayOf(getRealTypeName(column.getType(), column.getTypeName()), arrayList[arrayIndex].getArray());
								currentPs.setArray(arrayIndex + 1, array);
							}
							isInit = false;
						}
						//如果一个PreparedStatement不止一个batch,那么要用addBatch来处理
						if (rows < recordList.size()) {
							currentPs.addBatch();
						}
						row = 0;
					}
				}
				PreparedStatementWithBatchInfo preparedStatementWithBatchInfo = new PreparedStatementWithBatchInfo(currentPs, rows < recordList.size(), Put.MutationType.INSERT);
				preparedStatementWithBatchInfo.setByteSize(totalBytes);
				preparedStatementWithBatchInfo.setBatchCount(batchCount);
				list.add(preparedStatementWithBatchInfo);
			} else {
				super.buildInsertStatement(conn, version, schema, tableName, columnSet, recordList, list, mode);
			}
		} catch (SQLException e) {
			if (null != currentPs) {
				try {
					currentPs.close();
				} catch (SQLException e1) {

				}
			}
			throw e;
		} finally {
			if (insertCache.getSize() > 500) {
				insertCache.clear();
			}
		}
	}