private void batchWrite()

in flink_sink_adbpg_datastream/src/main/java/Adb4PgTableSink.java [362:468]


	private void batchWrite(List<Row> rows) {
		if (null == rows || rows.size() == 0){
			return ;
		}
		long start = System.currentTimeMillis();
		try {
			if (writeMode == 1) {
				StringBuilder stringBuilder = new StringBuilder();
				for (Row row : rows) {
					String[] fields = writeCopyFormat(schema, row, timeZone, reserveMs);
					for(int i = 0; i < fields.length; i++) {
						stringBuilder.append(fields[i]);
						stringBuilder.append(i == fields.length - 1 ? "\r\n" : "\t");
					}
				}
				byte[] data = stringBuilder.toString().getBytes(Charsets.UTF_8);
				InputStream inputStream = new ByteArrayInputStream(data);
				executeCopy(inputStream);
			}
			else if (writeMode == 2){
				List<String> valueList = new ArrayList<String>();
				String[] fields;
				for (Row row : rows) {
					fields = writeFormat(schema, row, timeZone, reserveMs);
					valueList.add("(" + StringUtils.join(fields, ",") + ")");
				}

				StringBuilder sb = new StringBuilder();
				if (caseSensitive) {
					sb.append(insertClause).append("\"").append(targetSchema).append("\"").append(".").append("\"").append(tableName).append("\"").append(" (" + fieldNamesCaseSensitive + " ) values ");
				}
				else {
					sb.append(insertClause).append(targetSchema).append(".").append(tableName).append(" (" + fieldNames + " ) values ");
				}

				sb.append(StringUtils.join(valueList, ","));
				if (caseSensitive) {
					sb.append(" on conflict(").append(primaryFieldNamesCaseSensitive).append(") ").append(" do update set (").append(nonPrimaryFieldNamesCaseSensitive).append(")=(").append(excludedNonPrimaryFieldNamesCaseSensitive).append(")");
				}
				else {
					sb.append(" on conflict(").append(primaryFieldNames).append(") ").append(" do update set (").append(nonPrimaryFieldNames).append(")=(").append(excludedNonPrimaryFieldNames).append(")");
				}
				executeSql(sb.toString());
			}
			else {
				List<String> valueList = new ArrayList<String>();
				String[] fields;
				for (Row row : rows) {
					fields = writeFormat(schema, row, timeZone, reserveMs);
					valueList.add("(" + StringUtils.join(fields, ",") + ")");
				}

				StringBuilder sb = new StringBuilder();
				if (caseSensitive) {
					sb.append(insertClause).append("\"").append(targetSchema).append("\"").append(".").append("\"").append(tableName).append("\"").append(" (" + fieldNamesCaseSensitive + " ) values ");
				}
				else {
					sb.append(insertClause).append(targetSchema).append(".").append(tableName).append(" (" + fieldNames + " ) values ");
				}
				String sql = sb.toString() + StringUtils.join(valueList, ",");
				executeSql(sql);
			}
		} catch (Exception e) {
			LOG.warn("execute sql error:", e);
			if (existsPrimaryKeys
					&& e.getMessage() != null
					&& e.getMessage().indexOf("duplicate key") != -1
					&& e.getMessage().indexOf("violates unique constraint") != -1
					&& "upsert".equalsIgnoreCase(conflictMode)){
				LOG.warn("batch insert failed in upsert mode, will try to upsert msgs one by one", e);
				for (Row row : rows) {
					upsertRow(row);
				}
			}
			else {
				LOG.warn("batch insert failed, will try to insert msgs one by one", e);
				for (Row row : rows) {
					String insertSQL = getInsertSQL(row);
					try {
						executeSql(insertSQL);
					} catch (SQLException insertException) {
						//insertException.printStackTrace();
						LOG.warn("Exception in insert sql: " + insertSQL, insertException);
						if (existsPrimaryKeys
								&& insertException.getMessage() != null
								&& insertException.getMessage().indexOf("duplicate key") != -1
								&& insertException.getMessage().indexOf("violates unique constraint") != -1) {
							if ("strict".equalsIgnoreCase(conflictMode)) {
								throw new RuntimeException("duplicate key value violates unique constraint");
							} else if ("update".equalsIgnoreCase(conflictMode)) {
								updateRow(row);
							}
							else if ("upsert".equalsIgnoreCase(conflictMode) || (2 == writeMode)) {
								upsertRow(row);
							}
						}
						else {
							if ("strict".equalsIgnoreCase(exceptionMode)) {
								throw new RuntimeException(insertException);
							}
						}
					}
				}
			}
		}
		long end = System.currentTimeMillis();
	}