in hologres-connector-flink-base/src/main/java/com/alibaba/ververica/connectors/hologres/jdbc/copy/HologresJDBCCopyWriter.java [127:222]
public long writeAddRecord(T record) throws IOException {
CopyContext copyContext;
Record jdbcRecord = recordConverter.convertFrom(record);
LOG.debug("Hologres insert record in JDBC-COPY: {}", jdbcRecord);
// The hologres instance cannot throw out dirty data details, so we do dirty data checking
// on the holo-client. But there will be some performance loss, so we only do dirty data
// check before the first flush, and no longer check after the first flush is successful.
// When write fails, the job fail over recovery will re-register the writer.
// At this time, the "checkDirtyData" will be true, that specific dirty data rows can be
// thrown.
if (checkDirtyData) {
try {
RecordChecker.check(jdbcRecord);
} catch (HoloClientException e) {
throw new IOException(
String.format(
"failed to copy because dirty data, the error record is %s.",
jdbcRecord),
e);
}
}
jdbcRecord.setType(Put.MutationType.INSERT);
com.alibaba.hologres.client.model.TableSchema schema = jdbcRecord.getSchema();
if (schema.isPartitionParentTable()) {
String partitionValue =
String.valueOf(jdbcRecord.getObject(schema.getPartitionIndex()));
if (partitionValueToCopyContext.containsKey(partitionValue)) {
copyContext = partitionValueToCopyContext.get(partitionValue);
} else {
com.alibaba.hologres.client.model.TableSchema childSchema =
checkChildTableExists(schema, partitionValue);
partitionValueToTableSchema.put(partitionValue, childSchema);
copyContext = new CopyContext();
copyContext.init(param);
copyContext.schema = childSchema;
partitionValueToCopyContext.put(partitionValue, copyContext);
if (partitionValueToCopyContext.size() > 5) {
throw new RuntimeException(
"Only support to write less than 5 child table at the same time now.");
}
}
jdbcRecord.changeToChildSchema(partitionValueToTableSchema.get(partitionValue));
} else {
copyContext =
partitionValueToCopyContext.computeIfAbsent(
param.getTable(), k -> new CopyContext().init(param));
}
try {
if (copyContext.os == null) {
boolean binary = "binary".equalsIgnoreCase(param.getCopyWriteFormat());
String sql =
buildCopyInSql(
jdbcRecord,
binary,
param.getJDBCWriteMode() == INSERT_OR_IGNORE
? INSERT_OR_IGNORE
: INSERT_OR_UPDATE,
copyMode);
LOG.info("copy sql :{}", sql);
CopyIn in = copyContext.manager.copyIn(sql);
copyContext.ios = new CopyInOutputStream(in);
// holo bulk load copy just support text, not support binary
if (copyMode != CopyMode.STREAM) {
copyContext.os =
new RecordTextOutputStream(
copyContext.ios,
schema,
copyContext.pgConn.unwrap(BaseConnection.class),
1024 * 1024 * 10);
} else {
copyContext.os =
binary
? new RecordBinaryOutputStream(
copyContext.ios,
schema,
copyContext.pgConn.unwrap(BaseConnection.class),
1024 * 1024 * 10)
: new RecordTextOutputStream(
copyContext.ios,
schema,
copyContext.pgConn.unwrap(BaseConnection.class),
1024 * 1024 * 10);
}
}
copyContext.os.putRecord(jdbcRecord);
if (param.isEnableAggressive() && copyMode == CopyMode.STREAM) {
copyContext.ios.flush();
}
} catch (SQLException e) {
LOG.error("close copyContext", e);
copyContext.close();
throw new IOException(e);
}
return jdbcRecord.getByteSize();
}