in flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java [432:527]
private DataStreamSink<?> createBatchCompactSink(
DataStream<RowData> dataStream,
DataStructureConverter converter,
HiveWriterFactory recordWriterFactory,
TableMetaStoreFactory metaStoreFactory,
OutputFileConfig fileNaming,
String stagingParentDir,
StorageDescriptor sd,
Properties tableProps,
boolean isToLocal,
boolean overwrite,
final int sinkParallelism,
final int compactParallelism,
boolean sinkParallelismConfigured,
boolean compactParallelismConfigured)
throws IOException {
String[] partitionColumns = getPartitionKeyArray();
org.apache.flink.configuration.Configuration conf =
new org.apache.flink.configuration.Configuration();
catalogTable.getOptions().forEach(conf::setString);
HadoopFileSystemFactory fsFactory = fsFactory();
org.apache.flink.core.fs.Path tmpPath =
new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf));
PartitionCommitPolicyFactory partitionCommitPolicyFactory =
new PartitionCommitPolicyFactory(
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND),
conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS),
conf.get(
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME),
conf.get(
FileSystemConnectorOptions
.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS));
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());
BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder =
getBucketsBuilder(path, recordWriterFactory, sd, fileNaming, conf);
CompactReader.Factory<RowData> readerFactory = createCompactReaderFactory(sd, tableProps);
HiveOutputFormatFactory outputFormatFactory =
new HiveOutputFormatFactory(recordWriterFactory);
HiveRowPartitionComputer partitionComputer =
new HiveRowPartitionComputer(
hiveShim,
JobConfUtils.getDefaultPartitionName(jobConf),
resolvedSchema.getColumnNames().toArray(new String[0]),
resolvedSchema.getColumnDataTypes().toArray(new DataType[0]),
partitionColumns);
SingleOutputStreamOperator<Row> map =
dataStream.map(value -> (Row) converter.toExternal(value));
map.getTransformation().setParallelism(sinkParallelism, sinkParallelismConfigured);
DataStream<CoordinatorInput> writerDataStream =
map.transform(
BATCH_COMPACT_WRITER_OP_NAME,
TypeInformation.of(CoordinatorInput.class),
new BatchFileWriter<>(
fsFactory,
tmpPath,
partitionColumns,
dynamicGrouping,
staticPartitionSpec,
outputFormatFactory,
partitionComputer,
fileNaming));
writerDataStream
.getTransformation()
.setParallelism(sinkParallelism, sinkParallelismConfigured);
long compactAverageSize = conf.get(HiveOptions.COMPACT_SMALL_FILES_AVG_SIZE).getBytes();
long compactTargetSize =
conf.getOptional(FileSystemConnectorOptions.COMPACTION_FILE_SIZE)
.orElse(conf.get(SINK_ROLLING_POLICY_FILE_SIZE))
.getBytes();
return BatchSink.createBatchCompactSink(
writerDataStream,
builder,
readerFactory,
fsFactory,
metaStoreFactory,
partitionCommitPolicyFactory,
partitionColumns,
staticPartitionSpec,
tmpPath,
identifier,
compactAverageSize,
compactTargetSize,
isToLocal,
overwrite,
compactParallelism,
compactParallelismConfigured);
}