in seatunnel-ui/src/views/task/synchronization-definition/dag/use-model.ts [30:361]
export function useNodeModel(
type: string,
transformType: string,
predecessorsNodeId: string,
schemaError: any,
currentNodeId: string,
refForm: any
) {
const { createColumns } = useModelColumns()
const route = useRoute()
const state = reactive({
inputColumns: [] as TableColumns,
inputTableData: [] as ModelRecord[],
inputLoading: false,
outputColumns: [] as TableColumns,
outputTableData: [] as ModelRecord[],
allTableData: [],
schemaError: {} as any,
optionsOutputTableData: [],
transformOptions: {} as any,
secondTransformOptions: {} as any,
outputLoading: false,
currentTable: '',
selectedKeys: [] as string[],
tables: [] as string[],
columnSelectable: false,
database: '',
datasourceInstanceId: '',
datasourceName: '',
inputTableWidth: DefaultTableWidth,
outputTableWidth: DefaultTableWidth,
format: ''
})
let tempOutputTables = [] as ModelRecord[]
const getModelData = () => {
// The sink or transform model does not have an output table structure, and
// the input table structure will be rendered according to the upstream node
// id to obtain the output table structure.
if (type === 'sink' || type === 'transform') {
// if sql-transform, request when sql is not empty
if(transformType === 'Sql' && !refForm.value.getValues()?.query) return
// Request the data of the previous node.
predecessorsNodeId && queryTaskDetail(
route.params.jobDefinitionCode as string,
predecessorsNodeId
).then((res: any) => {
state.tables = res.outputSchema.map((o: any) => o.tableName)
state.currentTable = res.outputSchema[0].tableName
state.allTableData = [{ database: res.outputSchema[0].database, tableInfos: res.outputSchema }] as any
if (res.transformOptions) {
state.transformOptions = res.transformOptions
}
onSwitchTable(state.currentTable)
})
} else {
modelInfo(state.datasourceInstanceId, [{
database: state.database,
tables: state.tables
}]).then((res: any) => {
state.allTableData = res
onSwitchTable(state.currentTable)
})
}
}
const getColumns = () => {
const { inputColumns, outputColumns, inputTableWidth, outputTableWidth } =
createColumns({
nodeType: type,
columnSelectable: state.columnSelectable,
transformType,
outputTableData: state.outputTableData
}) as {
inputColumns: TableColumns
outputColumns: TableColumns
inputTableWidth: number
outputTableWidth: number
outputTableData: ModelRecord[]
}
state.inputColumns = inputColumns
state.outputColumns = outputColumns
state.inputTableWidth = inputTableWidth
state.outputTableWidth = outputTableWidth
}
const getSqlTransformOutputData = () => {
let sqlQuery = refForm.value.getValues()?.query as string
return sqlModelInfo(route.params.jobDefinitionCode as string, predecessorsNodeId, {
"sourceFieldName": null,
"query": sqlQuery
}).then((res: any) => res.fields)
}
// Model indicates list click events.
const onSwitchTable = (table: string) => {
state.currentTable = table
if (state.format === 'COMPATIBLE_DEBEZIUM_JSON') return
(state.allTableData[0] as any).tableInfos.forEach(async (t: any) => {
if (t.tableName === state.currentTable) {
tempOutputTables = t.fields
state.inputTableData = t.fields.filter((f: any) => !f.isSplit)
if (state.columnSelectable) {
if (state.optionsOutputTableData && (state.optionsOutputTableData[0] as any).tableName === state.currentTable) {
// The default assignment of the source node output table structure.
const result = state.optionsOutputTableData ? (state.optionsOutputTableData[0] as any).fields : tempOutputTables.filter((row: ModelRecord) =>
state.selectedKeys.includes(row.name))
state.outputTableData = result.filter((r: any) => !r.unSupport)
} else {
state.outputTableData = tempOutputTables.filter((t: any) => !t.unSupport)
}
// Assign default values to the optional input table structure of the source node.
state.selectedKeys = state.outputTableData.filter((o: any) => !o.unSupport).map((o: any) => o.name)
} else {
if (transformType === 'FieldMapper') {
// When the transform is empty, the data of the input model is directly copied to the output model.
if (!state.secondTransformOptions && !state.transformOptions) {
state.outputTableData = state.inputTableData.map((i: any) => {
i.original_field = i.name
return i
})
return false
}
const transformOptions = state.transformOptions.changeOrders ? state.transformOptions : state.secondTransformOptions
state.outputTableData = state.optionsOutputTableData ?
(state.optionsOutputTableData[0] as any).fields :
t.fields.map((f: any, i: number) => {
return {
...f,
original_field: (transformOptions && Object.keys(transformOptions).length > 0) ?
transformOptions.changeOrders[i].sourceFieldName :
f.name
}
})
state.outputTableData = state.outputTableData.map((o: any, i: number) => {
if (!o.original_field) {
o.original_field = transformOptions.changeOrders[i].sourceFieldName
}
return o
})
state.outputTableData.forEach((o: any) => {
o.isError = o.original_field === state.schemaError.fieldName
})
} else if (transformType === 'MultiFieldSplit') {
state.outputTableData = state.optionsOutputTableData ? (state.optionsOutputTableData[0] as any).fields.map((f: any) => {
if (
state.transformOptions.splits ||
(
state.secondTransformOptions &&
state.secondTransformOptions.splits
)
) {
// When the data is echoed, it is judged whether the original
// field has a split field, and if so, the split button of the
// original field is disabled.
const transformOptions = state.transformOptions.splits ? state.transformOptions : state.secondTransformOptions
const needSplitDisabled = transformOptions.splits.map((t: any) => t.sourceFieldName)
f.splitDisabled = needSplitDisabled.includes(f.name)
// Add the split field to the original field value.
transformOptions.splits.forEach((t: any) => {
if (t.outputFields.includes(f.name)) {
f.original_field = t.sourceFieldName
f.separator = t.separator
f.isSplit = true
}
})
if (!f.original_field) {
f.original_field = f.name
}
}
return f
}) : t.fields.map((f: any) => {
return {
...f,
original_field: f.name
}
})
} else if (transformType === 'Copy') {
state.outputTableData = state.optionsOutputTableData ?
(state.optionsOutputTableData[0] as any).fields.map((f: any) => {
if (
state.transformOptions.copyList ||
(
state.secondTransformOptions &&
state.secondTransformOptions.copyList
)
) {
const transformOptions = state.transformOptions.copyList ? state.transformOptions : state.secondTransformOptions
// Echo and judge the delete button of the copied data. and add the copied field to the original field value.
transformOptions.copyList.forEach((t: any) => {
const inputTableData = state.inputTableData.map((i: any) => i.name)
if (!inputTableData.includes(f.name)) {
f.copyTimes = -1
if (t.targetFieldName === f.name) {
f.original_field = t.sourceFieldName
} else {
// Request the data of the current node.
currentNodeId && queryTaskDetail(
route.params.jobDefinitionCode as string,
currentNodeId
).then((res: any) => {
res.transformOptions && res.transformOptions.copyList.forEach((t: any) => {
if (t.targetFieldName === f.name) {
f.original_field = t.sourceFieldName
}
})
})
}
}
})
if (f.copyTimes !== -1) {
f.original_field = f.name
}
}
return f
}) :
t.fields.map((f: any) => {
return {
...f,
original_field: f.name
}
})
} else if(transformType === 'Sql'){
let table = await getSqlTransformOutputData()
state.outputTableData = table
} else if(transformType === 'JsonPath'){
// extract the dest_field in columns
const fixedInput = refForm.value.getValues().columns.replace(/=/g, ':');
const destFieldMatches = fixedInput.match(/"dest_field"\s*:\s*"([^"]*)"/g);
const destFields = destFieldMatches.map(match => match.match(/"([^"]+)"$/)[1]);
state.outputTableData = state.inputTableData.map(item => ({ ...item }));
destFields.map(item => {
state.outputTableData.push(
{
"format":"",
"type": "TEXT",
"name": item,
"comment": "",
"primaryKey": false,
"defaultValue": '',
"nullable": false,
"unSupport": false,
"outputDataType": "TEXT"
}
)
});
}
else {
state.outputTableData = t.fields
}
}
}
})
getColumns()
}
const onUpdatedCheckedRowKeys = (keys: any[]) => {
state.selectedKeys = keys
state.outputTableData = tempOutputTables.filter((row: ModelRecord) =>
keys.includes(row.name)
)
}
const onInit = (info: any) => {
state.currentTable = info.tables.length ? info.tables[0] : ''
state.tables = type === 'sink' ? [] : info.tables || []
state.columnSelectable = info.columnSelectable || false
state.database = info.database || ''
state.datasourceInstanceId = info.datasourceInstanceId || ''
state.format = info.format
state.transformOptions = info.transformOptions
// It is used to judge the output model of multiple continuously operable transform nodes.
state.secondTransformOptions = info.transformOptions
state.schemaError = schemaError
if (
(info.sceneMode && info.sceneMode === 'SINGLE_TABLE') ||
transformType === 'FieldMapper' ||
transformType === 'MultiFieldSplit' ||
transformType === 'Copy'
) {
state.optionsOutputTableData = info.outputSchema
}
// check debezium
if (state.format === 'COMPATIBLE_DEBEZIUM_JSON') {
state.inputTableData = [
{ name: 'topic', type: 'string' },
{ name: 'key', type: 'string' },
{ name: 'value', type: 'string' }
] as any
state.outputTableData = [
{ name: 'topic', type: 'string' },
{ name: 'key', type: 'string' },
{ name: 'value', type: 'string' }
] as any
} else {
(type === 'transform' || state.datasourceInstanceId) &&
(type === 'transform' || state.database) &&
((type === 'sink' || type === 'transform') || state.tables.length) &&
getModelData()
}
getColumns()
}
watch(
() => useI18n().locale,
() => {
getColumns()
}
)
return {
state,
onSwitchTable,
onUpdatedCheckedRowKeys,
onInit
}
}