in odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/OdpsWriter.java [521:693]
public void startWrite(RecordReceiver recordReceiver) {
blocks = new ArrayList<Long>();
List<Long> currentWriteBlocks;
AtomicLong blockId = new AtomicLong(0);
List<Integer> columnPositions = this.sliceConfig.getList(Constant.COLUMN_POSITION,
Integer.class);
try {
TaskPluginCollector taskPluginCollector = super.getTaskPluginCollector();
OdpsWriterProxy proxy;
// 可以配置化,保平安
boolean checkWithGetSize = this.sliceConfig.getBool("checkWithGetSize", true);
if (!supportDynamicPartition) {
if (this.consistencyCommit) {
proxy = new OdpsWriterProxy(this.workerUpload, this.blockSizeInMB, blockId, taskId, taskCount,
columnPositions, taskPluginCollector, this.emptyAsNull, this.isCompress, checkWithGetSize, this.allColumns, this.writeTimeOutInMs, this.sliceConfig, this.overLengthRule, this.maxFieldLength, this.enableOverLengthOutput);
} else {
proxy = new OdpsWriterProxy(this.workerUpload, this.blockSizeInMB, blockId,
columnPositions, taskPluginCollector, this.emptyAsNull, this.isCompress, checkWithGetSize, this.allColumns, false, this.writeTimeOutInMs, this.sliceConfig, this.overLengthRule, this.maxFieldLength, this.enableOverLengthOutput);
}
currentWriteBlocks = blocks;
} else {
proxy = null;
currentWriteBlocks = null;
}
com.alibaba.datax.common.element.Record dataXRecord = null;
PerfRecord blockClose = new PerfRecord(super.getTaskGroupId(), super.getTaskId(), PerfRecord.PHASE.ODPS_BLOCK_CLOSE);
blockClose.start();
long blockCloseUsedTime = 0;
boolean columnCntChecked = false;
while ((dataXRecord = recordReceiver.getFromReader()) != null) {
if (supportDynamicPartition) {
if (!columnCntChecked) {
// 动态分区模式下,读写两端的column数量必须相同
if (dataXRecord.getColumnNumber() != this.sliceConfig.getList(Key.COLUMN).size()) {
throw DataXException.asDataXException(OdpsWriterErrorCode.ILLEGAL_VALUE,
"In dynamic partition write mode you must make sure reader and writer has same column count.");
}
columnCntChecked = true;
}
// 如果是动态分区模式,则需要根据record内容来选择proxy
String partitionFormatType = sliceConfig.getString("partitionFormatType");
String partition;
if("custom".equalsIgnoreCase(partitionFormatType)){
List<PartitionInfo> partitions = getListWithJson(sliceConfig,"customPartitionColumns",PartitionInfo.class);
List<UserDefinedFunction> functions = getListWithJson(sliceConfig,"customPartitionFunctions",UserDefinedFunction.class);
partition = CustomPartitionUtils.generate(dataXRecord,functions,
partitions,sliceConfig.getList(Key.COLUMN, String.class));
}else{
partition = OdpsUtil.getPartColValFromDataXRecord(dataXRecord, columnPositions,
this.sliceConfig.getList(Key.COLUMN, String.class),
this.dateTransFormMap);
partition = OdpsUtil.formatPartition(partition, false);
}
Pair<OdpsWriterProxy, List<Long>> proxyBlocksPair = this.partitionUploadSessionHashMap.get(partition);
if (null != proxyBlocksPair) {
proxy = proxyBlocksPair.getLeft();
currentWriteBlocks = proxyBlocksPair.getRight();
if (null == proxy || null == currentWriteBlocks) {
throw DataXException.asDataXException("Get OdpsWriterProxy failed.");
}
} else {
/*
* 第一次写入该目标分区:处理truncate
* truncate 为 true,且还没有被truncate过,则truncate,加互斥锁
*/
Boolean truncate = this.sliceConfig.getBool(Key.TRUNCATE);
if (truncate && !partitionsDealedTruncate.contains(partition)) {
synchronized (lockForPartitionDealedTruncate) {
if (!partitionsDealedTruncate.contains(partition)) {
LOG.info("Start to truncate partition {}", partition);
OdpsUtil.dealTruncate(this.odps, this.table, partition, truncate);
partitionsDealedTruncate.add(partition);
}
/*
* 判断分区是否创建过多,如果创建过多,则报错
*/
if (partitionCnt.addAndGet(1) > maxPartitionCnt) {
throw new DataXException("Create too many partitions. Please make sure you config the right partition column");
}
}
}
TableTunnel.UploadSession uploadSession = OdpsUtil.createMasterTunnelUpload(tableTunnel, this.projectName,
this.tableName, partition);
proxy = new OdpsWriterProxy(uploadSession, this.blockSizeInMB, blockId,
columnPositions, taskPluginCollector, this.emptyAsNull, this.isCompress, checkWithGetSize, this.allColumns, true, this.writeTimeOutInMs, this.sliceConfig, this.overLengthRule, this.maxFieldLength, this.enableOverLengthOutput);
currentWriteBlocks = new ArrayList<>();
partitionUploadSessionHashMap.put(partition, new MutablePair<>(proxy, currentWriteBlocks));
}
}
blockCloseUsedTime += proxy.writeOneRecord(dataXRecord, currentWriteBlocks);
// 动态分区写入模式下,如果内存使用达到一定程度 80%,清理较久没有活动且缓存较多数据的分区
if (supportDynamicPartition) {
boolean isNeedFush = checkIfNeedFlush();
if (isNeedFush) {
LOG.info("====The memory used exceed 80%, start to clear...===");
int releaseCnt = 0;
int remainCnt = 0;
for (String onePartition : partitionUploadSessionHashMap.keySet()) {
OdpsWriterProxy oneIdleProxy = partitionUploadSessionHashMap.get(onePartition) == null ? null : partitionUploadSessionHashMap.get(onePartition).getLeft();
if (oneIdleProxy == null) {
continue;
}
Long idleTime = System.currentTimeMillis() - oneIdleProxy.getLastActiveTime();
if (idleTime > Constant.PROXY_MAX_IDLE_TIME_MS || oneIdleProxy.getCurrentTotalBytes() > (this.blockSizeInMB*1014*1024 / 2)) {
// 如果空闲一定时间,先把数据写出
LOG.info("{} partition has no data last {} seconds, so release its uploadSession", onePartition, Constant.PROXY_MAX_IDLE_TIME_MS / 1000);
currentWriteBlocks = partitionUploadSessionHashMap.get(onePartition).getRight();
blockCloseUsedTime += oneIdleProxy.writeRemainingRecord(currentWriteBlocks);
// 再清除
partitionUploadSessionHashMap.put(onePartition, null);
releaseCnt++;
} else {
remainCnt++;
}
}
// 释放的不足够多,再释放一次,这次随机释放,直到释放数量达到一半
for (String onePartition : partitionUploadSessionHashMap.keySet()) {
if (releaseCnt >= remainCnt) {
break;
}
if (partitionUploadSessionHashMap.get(onePartition) != null) {
OdpsWriterProxy oneIdleProxy = partitionUploadSessionHashMap.get(onePartition).getLeft();
currentWriteBlocks = partitionUploadSessionHashMap.get(onePartition).getRight();
blockCloseUsedTime += oneIdleProxy.writeRemainingRecord(currentWriteBlocks);
partitionUploadSessionHashMap.put(onePartition, null);
releaseCnt++;
remainCnt--;
}
}
this.latestFlushTime = System.currentTimeMillis();
LOG.info("===complete===");
}
}
}
// 对所有分区进行剩余 records 写入
if (supportDynamicPartition) {
for (String partition : partitionUploadSessionHashMap.keySet()) {
if (partitionUploadSessionHashMap.get(partition) == null) {
continue;
}
proxy = partitionUploadSessionHashMap.get(partition).getLeft();
currentWriteBlocks = partitionUploadSessionHashMap.get(partition).getRight();
blockCloseUsedTime += proxy.writeRemainingRecord(currentWriteBlocks);
blockClose.end(blockCloseUsedTime);
}
}
else {
blockCloseUsedTime += proxy.writeRemainingRecord(blocks);
blockClose.end(blockCloseUsedTime);
}
} catch (Exception e) {
throw DataXException.asDataXException(OdpsWriterErrorCode.WRITER_RECORD_FAIL, MESSAGE_SOURCE.message("odpswriter.4"), e);
}
}