in client/migrationx/migrationx-domain/migrationx-domain-datago/src/main/java/com/aliyun/dataworks/migrationx/domain/dataworks/datago/model/OldDataX2DI.java [256:349]
private void toDIParameter(JsonObject parameter, String config) {
if (StringUtils.isNotBlank(config)) {
BaseModel baseModel = GsonUtils.defaultGson.fromJson(config, new TypeToken<BaseModel>() {}.getType());
CDPTypeEnum cdpTypeEnum = baseModel.getResourceType();
parameter.addProperty(DI_TYPE, cdpTypeEnum.getD2DataXType());
if (CollectionUtils.isNotEmpty(baseModel.getColumns())) {
JsonArray columnArr = new JsonArray();
if (TypeofEnum.OSS_READER.equals(baseModel.getTypedef()) || TypeofEnum.SFTP_READER.equals(
baseModel.getTypedef())) {
baseModel.getColumns().forEach(column -> {
JsonObject object = new JsonObject();
object.addProperty(INDEX, column.getIndex());
object.addProperty(TYPE, column.getType());
object.addProperty(DataXConstants.DI_COLUMN_NAME, column.getIndex());
columnArr.add(object);
});
} else {
baseModel.getColumns().forEach(column -> columnArr.add(column.getName()));
}
parameter.add(DataXConstants.DI_COLUMN, columnArr);
parameter.remove(COLUMNS);
}
parameter.addProperty(DataXConstants.DI_DATASOURCE, baseModel.getDatasourceName());
if (parameter.has(DataXConstants.TABLE_NAME)) {
String tableName = parameter.get(DataXConstants.TABLE_NAME).getAsString();
String[] parts = StringUtils.split(tableName, ".");
if (parts != null && parts.length > 1) {
tableName = parts[parts.length - 1];
}
parameter.addProperty(DataXConstants.DI_TABLE, tableName);
parameter.remove(DataXConstants.TABLE_NAME);
}
if (parameter.has(FILTER_SQL)) {
parameter.addProperty(DataXConstants.DI_WHERE, parameter.get(FILTER_SQL).getAsString());
parameter.remove(FILTER_SQL);
}
if (parameter.has(SPLIT_PK)) {
parameter.addProperty(DataXConstants.DI_SPLIT_PK, parameter.get(SPLIT_PK).getAsString());
parameter.remove(SPLIT_PK);
}
if (TypeofEnum.MYSQL_READER.equals(baseModel.getTypedef()) && parameter.has(DataXConstants.TABLE_NAME)) {
DITask.Connection connection = new DITask.Connection();
connection.setDatasource(baseModel.getDatasourceName());
connection.setTable(parameter.get(DataXConstants.TABLE_NAME).getAsString().split(","));
List<DITask.Connection> connectionList = new ArrayList<>();
connectionList.add(connection);
parameter.add(DataXConstants.DI_CONNECTION, GsonUtils.defaultGson.toJsonTree(connectionList));
}
if (parameter.has(PT)) {
String param = parameter.get(PT).getAsString();
if (param.contains("hour") && !param.contains("hour=")) {
param = param.replace("hour", "hour=@@[HH-1h]");
}
if (TypeofEnum.ODPS_READER.equals(baseModel.getTypedef()) && StringUtils.isNotBlank(param)) {
JsonArray partition = new JsonArray();
if (StringUtils.isNotBlank(param)) {
Arrays.stream(param.split(",")).forEach(s -> {
if (!s.contains("=") && s.contains("hour")) {
partition.add("hour=@@[HH-1h]");
} else {
partition.add(s);
}
});
}
parameter.add(DataXConstants.DI_PARTITION, partition);
} else {
parameter.addProperty(DataXConstants.DI_PARTITION, param);
}
parameter.remove(PT);
}
if (parameter.has(DataXConstants.WRITE_NODE)) {
String writeMode = parameter.get(DataXConstants.WRITE_NODE).getAsString();
if (DataXConstants.INSERT.equals(writeMode)) {
parameter.addProperty(DataXConstants.WRITE_NODE, DataXConstants.DI_INSERT);
} else if (DataXConstants.UPDATE.equals(writeMode)) {
parameter.addProperty(DataXConstants.WRITE_NODE, DataXConstants.DI_UPDATE);
} else if (DataXConstants.REPLACE.equals(writeMode)) {
parameter.addProperty(DataXConstants.WRITE_NODE, DataXConstants.DI_REPLACE);
}
}
if (parameter.has(FIELD_DELIMITER)) {
String fieldDelimiter = parameter.get(FIELD_DELIMITER).getAsString();
parameter.addProperty(DataXConstants.DI_FIELD_DELIMITER_ORIGIN, StringEscapeUtils.escapeJava(fieldDelimiter));
if (fieldDelimiter.length() > 1 && !parameter.has(DataXConstants.DI_FILE_FORMAT)) {
parameter.addProperty(DataXConstants.DI_FILE_FORMAT, "text");
}
}
if (parameter.has(MARK_DONE_FILE_NAME)) {
parameter.addProperty(DataXConstants.DI_MARK_DONE_FILE_NAME, parameter.get(MARK_DONE_FILE_NAME).getAsString());
}
}
}