private FlinkBaseTaskWriter buildBaseWriter()

in amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkTaskWriterBuilder.java [116:205]


  private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) {
    Preconditions.checkArgument(transactionId == null);
    FileFormat fileFormat =
        FileFormat.valueOf(
            (table
                .properties()
                .getOrDefault(
                    TableProperties.BASE_FILE_FORMAT, TableProperties.BASE_FILE_FORMAT_DEFAULT)
                .toUpperCase(Locale.ENGLISH)));
    long fileSizeBytes =
        PropertyUtil.propertyAsLong(
            table.properties(),
            TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
            TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);

    String baseLocation;
    EncryptionManager encryptionManager;
    Schema schema;
    Table icebergTable;
    PrimaryKeySpec primaryKeySpec = null;
    if (table.isKeyedTable()) {
      KeyedTable keyedTable = table.asKeyedTable();
      baseLocation = keyedTable.baseLocation();
      encryptionManager = keyedTable.baseTable().encryption();
      schema = keyedTable.baseTable().schema();
      primaryKeySpec = keyedTable.primaryKeySpec();
      icebergTable = keyedTable.baseTable();
    } else {
      UnkeyedTable table = this.table.asUnkeyedTable();
      baseLocation = table.location();
      encryptionManager = table.encryption();
      schema = table.schema();
      icebergTable = table;
    }

    Schema selectSchema =
        TypeUtil.reassignIds(
            FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
    boolean hiveConsistentWriteEnabled =
        PropertyUtil.propertyAsBoolean(
            table.properties(),
            HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
            HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);

    OutputFileFactory outputFileFactory =
        locationKind == HiveLocationKind.INSTANT
            ? new AdaptHiveOutputFileFactory(
                ((SupportHive) table).hiveLocation(),
                table.spec(),
                fileFormat,
                table.io(),
                encryptionManager,
                partitionId,
                taskId,
                transactionId,
                hiveConsistentWriteEnabled)
            : new CommonOutputFileFactory(
                baseLocation,
                table.spec(),
                fileFormat,
                table.io(),
                encryptionManager,
                partitionId,
                taskId,
                transactionId);
    FileAppenderFactory<RowData> appenderFactory =
        TableTypeUtil.isHive(table)
            ? new AdaptHiveFlinkAppenderFactory(
                schema, flinkSchema, table.properties(), table.spec())
            : new FlinkAppenderFactory(
                icebergTable,
                schema,
                flinkSchema,
                table.properties(),
                table.spec(),
                null,
                null,
                null);
    return new FlinkBaseTaskWriter(
        fileFormat,
        appenderFactory,
        outputFileFactory,
        table.io(),
        fileSizeBytes,
        mask,
        selectSchema,
        flinkSchema,
        table.spec(),
        primaryKeySpec);
  }