in linkis-web/src/common/service/execute.js [36:181]
function Execute(data) {
this.executeTimout = null;
this.statusTimeout = null;
this.diagnosisTimeout = null;
this.diagnosisTimer = 300;
this.beginTime = Date.now();
this.progress = 0;
this.id = null;
this.status = null;
// Only used by the /api/jobhistory/${id}/get interface(仅/api/jobhistory/${id}/get接口使用)
this.taskID = null;
this.postType = process.env.NODE_ENV === 'sandbox' ? 'http' : (data.data.postType || 'socket');
delete data.data.postType;
this.data = data;
this.executionCode = null;
this.model = 'normal';
this.fromLine = data.fromLine || 1;
this.runType = data.data.runType;
this.event = new Vue();
this.run = false;
// Store the relevant information of the result set, use the log and result path inside(存储结果集的相关信息,用到里面的日志和结果路径)
this.resultsetInfo = null;
// Stores all information in the result set directory, which can be used to get a single result set(存储结果集目录下的所有信息,可用于拿到单个结果集)
this.resultList = [];
// the path of the current result set(当前结果集的path)
this.currentResultPath = '';
this.isDiagnosisFailed = false;
// Used to control the url to get the result set(用来控制获取结果集的url)
this.getResultUrl = data.getResultUrl || 'filesystem',
this.on('execute', () => {
this.run = true;
// The comment is reserved for when the request is initiated, when the first interface is not returned for a long time(注释是留作发起请求时,长时间没返回第一个接口时)
// timeoutCheck(this);
});
this.on('execute:queryState', () => {
this.queryStatus({ isKill: false });
this.queryProgress();
});
this.on('stateEnd', () => {
this.getResultPath();
});
this.on('getResultList', () => {
this.getResultList();
});
this.on('getResult', () => {
this.getFirstResult();
});
this.on('stop', () => {
clearTimeout(this.executeTimout);
clearTimeout(this.statusTimeout);
this.run = false;
});
this.on('kill', () => {
if (this.postType === 'http') {
this.queryStatus({ isKill: true });
}
});
// data is the return data of the interface(data是接口的返回数据)
// execute.data is the request data sent from the foreground to the background(execute.data是前台发送至后台的请求数据)
this.on('data', ({ data, execute }) => {
if (execute.postType !== 'socket') return;
// Here, we judge the situation that the first interface execute of websocket will directly return errorMsg(这里对websocket第一个接口execute直接会返回errorMsg的情况进行判断)
if (Object.prototype.hasOwnProperty.call(data.data, 'errorMsg')) {
execute.trigger('stop');
execute.trigger('error');
return;
}
const method = execute.data.method;
if (!execute.taskID) {
const socketTag = data.data && data.data.websocketTag;
if (execute.data.websocketTag === socketTag && data.method === method) {
// clearTimeout(execute.executeTimout);
timeoutCheck(execute);
api.fetch('/jobhistory/list', {
pageSize: 100,
status: 'Running,Inited,Scheduled',
}, 'get').then((rst) => {
execute.trigger('updateResource', rst.tasks.length);
});
deconstructExecute(execute, data.data);
this.trigger('steps', 'Inited');
}
} else {
const id = data.data && data.data.taskID;
let prefix = method.slice(0, method.lastIndexOf('/') + 1);
if (execute.taskID !== id) {
// For the case where the expired data is not updated when the temporary script is right-clicked multiple times(针对临时脚本多次右键点击的,过期数据不更新的情况)
if (data.data.status) {
return execute.trigger('updateExpireHistory', data.data);
}
return;
}
if (data.method === prefix + `${execute.id}/status`) {
reawakening(execute);
deconstructStatus(execute, data.data);
} else if (data.method === prefix + `${execute.id}/progressWithResource`) {
reawakening(execute);
execute.trigger('progress', data.data);
} else if (data.method === prefix + `${execute.id}/log`) {
reawakening(execute);
execute.trigger('log', data.data.log);
} else if (data.method === prefix + `${execute.id}/waitingSize`) {
const log = `**Waiting queue:${i18n.t('message.common.execute.waitingQueue', { num: data.data.waitingSize })}`;
execute.trigger('log', log);
execute.trigger('progress', { progress: 0, progressInfo: [], waitingSize: data.data.waitingSize });
clearTimeout(execute.executeTimout);
clearTimeout(execute.statusTimeout);
} else if (data.method === prefix + `${execute.id}/runtimeTuning`) {
execute.trigger('diagnosis', data.data.diagnosticInfo);
} else {
if (data.data.status) {
execute.trigger('updateExpireHistory', data.data);
}
clearTimeout(execute.executeTimout);
clearTimeout(execute.statusTimeout);
}
}
});
this.on('downgrade', ({ execute }) => {
execute.postType = 'http';
// When the script is running, if the socket is downgraded to http, the http interface is used to poll the progress(脚本运行中,由socket降级成http则使用http接口轮询进度)
// There are two possibilities when the websocket is downgraded, 1. The task has been started and the connection is disconnected before returning the id, 2. One is that the connection is lost before starting(当websocket降级时有两种可能,1,,任务已经启动,在返回id前就断了连接, 2.一种是在启动前就失去连接)
if (execute.run) {
if (this.id && this.taskID) {
execute.queryStatus({isKill: false});
} else {
// run the task again(重新跑任务)
console.warn('[websocket降级没有获取到taskID]');
// this.execute(); // Do not actively help users start tasks(不主动帮用户启动任务)
}
}
});
this.on('dataError', ({ data, execute }) => {
execute.run = false;
if (data.data.status === -1) {
router.push('/login');
Message.warning(data.message);
} else {
this.trigger('notice', {
type: 'warning',
msg: data.message,
autoJoin: false,
});
}
});
}