private void startWriteBinaryFile()

in osswriter/src/main/java/com/alibaba/datax/plugin/writer/osswriter/OssWriter.java [789:906]


        private void startWriteBinaryFile(RecordReceiver lineReceiver) {
            Record record;
            String currentObject = null;
            InitiateMultipartUploadRequest currentInitiateMultipartUploadRequest;
            InitiateMultipartUploadResult currentInitiateMultipartUploadResult = null;
            String lastUploadId = null;
            boolean gotData = false;
            List<PartETag> currentPartETags = null;
            int currentPartNumber = 1;
            Map<String, String> meta;

            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            long currentSize = 0;
            try {
                // warn
                boolean needInitMultipartTransform = true;
                while ((record = lineReceiver.getFromReader()) != null) {
                    Column column = record.getColumn(0);
                    meta = record.getMeta();
                    assert meta != null;
                    gotData = true;
                    String objectNameTmp = meta
                            .get(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.META_KEY_FILE_PATH);
                    String fullObjectNameTmp = String.format("%s%s", this.objectDir, objectNameTmp.substring(this.parentPathLength, objectNameTmp.length()));

                    // init: 2 condition begin new multipart upload
                    if (needInitMultipartTransform || !StringUtils.equals(currentObject, fullObjectNameTmp)) {
                        // 先将上一个分块上传的request complete掉
                        if (null != currentInitiateMultipartUploadResult) {
                            // 如果还有部分分库数据没有提交,则先提交
                            if (currentSize > 0) {
                                this.ossWriterProxy.uploadOnePart(byteArrayOutputStream.toByteArray(), currentPartNumber,
                                        currentInitiateMultipartUploadResult, currentPartETags, currentObject);
                                currentPartNumber++;
                                currentSize = 0;
                                byteArrayOutputStream.reset();
                            }
                            // TODO 如果当前文件是空文件
                            String commitKey = currentInitiateMultipartUploadResult.getKey();
                            LOG.info(String.format(
                                    "current object [%s] size %s, complete current multipart upload %s and begin new one",
                                    commitKey, currentPartNumber * this.blockSizeInByte,
                                    currentInitiateMultipartUploadResult.getUploadId()));
                            CompleteMultipartUploadRequest currentCompleteMultipartUploadRequest = new CompleteMultipartUploadRequest(
                                    this.bucket, commitKey, currentInitiateMultipartUploadResult.getUploadId(),
                                    currentPartETags);
                            CompleteMultipartUploadResult currentCompleteMultipartUploadResult = this.ossWriterProxy.completeMultipartUpload(
                                    currentCompleteMultipartUploadRequest);
                            lastUploadId = currentInitiateMultipartUploadResult.getUploadId();
                            LOG.info(String.format("final object [%s] etag is:[%s]", commitKey,
                                    currentCompleteMultipartUploadResult.getETag()));
                        }
                        // 这里发现一个全新的文件需要分块上传
                        currentObject = fullObjectNameTmp;
                        currentInitiateMultipartUploadRequest = this.ossWriterProxy.getInitiateMultipartUploadRequest(currentObject);
                        currentInitiateMultipartUploadResult = this.ossWriterProxy.initiateMultipartUpload(
                                currentInitiateMultipartUploadRequest);
                        currentPartETags = new ArrayList<PartETag>();
                        LOG.info(String.format("write to bucket: [%s] object: [%s] with oss uploadId: [%s]",
                                this.bucket, currentObject, currentInitiateMultipartUploadResult.getUploadId()));
                        // warn
                        needInitMultipartTransform = false;
                        currentPartNumber = 1;
                    }
                    // write: upload data to current object
                    byte[] data;
                    if (column instanceof BytesColumn) {
                        data = column.asBytes();
                        byteArrayOutputStream.write(data);
                        currentSize += data.length;
                    } else {
                        String message = "the type of column must be BytesColumn!";
                        throw DataXException.asDataXException(OssWriterErrorCode.Write_OBJECT_ERROR, message);
                    }
                    if (currentSize >= this.blockSizeInByte) {
                        this.ossWriterProxy.uploadOnePart(byteArrayOutputStream.toByteArray(), currentPartNumber,
                                currentInitiateMultipartUploadResult, currentPartETags, currentObject);
                        currentPartNumber++;
                        currentSize = 0;
                        byteArrayOutputStream.reset();
                    }
                }

                // TODO binary 模式读取,源头为空文件时是有问题的
                if (!gotData) {
                    LOG.info("Receive no data from the source.");
                    currentInitiateMultipartUploadRequest = new InitiateMultipartUploadRequest(this.bucket,
                            currentObject);
                    currentInitiateMultipartUploadResult = this.ossWriterProxy.initiateMultipartUpload(
                            currentInitiateMultipartUploadRequest);
                    currentPartETags = new ArrayList<PartETag>();
                }

                // warn: may be some data stall in byteArrayOutputStream
                if (byteArrayOutputStream.size() > 0) {
                    this.ossWriterProxy.uploadOnePart(byteArrayOutputStream.toByteArray(), currentPartNumber,
                            currentInitiateMultipartUploadResult, currentPartETags, currentObject);
                    currentPartNumber++;
                }

                // 避免重复提交
                if (!StringUtils.equals(lastUploadId, currentInitiateMultipartUploadResult.getUploadId())) {
                    CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(
                            this.bucket, currentObject, currentInitiateMultipartUploadResult.getUploadId(),
                            currentPartETags);
                    CompleteMultipartUploadResult completeMultipartUploadResult = this.ossWriterProxy.completeMultipartUpload(
                            completeMultipartUploadRequest);
                    LOG.info(String.format("final object etag is:[%s]", completeMultipartUploadResult.getETag()));
                }
            } catch (IOException e) {
                // 脏数据UnstructuredStorageWriterUtil.transportOneRecord已经记录,header
                // 都是字符串不认为有脏数据
                throw DataXException.asDataXException(OssWriterErrorCode.Write_OBJECT_ERROR, e.getMessage(), e);
            } catch (Exception e) {
                throw DataXException.asDataXException(OssWriterErrorCode.Write_OBJECT_ERROR, e.getMessage(), e);
            }
            LOG.info("end do write");
        }