in amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkTaskWriterBuilder.java [116:205]
private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) {
Preconditions.checkArgument(transactionId == null);
FileFormat fileFormat =
FileFormat.valueOf(
(table
.properties()
.getOrDefault(
TableProperties.BASE_FILE_FORMAT, TableProperties.BASE_FILE_FORMAT_DEFAULT)
.toUpperCase(Locale.ENGLISH)));
long fileSizeBytes =
PropertyUtil.propertyAsLong(
table.properties(),
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
String baseLocation;
EncryptionManager encryptionManager;
Schema schema;
Table icebergTable;
PrimaryKeySpec primaryKeySpec = null;
if (table.isKeyedTable()) {
KeyedTable keyedTable = table.asKeyedTable();
baseLocation = keyedTable.baseLocation();
encryptionManager = keyedTable.baseTable().encryption();
schema = keyedTable.baseTable().schema();
primaryKeySpec = keyedTable.primaryKeySpec();
icebergTable = keyedTable.baseTable();
} else {
UnkeyedTable table = this.table.asUnkeyedTable();
baseLocation = table.location();
encryptionManager = table.encryption();
schema = table.schema();
icebergTable = table;
}
Schema selectSchema =
TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
boolean hiveConsistentWriteEnabled =
PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);
OutputFileFactory outputFileFactory =
locationKind == HiveLocationKind.INSTANT
? new AdaptHiveOutputFileFactory(
((SupportHive) table).hiveLocation(),
table.spec(),
fileFormat,
table.io(),
encryptionManager,
partitionId,
taskId,
transactionId,
hiveConsistentWriteEnabled)
: new CommonOutputFileFactory(
baseLocation,
table.spec(),
fileFormat,
table.io(),
encryptionManager,
partitionId,
taskId,
transactionId);
FileAppenderFactory<RowData> appenderFactory =
TableTypeUtil.isHive(table)
? new AdaptHiveFlinkAppenderFactory(
schema, flinkSchema, table.properties(), table.spec())
: new FlinkAppenderFactory(
icebergTable,
schema,
flinkSchema,
table.properties(),
table.spec(),
null,
null,
null);
return new FlinkBaseTaskWriter(
fileFormat,
appenderFactory,
outputFileFactory,
table.io(),
fileSizeBytes,
mask,
selectSchema,
flinkSchema,
table.spec(),
primaryKeySpec);
}