flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java [212:276]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public InterpreterResult runSqlList(String st, InterpreterContext context) {
    try {
      boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
      if (runAsOne) {
        statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
      }

      String jobName = context.getLocalProperties().get("jobName");
      if (StringUtils.isNotBlank(jobName)) {
        tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName);
      }

      List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
      for (String sql : sqls) {
        List<Operation> operations = null;
        try {
          operations = sqlParser.parse(sql);
        } catch (SqlParserException e) {
          context.out.write("%text Invalid Sql statement: " + sql + "\n");
          context.out.write(MESSAGE_HELP.toString());
          return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString());
        }

        try {
          callOperation(sql, operations.get(0), context);
          context.out.flush();
        } catch (Throwable e) {
          LOGGER.error("Fail to run sql:" + sql, e);
          try {
            context.out.write("%text Fail to run sql command: " +
                    sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
          } catch (IOException ex) {
            LOGGER.warn("Unexpected exception:", ex);
            return new InterpreterResult(InterpreterResult.Code.ERROR,
                    ExceptionUtils.getStackTrace(e));
          }
          return new InterpreterResult(InterpreterResult.Code.ERROR);
        }
      }

      if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) {
        try {
          lock.lock();
          List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
          if (!modifyOperations.isEmpty()) {
            callInserts(modifyOperations, context);
          }
        } catch (Exception e) {
          LOGGER.error("Fail to execute sql as one job", e);
          return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
        } finally {
          if (lock.isHeldByCurrentThread()) {
            lock.unlock();
          }
        }
      }
    } catch (Exception e) {
      LOGGER.error("Fail to execute sql", e);
      return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
    } finally {
      statementOperationsMap.remove(context.getParagraphId());
    }

    return new InterpreterResult(InterpreterResult.Code.SUCCESS);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116SqlInterpreter.java [212:276]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public InterpreterResult runSqlList(String st, InterpreterContext context) {
    try {
      boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
      if (runAsOne) {
        statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
      }

      String jobName = context.getLocalProperties().get("jobName");
      if (StringUtils.isNotBlank(jobName)) {
        tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName);
      }

      List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
      for (String sql : sqls) {
        List<Operation> operations = null;
        try {
          operations = sqlParser.parse(sql);
        } catch (SqlParserException e) {
          context.out.write("%text Invalid Sql statement: " + sql + "\n");
          context.out.write(MESSAGE_HELP.toString());
          return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString());
        }

        try {
          callOperation(sql, operations.get(0), context);
          context.out.flush();
        } catch (Throwable e) {
          LOGGER.error("Fail to run sql:" + sql, e);
          try {
            context.out.write("%text Fail to run sql command: " +
                    sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
          } catch (IOException ex) {
            LOGGER.warn("Unexpected exception:", ex);
            return new InterpreterResult(InterpreterResult.Code.ERROR,
                    ExceptionUtils.getStackTrace(e));
          }
          return new InterpreterResult(InterpreterResult.Code.ERROR);
        }
      }

      if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) {
        try {
          lock.lock();
          List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
          if (!modifyOperations.isEmpty()) {
            callInserts(modifyOperations, context);
          }
        } catch (Exception e) {
          LOGGER.error("Fail to execute sql as one job", e);
          return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
        } finally {
          if (lock.isHeldByCurrentThread()) {
            lock.unlock();
          }
        }
      }
    } catch (Exception e) {
      LOGGER.error("Fail to execute sql", e);
      return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
    } finally {
      statementOperationsMap.remove(context.getParagraphId());
    }

    return new InterpreterResult(InterpreterResult.Code.SUCCESS);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117SqlInterpreter.java [212:276]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public InterpreterResult runSqlList(String st, InterpreterContext context) {
    try {
      boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
      if (runAsOne) {
        statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
      }

      String jobName = context.getLocalProperties().get("jobName");
      if (StringUtils.isNotBlank(jobName)) {
        tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName);
      }

      List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
      for (String sql : sqls) {
        List<Operation> operations = null;
        try {
          operations = sqlParser.parse(sql);
        } catch (SqlParserException e) {
          context.out.write("%text Invalid Sql statement: " + sql + "\n");
          context.out.write(MESSAGE_HELP.toString());
          return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString());
        }

        try {
          callOperation(sql, operations.get(0), context);
          context.out.flush();
        } catch (Throwable e) {
          LOGGER.error("Fail to run sql:" + sql, e);
          try {
            context.out.write("%text Fail to run sql command: " +
                    sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
          } catch (IOException ex) {
            LOGGER.warn("Unexpected exception:", ex);
            return new InterpreterResult(InterpreterResult.Code.ERROR,
                    ExceptionUtils.getStackTrace(e));
          }
          return new InterpreterResult(InterpreterResult.Code.ERROR);
        }
      }

      if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) {
        try {
          lock.lock();
          List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
          if (!modifyOperations.isEmpty()) {
            callInserts(modifyOperations, context);
          }
        } catch (Exception e) {
          LOGGER.error("Fail to execute sql as one job", e);
          return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
        } finally {
          if (lock.isHeldByCurrentThread()) {
            lock.unlock();
          }
        }
      }
    } catch (Exception e) {
      LOGGER.error("Fail to execute sql", e);
      return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
    } finally {
      statementOperationsMap.remove(context.getParagraphId());
    }

    return new InterpreterResult(InterpreterResult.Code.SUCCESS);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



