private TableFunction getLookupFunction()

in flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java [159:276]


    private TableFunction<RowData> getLookupFunction(int[] keys) {

        final String defaultPartitionName = JobConfUtils.getDefaultPartitionName(jobConf);
        PartitionFetcher.Context<HiveTablePartition> fetcherContext =
                new HiveTablePartitionFetcherContext(
                        tablePath,
                        hiveShim,
                        new JobConfWrapper(jobConf),
                        catalogTable.getPartitionKeys(),
                        configuration,
                        defaultPartitionName);

        final PartitionFetcher<HiveTablePartition> partitionFetcher;
        // avoid lambda capture
        final ObjectPath tableFullPath = tablePath;
        if (catalogTable.getPartitionKeys().isEmpty()) {
            // non-partitioned table, the fetcher fetches the partition which represents the given
            // table.
            partitionFetcher =
                    context -> {
                        List<HiveTablePartition> partValueList = new ArrayList<>();
                        partValueList.add(
                                context.getPartition(new ArrayList<>())
                                        .orElseThrow(
                                                () ->
                                                        new IllegalArgumentException(
                                                                String.format(
                                                                        "Fetch partition fail for hive table %s.",
                                                                        tableFullPath))));
                        return partValueList;
                    };
        } else if (isStreamingSource()) {
            // streaming-read partitioned table, the fetcher fetches the latest partition of the
            // given table.
            partitionFetcher =
                    context -> {
                        List<HiveTablePartition> partValueList = new ArrayList<>();
                        List<PartitionFetcher.Context.ComparablePartitionValue>
                                comparablePartitionValues =
                                        context.getComparablePartitionValueList();
                        // fetch latest partitions for partitioned table
                        if (comparablePartitionValues.size() > 0) {
                            // sort in desc order
                            comparablePartitionValues.sort(
                                    (o1, o2) -> o2.getComparator().compareTo(o1.getComparator()));
                            PartitionFetcher.Context.ComparablePartitionValue maxPartition =
                                    comparablePartitionValues.get(0);
                            partValueList.add(
                                    context.getPartition(
                                                    (List<String>) maxPartition.getPartitionValue())
                                            .orElseThrow(
                                                    () ->
                                                            new IllegalArgumentException(
                                                                    String.format(
                                                                            "Fetch partition fail for hive table %s.",
                                                                            tableFullPath))));
                        } else {
                            throw new IllegalArgumentException(
                                    String.format(
                                            "At least one partition is required when set '%s' to 'latest' in temporal join,"
                                                    + " but actual partition number is '%s' for hive table %s",
                                            STREAMING_SOURCE_PARTITION_INCLUDE.key(),
                                            comparablePartitionValues.size(),
                                            tableFullPath));
                        }
                        return partValueList;
                    };
        } else {
            // bounded-read partitioned table, the fetcher fetches all partitions of the given
            // filesystem table.
            partitionFetcher =
                    context -> {
                        List<HiveTablePartition> partValueList = new ArrayList<>();
                        List<PartitionFetcher.Context.ComparablePartitionValue>
                                comparablePartitionValues =
                                        context.getComparablePartitionValueList();
                        for (PartitionFetcher.Context.ComparablePartitionValue
                                comparablePartitionValue : comparablePartitionValues) {
                            partValueList.add(
                                    context.getPartition(
                                                    (List<String>)
                                                            comparablePartitionValue
                                                                    .getPartitionValue())
                                            .orElseThrow(
                                                    () ->
                                                            new IllegalArgumentException(
                                                                    String.format(
                                                                            "Fetch partition fail for hive table %s.",
                                                                            tableFullPath))));
                        }
                        return partValueList;
                    };
        }

        PartitionReader<HiveTablePartition, RowData> partitionReader =
                new HiveInputFormatPartitionReader(
                        flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM),
                        jobConf,
                        hiveVersion,
                        tablePath,
                        DataType.getFieldDataTypes(
                                        catalogTable.getResolvedSchema().toPhysicalRowDataType())
                                .toArray(new DataType[0]),
                        DataType.getFieldNames(
                                        catalogTable.getResolvedSchema().toPhysicalRowDataType())
                                .toArray(new String[0]),
                        catalogTable.getPartitionKeys(),
                        projectedFields,
                        flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));

        return new FileSystemLookupFunction<>(
                partitionFetcher,
                fetcherContext,
                partitionReader,
                (RowType) producedDataType.getLogicalType(),
                keys,
                hiveTableReloadInterval);
    }