public long writeAddRecord()

in hologres-connector-flink-base/src/main/java/com/alibaba/ververica/connectors/hologres/jdbc/copy/HologresJDBCCopyWriter.java [127:222]


    public long writeAddRecord(T record) throws IOException {
        CopyContext copyContext;
        Record jdbcRecord = recordConverter.convertFrom(record);
        LOG.debug("Hologres insert record in JDBC-COPY: {}", jdbcRecord);
        // The hologres instance cannot throw out dirty data details, so we do dirty data checking
        // on the holo-client. But there will be some performance loss, so we only do dirty data
        // check before the first flush, and no longer check after the first flush is successful.
        // When write fails, the job fail over recovery will re-register the writer.
        // At this time, the "checkDirtyData" will be true, that specific dirty data rows can be
        // thrown.
        if (checkDirtyData) {
            try {
                RecordChecker.check(jdbcRecord);
            } catch (HoloClientException e) {
                throw new IOException(
                        String.format(
                                "failed to copy because dirty data, the error record is %s.",
                                jdbcRecord),
                        e);
            }
        }
        jdbcRecord.setType(Put.MutationType.INSERT);
        com.alibaba.hologres.client.model.TableSchema schema = jdbcRecord.getSchema();
        if (schema.isPartitionParentTable()) {
            String partitionValue =
                    String.valueOf(jdbcRecord.getObject(schema.getPartitionIndex()));
            if (partitionValueToCopyContext.containsKey(partitionValue)) {
                copyContext = partitionValueToCopyContext.get(partitionValue);
            } else {
                com.alibaba.hologres.client.model.TableSchema childSchema =
                        checkChildTableExists(schema, partitionValue);
                partitionValueToTableSchema.put(partitionValue, childSchema);
                copyContext = new CopyContext();
                copyContext.init(param);
                copyContext.schema = childSchema;
                partitionValueToCopyContext.put(partitionValue, copyContext);
                if (partitionValueToCopyContext.size() > 5) {
                    throw new RuntimeException(
                            "Only support to write less than 5 child table at the same time now.");
                }
            }
            jdbcRecord.changeToChildSchema(partitionValueToTableSchema.get(partitionValue));
        } else {
            copyContext =
                    partitionValueToCopyContext.computeIfAbsent(
                            param.getTable(), k -> new CopyContext().init(param));
        }

        try {
            if (copyContext.os == null) {
                boolean binary = "binary".equalsIgnoreCase(param.getCopyWriteFormat());
                String sql =
                        buildCopyInSql(
                                jdbcRecord,
                                binary,
                                param.getJDBCWriteMode() == INSERT_OR_IGNORE
                                        ? INSERT_OR_IGNORE
                                        : INSERT_OR_UPDATE,
                                copyMode);
                LOG.info("copy sql :{}", sql);
                CopyIn in = copyContext.manager.copyIn(sql);
                copyContext.ios = new CopyInOutputStream(in);
                // holo bulk load copy just support text, not support binary
                if (copyMode != CopyMode.STREAM) {
                    copyContext.os =
                            new RecordTextOutputStream(
                                    copyContext.ios,
                                    schema,
                                    copyContext.pgConn.unwrap(BaseConnection.class),
                                    1024 * 1024 * 10);
                } else {
                    copyContext.os =
                            binary
                                    ? new RecordBinaryOutputStream(
                                            copyContext.ios,
                                            schema,
                                            copyContext.pgConn.unwrap(BaseConnection.class),
                                            1024 * 1024 * 10)
                                    : new RecordTextOutputStream(
                                            copyContext.ios,
                                            schema,
                                            copyContext.pgConn.unwrap(BaseConnection.class),
                                            1024 * 1024 * 10);
                }
            }
            copyContext.os.putRecord(jdbcRecord);
            if (param.isEnableAggressive() && copyMode == CopyMode.STREAM) {
                copyContext.ios.flush();
            }
        } catch (SQLException e) {
            LOG.error("close copyContext", e);
            copyContext.close();
            throw new IOException(e);
        }
        return jdbcRecord.getByteSize();
    }