amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatDynamicSource.java [77:247]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class MixedFormatDynamicSource
    implements ScanTableSource,
        SupportsFilterPushDown,
        SupportsProjectionPushDown,
        SupportsLimitPushDown,
        SupportsWatermarkPushDown,
        LookupTableSource {

  private static final Logger LOG = LoggerFactory.getLogger(MixedFormatDynamicSource.class);

  protected final String tableName;

  protected final ScanTableSource mixedFormatDynamicSource;
  protected final MixedTable mixedTable;
  protected final Map<String, String> properties;

  protected int[] projectFields;
  protected List<Expression> filters;
  protected ResolvedExpression flinkExpression;
  protected final MixedFormatTableLoader tableLoader;

  @Nullable protected WatermarkStrategy<RowData> watermarkStrategy;

  /**
   * @param tableName tableName
   * @param mixedFormatDynamicSource underlying source
   * @param mixedTable mixedTable
   * @param properties With all mixed-format table properties and sql options
   * @param tableLoader
   */
  public MixedFormatDynamicSource(
      String tableName,
      ScanTableSource mixedFormatDynamicSource,
      MixedTable mixedTable,
      Map<String, String> properties,
      MixedFormatTableLoader tableLoader) {
    this.tableName = tableName;
    this.mixedFormatDynamicSource = mixedFormatDynamicSource;
    this.mixedTable = mixedTable;
    this.properties = properties;
    this.tableLoader = tableLoader;
  }

  public MixedFormatDynamicSource(
      String tableName,
      ScanTableSource mixedFormatDynamicSource,
      MixedTable mixedTable,
      Map<String, String> properties,
      MixedFormatTableLoader tableLoader,
      int[] projectFields,
      List<Expression> filters,
      ResolvedExpression flinkExpression) {
    this.tableName = tableName;
    this.mixedFormatDynamicSource = mixedFormatDynamicSource;
    this.mixedTable = mixedTable;
    this.properties = properties;
    this.tableLoader = tableLoader;
    this.projectFields = projectFields;
    this.filters = filters;
    this.flinkExpression = flinkExpression;
  }

  @Override
  public ChangelogMode getChangelogMode() {
    return mixedFormatDynamicSource.getChangelogMode();
  }

  @Override
  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
    ScanRuntimeProvider origin = mixedFormatDynamicSource.getScanRuntimeProvider(scanContext);
    Preconditions.checkArgument(
        origin instanceof DataStreamScanProvider,
        "file or log ScanRuntimeProvider should be DataStreamScanProvider, but provided is "
            + origin.getClass());
    return origin;
  }

  @Override
  public DynamicTableSource copy() {
    return new MixedFormatDynamicSource(
        tableName,
        mixedFormatDynamicSource,
        mixedTable,
        properties,
        tableLoader,
        projectFields,
        filters,
        flinkExpression);
  }

  @Override
  public String asSummaryString() {
    return "Mixed-format Dynamic Source";
  }

  @Override
  public Result applyFilters(List<ResolvedExpression> filters) {
    IcebergAndFlinkFilters icebergAndFlinkFilters =
        FilterUtil.convertFlinkExpressToIceberg(filters);
    this.filters = icebergAndFlinkFilters.expressions();

    if (filters.size() == 1) {
      flinkExpression = filters.get(0);
    } else if (filters.size() >= 2) {
      flinkExpression = and(filters.get(0), filters.get(1));
      for (int i = 2; i < filters.size(); i++) {
        flinkExpression = and(flinkExpression, filters.subList(i, i + 1).get(0));
      }
    }

    if (mixedFormatDynamicSource instanceof SupportsFilterPushDown) {
      return ((SupportsFilterPushDown) mixedFormatDynamicSource).applyFilters(filters);
    } else {
      return Result.of(Collections.emptyList(), filters);
    }
  }

  @Override
  public boolean supportsNestedProjection() {
    if (mixedFormatDynamicSource instanceof SupportsProjectionPushDown) {
      return ((SupportsProjectionPushDown) mixedFormatDynamicSource).supportsNestedProjection();
    } else {
      return false;
    }
  }

  protected CallExpression and(ResolvedExpression left, ResolvedExpression right) {
    return CallExpression.permanent(
        FunctionIdentifier.of(BuiltInFunctionDefinitions.AND.getName()),
        BuiltInFunctionDefinitions.AND,
        Arrays.asList(left, right),
        DataTypes.BOOLEAN());
  }

  @Override
  public void applyProjection(int[][] projectedFields, DataType producedDataType) {
    projectFields = new int[projectedFields.length];
    for (int i = 0; i < projectedFields.length; i++) {
      Preconditions.checkArgument(
          projectedFields[i].length == 1, "Don't support nested projection now.");
      projectFields[i] = projectedFields[i][0];
    }

    if (mixedFormatDynamicSource instanceof SupportsProjectionPushDown) {
      ((SupportsProjectionPushDown) mixedFormatDynamicSource)
          .applyProjection(projectedFields, producedDataType);
    }
  }

  @Override
  public void applyLimit(long newLimit) {
    if (mixedFormatDynamicSource instanceof SupportsLimitPushDown) {
      ((SupportsLimitPushDown) mixedFormatDynamicSource).applyLimit(newLimit);
    }
  }

  @Override
  public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
    if (mixedFormatDynamicSource instanceof SupportsWatermarkPushDown) {
      ((SupportsWatermarkPushDown) mixedFormatDynamicSource).applyWatermark(watermarkStrategy);
    }
  }

  @Override
  public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
    int[] joinKeys = new int[context.getKeys().length];
    for (int i = 0; i < context.getKeys().length; i++) {
      Preconditions.checkArgument(
          context.getKeys()[i].length == 1,
          "Mixed-format lookup join doesn't support the row field as a joining key.");
      joinKeys[i] = context.getKeys()[i][0];
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



amoro-format-mixed/amoro-mixed-flink/v1.15/amoro-mixed-flink-1.15/src/main/java/org/apache/amoro/flink/table/MixedFormatDynamicSource.java [77:247]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class MixedFormatDynamicSource
    implements ScanTableSource,
        SupportsFilterPushDown,
        SupportsProjectionPushDown,
        SupportsLimitPushDown,
        SupportsWatermarkPushDown,
        LookupTableSource {

  private static final Logger LOG = LoggerFactory.getLogger(MixedFormatDynamicSource.class);

  protected final String tableName;

  protected final ScanTableSource mixedFormatDynamicSource;
  protected final MixedTable mixedTable;
  protected final Map<String, String> properties;

  protected int[] projectFields;
  protected List<Expression> filters;
  protected ResolvedExpression flinkExpression;
  protected final MixedFormatTableLoader tableLoader;

  @Nullable protected WatermarkStrategy<RowData> watermarkStrategy;

  /**
   * @param tableName tableName
   * @param mixedFormatDynamicSource underlying source
   * @param mixedTable mixedTable
   * @param properties With all mixed-format table properties and sql options
   * @param tableLoader
   */
  public MixedFormatDynamicSource(
      String tableName,
      ScanTableSource mixedFormatDynamicSource,
      MixedTable mixedTable,
      Map<String, String> properties,
      MixedFormatTableLoader tableLoader) {
    this.tableName = tableName;
    this.mixedFormatDynamicSource = mixedFormatDynamicSource;
    this.mixedTable = mixedTable;
    this.properties = properties;
    this.tableLoader = tableLoader;
  }

  public MixedFormatDynamicSource(
      String tableName,
      ScanTableSource mixedFormatDynamicSource,
      MixedTable mixedTable,
      Map<String, String> properties,
      MixedFormatTableLoader tableLoader,
      int[] projectFields,
      List<Expression> filters,
      ResolvedExpression flinkExpression) {
    this.tableName = tableName;
    this.mixedFormatDynamicSource = mixedFormatDynamicSource;
    this.mixedTable = mixedTable;
    this.properties = properties;
    this.tableLoader = tableLoader;
    this.projectFields = projectFields;
    this.filters = filters;
    this.flinkExpression = flinkExpression;
  }

  @Override
  public ChangelogMode getChangelogMode() {
    return mixedFormatDynamicSource.getChangelogMode();
  }

  @Override
  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
    ScanRuntimeProvider origin = mixedFormatDynamicSource.getScanRuntimeProvider(scanContext);
    Preconditions.checkArgument(
        origin instanceof DataStreamScanProvider,
        "file or log ScanRuntimeProvider should be DataStreamScanProvider, but provided is "
            + origin.getClass());
    return origin;
  }

  @Override
  public DynamicTableSource copy() {
    return new MixedFormatDynamicSource(
        tableName,
        mixedFormatDynamicSource,
        mixedTable,
        properties,
        tableLoader,
        projectFields,
        filters,
        flinkExpression);
  }

  @Override
  public String asSummaryString() {
    return "Mixed-format Dynamic Source";
  }

  @Override
  public Result applyFilters(List<ResolvedExpression> filters) {
    IcebergAndFlinkFilters icebergAndFlinkFilters =
        FilterUtil.convertFlinkExpressToIceberg(filters);
    this.filters = icebergAndFlinkFilters.expressions();

    if (filters.size() == 1) {
      flinkExpression = filters.get(0);
    } else if (filters.size() >= 2) {
      flinkExpression = and(filters.get(0), filters.get(1));
      for (int i = 2; i < filters.size(); i++) {
        flinkExpression = and(flinkExpression, filters.subList(i, i + 1).get(0));
      }
    }

    if (mixedFormatDynamicSource instanceof SupportsFilterPushDown) {
      return ((SupportsFilterPushDown) mixedFormatDynamicSource).applyFilters(filters);
    } else {
      return Result.of(Collections.emptyList(), filters);
    }
  }

  @Override
  public boolean supportsNestedProjection() {
    if (mixedFormatDynamicSource instanceof SupportsProjectionPushDown) {
      return ((SupportsProjectionPushDown) mixedFormatDynamicSource).supportsNestedProjection();
    } else {
      return false;
    }
  }

  protected CallExpression and(ResolvedExpression left, ResolvedExpression right) {
    return CallExpression.permanent(
        FunctionIdentifier.of(BuiltInFunctionDefinitions.AND.getName()),
        BuiltInFunctionDefinitions.AND,
        Arrays.asList(left, right),
        DataTypes.BOOLEAN());
  }

  @Override
  public void applyProjection(int[][] projectedFields, DataType producedDataType) {
    projectFields = new int[projectedFields.length];
    for (int i = 0; i < projectedFields.length; i++) {
      Preconditions.checkArgument(
          projectedFields[i].length == 1, "Don't support nested projection now.");
      projectFields[i] = projectedFields[i][0];
    }

    if (mixedFormatDynamicSource instanceof SupportsProjectionPushDown) {
      ((SupportsProjectionPushDown) mixedFormatDynamicSource)
          .applyProjection(projectedFields, producedDataType);
    }
  }

  @Override
  public void applyLimit(long newLimit) {
    if (mixedFormatDynamicSource instanceof SupportsLimitPushDown) {
      ((SupportsLimitPushDown) mixedFormatDynamicSource).applyLimit(newLimit);
    }
  }

  @Override
  public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
    if (mixedFormatDynamicSource instanceof SupportsWatermarkPushDown) {
      ((SupportsWatermarkPushDown) mixedFormatDynamicSource).applyWatermark(watermarkStrategy);
    }
  }

  @Override
  public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
    int[] joinKeys = new int[context.getKeys().length];
    for (int i = 0; i < context.getKeys().length; i++) {
      Preconditions.checkArgument(
          context.getKeys()[i].length == 1,
          "Mixed-format lookup join doesn't support the row field as a joining key.");
      joinKeys[i] = context.getKeys()[i][0];
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



