flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java [245:321]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public void registerTableSink(Object stenv, String tableName, Object collectTableSink) {
    ((org.apache.flink.table.api.internal.TableEnvironmentInternal) stenv)
            .registerTableSinkInternal(tableName, (TableSink) collectTableSink);
  }

  @Override
  public void registerScalarFunction(Object btenv, String name, Object scalarFunction) {
    ((StreamTableEnvironmentImpl) (btenv)).createTemporarySystemFunction(name, (ScalarFunction) scalarFunction);
  }

  @Override
  public void registerTableFunction(Object btenv, String name, Object tableFunction) {
    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableFunction) tableFunction);
  }

  @Override
  public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) {
    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (AggregateFunction) aggregateFunction);
  }

  @Override
  public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) {
    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction);
  }

  /**
   * Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager.
   * This is a workaround which always reset CatalogTableSchemaResolver before running any flink code.
   *
   * @param catalogManager
   * @param parserObject
   * @param environmentSetting
   */
  @Override
  public void setCatalogManagerSchemaResolver(Object catalogManager,
                                              Object parserObject,
                                              Object environmentSetting) {

  }

  @Override
  public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) {
    CustomCommandLine customCommandLine = ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
    try {
      ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine));
      return effectiveConfig;
    } catch (FlinkException e) {
      throw new RuntimeException("Fail to call addAll", e);
    }
  }

  @Override
  public void setBatchRuntimeMode(Object tableConfig) {
    ((TableConfig) tableConfig).getConfiguration()
            .set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
  }

  @Override
  public void setOldPlanner(Object tableConfig) {

  }

  @Override
  public String[] rowToString(Object row, Object table, Object tableConfig) {
    final String zone = ((TableConfig) tableConfig).getConfiguration()
            .get(TableConfigOptions.LOCAL_TIME_ZONE);
    ZoneId zoneId = TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
            ? ZoneId.systemDefault()
            : ZoneId.of(zone);

    ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema();
    return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId);
  }

  @Override
  public boolean isTimeIndicatorType(Object type) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117Shims.java [261:337]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public void registerTableSink(Object stenv, String tableName, Object collectTableSink) {
    ((org.apache.flink.table.api.internal.TableEnvironmentInternal) stenv)
            .registerTableSinkInternal(tableName, (TableSink) collectTableSink);
  }

  @Override
  public void registerScalarFunction(Object btenv, String name, Object scalarFunction) {
    ((StreamTableEnvironmentImpl) (btenv)).createTemporarySystemFunction(name, (ScalarFunction) scalarFunction);
  }

  @Override
  public void registerTableFunction(Object btenv, String name, Object tableFunction) {
    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableFunction) tableFunction);
  }

  @Override
  public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) {
    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (AggregateFunction) aggregateFunction);
  }

  @Override
  public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) {
    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction);
  }

  /**
   * Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager.
   * This is a workaround which always reset CatalogTableSchemaResolver before running any flink code.
   *
   * @param catalogManager
   * @param parserObject
   * @param environmentSetting
   */
  @Override
  public void setCatalogManagerSchemaResolver(Object catalogManager,
                                              Object parserObject,
                                              Object environmentSetting) {

  }

  @Override
  public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) {
    CustomCommandLine customCommandLine = ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
    try {
      ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine));
      return effectiveConfig;
    } catch (FlinkException e) {
      throw new RuntimeException("Fail to call addAll", e);
    }
  }

  @Override
  public void setBatchRuntimeMode(Object tableConfig) {
    ((TableConfig) tableConfig).getConfiguration()
            .set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
  }

  @Override
  public void setOldPlanner(Object tableConfig) {

  }

  @Override
  public String[] rowToString(Object row, Object table, Object tableConfig) {
    final String zone = ((TableConfig) tableConfig).getConfiguration()
            .get(TableConfigOptions.LOCAL_TIME_ZONE);
    ZoneId zoneId = TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
            ? ZoneId.systemDefault()
            : ZoneId.of(zone);

    ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema();
    return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId);
  }

  @Override
  public boolean isTimeIndicatorType(Object type) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



