in flink_sink_adbpg_datastream/src/main/java/Adb4PgTableSink.java [362:468]
private void batchWrite(List<Row> rows) {
if (null == rows || rows.size() == 0){
return ;
}
long start = System.currentTimeMillis();
try {
if (writeMode == 1) {
StringBuilder stringBuilder = new StringBuilder();
for (Row row : rows) {
String[] fields = writeCopyFormat(schema, row, timeZone, reserveMs);
for(int i = 0; i < fields.length; i++) {
stringBuilder.append(fields[i]);
stringBuilder.append(i == fields.length - 1 ? "\r\n" : "\t");
}
}
byte[] data = stringBuilder.toString().getBytes(Charsets.UTF_8);
InputStream inputStream = new ByteArrayInputStream(data);
executeCopy(inputStream);
}
else if (writeMode == 2){
List<String> valueList = new ArrayList<String>();
String[] fields;
for (Row row : rows) {
fields = writeFormat(schema, row, timeZone, reserveMs);
valueList.add("(" + StringUtils.join(fields, ",") + ")");
}
StringBuilder sb = new StringBuilder();
if (caseSensitive) {
sb.append(insertClause).append("\"").append(targetSchema).append("\"").append(".").append("\"").append(tableName).append("\"").append(" (" + fieldNamesCaseSensitive + " ) values ");
}
else {
sb.append(insertClause).append(targetSchema).append(".").append(tableName).append(" (" + fieldNames + " ) values ");
}
sb.append(StringUtils.join(valueList, ","));
if (caseSensitive) {
sb.append(" on conflict(").append(primaryFieldNamesCaseSensitive).append(") ").append(" do update set (").append(nonPrimaryFieldNamesCaseSensitive).append(")=(").append(excludedNonPrimaryFieldNamesCaseSensitive).append(")");
}
else {
sb.append(" on conflict(").append(primaryFieldNames).append(") ").append(" do update set (").append(nonPrimaryFieldNames).append(")=(").append(excludedNonPrimaryFieldNames).append(")");
}
executeSql(sb.toString());
}
else {
List<String> valueList = new ArrayList<String>();
String[] fields;
for (Row row : rows) {
fields = writeFormat(schema, row, timeZone, reserveMs);
valueList.add("(" + StringUtils.join(fields, ",") + ")");
}
StringBuilder sb = new StringBuilder();
if (caseSensitive) {
sb.append(insertClause).append("\"").append(targetSchema).append("\"").append(".").append("\"").append(tableName).append("\"").append(" (" + fieldNamesCaseSensitive + " ) values ");
}
else {
sb.append(insertClause).append(targetSchema).append(".").append(tableName).append(" (" + fieldNames + " ) values ");
}
String sql = sb.toString() + StringUtils.join(valueList, ",");
executeSql(sql);
}
} catch (Exception e) {
LOG.warn("execute sql error:", e);
if (existsPrimaryKeys
&& e.getMessage() != null
&& e.getMessage().indexOf("duplicate key") != -1
&& e.getMessage().indexOf("violates unique constraint") != -1
&& "upsert".equalsIgnoreCase(conflictMode)){
LOG.warn("batch insert failed in upsert mode, will try to upsert msgs one by one", e);
for (Row row : rows) {
upsertRow(row);
}
}
else {
LOG.warn("batch insert failed, will try to insert msgs one by one", e);
for (Row row : rows) {
String insertSQL = getInsertSQL(row);
try {
executeSql(insertSQL);
} catch (SQLException insertException) {
//insertException.printStackTrace();
LOG.warn("Exception in insert sql: " + insertSQL, insertException);
if (existsPrimaryKeys
&& insertException.getMessage() != null
&& insertException.getMessage().indexOf("duplicate key") != -1
&& insertException.getMessage().indexOf("violates unique constraint") != -1) {
if ("strict".equalsIgnoreCase(conflictMode)) {
throw new RuntimeException("duplicate key value violates unique constraint");
} else if ("update".equalsIgnoreCase(conflictMode)) {
updateRow(row);
}
else if ("upsert".equalsIgnoreCase(conflictMode) || (2 == writeMode)) {
upsertRow(row);
}
}
else {
if ("strict".equalsIgnoreCase(exceptionMode)) {
throw new RuntimeException(insertException);
}
}
}
}
}
}
long end = System.currentTimeMillis();
}