public void onTaskEnd()

in wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/monitoring/metrics/SparkListener.java [297:363]


    public void onTaskEnd(SparkListenerTaskEnd taskEndSpark) {
        super.onTaskEnd(taskEndSpark);
        Task taskEnd=new TaskEnd();
        TaskInfo taskInfo= taskEndSpark.taskInfo();
        taskEnd.setID(taskInfo.id());
        taskEnd.setEventame("OnTaskGettingResult");
        taskEnd.setHostIP(taskInfo.host());
        taskEnd.setStringExecutorID(taskInfo.executorId());
        taskEnd.setTaskStatus(taskInfo.status());
        taskEnd.setTaskID(taskInfo.taskId());
        taskEnd.setIndex(taskInfo.index());
        taskEnd.setLaunchTime(taskInfo.launchTime());
        taskEnd.setFinishTime(taskInfo.finishTime());
        taskEnd.setDurationTime(taskInfo.duration());
        taskEnd.setGettingTime(taskInfo.gettingResultTime());
        //  this. taskGettingResult.setStageID(taskGettingResult.stageId());
        //this.taskEnd.setPartition(taskInfo.partitionId());
        if(taskInfo.failed()){
          taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.FAILED);
        }
        else if(taskInfo.finished()){
           taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.FINISHED);
        }
        else if(taskInfo.killed()){
          taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.KILLED);
        }
        else  if(taskInfo.running()){
           taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.RUNNING);
        }
        else if(taskInfo.successful()){
           taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.SUCCESSFUL);
        }
        else if(taskInfo.speculative()){
            taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.SPECULATIVE);
        }
        else {
           taskEnd.setTaskStatusForRunning(null);
        }


        TaskMetrics taskMetrics= taskEndSpark.taskMetrics();
        TaskMetric taskMetric= new TaskMetric();
        taskMetric.setExecutorCPUTime(taskMetrics.executorCpuTime());
        taskMetric.setExecutorDeserializeCpuTime(taskMetrics.executorDeserializeCpuTime());
        taskMetric.setExecutorDeserializeTime(taskMetrics.executorDeserializeTime());
        taskMetric.setDiskBytesSpilled(taskMetrics.diskBytesSpilled());
        taskMetric.setExecutorRunTime(taskMetrics.executorRunTime());
        taskMetric.setjvmGCTime(taskMetrics.jvmGCTime());
        taskMetric.setPeakExecutionMemory(taskMetrics.peakExecutionMemory());
        taskMetric.setResultSize(taskMetrics.resultSize());
        taskMetric.setResultSerializationTime(taskMetrics.resultSerializationTime());
        taskEnd.setTaskMetric(taskMetric);
        this.listOfTasks.add(taskEnd);
        this.taskObjects.add(taskEnd);
        try {

            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(this.taskObjects);
            producer.send(new ProducerRecord(kafkaTopic, "TaskEnd", baos.toByteArray()));
            this.taskObjects= new ArrayList<>();

        } catch (Exception e) {
            e.printStackTrace();
        }

    }