in osswriter/src/main/java/com/alibaba/datax/plugin/writer/osswriter/OssWriter.java [913:1074]
private void startWriteUnstructedStorageFile(RecordReceiver lineReceiver, boolean generateEmptyFile){
// 设置每块字符串长度
long numberCacul = (this.maxFileSize * 1024 * 1024L) / this.blockSizeInByte;
final long maxPartNumber = numberCacul >= 1 ? numberCacul : 1;
int objectRollingNumber = 0;
Record record;
String currentObject = this.object;
if (this.isPeer2PeerCopyMode()) {
currentObject = null;
} else {
// 加上suffix
currentObject = appedSuffixTo(currentObject);
}
InitiateMultipartUploadRequest currentInitiateMultipartUploadRequest;
InitiateMultipartUploadResult currentInitiateMultipartUploadResult = null;
String lastUploadId = null;
boolean gotData = false;
List<PartETag> currentPartETags = null;
// to do:
// 可以根据currentPartNumber做分块级别的重试,InitiateMultipartUploadRequest多次一个currentPartNumber会覆盖原有
int currentPartNumber = 1;
Map<String, String> meta;
//warn: may be StringBuffer->StringBuilder
StringWriter sw = new StringWriter();
StringBuffer sb = sw.getBuffer();
UnstructuredWriter unstructuredWriter = UnstructuredStorageWriterUtil.
produceUnstructuredWriter(this.fileFormat, this.writerSliceConfig, sw);
LOG.info(String.format(
"begin do write, each object maxFileSize: [%s]MB...",
maxPartNumber * 10));
try {
// warn 源头可能是MySQL中,导致没有meta这个第一次初始化标示省不掉
boolean needInitMultipartTransform = true;
while ((record = lineReceiver.getFromReader()) != null) {
meta = record.getMeta();
gotData = true;
// init: 2 condition begin new multipart upload 轮转策略(文件名规则)不一致
// condition: 对等拷贝模式 && Record中的Meta切换文件名 &&
// condition: 类log4j日志轮转 && !对等拷贝模式
boolean realyNeedInitUploadRequest = false;
if (this.isPeer2PeerCopyMode()) {
assert meta != null;
String objectNameTmp = meta
.get(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.META_KEY_FILE_PATH);
String fullObjectNameTmp = String.format("%s%s", this.objectDir, objectNameTmp.substring(this.parentPathLength, objectNameTmp.length()));
if (!StringUtils.equals(currentObject, fullObjectNameTmp)) {
currentObject = fullObjectNameTmp;
realyNeedInitUploadRequest = true;
}
} else {
if (needInitMultipartTransform || currentPartNumber > maxPartNumber) {
currentObject = getCurrentObject(objectRollingNumber, record);
objectRollingNumber++;
realyNeedInitUploadRequest = true;
}
}
if (realyNeedInitUploadRequest) {
// 先将上一个分块上传的request complete掉
if (null != currentInitiateMultipartUploadResult) {
if (sb.length() > 0) {
this.uploadOnePart(sw, currentPartNumber, currentInitiateMultipartUploadResult,
currentPartETags, currentObject);
currentPartNumber++;
sb.setLength(0);
}
// TODO 如果当前文件是空文件
String commitKey = currentInitiateMultipartUploadResult.getKey();
LOG.info(String.format(
"current object [%s] size %s, complete current multipart upload %s and begin new one",
commitKey, currentPartNumber * this.blockSizeInByte,
currentInitiateMultipartUploadResult.getUploadId()));
CompleteMultipartUploadRequest currentCompleteMultipartUploadRequest = new CompleteMultipartUploadRequest(
this.bucket, commitKey, currentInitiateMultipartUploadResult.getUploadId(),
currentPartETags);
CompleteMultipartUploadResult currentCompleteMultipartUploadResult = this.ossWriterProxy.completeMultipartUpload(
currentCompleteMultipartUploadRequest);
lastUploadId = currentInitiateMultipartUploadResult.getUploadId();
LOG.info(String.format("final object [%s] etag is:[%s]", commitKey,
currentCompleteMultipartUploadResult.getETag()));
}
currentInitiateMultipartUploadRequest = this.ossWriterProxy.getInitiateMultipartUploadRequest(currentObject);
currentInitiateMultipartUploadResult = this.ossWriterProxy.initiateMultipartUpload(currentInitiateMultipartUploadRequest);
currentPartETags = new ArrayList<PartETag>();
LOG.info(String
.format("write to bucket: [%s] object: [%s] with oss uploadId: [%s]",
this.bucket, currentObject,
currentInitiateMultipartUploadResult
.getUploadId()));
// each object's header
if (null != this.header && !this.header.isEmpty()) {
unstructuredWriter.writeOneRecord(this.header);
}
// warn
needInitMultipartTransform = false;
currentPartNumber = 1;
}
// write: upload data to current object
UnstructuredStorageWriterUtil.transportOneRecord(record,
this.nullFormat, this.dateParse,
this.getTaskPluginCollector(), unstructuredWriter, this.byteEncoding);
if (sb.length() >= this.blockSizeInByte) {
this.uploadOnePart(sw, currentPartNumber,
currentInitiateMultipartUploadResult,
currentPartETags, currentObject);
currentPartNumber++;
sb.setLength(0);
}
}
if (!gotData) {
LOG.info("Receive no data from the source.");
currentInitiateMultipartUploadRequest = new InitiateMultipartUploadRequest(
this.bucket, currentObject);
currentInitiateMultipartUploadResult = this.ossWriterProxy.initiateMultipartUpload(currentInitiateMultipartUploadRequest);
currentPartETags = new ArrayList<PartETag>();
// each object's header
if (null != this.header && !this.header.isEmpty()) {
unstructuredWriter.writeOneRecord(this.header);
}
}
// warn: may be some data stall in sb
if (0 < sb.length()) {
this.uploadOnePart(sw, currentPartNumber,
currentInitiateMultipartUploadResult,
currentPartETags, currentObject);
}
// 避免重复提交
if (!StringUtils.equals(lastUploadId, currentInitiateMultipartUploadResult.getUploadId())) {
CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(
this.bucket, currentObject,
currentInitiateMultipartUploadResult.getUploadId(),
currentPartETags);
if (gotData) {
completeUpload(completeMultipartUploadRequest);
} else{
if (generateEmptyFile) {
LOG.info("Due to without data, oss will generate empty file, " +
"the generateEmptyFile is {}, you can set it false to avoid this",generateEmptyFile);
completeUpload(completeMultipartUploadRequest);
} else {
LOG.info("The generateEmptyFile is false, datax will not generate empty file");
}
}
}
} catch (IOException e) {
// 脏数据UnstructuredStorageWriterUtil.transportOneRecord已经记录,header
// 都是字符串不认为有脏数据
throw DataXException.asDataXException(
OssWriterErrorCode.Write_OBJECT_ERROR, e.getMessage(), e);
} catch (Exception e) {
throw DataXException.asDataXException(
OssWriterErrorCode.Write_OBJECT_ERROR, e.getMessage(), e);
}
LOG.info("end do write");
}