in dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts [30:522]
export function formatParams(data: INodeData): {
workflowDefinitionCode: string
upstreamCodes: string
taskDefinitionJsonObj: object
} {
const rdbmsSourceTypes = ref(['MYSQL', 'ORACLE', 'SQLSERVER', 'HANA'])
const taskParams: ITaskParams = {}
if (data.taskType === 'SUB_WORKFLOW') {
taskParams.workflowDefinitionCode = data.workflowDefinitionCode
}
if (data.taskType === 'JAVA') {
taskParams.runType = data.runType
taskParams.mainArgs = data.mainArgs
taskParams.jvmArgs = data.jvmArgs
taskParams.isModulePath = data.isModulePath
taskParams.mainClass = data.mainClass
if (
(data.runType === 'FAT_JAR' || data.runType === 'NORMAL_JAR') &&
data.mainJar
) {
taskParams.mainJar = { resourceName: data.mainJar }
}
}
if (
data.taskType &&
['SPARK', 'MR', 'FLINK', 'FLINK_STREAM'].includes(data.taskType)
) {
taskParams.programType = data.programType
taskParams.mainClass = data.mainClass
if (data.mainJar) {
taskParams.mainJar = { resourceName: data.mainJar }
}
taskParams.deployMode = data.deployMode
taskParams.appName = data.appName
taskParams.mainArgs = data.mainArgs
taskParams.others = data.others
if (data.namespace) {
taskParams.namespace = data.namespace
}
taskParams.yarnQueue = data.yarnQueue
}
if (data.taskType === 'SPARK') {
taskParams.master = data.master
taskParams.driverCores = data.driverCores
taskParams.driverMemory = data.driverMemory
taskParams.numExecutors = data.numExecutors
taskParams.executorMemory = data.executorMemory
taskParams.executorCores = data.executorCores
taskParams.sqlExecutionType = data.sqlExecutionType
}
if (data.taskType === 'FLINK' || data.taskType === 'FLINK_STREAM') {
taskParams.flinkVersion = data.flinkVersion
taskParams.jobManagerMemory = data.jobManagerMemory
taskParams.taskManagerMemory = data.taskManagerMemory
taskParams.slot = data.slot
taskParams.taskManager = data.taskManager
taskParams.parallelism = data.parallelism
}
if (data.taskType === 'HTTP') {
taskParams.httpMethod = data.httpMethod
taskParams.httpBody = data.httpBody
taskParams.httpCheckCondition = data.httpCheckCondition
taskParams.httpParams = data.httpParams
taskParams.url = data.url
taskParams.condition = data.condition
taskParams.connectTimeout = data.connectTimeout
taskParams.socketTimeout = data.socketTimeout
}
if (data.taskType === 'SQOOP') {
taskParams.jobType = data.isCustomTask ? 'CUSTOM' : 'TEMPLATE'
taskParams.localParams = data.localParams
if (data.isCustomTask) {
taskParams.customShell = data.customShell
} else {
taskParams.jobName = data.jobName
taskParams.hadoopCustomParams = data.hadoopCustomParams
taskParams.sqoopAdvancedParams = data.sqoopAdvancedParams
taskParams.concurrency = data.concurrency
taskParams.splitBy = data.splitBy
taskParams.modelType = data.modelType
taskParams.sourceType = data.sourceType
taskParams.targetType = data.targetType
let targetParams: ISqoopTargetParams = {}
let sourceParams: ISqoopSourceParams = {}
if (data.targetType === 'HIVE') {
targetParams = {
hiveDatabase: data.targetHiveDatabase,
hiveTable: data.targetHiveTable,
createHiveTable: data.targetHiveCreateTable,
dropDelimiter: data.targetHiveDropDelimiter,
hiveOverWrite: data.targetHiveOverWrite,
hiveTargetDir: data.targetHiveTargetDir,
replaceDelimiter: data.targetHiveReplaceDelimiter,
hivePartitionKey: data.targetHivePartitionKey,
hivePartitionValue: data.targetHivePartitionValue
}
} else if (data.targetType === 'HDFS') {
targetParams = {
targetPath: data.targetHdfsTargetPath,
deleteTargetDir: data.targetHdfsDeleteTargetDir,
compressionCodec: data.targetHdfsCompressionCodec,
fileType: data.targetHdfsFileType,
fieldsTerminated: data.targetHdfsFieldsTerminated,
linesTerminated: data.targetHdfsLinesTerminated
}
} else if (
rdbmsSourceTypes.value.some((target) => target === data.targetType)
) {
targetParams = {
targetType: data.targetMysqlType,
targetDatasource: data.targetMysqlDatasource,
targetTable: data.targetMysqlTable,
targetColumns: data.targetMysqlColumns,
fieldsTerminated: data.targetMysqlFieldsTerminated,
linesTerminated: data.targetMysqlLinesTerminated,
isUpdate: data.targetMysqlIsUpdate,
targetUpdateKey: data.targetMysqlTargetUpdateKey,
targetUpdateMode: data.targetMysqlUpdateMode
}
}
if (rdbmsSourceTypes.value.some((target) => target === data.sourceType)) {
sourceParams = {
srcTable: data.srcQueryType === '1' ? '' : data.srcTable,
srcColumnType: data.srcQueryType === '1' ? '0' : data.srcColumnType,
srcColumns:
data.srcQueryType === '1' || data.srcColumnType === '0'
? ''
: data.srcColumns,
srcQuerySql:
data.srcQueryType === '0' ? '' : data.sourceMysqlSrcQuerySql,
srcQueryType: data.srcQueryType,
srcType: data.sourceMysqlType,
srcDatasource: data.sourceMysqlDatasource,
mapColumnHive: data.mapColumnHive,
mapColumnJava: data.mapColumnJava
}
} else if (data.sourceType === 'HDFS') {
sourceParams = {
exportDir: data.sourceHdfsExportDir
}
} else if (data.sourceType === 'HIVE') {
sourceParams = {
hiveDatabase: data.sourceHiveDatabase,
hiveTable: data.sourceHiveTable,
hivePartitionKey: data.sourceHivePartitionKey,
hivePartitionValue: data.sourceHivePartitionValue
}
}
taskParams.targetParams = JSON.stringify(targetParams)
taskParams.sourceParams = JSON.stringify(sourceParams)
}
}
if (data.taskType === 'SQL') {
taskParams.type = data.type
taskParams.datasource = data.datasource
taskParams.sql = data.sql
taskParams.sqlType = data.sqlType
taskParams.preStatements = data.preStatements
taskParams.postStatements = data.postStatements
taskParams.sendEmail = data.sendEmail
taskParams.displayRows = data.displayRows
if (data.sqlType === '0' && data.sendEmail) {
taskParams.title = data.title
taskParams.groupId = data.groupId
}
}
if (data.taskType === 'PROCEDURE') {
taskParams.type = data.type
taskParams.datasource = data.datasource
taskParams.method = data.method
}
if (data.taskType === 'SEATUNNEL') {
taskParams.startupScript = data.startupScript
taskParams.useCustom = data.useCustom
if (!data.useCustom) {
taskParams.rawScript = ''
}
if (data.startupScript?.includes('flink')) {
taskParams.runMode = data.runMode
taskParams.others = data.others
}
if (data.startupScript?.includes('spark')) {
taskParams.deployMode = data.deployMode
taskParams.master = data.master
taskParams.masterUrl = data.masterUrl
}
if (data.startupScript === 'seatunnel.sh') {
taskParams.deployMode = data.deployMode
taskParams.others = data.others
}
}
if (data.taskType === 'SWITCH') {
taskParams.switchResult = {}
taskParams.switchResult.dependTaskList = data.dependTaskList
taskParams.switchResult.nextNode = data.nextNode
}
if (data.taskType === 'CONDITIONS') {
taskParams.dependence = {
relation: data.relation,
dependTaskList: data.dependTaskList
}
taskParams.conditionResult = {}
if (data.successBranch) {
taskParams.conditionResult.successNode = [data.successBranch]
}
if (data.failedBranch) {
taskParams.conditionResult.failedNode = [data.failedBranch]
}
}
if (data.taskType === 'DATAX') {
taskParams.customConfig = data.customConfig ? 1 : 0
if (taskParams.customConfig === 0) {
taskParams.dsType = data.dsType
taskParams.dataSource = data.dataSource
taskParams.dtType = data.dtType
taskParams.dataTarget = data.dataTarget
taskParams.sql = data.sql
taskParams.targetTable = data.targetTable
taskParams.jobSpeedByte = data.jobSpeedByte
taskParams.jobSpeedRecord = data.jobSpeedRecord
taskParams.preStatements = data.preStatements
taskParams.postStatements = data.postStatements
} else {
taskParams.json = data.json
data?.localParams?.map((param: ILocalParam) => {
param.direct = 'IN'
param.type = 'VARCHAR'
})
}
taskParams.xms = data.xms
taskParams.xmx = data.xmx
}
if (data.taskType === 'DEPENDENT') {
taskParams.dependence = {
checkInterval: data.checkInterval,
failurePolicy: data.failurePolicy,
failureWaitingTime: data.failureWaitingTime,
relation: data.relation,
dependTaskList: data.dependTaskList
}
}
if (data.taskType === 'EMR') {
taskParams.type = data.type
taskParams.programType = data.programType
taskParams.jobFlowDefineJson = data.jobFlowDefineJson
taskParams.stepsDefineJson = data.stepsDefineJson
}
if (data.taskType === 'ZEPPELIN') {
taskParams.noteId = data.noteId
taskParams.paragraphId = data.paragraphId
taskParams.restEndpoint = data.restEndpoint
taskParams.username = data.username
taskParams.password = data.password
taskParams.productionNoteDirectory = data.productionNoteDirectory
taskParams.parameters = data.parameters
taskParams.datasource = data.datasource
taskParams.type = data.type
}
if (data.taskType === 'ALIYUN_SERVERLESS_SPARK') {
taskParams.workspaceId = data.workspaceId
taskParams.resourceQueueId = data.resourceQueueId
taskParams.codeType = data.codeType
taskParams.jobName = data.jobName
taskParams.engineReleaseVersion = data.engineReleaseVersion
taskParams.entryPoint = data.entryPoint
taskParams.entryPointArguments = data.entryPointArguments
taskParams.sparkSubmitParameters = data.sparkSubmitParameters
taskParams.isProduction = data.isProduction
taskParams.type = data.type
taskParams.datasource = data.datasource
}
if (data.taskType === 'K8S') {
taskParams.namespace = data.namespace
taskParams.minCpuCores = data.minCpuCores
taskParams.minMemorySpace = data.minMemorySpace
taskParams.image = data.image
taskParams.imagePullPolicy = data.imagePullPolicy
taskParams.command = data.command
taskParams.args = data.args
taskParams.customizedLabels = data.customizedLabels
taskParams.nodeSelectors = data.nodeSelectors
taskParams.datasource = data.datasource
taskParams.type = data.type
taskParams.kubeConfig = data.kubeConfig
taskParams.pullSecret = data.pullSecret
}
if (data.taskType === 'JUPYTER') {
taskParams.condaEnvName = data.condaEnvName
taskParams.inputNotePath = data.inputNotePath
taskParams.outputNotePath = data.outputNotePath
taskParams.parameters = data.parameters
taskParams.kernel = data.kernel
taskParams.engine = data.engine
taskParams.executionTimeout = data.executionTimeout
taskParams.startTimeout = data.startTimeout
taskParams.others = data.others
}
if (data.taskType === 'MLFLOW') {
taskParams.algorithm = data.algorithm
taskParams.params = data.params
taskParams.searchParams = data.searchParams
taskParams.dataPath = data.dataPath
taskParams.experimentName = data.experimentName
taskParams.modelName = data.modelName
taskParams.mlflowTrackingUri = data.mlflowTrackingUri
taskParams.mlflowJobType = data.mlflowJobType
taskParams.automlTool = data.automlTool
taskParams.registerModel = data.registerModel
taskParams.mlflowTaskType = data.mlflowTaskType
taskParams.deployType = data.deployType
taskParams.deployPort = data.deployPort
taskParams.deployModelKey = data.deployModelKey
taskParams.mlflowProjectRepository = data.mlflowProjectRepository
taskParams.mlflowProjectVersion = data.mlflowProjectVersion
}
if (data.taskType === 'DVC') {
taskParams.dvcTaskType = data.dvcTaskType
taskParams.dvcRepository = data.dvcRepository
taskParams.dvcVersion = data.dvcVersion
taskParams.dvcDataLocation = data.dvcDataLocation
taskParams.dvcMessage = data.dvcMessage
taskParams.dvcLoadSaveDataPath = data.dvcLoadSaveDataPath
taskParams.dvcStoreUrl = data.dvcStoreUrl
}
if (data.taskType === 'SAGEMAKER') {
taskParams.sagemakerRequestJson = data.sagemakerRequestJson
taskParams.username = data.username
taskParams.password = data.password
taskParams.datasource = data.datasource
taskParams.type = data.type
taskParams.awsRegion = data.awsRegion
}
if (data.taskType === 'PYTORCH') {
taskParams.script = data.script
taskParams.scriptParams = data.scriptParams
taskParams.pythonPath = data.pythonPath
taskParams.isCreateEnvironment = data.isCreateEnvironment
taskParams.pythonLauncher = data.pythonLauncher
taskParams.pythonEnvTool = data.pythonEnvTool
taskParams.requirements = data.requirements
taskParams.condaPythonVersion = data.condaPythonVersion
}
if (data.taskType === 'DINKY') {
taskParams.address = data.address
taskParams.taskId = data.taskId
taskParams.online = data.online
}
if (data.taskType === 'OPENMLDB') {
taskParams.zk = data.zk
taskParams.zkPath = data.zkPath
taskParams.executeMode = data.executeMode
taskParams.sql = data.sql
}
if (data.taskType === 'CHUNJUN') {
taskParams.customConfig = data.customConfig ? 1 : 0
taskParams.json = data.json
taskParams.deployMode = data.deployMode
taskParams.others = data.others
}
if (data.taskType === 'HIVECLI') {
taskParams.hiveCliTaskExecutionType = data.hiveCliTaskExecutionType
taskParams.hiveSqlScript = data.hiveSqlScript
taskParams.hiveCliOptions = data.hiveCliOptions
}
if (data.taskType === 'DMS') {
taskParams.isRestartTask = data.isRestartTask
taskParams.isJsonFormat = data.isJsonFormat
taskParams.jsonData = data.jsonData
taskParams.migrationType = data.migrationType
taskParams.replicationTaskIdentifier = data.replicationTaskIdentifier
taskParams.sourceEndpointArn = data.sourceEndpointArn
taskParams.targetEndpointArn = data.targetEndpointArn
taskParams.replicationInstanceArn = data.replicationInstanceArn
taskParams.tableMappings = data.tableMappings
taskParams.replicationTaskArn = data.replicationTaskArn
}
if (data.taskType === 'DATASYNC') {
taskParams.jsonFormat = data.jsonFormat
taskParams.json = data.json
taskParams.destinationLocationArn = data.destinationLocationArn
taskParams.sourceLocationArn = data.sourceLocationArn
taskParams.name = data.name
taskParams.cloudWatchLogGroupArn = data.cloudWatchLogGroupArn
}
if (data.taskType === 'KUBEFLOW') {
taskParams.yamlContent = data.yamlContent
taskParams.namespace = data.namespace
}
if (data.taskType === 'LINKIS') {
taskParams.useCustom = data.useCustom
taskParams.paramScript = data.paramScript
taskParams.rawScript = data.rawScript
}
if (data.taskType === 'DATA_FACTORY') {
taskParams.factoryName = data.factoryName
taskParams.resourceGroupName = data.resourceGroupName
taskParams.pipelineName = data.pipelineName
}
if (data.taskType === 'REMOTESHELL') {
taskParams.type = data.type
taskParams.datasource = data.datasource
}
let timeoutNotifyStrategy = ''
if (data.timeoutNotifyStrategy) {
if (data.timeoutNotifyStrategy.length === 1) {
timeoutNotifyStrategy = data.timeoutNotifyStrategy[0]
}
if (data.timeoutNotifyStrategy.length === 2) {
timeoutNotifyStrategy = 'WARNFAILED'
}
}
const params = {
workflowDefinitionCode: data.workflowDefinitionName
? String(data.workflowDefinitionName)
: '',
upstreamCodes: data?.preTasks?.join(','),
taskDefinitionJsonObj: {
code: data.code,
delayTime: data.delayTime ? String(data.delayTime) : '0',
description: data.description,
environmentCode: data.environmentCode || -1,
failRetryInterval: data.failRetryInterval
? String(data.failRetryInterval)
: '0',
failRetryTimes: data.failRetryTimes ? String(data.failRetryTimes) : '0',
flag: data.flag,
name: data.name,
taskGroupId: data.taskGroupId,
taskGroupPriority: data.taskGroupPriority,
taskParams: {
localParams: data.localParams?.map((item: any) => {
item.value = item.value || ''
return item
}),
initScript: data.initScript,
rawScript: data.rawScript,
resourceList: data.resourceList?.length
? data.resourceList.map((fullName: string) => ({
resourceName: `${fullName}`
}))
: [],
...taskParams
},
taskPriority: data.taskPriority,
taskType: data.taskType,
timeout: data.timeoutFlag ? data.timeout : 0,
timeoutFlag: data.timeoutFlag ? 'OPEN' : 'CLOSE',
timeoutNotifyStrategy: data.timeoutFlag ? timeoutNotifyStrategy : '',
workerGroup: data.workerGroup,
cpuQuota: data.cpuQuota || -1,
memoryMax: data.memoryMax || -1,
taskExecuteType: data.taskExecuteType
}
} as {
workflowDefinitionCode: string
upstreamCodes: string
taskDefinitionJsonObj: { timeout: number; timeoutNotifyStrategy: string }
}
if (!data.timeoutFlag) {
params.taskDefinitionJsonObj.timeout = 0
params.taskDefinitionJsonObj.timeoutNotifyStrategy = ''
}
return params
}