private void startWriteUnstructedStorageFile()

in osswriter/src/main/java/com/alibaba/datax/plugin/writer/osswriter/OssWriter.java [913:1074]


        private void startWriteUnstructedStorageFile(RecordReceiver lineReceiver, boolean generateEmptyFile){
            // 设置每块字符串长度
            long numberCacul = (this.maxFileSize * 1024 * 1024L) / this.blockSizeInByte;
            final long maxPartNumber = numberCacul >= 1 ? numberCacul : 1;
            int objectRollingNumber = 0;
            Record record;
            String currentObject = this.object;
            if (this.isPeer2PeerCopyMode()) {
                currentObject = null;
            } else {
                // 加上suffix
                currentObject = appedSuffixTo(currentObject);
            }
            InitiateMultipartUploadRequest currentInitiateMultipartUploadRequest;
            InitiateMultipartUploadResult currentInitiateMultipartUploadResult = null;
            String lastUploadId = null;
            boolean gotData = false;
            List<PartETag> currentPartETags = null;
            // to do:
            // 可以根据currentPartNumber做分块级别的重试,InitiateMultipartUploadRequest多次一个currentPartNumber会覆盖原有
            int currentPartNumber = 1;
            Map<String, String> meta;

            //warn: may be StringBuffer->StringBuilder
            StringWriter sw = new StringWriter();
            StringBuffer sb = sw.getBuffer();
            UnstructuredWriter unstructuredWriter = UnstructuredStorageWriterUtil.
                    produceUnstructuredWriter(this.fileFormat, this.writerSliceConfig, sw);
            LOG.info(String.format(
                    "begin do write, each object maxFileSize: [%s]MB...",
                    maxPartNumber * 10));
            try {
                // warn 源头可能是MySQL中,导致没有meta这个第一次初始化标示省不掉
                boolean needInitMultipartTransform = true;
                while ((record = lineReceiver.getFromReader()) != null) {
                    meta = record.getMeta();
                    gotData = true;
                    // init: 2 condition begin new multipart upload 轮转策略(文件名规则)不一致
                    // condition: 对等拷贝模式 && Record中的Meta切换文件名 &&
                    // condition: 类log4j日志轮转 && !对等拷贝模式
                    boolean realyNeedInitUploadRequest = false;
                    if (this.isPeer2PeerCopyMode()) {
                        assert meta != null;
                        String objectNameTmp = meta
                                .get(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.META_KEY_FILE_PATH);
                        String fullObjectNameTmp = String.format("%s%s", this.objectDir, objectNameTmp.substring(this.parentPathLength, objectNameTmp.length()));
                        if (!StringUtils.equals(currentObject, fullObjectNameTmp)) {
                            currentObject = fullObjectNameTmp;
                            realyNeedInitUploadRequest = true;
                        }
                    } else {
                        if (needInitMultipartTransform || currentPartNumber > maxPartNumber) {
                            currentObject = getCurrentObject(objectRollingNumber, record);
                            objectRollingNumber++;
                            realyNeedInitUploadRequest = true;
                        }
                    }

                    if (realyNeedInitUploadRequest) {
                        // 先将上一个分块上传的request complete掉
                        if (null != currentInitiateMultipartUploadResult) {
                            if (sb.length() > 0) {
                                this.uploadOnePart(sw, currentPartNumber, currentInitiateMultipartUploadResult,
                                        currentPartETags, currentObject);
                                currentPartNumber++;
                                sb.setLength(0);
                            }
                            // TODO 如果当前文件是空文件
                            String commitKey = currentInitiateMultipartUploadResult.getKey();
                            LOG.info(String.format(
                                    "current object [%s] size %s, complete current multipart upload %s and begin new one",
                                    commitKey, currentPartNumber * this.blockSizeInByte,
                                    currentInitiateMultipartUploadResult.getUploadId()));
                            CompleteMultipartUploadRequest currentCompleteMultipartUploadRequest = new CompleteMultipartUploadRequest(
                                    this.bucket, commitKey, currentInitiateMultipartUploadResult.getUploadId(),
                                    currentPartETags);
                            CompleteMultipartUploadResult currentCompleteMultipartUploadResult = this.ossWriterProxy.completeMultipartUpload(
                                    currentCompleteMultipartUploadRequest);
                            lastUploadId = currentInitiateMultipartUploadResult.getUploadId();
                            LOG.info(String.format("final object [%s] etag is:[%s]", commitKey,
                                    currentCompleteMultipartUploadResult.getETag()));
                        }

                        currentInitiateMultipartUploadRequest = this.ossWriterProxy.getInitiateMultipartUploadRequest(currentObject);
                        currentInitiateMultipartUploadResult = this.ossWriterProxy.initiateMultipartUpload(currentInitiateMultipartUploadRequest);
                        currentPartETags = new ArrayList<PartETag>();
                        LOG.info(String
                                .format("write to bucket: [%s] object: [%s] with oss uploadId: [%s]",
                                        this.bucket, currentObject,
                                        currentInitiateMultipartUploadResult
                                                .getUploadId()));

                        // each object's header
                        if (null != this.header && !this.header.isEmpty()) {
                            unstructuredWriter.writeOneRecord(this.header);
                        }
                        // warn
                        needInitMultipartTransform = false;
                        currentPartNumber = 1;
                    }

                    // write: upload data to current object
                    UnstructuredStorageWriterUtil.transportOneRecord(record,
                            this.nullFormat, this.dateParse,
                            this.getTaskPluginCollector(), unstructuredWriter, this.byteEncoding);

                    if (sb.length() >= this.blockSizeInByte) {
                        this.uploadOnePart(sw, currentPartNumber,
                                currentInitiateMultipartUploadResult,
                                currentPartETags, currentObject);
                        currentPartNumber++;
                        sb.setLength(0);
                    }
                }

                if (!gotData) {
                    LOG.info("Receive no data from the source.");
                    currentInitiateMultipartUploadRequest = new InitiateMultipartUploadRequest(
                            this.bucket, currentObject);
                    currentInitiateMultipartUploadResult = this.ossWriterProxy.initiateMultipartUpload(currentInitiateMultipartUploadRequest);
                    currentPartETags = new ArrayList<PartETag>();
                    // each object's header
                    if (null != this.header && !this.header.isEmpty()) {
                        unstructuredWriter.writeOneRecord(this.header);
                    }
                }
                // warn: may be some data stall in sb
                if (0 < sb.length()) {
                    this.uploadOnePart(sw, currentPartNumber,
                            currentInitiateMultipartUploadResult,
                            currentPartETags, currentObject);
                }

                // 避免重复提交
                if (!StringUtils.equals(lastUploadId, currentInitiateMultipartUploadResult.getUploadId())) {
                    CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(
                            this.bucket, currentObject,
                            currentInitiateMultipartUploadResult.getUploadId(),
                            currentPartETags);
                    if (gotData) {
                        completeUpload(completeMultipartUploadRequest);
                    } else{
                        if (generateEmptyFile) {
                            LOG.info("Due to without data, oss will generate empty file, " +
                                    "the generateEmptyFile is {}, you can set it false to avoid this",generateEmptyFile);
                            completeUpload(completeMultipartUploadRequest);
                        } else {
                            LOG.info("The generateEmptyFile is false, datax will not generate empty file");
                        }
                    }
                }
            } catch (IOException e) {
                // 脏数据UnstructuredStorageWriterUtil.transportOneRecord已经记录,header
                // 都是字符串不认为有脏数据
                throw DataXException.asDataXException(
                        OssWriterErrorCode.Write_OBJECT_ERROR, e.getMessage(), e);
            } catch (Exception e) {
                throw DataXException.asDataXException(
                        OssWriterErrorCode.Write_OBJECT_ERROR, e.getMessage(), e);
            }
            LOG.info("end do write");
        }