public ExecuteResponse stream_execute()

in zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java [138:265]


  public ExecuteResponse stream_execute(ExecuteRequest request,
                                        final InterpreterOutputStream interpreterOutput) {
    final ExecuteResponse.Builder finalResponseBuilder = ExecuteResponse.newBuilder()
        .setStatus(ExecuteStatus.SUCCESS);
    final AtomicBoolean completedFlag = new AtomicBoolean(false);
    maybeKernelFailed = false;
    LOGGER.debug("stream_execute code:\n" + request.getCode());
    asyncStub.execute(request, new StreamObserver<ExecuteResponse>() {
      OutputType lastOutputType = null;

      @Override
      public void onNext(ExecuteResponse executeResponse) {
        LOGGER.debug("Interpreter Streaming Output: " + executeResponse.getType() +
                "\t" + executeResponse.getOutput());
        switch (executeResponse.getType()) {
          case TEXT:
            try {
              if (checkForShinyApp(executeResponse.getOutput())) {
                break;
              }
              if (executeResponse.getOutput().startsWith("%")) {
                // the output from jupyter kernel maybe specify format already.
                interpreterOutput.write((executeResponse.getOutput()).getBytes());
              } else {
                // only add %text when the previous output type is not TEXT & HTML.
                // Reason :
                // 1. if no `%text`, it will be treated as previous output type.
                // 2. Always prepend `%text `, there will be an extra line separator,
                // because `%text ` appends line separator first.
                InterpreterResultMessageOutput curOutput =
                        interpreterOutput.getInterpreterOutput().getCurrentOutput();
                if (curOutput != null && curOutput.getType() != InterpreterResult.Type.HTML &&
                        curOutput.getType() != InterpreterResult.Type.TEXT) {
                  interpreterOutput.write("%text ".getBytes());
                }
                // explicitly use html output for ir kernel in some cases. otherwise some
                // R packages doesn't work. e.g. googlevis
                if (kernel.equals("ir") && executeResponse.getOutput()
                        .contains("<script type=\"text/javascript\">")) {
                  interpreterOutput.write("\n%html ".getBytes());
                }
                interpreterOutput.write(executeResponse.getOutput().getBytes());
              }
              interpreterOutput.getInterpreterOutput().flush();
            } catch (IOException e) {
              LOGGER.error("Unexpected IOException", e);
            }
            break;
          case PNG:
          case JPEG:
            try {
              interpreterOutput.write(("\n%img " + executeResponse.getOutput()).getBytes());
              interpreterOutput.getInterpreterOutput().flush();
            } catch (IOException e) {
              LOGGER.error("Unexpected IOException", e);
            }
            break;
          case HTML:
            try {
              interpreterOutput.write(("\n%html " + executeResponse.getOutput()).getBytes());
              interpreterOutput.getInterpreterOutput().flush();
            } catch (IOException e) {
              LOGGER.error("Unexpected IOException", e);
            }
            break;
          case CLEAR:
            interpreterOutput.getInterpreterOutput().clear();
            break;
          default:
            LOGGER.error("Unrecognized type:" + executeResponse.getType());
        }

        lastOutputType = executeResponse.getType();
        if (executeResponse.getStatus() == ExecuteStatus.ERROR) {
          // set the finalResponse to ERROR if any ERROR happens, otherwise the finalResponse would
          // be SUCCESS.
          finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
        }
      }

      @Override
      public void onError(Throwable throwable) {
        try {
          // only output the extra error when no error message is displayed before.
          if (finalResponseBuilder.getStatus() != null &&
                  finalResponseBuilder.getStatus() != ExecuteStatus.ERROR) {
            interpreterOutput.getInterpreterOutput().write("\n%text " +
                    ExceptionUtils.getStackTrace(throwable));
            interpreterOutput.getInterpreterOutput().flush();
          }
        } catch (IOException e) {
          LOGGER.error("Unexpected IOException", e);
        }
        LOGGER.error("Fail to call IPython grpc", throwable);
        finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
        maybeKernelFailed = true;
        completedFlag.set(true);
        synchronized (completedFlag) {
          completedFlag.notify();
        }
      }

      @Override
      public void onCompleted() {
        synchronized (completedFlag) {
          try {
            LOGGER.debug("stream_execute is completed");
            interpreterOutput.getInterpreterOutput().flush();
          } catch (IOException e) {
            LOGGER.error("Unexpected IOException", e);
          }
          completedFlag.set(true);
          completedFlag.notify();
        }
      }
    });

    synchronized (completedFlag) {
      if (!completedFlag.get()) {
        try {
          completedFlag.wait();
        } catch (InterruptedException e) {
          LOGGER.error("Unexpected Interruption", e);
        }
      }
    }
    return finalResponseBuilder.build();
  }