private DataStreamSink createStreamSink()

in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java [298:416]


    private DataStreamSink<?> createStreamSink(
            DataStream<RowData> dataStream,
            StorageDescriptor sd,
            Properties tableProps,
            HiveWriterFactory recordWriterFactory,
            OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder,
            final int parallelism) {
        org.apache.flink.configuration.Configuration conf =
                new org.apache.flink.configuration.Configuration();
        catalogTable.getOptions().forEach(conf::setString);

        String commitPolicies = conf.getString(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND);
        if (!getPartitionKeys().isEmpty() && StringUtils.isNullOrWhitespaceOnly(commitPolicies)) {
            throw new FlinkHiveException(
                    String.format(
                            "Streaming write to partitioned hive table %s without providing a commit policy. "
                                    + "Make sure to set a proper value for %s",
                            identifier, FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key()));
        }

        HiveRowDataPartitionComputer partComputer;
        partComputer = new HiveRowDataPartitionComputer(
                jobConf,
                hiveShim,
                hiveVersion,
                JobConfUtils.getDefaultPartitionName(jobConf),
                tableSchema.getFieldNames(),
                tableSchema.getFieldDataTypes(),
                getPartitionKeyArray(),
                partitionPolicy,
                partitionField,
                timePattern,
                inputFormat,
                outputFormat,
                serializationLib);

        TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
        HiveRollingPolicy rollingPolicy =
                new HiveRollingPolicy(
                        conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
                        conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());

        boolean autoCompaction = conf.getBoolean(FileSystemOptions.AUTO_COMPACTION);
        if (autoCompaction) {
            fileNamingBuilder.withPartPrefix(
                    convertToUncompacted(fileNamingBuilder.build().getPartPrefix()));
        }
        OutputFileConfig outputFileConfig = fileNamingBuilder.build();

        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());

        BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
        if (flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER)) {
            builder =
                    bucketsBuilderForMRWriter(
                            recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
            LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
        } else {
            Optional<BulkWriter.Factory<RowData>> bulkFactory =
                    createBulkWriterFactory(getPartitionKeyArray(), sd);
            if (bulkFactory.isPresent()) {
                builder =
                        StreamingFileSink.forBulkFormat(
                                path,
                                new FileSystemTableSink.ProjectionBulkFactory(
                                        bulkFactory.get(), partComputer))
                                .withBucketAssigner(assigner)
                                .withRollingPolicy(rollingPolicy)
                                .withOutputFileConfig(outputFileConfig);
                LOG.info("Hive streaming sink: Use native parquet&orc writer.");
            } else {
                builder =
                        bucketsBuilderForMRWriter(
                                recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
                LOG.info(
                        "Hive streaming sink: Use MapReduce RecordWriter "
                                + "writer because BulkWriter Factory not available.");
            }
        }

        long bucketCheckInterval = conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();

        DataStream<PartitionCommitInfo> writerStream;
        if (autoCompaction) {
            long compactionSize =
                    conf.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE)
                            .orElse(conf.get(SINK_ROLLING_POLICY_FILE_SIZE))
                            .getBytes();

            writerStream =
                    StreamingSink.compactionWriter(
                            dataStream,
                            bucketCheckInterval,
                            builder,
                            fsFactory(),
                            path,
                            createCompactReaderFactory(sd, tableProps),
                            compactionSize,
                            parallelism,
                            inlongMetric,
                            auditHostAndPorts,
                            dirtyOptions,
                            dirtySink);
        } else {
            writerStream =
                    StreamingSink.writer(
                            dataStream,
                            bucketCheckInterval,
                            builder,
                            parallelism,
                            inlongMetric,
                            auditHostAndPorts,
                            dirtyOptions,
                            dirtySink);
        }

        return StreamingSink.sink(
                writerStream, path, identifier, getPartitionKeys(), msFactory(), fsFactory(), conf);
    }