in osswriter/src/main/java/com/alibaba/datax/plugin/writer/osswriter/OssWriter.java [789:906]
private void startWriteBinaryFile(RecordReceiver lineReceiver) {
Record record;
String currentObject = null;
InitiateMultipartUploadRequest currentInitiateMultipartUploadRequest;
InitiateMultipartUploadResult currentInitiateMultipartUploadResult = null;
String lastUploadId = null;
boolean gotData = false;
List<PartETag> currentPartETags = null;
int currentPartNumber = 1;
Map<String, String> meta;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
long currentSize = 0;
try {
// warn
boolean needInitMultipartTransform = true;
while ((record = lineReceiver.getFromReader()) != null) {
Column column = record.getColumn(0);
meta = record.getMeta();
assert meta != null;
gotData = true;
String objectNameTmp = meta
.get(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.META_KEY_FILE_PATH);
String fullObjectNameTmp = String.format("%s%s", this.objectDir, objectNameTmp.substring(this.parentPathLength, objectNameTmp.length()));
// init: 2 condition begin new multipart upload
if (needInitMultipartTransform || !StringUtils.equals(currentObject, fullObjectNameTmp)) {
// 先将上一个分块上传的request complete掉
if (null != currentInitiateMultipartUploadResult) {
// 如果还有部分分库数据没有提交,则先提交
if (currentSize > 0) {
this.ossWriterProxy.uploadOnePart(byteArrayOutputStream.toByteArray(), currentPartNumber,
currentInitiateMultipartUploadResult, currentPartETags, currentObject);
currentPartNumber++;
currentSize = 0;
byteArrayOutputStream.reset();
}
// TODO 如果当前文件是空文件
String commitKey = currentInitiateMultipartUploadResult.getKey();
LOG.info(String.format(
"current object [%s] size %s, complete current multipart upload %s and begin new one",
commitKey, currentPartNumber * this.blockSizeInByte,
currentInitiateMultipartUploadResult.getUploadId()));
CompleteMultipartUploadRequest currentCompleteMultipartUploadRequest = new CompleteMultipartUploadRequest(
this.bucket, commitKey, currentInitiateMultipartUploadResult.getUploadId(),
currentPartETags);
CompleteMultipartUploadResult currentCompleteMultipartUploadResult = this.ossWriterProxy.completeMultipartUpload(
currentCompleteMultipartUploadRequest);
lastUploadId = currentInitiateMultipartUploadResult.getUploadId();
LOG.info(String.format("final object [%s] etag is:[%s]", commitKey,
currentCompleteMultipartUploadResult.getETag()));
}
// 这里发现一个全新的文件需要分块上传
currentObject = fullObjectNameTmp;
currentInitiateMultipartUploadRequest = this.ossWriterProxy.getInitiateMultipartUploadRequest(currentObject);
currentInitiateMultipartUploadResult = this.ossWriterProxy.initiateMultipartUpload(
currentInitiateMultipartUploadRequest);
currentPartETags = new ArrayList<PartETag>();
LOG.info(String.format("write to bucket: [%s] object: [%s] with oss uploadId: [%s]",
this.bucket, currentObject, currentInitiateMultipartUploadResult.getUploadId()));
// warn
needInitMultipartTransform = false;
currentPartNumber = 1;
}
// write: upload data to current object
byte[] data;
if (column instanceof BytesColumn) {
data = column.asBytes();
byteArrayOutputStream.write(data);
currentSize += data.length;
} else {
String message = "the type of column must be BytesColumn!";
throw DataXException.asDataXException(OssWriterErrorCode.Write_OBJECT_ERROR, message);
}
if (currentSize >= this.blockSizeInByte) {
this.ossWriterProxy.uploadOnePart(byteArrayOutputStream.toByteArray(), currentPartNumber,
currentInitiateMultipartUploadResult, currentPartETags, currentObject);
currentPartNumber++;
currentSize = 0;
byteArrayOutputStream.reset();
}
}
// TODO binary 模式读取,源头为空文件时是有问题的
if (!gotData) {
LOG.info("Receive no data from the source.");
currentInitiateMultipartUploadRequest = new InitiateMultipartUploadRequest(this.bucket,
currentObject);
currentInitiateMultipartUploadResult = this.ossWriterProxy.initiateMultipartUpload(
currentInitiateMultipartUploadRequest);
currentPartETags = new ArrayList<PartETag>();
}
// warn: may be some data stall in byteArrayOutputStream
if (byteArrayOutputStream.size() > 0) {
this.ossWriterProxy.uploadOnePart(byteArrayOutputStream.toByteArray(), currentPartNumber,
currentInitiateMultipartUploadResult, currentPartETags, currentObject);
currentPartNumber++;
}
// 避免重复提交
if (!StringUtils.equals(lastUploadId, currentInitiateMultipartUploadResult.getUploadId())) {
CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(
this.bucket, currentObject, currentInitiateMultipartUploadResult.getUploadId(),
currentPartETags);
CompleteMultipartUploadResult completeMultipartUploadResult = this.ossWriterProxy.completeMultipartUpload(
completeMultipartUploadRequest);
LOG.info(String.format("final object etag is:[%s]", completeMultipartUploadResult.getETag()));
}
} catch (IOException e) {
// 脏数据UnstructuredStorageWriterUtil.transportOneRecord已经记录,header
// 都是字符串不认为有脏数据
throw DataXException.asDataXException(OssWriterErrorCode.Write_OBJECT_ERROR, e.getMessage(), e);
} catch (Exception e) {
throw DataXException.asDataXException(OssWriterErrorCode.Write_OBJECT_ERROR, e.getMessage(), e);
}
LOG.info("end do write");
}