public void startWriteWithConnection()

in adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsInsertProxy.java [119:218]


    public void startWriteWithConnection(RecordReceiver recordReceiver,
                                         Connection connection,
                                         int columnNumber) {
        this.currentConnection = connection;
        int batchSize = this.configuration.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE);
        // 默认情况下bufferSize需要和batchSize一致
        int bufferSize = this.configuration.getInt(Key.BUFFER_SIZE, batchSize);
        // insert缓冲,多个分区排序后insert合并发送到ads
        List<Record> writeBuffer = new ArrayList<Record>(bufferSize);
        List<Record> deleteBuffer = null;
        if (this.writeMode.equalsIgnoreCase(Constant.STREAMMODE)) {
            // delete缓冲,多个分区排序后delete合并发送到ads
            deleteBuffer = new ArrayList<Record>(bufferSize);
        }
        try {
            Record record;
            while ((record = recordReceiver.getFromReader()) != null) {
                if (this.writeMode.equalsIgnoreCase(Constant.INSERTMODE)) {
                    if (record.getColumnNumber() != columnNumber) {
                        // 源头读取字段列数与目的表字段写入列数不相等,直接报错
                        throw DataXException
                                .asDataXException(
                                        DBUtilErrorCode.CONF_ERROR,
                                        String.format(
                                                "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
                                                record.getColumnNumber(),
                                                columnNumber));
                    }
                    writeBuffer.add(record);
                    if (writeBuffer.size() >= bufferSize) {
                        this.doBatchRecordWithPartitionSort(writeBuffer, Constant.INSERTMODE, bufferSize, batchSize);
                        writeBuffer.clear();
                    }
                } else {
                    if (record.getColumnNumber() != columnNumber + 1) {
                        // 源头读取字段列数需要为目的表字段写入列数+1, 直接报错, 源头多了一列OP
                        throw DataXException
                                .asDataXException(
                                        DBUtilErrorCode.CONF_ERROR,
                                        String.format(
                                                "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不满足源头多1列操作类型列. 请检查您的配置并作出修改.",
                                                record.getColumnNumber(),
                                                columnNumber));
                    }
                    String optionColumnValue = record.getColumn(this.opColumnIndex).asString();
                    OperationType operationType = OperationType.asOperationType(optionColumnValue);
                    if (operationType.isInsertTemplate()) {
                        writeBuffer.add(record);
                        if (this.lastDmlMode == null || this.lastDmlMode == Constant.INSERTMODE) {
                            this.lastDmlMode = Constant.INSERTMODE;
                            if (writeBuffer.size() >= bufferSize) {
                                this.doBatchRecordWithPartitionSort(writeBuffer, Constant.INSERTMODE, bufferSize, batchSize);
                                writeBuffer.clear();
                            }
                        } else {
                            this.lastDmlMode = Constant.INSERTMODE;
                            // 模式变换触发一次提交ads delete, 并进入insert模式
                            this.doBatchRecordWithPartitionSort(deleteBuffer, Constant.DELETEMODE, bufferSize, batchSize);
                            deleteBuffer.clear();
                        }
                    } else if (operationType.isDeleteTemplate()) {
                        deleteBuffer.add(record);
                        if (this.lastDmlMode == null || this.lastDmlMode == Constant.DELETEMODE) {
                            this.lastDmlMode = Constant.DELETEMODE;
                            if (deleteBuffer.size() >= bufferSize) {
                                this.doBatchRecordWithPartitionSort(deleteBuffer, Constant.DELETEMODE, bufferSize, batchSize);
                                deleteBuffer.clear();
                            }
                        } else {
                            this.lastDmlMode = Constant.DELETEMODE;
                            // 模式变换触发一次提交ads insert, 并进入delete模式
                            this.doBatchRecordWithPartitionSort(writeBuffer, Constant.INSERTMODE, bufferSize, batchSize);
                            writeBuffer.clear();
                        }
                    } else {
                        // 注意OP操作类型的脏数据, 这里不需要重试
                        this.taskPluginCollector.collectDirtyRecord(record, String.format("不支持您的更新类型:%s", optionColumnValue));
                    }
                }
            }

            if (!writeBuffer.isEmpty()) {
                //doOneRecord(writeBuffer, Constant.INSERTMODE);
                this.doBatchRecordWithPartitionSort(writeBuffer, Constant.INSERTMODE, bufferSize, batchSize);
                writeBuffer.clear();
            }
            // 2个缓冲最多一个不为空同时
            if (null != deleteBuffer && !deleteBuffer.isEmpty()) {
                //doOneRecord(deleteBuffer, Constant.DELETEMODE);
                this.doBatchRecordWithPartitionSort(deleteBuffer, Constant.DELETEMODE, bufferSize, batchSize);
                deleteBuffer.clear();
            }
        } catch (Exception e) {
            throw DataXException.asDataXException(
                    DBUtilErrorCode.WRITE_DATA_ERROR, e);
        } finally {
            writeBuffer.clear();
            DBUtil.closeDBResources(null, null, connection);
        }
    }