private DataStreamSink createBatchCompactSink()

in flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java [432:527]


    private DataStreamSink<?> createBatchCompactSink(
            DataStream<RowData> dataStream,
            DataStructureConverter converter,
            HiveWriterFactory recordWriterFactory,
            TableMetaStoreFactory metaStoreFactory,
            OutputFileConfig fileNaming,
            String stagingParentDir,
            StorageDescriptor sd,
            Properties tableProps,
            boolean isToLocal,
            boolean overwrite,
            final int sinkParallelism,
            final int compactParallelism,
            boolean sinkParallelismConfigured,
            boolean compactParallelismConfigured)
            throws IOException {
        String[] partitionColumns = getPartitionKeyArray();
        org.apache.flink.configuration.Configuration conf =
                new org.apache.flink.configuration.Configuration();
        catalogTable.getOptions().forEach(conf::setString);
        HadoopFileSystemFactory fsFactory = fsFactory();
        org.apache.flink.core.fs.Path tmpPath =
                new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf));

        PartitionCommitPolicyFactory partitionCommitPolicyFactory =
                new PartitionCommitPolicyFactory(
                        conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND),
                        conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS),
                        conf.get(
                                FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME),
                        conf.get(
                                FileSystemConnectorOptions
                                        .SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS));

        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());
        BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder =
                getBucketsBuilder(path, recordWriterFactory, sd, fileNaming, conf);

        CompactReader.Factory<RowData> readerFactory = createCompactReaderFactory(sd, tableProps);

        HiveOutputFormatFactory outputFormatFactory =
                new HiveOutputFormatFactory(recordWriterFactory);
        HiveRowPartitionComputer partitionComputer =
                new HiveRowPartitionComputer(
                        hiveShim,
                        JobConfUtils.getDefaultPartitionName(jobConf),
                        resolvedSchema.getColumnNames().toArray(new String[0]),
                        resolvedSchema.getColumnDataTypes().toArray(new DataType[0]),
                        partitionColumns);

        SingleOutputStreamOperator<Row> map =
                dataStream.map(value -> (Row) converter.toExternal(value));
        map.getTransformation().setParallelism(sinkParallelism, sinkParallelismConfigured);

        DataStream<CoordinatorInput> writerDataStream =
                map.transform(
                        BATCH_COMPACT_WRITER_OP_NAME,
                        TypeInformation.of(CoordinatorInput.class),
                        new BatchFileWriter<>(
                                fsFactory,
                                tmpPath,
                                partitionColumns,
                                dynamicGrouping,
                                staticPartitionSpec,
                                outputFormatFactory,
                                partitionComputer,
                                fileNaming));
        writerDataStream
                .getTransformation()
                .setParallelism(sinkParallelism, sinkParallelismConfigured);

        long compactAverageSize = conf.get(HiveOptions.COMPACT_SMALL_FILES_AVG_SIZE).getBytes();

        long compactTargetSize =
                conf.getOptional(FileSystemConnectorOptions.COMPACTION_FILE_SIZE)
                        .orElse(conf.get(SINK_ROLLING_POLICY_FILE_SIZE))
                        .getBytes();

        return BatchSink.createBatchCompactSink(
                writerDataStream,
                builder,
                readerFactory,
                fsFactory,
                metaStoreFactory,
                partitionCommitPolicyFactory,
                partitionColumns,
                staticPartitionSpec,
                tmpPath,
                identifier,
                compactAverageSize,
                compactTargetSize,
                isToLocal,
                overwrite,
                compactParallelism,
                compactParallelismConfigured);
    }