in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java [298:416]
private DataStreamSink<?> createStreamSink(
DataStream<RowData> dataStream,
StorageDescriptor sd,
Properties tableProps,
HiveWriterFactory recordWriterFactory,
OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder,
final int parallelism) {
org.apache.flink.configuration.Configuration conf =
new org.apache.flink.configuration.Configuration();
catalogTable.getOptions().forEach(conf::setString);
String commitPolicies = conf.getString(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND);
if (!getPartitionKeys().isEmpty() && StringUtils.isNullOrWhitespaceOnly(commitPolicies)) {
throw new FlinkHiveException(
String.format(
"Streaming write to partitioned hive table %s without providing a commit policy. "
+ "Make sure to set a proper value for %s",
identifier, FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key()));
}
HiveRowDataPartitionComputer partComputer;
partComputer = new HiveRowDataPartitionComputer(
jobConf,
hiveShim,
hiveVersion,
JobConfUtils.getDefaultPartitionName(jobConf),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
getPartitionKeyArray(),
partitionPolicy,
partitionField,
timePattern,
inputFormat,
outputFormat,
serializationLib);
TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
HiveRollingPolicy rollingPolicy =
new HiveRollingPolicy(
conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
boolean autoCompaction = conf.getBoolean(FileSystemOptions.AUTO_COMPACTION);
if (autoCompaction) {
fileNamingBuilder.withPartPrefix(
convertToUncompacted(fileNamingBuilder.build().getPartPrefix()));
}
OutputFileConfig outputFileConfig = fileNamingBuilder.build();
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());
BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
if (flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER)) {
builder =
bucketsBuilderForMRWriter(
recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
} else {
Optional<BulkWriter.Factory<RowData>> bulkFactory =
createBulkWriterFactory(getPartitionKeyArray(), sd);
if (bulkFactory.isPresent()) {
builder =
StreamingFileSink.forBulkFormat(
path,
new FileSystemTableSink.ProjectionBulkFactory(
bulkFactory.get(), partComputer))
.withBucketAssigner(assigner)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig);
LOG.info("Hive streaming sink: Use native parquet&orc writer.");
} else {
builder =
bucketsBuilderForMRWriter(
recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
LOG.info(
"Hive streaming sink: Use MapReduce RecordWriter "
+ "writer because BulkWriter Factory not available.");
}
}
long bucketCheckInterval = conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();
DataStream<PartitionCommitInfo> writerStream;
if (autoCompaction) {
long compactionSize =
conf.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE)
.orElse(conf.get(SINK_ROLLING_POLICY_FILE_SIZE))
.getBytes();
writerStream =
StreamingSink.compactionWriter(
dataStream,
bucketCheckInterval,
builder,
fsFactory(),
path,
createCompactReaderFactory(sd, tableProps),
compactionSize,
parallelism,
inlongMetric,
auditHostAndPorts,
dirtyOptions,
dirtySink);
} else {
writerStream =
StreamingSink.writer(
dataStream,
bucketCheckInterval,
builder,
parallelism,
inlongMetric,
auditHostAndPorts,
dirtyOptions,
dirtySink);
}
return StreamingSink.sink(
writerStream, path, identifier, getPartitionKeys(), msFactory(), fsFactory(), conf);
}