in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java [148:179]
private void jobPipelineSummaryMetrics(List<SeaTunnelJobInstanceDto> records, JobMode jobMode) {
try {
ArrayList<Long> jobInstanceIdList = new ArrayList<>();
HashMap<Long, Long> jobInstanceIdAndJobEngineIdMap = new HashMap<>();
for (SeaTunnelJobInstanceDto jobInstance : records) {
if (jobInstance.getId() != null && jobInstance.getJobEngineId() != null) {
jobInstanceIdList.add(jobInstance.getId());
jobInstanceIdAndJobEngineIdMap.put(
jobInstance.getId(), Long.valueOf(jobInstance.getJobEngineId()));
}
}
Map<Long, JobSummaryMetricsRes> jobSummaryMetrics =
jobMetricsService.getALLJobSummaryMetrics(
jobInstanceIdAndJobEngineIdMap, jobInstanceIdList, jobMode);
for (SeaTunnelJobInstanceDto taskInstance : records) {
if (jobSummaryMetrics.get(taskInstance.getId()) != null) {
taskInstance.setWriteRowCount(
jobSummaryMetrics.get(taskInstance.getId()).getWriteRowCount());
taskInstance.setReadRowCount(
jobSummaryMetrics.get(taskInstance.getId()).getReadRowCount());
}
}
} catch (Exception e) {
for (SeaTunnelJobInstanceDto taskInstance : records) {
log.error(
"instance {} {} set instance and engine id error", taskInstance.getId(), e);
}
}
}