in PluginsAndFeatures/azure-toolkit-for-eclipse/com.microsoft.azuretools.hdinsight/src/com/microsoft/azuretools/hdinsight/spark/common2/SparkSubmitHelper.java [122:215]
public void printRunningLogStreamingly(/* Project project, */ int id, IClusterDetail clusterDetail,
Map<String, String> postEventProperty) throws IOException {
try {
boolean isFailedJob = false;
boolean isKilledJob = false;
int from_index = 0;
int pre_index;
int times = 0;
HDInsightUtil.getSparkSubmissionToolWindowView()
.setInfo("======================Begin printing out spark job log.=======================");
while (true) {
pre_index = from_index;
if (HDInsightUtil.getSparkSubmissionToolWindowView().getJobStatusManager().isJobKilled()) {
isKilledJob = true;
break;
}
from_index = printoutJobLog(/* project, */id, from_index, clusterDetail);
HttpResponse statusHttpResponse = SparkBatchSubmission.getInstance()
.getBatchSparkJobStatus(clusterDetail.getConnectionUrl() + "/livy/batches", id);
SparkSubmitResponse status = new Gson().fromJson(statusHttpResponse.getMessage(),
new TypeToken<SparkSubmitResponse>() {
}.getType());
// only the lines of the log are same between two http requests,
// we try to get the job status
if (from_index == pre_index) {
String finalStatus = status.getState().toLowerCase();
if (finalStatus.equals("error") || finalStatus.equals("success") || finalStatus.equals("dead")) {
if (finalStatus.equals("error") || finalStatus.equals("dead")) {
isFailedJob = true;
}
if (!HDInsightUtil.getSparkSubmissionToolWindowView().getJobStatusManager().isJobKilled()) {
printoutJobLog(id, from_index, clusterDetail);
HDInsightUtil.getSparkSubmissionToolWindowView().setInfo(
"======================Finish printing out spark job log.=======================");
} else {
isKilledJob = true;
}
break;
}
}
Thread.sleep(getIntervalTime(times));
times++;
}
if (isKilledJob) {
postEventProperty.put("IsKilled", "true");
AppInsightsClient.create(Messages.SparkSubmissionButtonClickEvent,
Activator.getDefault().getBundle().getVersion().toString(), postEventProperty);
EventUtil.logEvent(EventType.info, HDINSIGHT, Messages.SparkSubmissionButtonClickEvent, null);
return;
}
if (isFailedJob) {
postEventProperty.put("IsRunningSucceed", "false");
HttpResponse httpResponse = SparkBatchSubmission.getInstance()
.getBatchJobFullLog(clusterDetail.getConnectionUrl() + "/livy/batches", id);
SparkSubmitResponse submitStatus = new Gson().fromJson(httpResponse.getMessage(),
new TypeToken<SparkSubmitResponse>() {
}.getType());
if (submitStatus != null && submitStatus.getLog() != null && submitStatus.getLog().size() > 0) {
List<String> logs = submitStatus.getLog();
postEventProperty.put("SubmitFailedReason", truncateTelemetryMessage(logs.get(logs.size() - 1)));
}
HDInsightUtil.getSparkSubmissionToolWindowView().setError("Error : Your submitted job run failed");
} else {
postEventProperty.put("IsRunningSucceed", "true");
HDInsightUtil.getSparkSubmissionToolWindowView()
.setInfo("The Spark application completed successfully");
}
AppInsightsClient.create(Messages.SparkSubmissionButtonClickEvent,
Activator.getDefault().getBundle().getVersion().toString(), postEventProperty);
EventUtil.logEvent(EventType.info, HDINSIGHT, Messages.SparkSubmissionButtonClickEvent, null);
} catch (Exception e) {
if (HDInsightUtil.getSparkSubmissionToolWindowView().getJobStatusManager().isJobKilled() == false) {
HDInsightUtil.getSparkSubmissionToolWindowView()
.setError("Error : Failed to getting running log. Exception : " + e.toString());
postEventProperty.put("SubmitFailedReason", truncateTelemetryMessage(e.toString()));
} else {
postEventProperty.put("IsKilled", "true");
}
AppInsightsClient.create(Messages.SparkSubmissionButtonClickEvent,
Activator.getDefault().getBundle().getVersion().toString(), postEventProperty);
EventUtil.logEvent(EventType.info, HDINSIGHT, Messages.SparkSubmissionButtonClickEvent, null);
}
}