public void prepare()

in osswriter/src/main/java/com/alibaba/datax/plugin/writer/osswriter/OssWriter.java [163:280]


        public void prepare() {
            LOG.info("begin do prepare...");
            if(useHdfsWriterProxy){
                this.hdfsWriterJob.prepare();
                return;
            }
            this.bucket = this.writerSliceConfig.getString(Key.BUCKET);
            this.object = this.writerSliceConfig.getString(Key.OBJECT);
            String writeMode = this.writerSliceConfig
                    .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.WRITE_MODE);
            List<String> sourceFileName = this.peerPluginJobConf.getList(SOURCE_FILE_NAME, new ArrayList<String>(),
                    String.class);
            this.objectDir = this.getObjectDir(object);

            // 对等拷贝模式下将源头获取的文件列表在目的端删除
            if (this.isPeer2PeerCopyMode()) {
                String fullObjectName = null;
                String truncateMode = this.writerSliceConfig.getString("truncateMode", "objectMatch");
                // 前缀删除模式
                if ("prefix".equalsIgnoreCase(truncateMode)) {
                    BinaryFileWriterUtil.checkFileNameIfRepeatedThrowException(sourceFileName);
                    if (TRUNCATE.equals(writeMode)) {
                        LOG.info("You have configured [writeMode] [truncate], so the system will start to clear the objects starting with [{}] under [{}]. ", bucket, object);
                        // warn: 默认情况下,如果Bucket中的Object数量大于100,则只会返回100个Object
                        while (true) {
                            ObjectListing listing = null;
                            LOG.info("list objects with listObject(bucket, object)");
                            listing = this.ossClient.listObjects(bucket, object);
                            List<OSSObjectSummary> objectSummarys = listing
                                    .getObjectSummaries();
                            if (objectSummarys.isEmpty()) {
                                break;
                            }
                            List<String> objects2Delete = new ArrayList<String>();
                            for (OSSObjectSummary objectSummary : objectSummarys) {
                                objects2Delete.add(objectSummary.getKey());
                            }
                            LOG.info(String.format("[prefix truncate mode]delete oss object [%s].", JSON.toJSONString(objects2Delete)));
                            DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket);
                            deleteRequest.setKeys(objects2Delete);
                            deleteRequest.setQuiet(true);// 简单模式
                            DeleteObjectsResult deleteResult = this.ossClient.deleteObjects(deleteRequest);
                            assert deleteResult.getDeletedObjects().isEmpty();
                            LOG.warn("OSS request id:{}, objects delete failed:{}", deleteResult.getRequestId(),
                                    JSON.toJSONString(deleteResult.getDeletedObjects()));
                        }

                    }else {
                        throw DataXException.asDataXException(OssWriterErrorCode.ILLEGAL_VALUE,
                                "only support truncate writeMode in copy sync mode.");
                    }
                } else {
                    if (TRUNCATE.equals(writeMode)) {
                        sourceFileName = this.peerPluginJobConf.getList(com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.SOURCE_FILE, new ArrayList<String>(),
                                String.class);
                        List<String> readerPath =  this.peerPluginJobConf.getList(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.PATH, new ArrayList<String>(),
                                String.class);
                        int parentPathLength = OssWriter.parseParentPathLength(readerPath);
                        this.writerSliceConfig.set("__parentPathLength", parentPathLength);
                        BinaryFileWriterUtil.checkFileNameIfRepeatedThrowException(sourceFileName);

                        // 原样文件名删除模式
                        int splitCount = sourceFileName.size() / 1000 + 1;
                        List<List<String>> splitResult = RangeSplitUtil.doListSplit(sourceFileName, splitCount);
                        for (List<String> eachSlice : splitResult) {
                            assert eachSlice.size() <= 1000;
                            if (eachSlice.isEmpty()) {
                                continue;
                            }
                            List<String> ossObjFullPath = new ArrayList<String>();
                            for (String eachObj : eachSlice) {
                                fullObjectName = String.format("%s%s", objectDir, eachObj.substring(parentPathLength, eachObj.length()));
                                ossObjFullPath.add(fullObjectName);
                            }
                            LOG.info(String.format("[origin object name truncate mode]delete oss object [%s].", JSON.toJSONString(ossObjFullPath)));
                            DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket);
                            deleteRequest.setKeys(ossObjFullPath);
                            deleteRequest.setQuiet(true);// 简单模式
                            DeleteObjectsResult deleteResult = this.ossClient.deleteObjects(deleteRequest);
                            assert deleteResult.getDeletedObjects().isEmpty();
                            LOG.warn("OSS request id:{}, objects delete failed:{}", deleteResult.getRequestId(),
                                    JSON.toJSONString(deleteResult.getDeletedObjects()));
                        }
                    } else {
                        throw DataXException.asDataXException(OssWriterErrorCode.ILLEGAL_VALUE,
                                "only support truncate writeMode in copy sync mode.");
                    }
                }
                return;
            } else {
                // warn: 源头表不是半结构化或者不是对等copy模式时走前缀删除策略
                // warn: bucket is not exists, create it
                try {
                    // warn: do not create bucket for user
                    if (!this.ossClient.doesBucketExist(bucket)) {
                        // this.ossClient.createBucket(bucket);
                        String errorMessage = String.format("The [bucket]: %s you configured does not exist. Please confirm your configuration items. ", bucket);
                        LOG.error(errorMessage);
                        throw DataXException.asDataXException(
                                OssWriterErrorCode.ILLEGAL_VALUE, errorMessage);
                    }
                    LOG.info(String.format("access control details [%s].",
                            this.ossClient.getBucketAcl(bucket).toString()));

                    if (writeSingleObject) {
                        doPrepareForSingleObject(bucket, object, writeMode);
                    } else {
                        doPrepareForMutliObject(bucket, object, writeMode);
                    }
                } catch (OSSException e) {
                    throw DataXException.asDataXException(
                            OssWriterErrorCode.OSS_COMM_ERROR, e.getMessage(), e);
                } catch (ClientException e) {
                    throw DataXException.asDataXException(
                            OssWriterErrorCode.OSS_COMM_ERROR, e.getMessage(), e);
                }
            }
        }