in wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/monitoring/metrics/SparkListener.java [297:363]
public void onTaskEnd(SparkListenerTaskEnd taskEndSpark) {
super.onTaskEnd(taskEndSpark);
Task taskEnd=new TaskEnd();
TaskInfo taskInfo= taskEndSpark.taskInfo();
taskEnd.setID(taskInfo.id());
taskEnd.setEventame("OnTaskGettingResult");
taskEnd.setHostIP(taskInfo.host());
taskEnd.setStringExecutorID(taskInfo.executorId());
taskEnd.setTaskStatus(taskInfo.status());
taskEnd.setTaskID(taskInfo.taskId());
taskEnd.setIndex(taskInfo.index());
taskEnd.setLaunchTime(taskInfo.launchTime());
taskEnd.setFinishTime(taskInfo.finishTime());
taskEnd.setDurationTime(taskInfo.duration());
taskEnd.setGettingTime(taskInfo.gettingResultTime());
// this. taskGettingResult.setStageID(taskGettingResult.stageId());
//this.taskEnd.setPartition(taskInfo.partitionId());
if(taskInfo.failed()){
taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.FAILED);
}
else if(taskInfo.finished()){
taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.FINISHED);
}
else if(taskInfo.killed()){
taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.KILLED);
}
else if(taskInfo.running()){
taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.RUNNING);
}
else if(taskInfo.successful()){
taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.SUCCESSFUL);
}
else if(taskInfo.speculative()){
taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.SPECULATIVE);
}
else {
taskEnd.setTaskStatusForRunning(null);
}
TaskMetrics taskMetrics= taskEndSpark.taskMetrics();
TaskMetric taskMetric= new TaskMetric();
taskMetric.setExecutorCPUTime(taskMetrics.executorCpuTime());
taskMetric.setExecutorDeserializeCpuTime(taskMetrics.executorDeserializeCpuTime());
taskMetric.setExecutorDeserializeTime(taskMetrics.executorDeserializeTime());
taskMetric.setDiskBytesSpilled(taskMetrics.diskBytesSpilled());
taskMetric.setExecutorRunTime(taskMetrics.executorRunTime());
taskMetric.setjvmGCTime(taskMetrics.jvmGCTime());
taskMetric.setPeakExecutionMemory(taskMetrics.peakExecutionMemory());
taskMetric.setResultSize(taskMetrics.resultSize());
taskMetric.setResultSerializationTime(taskMetrics.resultSerializationTime());
taskEnd.setTaskMetric(taskMetric);
this.listOfTasks.add(taskEnd);
this.taskObjects.add(taskEnd);
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(this.taskObjects);
producer.send(new ProducerRecord(kafkaTopic, "TaskEnd", baos.toByteArray()));
this.taskObjects= new ArrayList<>();
} catch (Exception e) {
e.printStackTrace();
}
}