private DataStreamSink consume()

in flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java [233:367]


    private DataStreamSink<?> consume(
            ProviderContext providerContext,
            DataStream<RowData> dataStream,
            boolean isBounded,
            DataStructureConverter converter) {
        checkAcidTable(catalogTable.getOptions(), identifier.toObjectPath());

        StorageDescriptor sd;
        Properties tableProps = new Properties();
        Map<String, String> options = catalogTable.getOptions();
        boolean isInsertDirectory =
                Boolean.parseBoolean(
                        options.getOrDefault(
                                CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX + IS_INSERT_DIRECTORY,
                                "false"));
        boolean isToLocal = false;
        if (isInsertDirectory) {
            isToLocal =
                    Boolean.parseBoolean(
                            options.getOrDefault(
                                    CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX
                                            + IS_TO_LOCAL_DIRECTORY,
                                    "false"));
            sd =
                    org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(
                                    identifier.getDatabaseName(), identifier.getObjectName())
                            .getSd();
            HiveConf hiveConf = HiveConfUtils.create(jobConf);
            HiveTableUtil.setDefaultStorageFormatForDirectory(sd, HiveConfUtils.create(jobConf));
            HiveTableUtil.extractRowFormat(sd, catalogTable.getOptions());
            HiveTableUtil.extractStoredAs(sd, catalogTable.getOptions(), hiveConf);
            HiveTableUtil.extractLocation(sd, catalogTable.getOptions());
            tableProps.putAll(sd.getSerdeInfo().getParameters());
            tableProps.putAll(catalogTable.getOptions());
        } else {
            try (HiveMetastoreClientWrapper client =
                    HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
                Table table =
                        client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
                sd = table.getSd();
                tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
            } catch (TException e) {
                throw new CatalogException("Failed to query Hive metaStore", e);
            }
        }

        try {
            Class hiveOutputFormatClz =
                    hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
            boolean isCompressed =
                    jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
            HiveWriterFactory writerFactory =
                    new HiveWriterFactory(
                            jobConf,
                            hiveOutputFormatClz,
                            sd.getSerdeInfo(),
                            resolvedSchema,
                            getPartitionKeyArray(),
                            tableProps,
                            hiveShim,
                            isCompressed);
            String extension =
                    Utilities.getFileExtension(
                            jobConf,
                            isCompressed,
                            (HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());

            OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder =
                    OutputFileConfig.builder()
                            .withPartPrefix("part-" + UUID.randomUUID())
                            .withPartSuffix(extension == null ? "" : extension);

            final int sinkParallelism =
                    Optional.ofNullable(configuredSinkParallelism)
                            .orElse(dataStream.getParallelism());
            boolean sinkParallelismConfigured = configuredSinkParallelism != null;
            if (isBounded) {
                TableMetaStoreFactory msFactory =
                        isInsertDirectory
                                ? new EmptyMetaStoreFactory(
                                        new org.apache.flink.core.fs.Path(sd.getLocation()))
                                : msFactory(autoGatherStatistic);
                // default to use the dest location as the parent directory of staging directory
                String stagingParentDir = sd.getLocation();
                if (isToLocal) {
                    // it's for writing to local file system, dest location is a path in local file
                    // system. we need to know the scratch path for
                    // non-local file system, so that it'll first write to the scratch path and then
                    // move to the local path
                    Path rootScratchDirPath =
                            new Path(
                                    HiveConf.getVar(
                                            HiveConfUtils.create(jobConf),
                                            HiveConf.ConfVars.SCRATCHDIR));
                    // TODO: may append something more meaningful than a timestamp, like query ID
                    Path scratchDir =
                            new Path(
                                    rootScratchDirPath, String.valueOf(System.currentTimeMillis()));
                    stagingParentDir = scratchDir.toUri().toString();
                }
                return createBatchSink(
                        dataStream,
                        converter,
                        writerFactory,
                        msFactory,
                        fileNamingBuilder,
                        stagingParentDir,
                        sd,
                        tableProps,
                        isToLocal,
                        overwrite,
                        sinkParallelism,
                        sinkParallelismConfigured);
            } else {
                if (overwrite) {
                    throw new IllegalStateException("Streaming mode not support overwrite.");
                }
                return createStreamSink(
                        providerContext,
                        dataStream,
                        sd,
                        tableProps,
                        writerFactory,
                        fileNamingBuilder,
                        sinkParallelism,
                        sinkParallelismConfigured);
            }
        } catch (IOException e) {
            throw new FlinkRuntimeException("Failed to create staging dir", e);
        } catch (ClassNotFoundException e) {
            throw new FlinkHiveException("Failed to get output format class", e);
        } catch (IllegalAccessException | InstantiationException e) {
            throw new FlinkHiveException("Failed to instantiate output format instance", e);
        }
    }