flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java [375:394]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void callInserts(List<ModifyOperation> operations, InterpreterContext context) throws IOException {
    if (!isBatch) {
      context.getLocalProperties().put("flink.streaming.insert_into", "true");
    }
    TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations);
    checkState(tableResult.getJobClient().isPresent());
    try {
      tableResult.await();
      JobClient jobClient = tableResult.getJobClient().get();
      if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
        context.out.write("Insertion successfully.\n");
      } else {
        throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString());
      }
    } catch (InterruptedException e) {
      throw new IOException("Flink job is interrupted", e);
    } catch (ExecutionException e) {
      throw new IOException("Flink job is failed", e);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116SqlInterpreter.java [375:394]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void callInserts(List<ModifyOperation> operations, InterpreterContext context) throws IOException {
    if (!isBatch) {
      context.getLocalProperties().put("flink.streaming.insert_into", "true");
    }
    TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations);
    checkState(tableResult.getJobClient().isPresent());
    try {
      tableResult.await();
      JobClient jobClient = tableResult.getJobClient().get();
      if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
        context.out.write("Insertion successfully.\n");
      } else {
        throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString());
      }
    } catch (InterruptedException e) {
      throw new IOException("Flink job is interrupted", e);
    } catch (ExecutionException e) {
      throw new IOException("Flink job is failed", e);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117SqlInterpreter.java [375:394]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void callInserts(List<ModifyOperation> operations, InterpreterContext context) throws IOException {
    if (!isBatch) {
      context.getLocalProperties().put("flink.streaming.insert_into", "true");
    }
    TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations);
    checkState(tableResult.getJobClient().isPresent());
    try {
      tableResult.await();
      JobClient jobClient = tableResult.getJobClient().get();
      if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
        context.out.write("Insertion successfully.\n");
      } else {
        throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString());
      }
    } catch (InterruptedException e) {
      throw new IOException("Flink job is interrupted", e);
    } catch (ExecutionException e) {
      throw new IOException("Flink job is failed", e);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



