in client/migrationx/migrationx-domain/migrationx-domain-caiyunjian/src/main/java/com/aliyun/dataworks/migrationx/domain/dataworks/caiyunjian/CaiyunjianTask.java [555:614]
private static void processResourceWriter(String type, JsonObject parameter, String category) {
if (!DataXConstants.DI_WRITER.equalsIgnoreCase(category)) {
return;
}
Set<CDPTypeEnum> resourceWriteTypes = new HashSet<>(Arrays.asList(CDPTypeEnum.OSS, CDPTypeEnum.SFTP));
if (resourceWriteTypes.stream().noneMatch(t -> t.getD2DataXType().equalsIgnoreCase(type))) {
return;
}
String resourceName = parameter.has(DataXConstants.RESOURCE_NAME) && parameter.get(DataXConstants.RESOURCE_NAME) != null ?
parameter.get(DataXConstants.RESOURCE_NAME).getAsString() : null;
parameter.addProperty(DataXConstants.DI_DATASOURCE, resourceName);
CDPTypeEnum resourceWriteType = resourceWriteTypes.stream()
.filter(t -> t.getD2DataXType().equalsIgnoreCase(type)).findFirst().orElse(null);
processFieldDelimiter(parameter);
if (parameter.has(DataXConstants.DI_HEADER)) {
String header = parameter.get(DataXConstants.DI_HEADER).getAsString();
JsonArray headerArr = new JsonArray();
if (StringUtils.isNotBlank(header)) {
String[] headers = StringUtils.split(header, ",");
for (String col : headers) {
headerArr.add(col);
}
}
parameter.add(DataXConstants.DI_HEADER, headerArr);
}
switch (resourceWriteType) {
case OSS: {
String objectName = parameter.has(DataXConstants.OBJECT_NAME) && parameter.get(DataXConstants.OBJECT_NAME) != null ?
parameter.get(DataXConstants.OBJECT_NAME).getAsString() : null;
String filePrefix = parameter.has(DataXConstants.FILE_PREFIX) && parameter.get(DataXConstants.FILE_PREFIX) != null ?
parameter.get(DataXConstants.FILE_PREFIX).getAsString() : null;
if (StringUtils.isNotBlank(objectName) && StringUtils.isNotBlank(filePrefix)) {
String ossObject = Joiner.on("/").join(objectName, filePrefix);
ossObject = Joiner.on("/").join(Arrays.stream(ossObject.split("/"))
.map(StringUtils::trim)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toList()));
parameter.addProperty(DataXConstants.DI_OSS_OBJECT, ossObject);
}
break;
}
case SFTP: {
String objectName = parameter.has(DataXConstants.OBJECT_NAME) && parameter.get(DataXConstants.OBJECT_NAME) != null ?
parameter.get(DataXConstants.OBJECT_NAME).getAsString() : null;
String filePrefix = parameter.has(DataXConstants.FILE_PREFIX) && parameter.get(DataXConstants.FILE_PREFIX) != null ?
parameter.get(DataXConstants.FILE_PREFIX).getAsString() : null;
parameter.addProperty(DataXConstants.DI_FILE_NAME, filePrefix);
parameter.addProperty(DataXConstants.DI_PATH, objectName);
break;
}
default:
}
}