public void printRunningLogStreamingly()

in PluginsAndFeatures/azure-toolkit-for-eclipse/com.microsoft.azuretools.hdinsight/src/com/microsoft/azuretools/hdinsight/spark/common2/SparkSubmitHelper.java [122:215]


    public void printRunningLogStreamingly(/* Project project, */ int id, IClusterDetail clusterDetail,
            Map<String, String> postEventProperty) throws IOException {
        try {
            boolean isFailedJob = false;
            boolean isKilledJob = false;

            int from_index = 0;
            int pre_index;
            int times = 0;
            HDInsightUtil.getSparkSubmissionToolWindowView()
                    .setInfo("======================Begin printing out spark job log.=======================");
            while (true) {
                pre_index = from_index;
                if (HDInsightUtil.getSparkSubmissionToolWindowView().getJobStatusManager().isJobKilled()) {
                    isKilledJob = true;
                    break;
                }

                from_index = printoutJobLog(/* project, */id, from_index, clusterDetail);
                HttpResponse statusHttpResponse = SparkBatchSubmission.getInstance()
                        .getBatchSparkJobStatus(clusterDetail.getConnectionUrl() + "/livy/batches", id);

                SparkSubmitResponse status = new Gson().fromJson(statusHttpResponse.getMessage(),
                        new TypeToken<SparkSubmitResponse>() {
                        }.getType());

                // only the lines of the log are same between two http requests,
                // we try to get the job status
                if (from_index == pre_index) {
                    String finalStatus = status.getState().toLowerCase();
                    if (finalStatus.equals("error") || finalStatus.equals("success") || finalStatus.equals("dead")) {
                        if (finalStatus.equals("error") || finalStatus.equals("dead")) {
                            isFailedJob = true;
                        }

                        if (!HDInsightUtil.getSparkSubmissionToolWindowView().getJobStatusManager().isJobKilled()) {
                            printoutJobLog(id, from_index, clusterDetail);
                            HDInsightUtil.getSparkSubmissionToolWindowView().setInfo(
                                    "======================Finish printing out spark job log.=======================");
                        } else {
                            isKilledJob = true;
                        }
                        break;
                    }
                }

                Thread.sleep(getIntervalTime(times));
                times++;
            }

            if (isKilledJob) {
                postEventProperty.put("IsKilled", "true");
                AppInsightsClient.create(Messages.SparkSubmissionButtonClickEvent,
                        Activator.getDefault().getBundle().getVersion().toString(), postEventProperty);
                EventUtil.logEvent(EventType.info, HDINSIGHT, Messages.SparkSubmissionButtonClickEvent, null);
                return;
            }

            if (isFailedJob) {
                postEventProperty.put("IsRunningSucceed", "false");
                HttpResponse httpResponse = SparkBatchSubmission.getInstance()
                        .getBatchJobFullLog(clusterDetail.getConnectionUrl() + "/livy/batches", id);
                SparkSubmitResponse submitStatus = new Gson().fromJson(httpResponse.getMessage(),
                        new TypeToken<SparkSubmitResponse>() {
                        }.getType());
                if (submitStatus != null && submitStatus.getLog() != null && submitStatus.getLog().size() > 0) {
                    List<String> logs = submitStatus.getLog();
                    postEventProperty.put("SubmitFailedReason", truncateTelemetryMessage(logs.get(logs.size() - 1)));
                }

                HDInsightUtil.getSparkSubmissionToolWindowView().setError("Error : Your submitted job run failed");
            } else {
                postEventProperty.put("IsRunningSucceed", "true");
                HDInsightUtil.getSparkSubmissionToolWindowView()
                        .setInfo("The Spark application completed successfully");
            }

            AppInsightsClient.create(Messages.SparkSubmissionButtonClickEvent,
                    Activator.getDefault().getBundle().getVersion().toString(), postEventProperty);
            EventUtil.logEvent(EventType.info, HDINSIGHT, Messages.SparkSubmissionButtonClickEvent, null);
        } catch (Exception e) {
            if (HDInsightUtil.getSparkSubmissionToolWindowView().getJobStatusManager().isJobKilled() == false) {
                HDInsightUtil.getSparkSubmissionToolWindowView()
                        .setError("Error : Failed to getting running log. Exception : " + e.toString());

                postEventProperty.put("SubmitFailedReason", truncateTelemetryMessage(e.toString()));
            } else {
                postEventProperty.put("IsKilled", "true");
            }
            AppInsightsClient.create(Messages.SparkSubmissionButtonClickEvent,
                    Activator.getDefault().getBundle().getVersion().toString(), postEventProperty);
            EventUtil.logEvent(EventType.info, HDINSIGHT, Messages.SparkSubmissionButtonClickEvent, null);
        }
    }