in inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java [430:555]
public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
if (!dataConfig.isValid()) {
throw new IllegalArgumentException("input dataConfig" + dataConfig + "is invalid please check");
}
TaskProfileDto profileDto = new TaskProfileDto();
Proxy proxy = getProxy(dataConfig);
profileDto.setProxy(proxy);
Task task = new Task();
// common attribute
task.setId(String.valueOf(dataConfig.getTaskId()));
task.setTaskType(dataConfig.getTaskType());
task.setGroupId(dataConfig.getInlongGroupId());
task.setStreamId(dataConfig.getInlongStreamId());
task.setChannel(DEFAULT_CHANNEL);
task.setIp(dataConfig.getIp());
task.setOp(dataConfig.getOp());
task.setDeliveryTime(dataConfig.getDeliveryTime());
task.setUuid(dataConfig.getUuid());
task.setVersion(dataConfig.getVersion());
task.setState(dataConfig.getState());
task.setPredefinedFields(dataConfig.getPredefinedFields());
task.setCycleUnit(CycleUnitType.REAL_TIME);
task.setTimeZone(dataConfig.getTimeZone());
if (dataConfig.getAuditVersion() == null) {
task.setAuditVersion(DEFAULT_AUDIT_VERSION);
} else {
task.setAuditVersion(dataConfig.getAuditVersion());
}
// set sink type
if (dataConfig.getDataReportType() == NORMAL_SEND_TO_DATAPROXY.ordinal()) {
task.setSink(DEFAULT_DATA_PROXY_SINK);
task.setProxySend(false);
} else if (dataConfig.getDataReportType() == 1) {
task.setSink(DEFAULT_DATA_PROXY_SINK);
task.setProxySend(true);
} else {
String mqType = dataConfig.getMqClusters().get(0).getMqType();
task.setMqClusters(GSON.toJson(dataConfig.getMqClusters()));
task.setTopicInfo(GSON.toJson(dataConfig.getTopicInfo()));
if (mqType.equals(MQType.PULSAR)) {
task.setSink(PULSAR_SINK);
} else if (mqType.equals(MQType.KAFKA)) {
task.setSink(KAFKA_SINK);
} else {
throw new IllegalArgumentException("invalid mq type " + mqType + " please check");
}
}
task.setRetry(false);
TaskTypeEnum taskType = TaskTypeEnum.getTaskType(dataConfig.getTaskType());
switch (requireNonNull(taskType)) {
case SQL:
SQLTask sqlTask = getSQLTask(dataConfig);
task.setCycleUnit(sqlTask.getCycleUnit());
task.setSqlTask(sqlTask);
task.setRetry(sqlTask.getRetry());
profileDto.setTask(task);
break;
case BINLOG:
BinlogTask binlogTask = getBinlogTask(dataConfig);
task.setBinlogTask(binlogTask);
profileDto.setTask(task);
break;
case FILE:
FileTask fileTask = getFileTask(dataConfig);
task.setCycleUnit(fileTask.getCycleUnit());
task.setFileTask(fileTask);
task.setRetry(fileTask.getRetry());
profileDto.setTask(task);
break;
case KAFKA:
KafkaTask kafkaTask = getKafkaTask(dataConfig);
task.setKafkaTask(kafkaTask);
profileDto.setTask(task);
break;
case PULSAR:
PulsarTask pulsarTask = getPulsarTask(dataConfig);
task.setPulsarTask(pulsarTask);
profileDto.setTask(task);
break;
case POSTGRES:
PostgreSQLTask postgreSQLTask = getPostgresTask(dataConfig);
task.setPostgreSQLTask(postgreSQLTask);
profileDto.setTask(task);
break;
case ORACLE:
OracleTask oracleTask = getOracleTask(dataConfig);
task.setOracleTask(oracleTask);
profileDto.setTask(task);
break;
case SQLSERVER:
SqlServerTask sqlserverTask = getSqlServerTask(dataConfig);
task.setSqlserverTask(sqlserverTask);
profileDto.setTask(task);
break;
case MONGODB:
MongoTask mongoTask = getMongoTask(dataConfig);
task.setMongoTask(mongoTask);
profileDto.setTask(task);
break;
case REDIS:
RedisTask redisTask = getRedisTask(dataConfig);
task.setRedisTask(redisTask);
profileDto.setTask(task);
break;
case MQTT:
MqttTask mqttTask = getMqttTask(dataConfig);
task.setMqttTask(mqttTask);
profileDto.setTask(task);
break;
case MOCK:
profileDto.setTask(task);
break;
case COS:
COSTask cosTask = getCOSTask(dataConfig);
task.setCycleUnit(cosTask.getCycleUnit());
task.setCosTask(cosTask);
task.setRetry(cosTask.getRetry());
profileDto.setTask(task);
break;
default:
logger.error("invalid task type {}", taskType);
}
return TaskProfile.parseJsonStr(GSON.toJson(profileDto));
}