in client/migrationx/migrationx-domain/migrationx-domain-datago/src/main/java/com/aliyun/dataworks/migrationx/domain/dataworks/datago/DataGoTask.java [366:426]
private void rewriteParameters(DITask.Step step) {
if ("sftp".equalsIgnoreCase(step.getStepType())) {
step.setStepType("ftp");
}
JsonObject parameter = step.getParameter();
if (parameter != null) {
if (!parameter.has(DataXConstants.DI_FIELD_DELIMITER_ORIGIN) ||
StringUtils.isBlank(parameter.get(DataXConstants.DI_FIELD_DELIMITER_ORIGIN).getAsString())) {
String fieldDelimiter = parameter.has(DataXConstants.DI_FIELD_DELIMITER) ?
parameter.get(DataXConstants.DI_FIELD_DELIMITER).getAsString() : null;
if (StringUtils.isNotBlank(fieldDelimiter)) {
parameter.addProperty(DataXConstants.DI_FIELD_DELIMITER_ORIGIN, StringEscapeUtils.escapeJava(fieldDelimiter));
}
}
if (!parameter.has(DataXConstants.DI_PATH)) {
String filePath = parameter.has(DataXConstants.DI_FILE_PATH) ? parameter.get(DataXConstants.DI_FILE_PATH).getAsString() : "";
if (StringUtils.isNotBlank(filePath)) {
JsonArray path = new JsonArray();
path.add(filePath);
parameter.add(DataXConstants.DI_PATH, path);
}
}
if (!parameter.has(DataXConstants.DI_FILE_FORMAT)) {
parameter.addProperty(DataXConstants.DI_FILE_FORMAT, "text");
}
if (parameter.has(DataXConstants.DI_COMPRESS)) {
String compress = parameter.get(DataXConstants.DI_COMPRESS).getAsString();
if ("NONE".equalsIgnoreCase(compress)) {
parameter.remove(DataXConstants.DI_COMPRESS);
}
}
/**
* odps reader 的partition必须为字符串数组
* "partition": ["pt=xxxx"]
*/
if ("odps".equalsIgnoreCase(step.getStepType())
&& "reader".equalsIgnoreCase(step.getCategory())
&& parameter.has(DataXConstants.DI_PARTITION)
) {
JsonElement partition = parameter.get(DataXConstants.DI_PARTITION);
if (partition.isJsonPrimitive()) {
String partitionStr = partition.getAsString();
JsonArray partitionArray = new JsonArray();
partitionArray.add(partitionStr);
parameter.add(DataXConstants.DI_PARTITION, partitionArray);
}
}
/**
* oceanbase_new -> apsaradb_for_oceanbase
*/
if (CDPTypeEnum.OCEAN_BASE_NEW.getCDPDataXType().equalsIgnoreCase(step.getStepType())) {
step.setStepType(CDPTypeEnum.OCEAN_BASE_NEW.getD2DataXType());
}
}
}