public Observable getSubmissionLog()

in Utils/azure-toolkit-ide-hdinsight-libs/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/CosmosServerlessSparkBatchJob.java [319:401]


    public Observable<SparkLogLine> getSubmissionLog() {
        final ImmutableSet<String> ignoredEmptyLines = ImmutableSet.of("stdout:", "stderr:", "yarn diagnostics:");
        final int GET_LIVY_URL_REPEAT_DELAY_MILLISECONDS = 3000;
        final int MAX_LOG_LINES_PER_REQUEST = 128;
        final int GET_LOG_REPEAT_DELAY_MILLISECONDS = 1000;
        // We need to repeatly call getSparkBatchJobRequest() since "livyServerApi" field does not always exist in response but
        // only appeared for a while and before that we can't get the "livyServerApi" field.
        ctrlInfo("Trying to get livy URL...");
        return getSparkBatchJobRequest()
                .flatMap(batchResp ->
                        getJobSchedulerState(batchResp) == null
                                ? Observable.error(new IOException("Failed to get scheduler state of the job."))
                                : Observable.just(batchResp)
                )
                .retryWhen(err ->
                        err.zipWith(Observable.range(1, getRetriesMax()), (n, i) -> i)
                                .delay(getDelaySeconds(), TimeUnit.SECONDS)
                )
                .repeatWhen(ob -> ob.delay(GET_LIVY_URL_REPEAT_DELAY_MILLISECONDS, TimeUnit.MILLISECONDS))
                .takeUntil(batchResp -> isJobEnded(batchResp) || StringUtils.isNotEmpty(getLivyAPI(batchResp)))
                .filter(batchResp -> isJobEnded(batchResp) || StringUtils.isNotEmpty(getLivyAPI(batchResp)))
                .flatMap(job -> {
                    if (isJobEnded(job)) {
                        final String jobState = getJobState(job);
                        final String schedulerState = getJobSchedulerState(job);
                        final String message = String.format("Job scheduler state: %s. Job running state: %s.", schedulerState, jobState);
                        return Observable.just(new SparkLogLine(TOOL, Info, message));
                    } else {
                        return Observable.just(job)
                                .doOnNext(batchResp -> {
                                    ctrlInfo("Successfully get livy URL: " + batchResp.properties().livyServerAPI());
                                    ctrlInfo("Trying to retrieve livy submission logs...");
                                    // After test we find batch id won't be provided until the job is in running state
                                    // However, since only one spark job will be run on the cluster, the batch ID should always be 0
                                    setBatchId(0);
                                })
                                .map(batchResp -> batchResp.properties().livyServerAPI())
                                // Get submission log
                                .flatMap(livyUrl ->
                                        Observable.defer(() -> getSubmissionLogRequest(livyUrl, getBatchId(), getLogStartIndex(), MAX_LOG_LINES_PER_REQUEST))
                                                .map(sparkJobLog -> Optional.ofNullable(sparkJobLog.getLog()).orElse(Collections.<String>emptyList()))
                                                .doOnNext(logs -> setLogStartIndex(getLogStartIndex() + logs.size()))
                                                .map(logs -> logs.stream()
                                                        .filter(logLine -> !ignoredEmptyLines.contains(logLine.trim().toLowerCase()))
                                                        .collect(Collectors.toList()))
                                                .flatMap(logLines -> {
                                                    if (logLines.size() > 0) {
                                                        return Observable.just(Triple.of(logLines, SparkBatchJobState.STARTING.toString(), SchedulerState.SCHEDULED.toString()));
                                                    } else {
                                                        return getSparkBatchJobRequest()
                                                                .map(batchResp -> Triple.of(logLines, getJobState(batchResp), getJobSchedulerState(batchResp)));
                                                    }
                                                })
                                                .onErrorResumeNext(errors ->
                                                        getSparkBatchJobRequest()
                                                                .delay(getDelaySeconds(), TimeUnit.SECONDS)
                                                                .map(batchResp -> Triple.of(new ArrayList<>(), getJobState(batchResp), getJobSchedulerState(batchResp)))
                                                )
                                                .repeatWhen(ob -> ob.delay(GET_LOG_REPEAT_DELAY_MILLISECONDS, TimeUnit.MILLISECONDS))
                                                // Continuously get livy log until job is not in Starting state or job is in Ended scheduler state
                                                .takeUntil(logAndStatesTriple -> {
                                                    String jobRunningState = logAndStatesTriple.getMiddle();
                                                    String jobSchedulerState = logAndStatesTriple.getRight();
                                                    return jobRunningState != null && !jobRunningState.equalsIgnoreCase(SparkBatchJobState.STARTING.toString())
                                                            || jobSchedulerState != null && jobSchedulerState.equalsIgnoreCase(SchedulerState.ENDED.toString());
                                                })
                                                .flatMap(logAndStatesTriple -> {
                                                    final String jobRunningState = logAndStatesTriple.getMiddle();
                                                    final String jobSchedulerState = logAndStatesTriple.getRight();
                                                    if (jobRunningState != null && !jobRunningState.equalsIgnoreCase(SparkBatchJobState.STARTING.toString())
                                                            || jobSchedulerState != null && jobSchedulerState.equalsIgnoreCase(SchedulerState.ENDED.toString())) {
                                                        final String message = String.format("Job scheduler state: %s. Job running state: %s.", jobSchedulerState, jobRunningState);
                                                        return Observable.just(
                                                                new SparkLogLine(TOOL, Info, message));
                                                    } else {
                                                        return Observable.from(logAndStatesTriple.getLeft())
                                                                .map(line -> new SparkLogLine(LIVY, Log, line));
                                                    }
                                                })
                                );
                    }
                });
    }