in flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java [159:276]
private TableFunction<RowData> getLookupFunction(int[] keys) {
final String defaultPartitionName = JobConfUtils.getDefaultPartitionName(jobConf);
PartitionFetcher.Context<HiveTablePartition> fetcherContext =
new HiveTablePartitionFetcherContext(
tablePath,
hiveShim,
new JobConfWrapper(jobConf),
catalogTable.getPartitionKeys(),
configuration,
defaultPartitionName);
final PartitionFetcher<HiveTablePartition> partitionFetcher;
// avoid lambda capture
final ObjectPath tableFullPath = tablePath;
if (catalogTable.getPartitionKeys().isEmpty()) {
// non-partitioned table, the fetcher fetches the partition which represents the given
// table.
partitionFetcher =
context -> {
List<HiveTablePartition> partValueList = new ArrayList<>();
partValueList.add(
context.getPartition(new ArrayList<>())
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
"Fetch partition fail for hive table %s.",
tableFullPath))));
return partValueList;
};
} else if (isStreamingSource()) {
// streaming-read partitioned table, the fetcher fetches the latest partition of the
// given table.
partitionFetcher =
context -> {
List<HiveTablePartition> partValueList = new ArrayList<>();
List<PartitionFetcher.Context.ComparablePartitionValue>
comparablePartitionValues =
context.getComparablePartitionValueList();
// fetch latest partitions for partitioned table
if (comparablePartitionValues.size() > 0) {
// sort in desc order
comparablePartitionValues.sort(
(o1, o2) -> o2.getComparator().compareTo(o1.getComparator()));
PartitionFetcher.Context.ComparablePartitionValue maxPartition =
comparablePartitionValues.get(0);
partValueList.add(
context.getPartition(
(List<String>) maxPartition.getPartitionValue())
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
"Fetch partition fail for hive table %s.",
tableFullPath))));
} else {
throw new IllegalArgumentException(
String.format(
"At least one partition is required when set '%s' to 'latest' in temporal join,"
+ " but actual partition number is '%s' for hive table %s",
STREAMING_SOURCE_PARTITION_INCLUDE.key(),
comparablePartitionValues.size(),
tableFullPath));
}
return partValueList;
};
} else {
// bounded-read partitioned table, the fetcher fetches all partitions of the given
// filesystem table.
partitionFetcher =
context -> {
List<HiveTablePartition> partValueList = new ArrayList<>();
List<PartitionFetcher.Context.ComparablePartitionValue>
comparablePartitionValues =
context.getComparablePartitionValueList();
for (PartitionFetcher.Context.ComparablePartitionValue
comparablePartitionValue : comparablePartitionValues) {
partValueList.add(
context.getPartition(
(List<String>)
comparablePartitionValue
.getPartitionValue())
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
"Fetch partition fail for hive table %s.",
tableFullPath))));
}
return partValueList;
};
}
PartitionReader<HiveTablePartition, RowData> partitionReader =
new HiveInputFormatPartitionReader(
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM),
jobConf,
hiveVersion,
tablePath,
DataType.getFieldDataTypes(
catalogTable.getResolvedSchema().toPhysicalRowDataType())
.toArray(new DataType[0]),
DataType.getFieldNames(
catalogTable.getResolvedSchema().toPhysicalRowDataType())
.toArray(new String[0]),
catalogTable.getPartitionKeys(),
projectedFields,
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
return new FileSystemLookupFunction<>(
partitionFetcher,
fetcherContext,
partitionReader,
(RowType) producedDataType.getLogicalType(),
keys,
hiveTableReloadInterval);
}