in adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsInsertProxy.java [119:218]
public void startWriteWithConnection(RecordReceiver recordReceiver,
Connection connection,
int columnNumber) {
this.currentConnection = connection;
int batchSize = this.configuration.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE);
// 默认情况下bufferSize需要和batchSize一致
int bufferSize = this.configuration.getInt(Key.BUFFER_SIZE, batchSize);
// insert缓冲,多个分区排序后insert合并发送到ads
List<Record> writeBuffer = new ArrayList<Record>(bufferSize);
List<Record> deleteBuffer = null;
if (this.writeMode.equalsIgnoreCase(Constant.STREAMMODE)) {
// delete缓冲,多个分区排序后delete合并发送到ads
deleteBuffer = new ArrayList<Record>(bufferSize);
}
try {
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
if (this.writeMode.equalsIgnoreCase(Constant.INSERTMODE)) {
if (record.getColumnNumber() != columnNumber) {
// 源头读取字段列数与目的表字段写入列数不相等,直接报错
throw DataXException
.asDataXException(
DBUtilErrorCode.CONF_ERROR,
String.format(
"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
record.getColumnNumber(),
columnNumber));
}
writeBuffer.add(record);
if (writeBuffer.size() >= bufferSize) {
this.doBatchRecordWithPartitionSort(writeBuffer, Constant.INSERTMODE, bufferSize, batchSize);
writeBuffer.clear();
}
} else {
if (record.getColumnNumber() != columnNumber + 1) {
// 源头读取字段列数需要为目的表字段写入列数+1, 直接报错, 源头多了一列OP
throw DataXException
.asDataXException(
DBUtilErrorCode.CONF_ERROR,
String.format(
"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不满足源头多1列操作类型列. 请检查您的配置并作出修改.",
record.getColumnNumber(),
columnNumber));
}
String optionColumnValue = record.getColumn(this.opColumnIndex).asString();
OperationType operationType = OperationType.asOperationType(optionColumnValue);
if (operationType.isInsertTemplate()) {
writeBuffer.add(record);
if (this.lastDmlMode == null || this.lastDmlMode == Constant.INSERTMODE) {
this.lastDmlMode = Constant.INSERTMODE;
if (writeBuffer.size() >= bufferSize) {
this.doBatchRecordWithPartitionSort(writeBuffer, Constant.INSERTMODE, bufferSize, batchSize);
writeBuffer.clear();
}
} else {
this.lastDmlMode = Constant.INSERTMODE;
// 模式变换触发一次提交ads delete, 并进入insert模式
this.doBatchRecordWithPartitionSort(deleteBuffer, Constant.DELETEMODE, bufferSize, batchSize);
deleteBuffer.clear();
}
} else if (operationType.isDeleteTemplate()) {
deleteBuffer.add(record);
if (this.lastDmlMode == null || this.lastDmlMode == Constant.DELETEMODE) {
this.lastDmlMode = Constant.DELETEMODE;
if (deleteBuffer.size() >= bufferSize) {
this.doBatchRecordWithPartitionSort(deleteBuffer, Constant.DELETEMODE, bufferSize, batchSize);
deleteBuffer.clear();
}
} else {
this.lastDmlMode = Constant.DELETEMODE;
// 模式变换触发一次提交ads insert, 并进入delete模式
this.doBatchRecordWithPartitionSort(writeBuffer, Constant.INSERTMODE, bufferSize, batchSize);
writeBuffer.clear();
}
} else {
// 注意OP操作类型的脏数据, 这里不需要重试
this.taskPluginCollector.collectDirtyRecord(record, String.format("不支持您的更新类型:%s", optionColumnValue));
}
}
}
if (!writeBuffer.isEmpty()) {
//doOneRecord(writeBuffer, Constant.INSERTMODE);
this.doBatchRecordWithPartitionSort(writeBuffer, Constant.INSERTMODE, bufferSize, batchSize);
writeBuffer.clear();
}
// 2个缓冲最多一个不为空同时
if (null != deleteBuffer && !deleteBuffer.isEmpty()) {
//doOneRecord(deleteBuffer, Constant.DELETEMODE);
this.doBatchRecordWithPartitionSort(deleteBuffer, Constant.DELETEMODE, bufferSize, batchSize);
deleteBuffer.clear();
}
} catch (Exception e) {
throw DataXException.asDataXException(
DBUtilErrorCode.WRITE_DATA_ERROR, e);
} finally {
writeBuffer.clear();
DBUtil.closeDBResources(null, null, connection);
}
}