flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java [100:252]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public Object createResourceManager(List<URL> jars, Object tableConfig) {
    Configuration configuration = ((TableConfig) tableConfig).getConfiguration().clone();
    ClientWrapperClassLoader userClassLoader =
            new ClientWrapperClassLoader(
                    ClientClassloaderUtil.buildUserClassLoader(
                            jars,
                            Thread.currentThread().getContextClassLoader(),
                            new Configuration(configuration)),
                    configuration);
    return new ClientResourceManager(configuration, userClassLoader);
  }

  @Override
  public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List<URL> jars) {
    ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, (TableConfig) tableConfig);
    return new FunctionCatalog((TableConfig) tableConfig, resourceManager, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
  }

  @Override
  public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
    // do nothing
  }

  @Override
  public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj,
                                                       Object senvObj,
                                                       Object tableConfigObj,
                                                       Object moduleManagerObj,
                                                       Object functionCatalogObj,
                                                       Object catalogManagerObj,
                                                       List<URL> jars,
                                                       ClassLoader classLoader) {
    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
    TableConfig tableConfig = (TableConfig) tableConfigObj;
    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
            classLoader, environmentSettings, senv,
            tableConfig, moduleManager, functionCatalog, catalogManager);
    Planner planner = (Planner) pair.left;
    Executor executor = (Executor) pair.right;

    ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, tableConfig);

    return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager,
            moduleManager, resourceManager,
            functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv),
            planner, executor, environmentSettings.isStreamingMode());
  }

  @Override
  public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj,
                                                      Object senvObj,
                                                      Object tableConfigObj,
                                                      Object moduleManagerObj,
                                                      Object functionCatalogObj,
                                                      Object catalogManagerObj,
                                                      List<URL> jars,
                                                      ClassLoader classLoader) {
    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
    TableConfig tableConfig = (TableConfig) tableConfigObj;
    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
            classLoader, environmentSettings, senv,
            tableConfig, moduleManager, functionCatalog, catalogManager);
    Planner planner = (Planner) pair.left;
    Executor executor = (Executor) pair.right;

    ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, tableConfig);

    return new StreamTableEnvironmentImpl(catalogManager, moduleManager, resourceManager,
            functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode());
  }

  @Override
  public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
    return new StreamExecutionEnvironmentFactory() {
      @Override
      public StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration) {
        return (StreamExecutionEnvironment) streamExecutionEnvironment;
      }
    };
  }

  @Override
  public Object createCatalogManager(Object config) {
    return CatalogManager.newBuilder()
            .classLoader(Thread.currentThread().getContextClassLoader())
            .config((ReadableConfig) config)
            .defaultCatalog("default_catalog",
                    new GenericInMemoryCatalog("default_catalog", "default_database"))
            .build();
  }

  @Override
  public String getPyFlinkPythonPath(Properties properties) throws IOException {
    String mode = properties.getProperty("flink.execution.mode");
    if ("yarn-application".equalsIgnoreCase(mode)) {
      // for yarn application mode, FLINK_HOME is container working directory
      String flinkHome = new File(".").getAbsolutePath();
      return getPyFlinkPythonPath(new File(flinkHome + "/lib/python"));
    }

    String flinkHome = System.getenv("FLINK_HOME");
    if (StringUtils.isNotBlank(flinkHome)) {
      return getPyFlinkPythonPath(new File(flinkHome + "/opt/python"));
    } else {
      throw new IOException("No FLINK_HOME is specified");
    }
  }

  private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException {
    LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder);
    if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) {
      throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder",
              pyFlinkFolder.getAbsolutePath()));
    }
    List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles());
    StringBuilder builder = new StringBuilder();
    for (File file : depFiles) {
      LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath());
      builder.append(file.getAbsolutePath() + ":");
    }
    return builder.toString();
  }

  @Override
  public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) {
    return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer<Tuple2<Boolean, Row>>) serializer);
  }

  @Override
  public List collectToList(Object table) throws Exception {
    return Lists.newArrayList(((Table) table).execute().collect());
  }

  @Override
  public boolean rowEquals(Object row1, Object row2) {
    Row r1 = (Row) row1;
    Row r2 = (Row) row2;
    r1.setKind(RowKind.INSERT);
    r2.setKind(RowKind.INSERT);
    return r1.equals(r2);
  }

  @Override
  public Object fromDataSet(Object btenv, Object ds) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117Shims.java [100:252]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public Object createResourceManager(List<URL> jars, Object tableConfig) {
    Configuration configuration = ((TableConfig) tableConfig).getConfiguration().clone();
    ClientWrapperClassLoader userClassLoader =
            new ClientWrapperClassLoader(
                    ClientClassloaderUtil.buildUserClassLoader(
                            jars,
                            Thread.currentThread().getContextClassLoader(),
                            new Configuration(configuration)),
                    configuration);
    return new ClientResourceManager(configuration, userClassLoader);
  }

  @Override
  public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List<URL> jars) {
    ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, (TableConfig) tableConfig);
    return new FunctionCatalog((TableConfig) tableConfig, resourceManager, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
  }

  @Override
  public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
    // do nothing
  }

  @Override
  public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj,
                                                       Object senvObj,
                                                       Object tableConfigObj,
                                                       Object moduleManagerObj,
                                                       Object functionCatalogObj,
                                                       Object catalogManagerObj,
                                                       List<URL> jars,
                                                       ClassLoader classLoader) {
    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
    TableConfig tableConfig = (TableConfig) tableConfigObj;
    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
            classLoader, environmentSettings, senv,
            tableConfig, moduleManager, functionCatalog, catalogManager);
    Planner planner = (Planner) pair.left;
    Executor executor = (Executor) pair.right;

    ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, tableConfig);

    return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager,
            moduleManager, resourceManager,
            functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv),
            planner, executor, environmentSettings.isStreamingMode());
  }

  @Override
  public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj,
                                                      Object senvObj,
                                                      Object tableConfigObj,
                                                      Object moduleManagerObj,
                                                      Object functionCatalogObj,
                                                      Object catalogManagerObj,
                                                      List<URL> jars,
                                                      ClassLoader classLoader) {
    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
    TableConfig tableConfig = (TableConfig) tableConfigObj;
    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
            classLoader, environmentSettings, senv,
            tableConfig, moduleManager, functionCatalog, catalogManager);
    Planner planner = (Planner) pair.left;
    Executor executor = (Executor) pair.right;

    ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, tableConfig);

    return new StreamTableEnvironmentImpl(catalogManager, moduleManager, resourceManager,
            functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode());
  }

  @Override
  public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
    return new StreamExecutionEnvironmentFactory() {
      @Override
      public StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration) {
        return (StreamExecutionEnvironment) streamExecutionEnvironment;
      }
    };
  }

  @Override
  public Object createCatalogManager(Object config) {
    return CatalogManager.newBuilder()
            .classLoader(Thread.currentThread().getContextClassLoader())
            .config((ReadableConfig) config)
            .defaultCatalog("default_catalog",
                    new GenericInMemoryCatalog("default_catalog", "default_database"))
            .build();
  }

  @Override
  public String getPyFlinkPythonPath(Properties properties) throws IOException {
    String mode = properties.getProperty("flink.execution.mode");
    if ("yarn-application".equalsIgnoreCase(mode)) {
      // for yarn application mode, FLINK_HOME is container working directory
      String flinkHome = new File(".").getAbsolutePath();
      return getPyFlinkPythonPath(new File(flinkHome + "/lib/python"));
    }

    String flinkHome = System.getenv("FLINK_HOME");
    if (StringUtils.isNotBlank(flinkHome)) {
      return getPyFlinkPythonPath(new File(flinkHome + "/opt/python"));
    } else {
      throw new IOException("No FLINK_HOME is specified");
    }
  }

  private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException {
    LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder);
    if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) {
      throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder",
              pyFlinkFolder.getAbsolutePath()));
    }
    List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles());
    StringBuilder builder = new StringBuilder();
    for (File file : depFiles) {
      LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath());
      builder.append(file.getAbsolutePath() + ":");
    }
    return builder.toString();
  }

  @Override
  public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) {
    return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer<Tuple2<Boolean, Row>>) serializer);
  }

  @Override
  public List collectToList(Object table) throws Exception {
    return Lists.newArrayList(((Table) table).execute().collect());
  }

  @Override
  public boolean rowEquals(Object row1, Object row2) {
    Row r1 = (Row) row1;
    Row r2 = (Row) row2;
    r1.setKind(RowKind.INSERT);
    r2.setKind(RowKind.INSERT);
    return r1.equals(r2);
  }

  @Override
  public Object fromDataSet(Object btenv, Object ds) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



