public void startWrite()

in odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/OdpsWriter.java [521:693]


        public void startWrite(RecordReceiver recordReceiver) {
            blocks = new ArrayList<Long>();
            List<Long> currentWriteBlocks;

            AtomicLong blockId = new AtomicLong(0);

            List<Integer> columnPositions = this.sliceConfig.getList(Constant.COLUMN_POSITION,
                    Integer.class);

            try {
                TaskPluginCollector taskPluginCollector = super.getTaskPluginCollector();

                OdpsWriterProxy proxy;
                // 可以配置化,保平安
                boolean checkWithGetSize = this.sliceConfig.getBool("checkWithGetSize", true);
                if (!supportDynamicPartition) {
                    if (this.consistencyCommit) {
                        proxy = new OdpsWriterProxy(this.workerUpload, this.blockSizeInMB, blockId, taskId, taskCount,
                                columnPositions, taskPluginCollector, this.emptyAsNull, this.isCompress, checkWithGetSize, this.allColumns, this.writeTimeOutInMs, this.sliceConfig, this.overLengthRule, this.maxFieldLength, this.enableOverLengthOutput);
                    } else {
                        proxy = new OdpsWriterProxy(this.workerUpload, this.blockSizeInMB, blockId,
                                columnPositions, taskPluginCollector, this.emptyAsNull, this.isCompress, checkWithGetSize, this.allColumns, false, this.writeTimeOutInMs, this.sliceConfig, this.overLengthRule, this.maxFieldLength, this.enableOverLengthOutput);
                    }
                    currentWriteBlocks = blocks;
                } else {
                    proxy = null;
                    currentWriteBlocks = null;
                }

                com.alibaba.datax.common.element.Record dataXRecord = null;

                PerfRecord blockClose = new PerfRecord(super.getTaskGroupId(), super.getTaskId(), PerfRecord.PHASE.ODPS_BLOCK_CLOSE);
                blockClose.start();
                long blockCloseUsedTime = 0;
                boolean columnCntChecked = false;
                while ((dataXRecord = recordReceiver.getFromReader()) != null) {
                    if (supportDynamicPartition) {
                        if (!columnCntChecked) {
                            // 动态分区模式下,读写两端的column数量必须相同
                            if (dataXRecord.getColumnNumber() != this.sliceConfig.getList(Key.COLUMN).size()) {
                                throw DataXException.asDataXException(OdpsWriterErrorCode.ILLEGAL_VALUE,
                                        "In dynamic partition write mode you must make sure reader and writer has same column count.");
                            }
                            columnCntChecked = true;
                        }

                        // 如果是动态分区模式,则需要根据record内容来选择proxy

                        String partitionFormatType = sliceConfig.getString("partitionFormatType");
                        String partition;
                        if("custom".equalsIgnoreCase(partitionFormatType)){
                            List<PartitionInfo> partitions = getListWithJson(sliceConfig,"customPartitionColumns",PartitionInfo.class);
                            List<UserDefinedFunction> functions = getListWithJson(sliceConfig,"customPartitionFunctions",UserDefinedFunction.class);

                            partition = CustomPartitionUtils.generate(dataXRecord,functions,
                                    partitions,sliceConfig.getList(Key.COLUMN, String.class));
                        }else{
                            partition = OdpsUtil.getPartColValFromDataXRecord(dataXRecord, columnPositions,
                                    this.sliceConfig.getList(Key.COLUMN, String.class),
                                    this.dateTransFormMap);
                            partition = OdpsUtil.formatPartition(partition, false);
                        }

                        Pair<OdpsWriterProxy, List<Long>> proxyBlocksPair = this.partitionUploadSessionHashMap.get(partition);
                        if (null != proxyBlocksPair) {
                            proxy = proxyBlocksPair.getLeft();
                            currentWriteBlocks = proxyBlocksPair.getRight();
                            if (null == proxy || null == currentWriteBlocks) {
                                throw DataXException.asDataXException("Get OdpsWriterProxy failed.");
                            }
                        } else {
                            /*
                             * 第一次写入该目标分区:处理truncate
                             * truncate 为 true,且还没有被truncate过,则truncate,加互斥锁
                             */
                            Boolean truncate = this.sliceConfig.getBool(Key.TRUNCATE);
                            if (truncate && !partitionsDealedTruncate.contains(partition)) {
                                synchronized (lockForPartitionDealedTruncate) {
                                    if (!partitionsDealedTruncate.contains(partition)) {
                                        LOG.info("Start to truncate partition {}", partition);
                                        OdpsUtil.dealTruncate(this.odps, this.table, partition, truncate);
                                        partitionsDealedTruncate.add(partition);
                                    }
                                /*
                                 * 判断分区是否创建过多,如果创建过多,则报错
                                 */
                                    if (partitionCnt.addAndGet(1) > maxPartitionCnt) {
                                        throw new DataXException("Create too many partitions. Please make sure you config the right partition column");
                                    }
                                }
                            }
                            TableTunnel.UploadSession uploadSession = OdpsUtil.createMasterTunnelUpload(tableTunnel, this.projectName,
                                    this.tableName, partition);
                            proxy = new OdpsWriterProxy(uploadSession, this.blockSizeInMB, blockId,
                                    columnPositions, taskPluginCollector, this.emptyAsNull, this.isCompress, checkWithGetSize, this.allColumns, true, this.writeTimeOutInMs, this.sliceConfig, this.overLengthRule, this.maxFieldLength, this.enableOverLengthOutput);
                            currentWriteBlocks = new ArrayList<>();
                            partitionUploadSessionHashMap.put(partition, new MutablePair<>(proxy, currentWriteBlocks));
                        }
                    }
                    blockCloseUsedTime += proxy.writeOneRecord(dataXRecord, currentWriteBlocks);

                    // 动态分区写入模式下,如果内存使用达到一定程度 80%,清理较久没有活动且缓存较多数据的分区
                    if (supportDynamicPartition) {
                        boolean isNeedFush = checkIfNeedFlush();
                        if (isNeedFush) {
                            LOG.info("====The memory used exceed 80%, start to clear...===");
                            int releaseCnt = 0;
                            int remainCnt = 0;
                            for (String onePartition : partitionUploadSessionHashMap.keySet()) {
                                OdpsWriterProxy oneIdleProxy = partitionUploadSessionHashMap.get(onePartition) == null ? null : partitionUploadSessionHashMap.get(onePartition).getLeft();
                                if (oneIdleProxy == null) {
                                    continue;
                                }

                                Long idleTime = System.currentTimeMillis() - oneIdleProxy.getLastActiveTime();
                                if (idleTime > Constant.PROXY_MAX_IDLE_TIME_MS || oneIdleProxy.getCurrentTotalBytes() > (this.blockSizeInMB*1014*1024 / 2)) {
                                    // 如果空闲一定时间,先把数据写出
                                    LOG.info("{} partition has no data last {} seconds, so release its uploadSession", onePartition, Constant.PROXY_MAX_IDLE_TIME_MS / 1000);
                                    currentWriteBlocks = partitionUploadSessionHashMap.get(onePartition).getRight();
                                    blockCloseUsedTime += oneIdleProxy.writeRemainingRecord(currentWriteBlocks);
                                    // 再清除
                                    partitionUploadSessionHashMap.put(onePartition, null);
                                    releaseCnt++;
                                } else {
                                    remainCnt++;
                                }
                            }

                            // 释放的不足够多,再释放一次,这次随机释放,直到释放数量达到一半
                            for (String onePartition : partitionUploadSessionHashMap.keySet()) {
                                if (releaseCnt >= remainCnt) {
                                    break;
                                }

                                if (partitionUploadSessionHashMap.get(onePartition) != null) {
                                    OdpsWriterProxy oneIdleProxy = partitionUploadSessionHashMap.get(onePartition).getLeft();
                                    currentWriteBlocks = partitionUploadSessionHashMap.get(onePartition).getRight();
                                    blockCloseUsedTime += oneIdleProxy.writeRemainingRecord(currentWriteBlocks);
                                    partitionUploadSessionHashMap.put(onePartition, null);

                                    releaseCnt++;
                                    remainCnt--;
                                }

                            }

                            this.latestFlushTime = System.currentTimeMillis();
                            LOG.info("===complete===");
                        }

                    }
                }

                // 对所有分区进行剩余 records 写入
                if (supportDynamicPartition) {
                    for (String partition : partitionUploadSessionHashMap.keySet()) {
                        if (partitionUploadSessionHashMap.get(partition) == null) {
                            continue;
                        }
                        proxy = partitionUploadSessionHashMap.get(partition).getLeft();
                        currentWriteBlocks = partitionUploadSessionHashMap.get(partition).getRight();
                        blockCloseUsedTime += proxy.writeRemainingRecord(currentWriteBlocks);
                        blockClose.end(blockCloseUsedTime);
                    }
                }
                else {
                    blockCloseUsedTime += proxy.writeRemainingRecord(blocks);
                    blockClose.end(blockCloseUsedTime);
                }
            } catch (Exception e) {
                throw DataXException.asDataXException(OdpsWriterErrorCode.WRITER_RECORD_FAIL, MESSAGE_SOURCE.message("odpswriter.4"), e);
            }
        }