in osswriter/src/main/java/com/alibaba/datax/plugin/writer/osswriter/OssWriter.java [163:280]
public void prepare() {
LOG.info("begin do prepare...");
if(useHdfsWriterProxy){
this.hdfsWriterJob.prepare();
return;
}
this.bucket = this.writerSliceConfig.getString(Key.BUCKET);
this.object = this.writerSliceConfig.getString(Key.OBJECT);
String writeMode = this.writerSliceConfig
.getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.WRITE_MODE);
List<String> sourceFileName = this.peerPluginJobConf.getList(SOURCE_FILE_NAME, new ArrayList<String>(),
String.class);
this.objectDir = this.getObjectDir(object);
// 对等拷贝模式下将源头获取的文件列表在目的端删除
if (this.isPeer2PeerCopyMode()) {
String fullObjectName = null;
String truncateMode = this.writerSliceConfig.getString("truncateMode", "objectMatch");
// 前缀删除模式
if ("prefix".equalsIgnoreCase(truncateMode)) {
BinaryFileWriterUtil.checkFileNameIfRepeatedThrowException(sourceFileName);
if (TRUNCATE.equals(writeMode)) {
LOG.info("You have configured [writeMode] [truncate], so the system will start to clear the objects starting with [{}] under [{}]. ", bucket, object);
// warn: 默认情况下,如果Bucket中的Object数量大于100,则只会返回100个Object
while (true) {
ObjectListing listing = null;
LOG.info("list objects with listObject(bucket, object)");
listing = this.ossClient.listObjects(bucket, object);
List<OSSObjectSummary> objectSummarys = listing
.getObjectSummaries();
if (objectSummarys.isEmpty()) {
break;
}
List<String> objects2Delete = new ArrayList<String>();
for (OSSObjectSummary objectSummary : objectSummarys) {
objects2Delete.add(objectSummary.getKey());
}
LOG.info(String.format("[prefix truncate mode]delete oss object [%s].", JSON.toJSONString(objects2Delete)));
DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket);
deleteRequest.setKeys(objects2Delete);
deleteRequest.setQuiet(true);// 简单模式
DeleteObjectsResult deleteResult = this.ossClient.deleteObjects(deleteRequest);
assert deleteResult.getDeletedObjects().isEmpty();
LOG.warn("OSS request id:{}, objects delete failed:{}", deleteResult.getRequestId(),
JSON.toJSONString(deleteResult.getDeletedObjects()));
}
}else {
throw DataXException.asDataXException(OssWriterErrorCode.ILLEGAL_VALUE,
"only support truncate writeMode in copy sync mode.");
}
} else {
if (TRUNCATE.equals(writeMode)) {
sourceFileName = this.peerPluginJobConf.getList(com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.SOURCE_FILE, new ArrayList<String>(),
String.class);
List<String> readerPath = this.peerPluginJobConf.getList(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.PATH, new ArrayList<String>(),
String.class);
int parentPathLength = OssWriter.parseParentPathLength(readerPath);
this.writerSliceConfig.set("__parentPathLength", parentPathLength);
BinaryFileWriterUtil.checkFileNameIfRepeatedThrowException(sourceFileName);
// 原样文件名删除模式
int splitCount = sourceFileName.size() / 1000 + 1;
List<List<String>> splitResult = RangeSplitUtil.doListSplit(sourceFileName, splitCount);
for (List<String> eachSlice : splitResult) {
assert eachSlice.size() <= 1000;
if (eachSlice.isEmpty()) {
continue;
}
List<String> ossObjFullPath = new ArrayList<String>();
for (String eachObj : eachSlice) {
fullObjectName = String.format("%s%s", objectDir, eachObj.substring(parentPathLength, eachObj.length()));
ossObjFullPath.add(fullObjectName);
}
LOG.info(String.format("[origin object name truncate mode]delete oss object [%s].", JSON.toJSONString(ossObjFullPath)));
DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket);
deleteRequest.setKeys(ossObjFullPath);
deleteRequest.setQuiet(true);// 简单模式
DeleteObjectsResult deleteResult = this.ossClient.deleteObjects(deleteRequest);
assert deleteResult.getDeletedObjects().isEmpty();
LOG.warn("OSS request id:{}, objects delete failed:{}", deleteResult.getRequestId(),
JSON.toJSONString(deleteResult.getDeletedObjects()));
}
} else {
throw DataXException.asDataXException(OssWriterErrorCode.ILLEGAL_VALUE,
"only support truncate writeMode in copy sync mode.");
}
}
return;
} else {
// warn: 源头表不是半结构化或者不是对等copy模式时走前缀删除策略
// warn: bucket is not exists, create it
try {
// warn: do not create bucket for user
if (!this.ossClient.doesBucketExist(bucket)) {
// this.ossClient.createBucket(bucket);
String errorMessage = String.format("The [bucket]: %s you configured does not exist. Please confirm your configuration items. ", bucket);
LOG.error(errorMessage);
throw DataXException.asDataXException(
OssWriterErrorCode.ILLEGAL_VALUE, errorMessage);
}
LOG.info(String.format("access control details [%s].",
this.ossClient.getBucketAcl(bucket).toString()));
if (writeSingleObject) {
doPrepareForSingleObject(bucket, object, writeMode);
} else {
doPrepareForMutliObject(bucket, object, writeMode);
}
} catch (OSSException e) {
throw DataXException.asDataXException(
OssWriterErrorCode.OSS_COMM_ERROR, e.getMessage(), e);
} catch (ClientException e) {
throw DataXException.asDataXException(
OssWriterErrorCode.OSS_COMM_ERROR, e.getMessage(), e);
}
}
}