in hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java [56:502]
static {
for (char c = 0; c < ' '; c++) {
CHAR_TO_ESCAPE.set(c);
}
/*
* ASCII 01-1F are HTTP control characters that need to be escaped.
* \u000A and \u000D are \n and \r, respectively.
*/
char[] clist = new char[] {'\u0001', '\u0002', '\u0003', '\u0004',
'\u0005', '\u0006', '\u0007', '\u0008', '\u0009', '\n', '\u000B',
'\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012',
'\u0013', '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019',
'\u001A', '\u001B', '\u001C', '\u001D', '\u001E', '\u001F',
'"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', '{',
'[', ']', '^'};
for (char c : clist) {
CHAR_TO_ESCAPE.set(c);
}
}
private static boolean needsEscaping(char c) {
return c < CHAR_TO_ESCAPE.size() && CHAR_TO_ESCAPE.get(c);
}
/**
* Make partition path from partition spec.
*
* @param partitionKVs The partition key value mapping
* @param hivePartition Whether the partition path is with Hive style,
* e.g. {partition key} = {partition value}
* @param sepSuffix Whether to append the path separator as suffix
* @return an escaped, valid partition name
*/
public static String generatePartitionPath(
LinkedHashMap<String, String> partitionKVs,
boolean hivePartition,
boolean sepSuffix) {
if (partitionKVs.isEmpty()) {
return "";
}
StringBuilder suffixBuf = new StringBuilder();
int i = 0;
for (Map.Entry<String, String> e : partitionKVs.entrySet()) {
if (i > 0) {
suffixBuf.append(StoragePath.SEPARATOR);
}
if (hivePartition) {
suffixBuf.append(escapePathName(e.getKey()));
suffixBuf.append('=');
}
suffixBuf.append(escapePathName(e.getValue()));
i++;
}
if (sepSuffix) {
suffixBuf.append(StoragePath.SEPARATOR);
}
return suffixBuf.toString();
}
/**
* Escapes a path name.
*
* @param path The path to escape.
* @return An escaped path name.
*/
private static String escapePathName(String path) {
if (path == null || path.length() == 0) {
throw new TableException("Path should not be null or empty: " + path);
}
StringBuilder sb = new StringBuilder();
for (int i = 0; i < path.length(); i++) {
char c = path.charAt(i);
if (needsEscaping(c)) {
sb.append('%');
sb.append(String.format("%1$02X", (int) c));
} else {
sb.append(c);
}
}
return sb.toString();
}
/**
* Generates partition key value mapping from path with special cases processing:
* - for date time type partition field
* - for field that does not exist in schema
* Common method for usage in both COW and MOR tables.
*
* @param filePath Partition file path
* @param fieldNames Table field names
* @param fieldTypes Table field types
* @param partDefaultName FlinkOptions.PARTITION_DEFAULT_NAME
* @param partPathField FlinkOptions.PARTITION_PATH_FIELD
* @param hiveStylePartitioning FlinkOptions.HIVE_STYLE_PARTITIONING
* @return Sequential partition specs
*/
public static LinkedHashMap<String, Object> generatePartitionSpecs(
String filePath,
List<String> fieldNames,
List<DataType> fieldTypes,
String partDefaultName,
String partPathField,
boolean hiveStylePartitioning) {
String[] partitionKeys = partPathField == null || FlinkOptions.PARTITION_PATH_FIELD.defaultValue().equals(partPathField)
? new String[0]
: partPathField.split(",");
LinkedHashMap<String, String> partSpec = extractPartitionKeyValues(
new org.apache.hadoop.fs.Path(filePath).getParent(),
hiveStylePartitioning,
partitionKeys);
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
partSpec.forEach((k, v) -> {
final int idx = fieldNames.indexOf(k);
if (idx == -1) {
// for any rare cases that the partition field does not exist in schema,
// fallback to file read
return;
}
DataType fieldType = fieldTypes.get(idx);
if (!DataTypeUtils.isDatetimeType(fieldType)) {
// date time type partition field is formatted specifically,
// read directly from the data file to avoid format mismatch or precision loss
partObjects.put(k, DataTypeUtils.resolvePartition(partDefaultName.equals(v) ? null : v, fieldType));
}
});
return partObjects;
}
/**
* Generates partition key value mapping from path.
*
* @param currPath Partition file path
* @param hivePartition Whether the partition path is with Hive style
* @param partitionKeys Partition keys
* @return Sequential partition specs.
*/
public static LinkedHashMap<String, String> extractPartitionKeyValues(
Path currPath,
boolean hivePartition,
String[] partitionKeys) {
LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<>();
if (partitionKeys.length == 0) {
return fullPartSpec;
}
List<String[]> kvs = new ArrayList<>();
int curDepth = 0;
do {
String component = currPath.getName();
final String[] kv = new String[2];
if (hivePartition) {
Matcher m = HIVE_PARTITION_NAME_PATTERN.matcher(component);
if (m.matches()) {
String k = unescapePathName(m.group(1));
String v = unescapePathName(m.group(2));
kv[0] = k;
kv[1] = v;
}
} else {
kv[0] = partitionKeys[partitionKeys.length - 1 - curDepth];
kv[1] = unescapePathName(component);
}
kvs.add(kv);
currPath = currPath.getParent();
curDepth++;
} while (currPath != null && !currPath.getName().isEmpty() && curDepth < partitionKeys.length);
// reverse the list since we checked the part from leaf dir to table's base dir
for (int i = kvs.size(); i > 0; i--) {
fullPartSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]);
}
return fullPartSpec;
}
public static String unescapePathName(String path) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < path.length(); i++) {
char c = path.charAt(i);
if (c == '%' && i + 2 < path.length()) {
int code = -1;
try {
code = Integer.parseInt(path.substring(i + 1, i + 3), 16);
} catch (Exception ignored) {
// do nothing
}
if (code >= 0) {
sb.append((char) code);
i += 2;
continue;
}
}
sb.append(c);
}
return sb.toString();
}
/**
* Search all partitions in this path.
*
* @param fs File system
* @param path Search path
* @param hivePartition Whether the partition path is with Hive style
* @param partitionKeys Partition keys
* @return all partition key value mapping in sequence of the given path
*/
public static List<Tuple2<LinkedHashMap<String, String>, Path>> searchPartKeyValueAndPaths(
FileSystem fs,
Path path,
boolean hivePartition,
String[] partitionKeys) {
// expectLevel start from 0, E.G. base_path/level0/level1/level2
FileStatus[] generatedParts = getFileStatusRecursively(path, partitionKeys.length, fs);
List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList<>();
for (FileStatus part : generatedParts) {
ret.add(
new Tuple2<>(
extractPartitionKeyValues(part.getPath(), hivePartition, partitionKeys),
part.getPath()));
}
return ret;
}
public static FileStatus[] getFileStatusRecursively(Path path, int expectLevel, Configuration conf) {
return getFileStatusRecursively(path, expectLevel, HadoopFSUtils.getFs(path.toString(), conf));
}
public static FileStatus[] getFileStatusRecursively(Path path, int expectLevel, FileSystem fs) {
ArrayList<FileStatus> result = new ArrayList<>();
try {
FileStatus fileStatus = fs.getFileStatus(path);
listStatusRecursively(fs, fileStatus, 0, expectLevel, result);
} catch (IOException ignore) {
return new FileStatus[0];
}
return result.toArray(new FileStatus[0]);
}
private static void listStatusRecursively(
FileSystem fs,
FileStatus fileStatus,
int level,
int expectLevel,
List<FileStatus> results) throws IOException {
if (expectLevel == level && !isHiddenFile(fileStatus)) {
results.add(fileStatus);
return;
}
if (fileStatus.isDirectory() && !isHiddenFile(fileStatus)) {
for (FileStatus stat : fs.listStatus(fileStatus.getPath())) {
listStatusRecursively(fs, stat, level + 1, expectLevel, results);
}
}
}
private static boolean isHiddenFile(FileStatus fileStatus) {
String name = fileStatus.getPath().getName();
// the log files is hidden file
return name.startsWith("_") || (name.startsWith(".") && !name.contains(".log."));
}
/**
* Returns the partition path key and values as a list of map, each map item in the list
* is a mapping of the partition key name to its actual partition value. For example, say
* there is a file path with partition keys [key1, key2, key3]:
*
* <p><pre>
* -- file:/// ... key1=val1/key2=val2/key3=val3
* -- file:/// ... key1=val4/key2=val5/key3=val6
* </pre>
*
* <p>The return list should be [{key1:val1, key2:val2, key3:val3}, {key1:val4, key2:val5, key3:val6}].
*
* @param path The base path
* @param hadoopConf The hadoop configuration
* @param partitionKeys The partition key list
* @param defaultParName The default partition name for nulls
* @param hivePartition Whether the partition path is in Hive style
*/
public static List<Map<String, String>> getPartitions(
Path path,
Configuration hadoopConf,
List<String> partitionKeys,
String defaultParName,
boolean hivePartition) {
try {
return FilePathUtils
.searchPartKeyValueAndPaths(
HadoopFSUtils.getFs(path.toString(), hadoopConf),
path,
hivePartition,
partitionKeys.toArray(new String[0]))
.stream()
.map(tuple2 -> tuple2.f0)
.map(spec -> {
LinkedHashMap<String, String> ret = new LinkedHashMap<>();
spec.forEach((k, v) -> ret.put(k, defaultParName.equals(v) ? null : v));
return ret;
})
.collect(Collectors.toList());
} catch (Exception e) {
throw new TableException("Fetch partitions fail.", e);
}
}
/**
* Reorder the partition key value mapping based on the given partition keys sequence.
*
* @param partitionKVs The partition key and value mapping
* @param partitionKeys The partition key list
*/
public static LinkedHashMap<String, String> validateAndReorderPartitions(
Map<String, String> partitionKVs,
List<String> partitionKeys) {
if (partitionKeys.size() == 0) {
// in case the partition fields are not in schema
return new LinkedHashMap<>(partitionKVs);
}
LinkedHashMap<String, String> map = new LinkedHashMap<>();
for (String k : partitionKeys) {
if (!partitionKVs.containsKey(k)) {
throw new TableException("Partition keys are: " + partitionKeys
+ ", incomplete partition spec: " + partitionKVs);
}
map.put(k, partitionKVs.get(k));
}
return map;
}
/**
* Returns all the file paths that is the parents of the data files.
*
* @param path The base path
* @param conf The Flink configuration
* @param hadoopConf The hadoop configuration
* @param partitionKeys The partition key list
*/
public static Path[] getReadPaths(
Path path,
org.apache.flink.configuration.Configuration conf,
Configuration hadoopConf,
List<String> partitionKeys) {
if (partitionKeys.isEmpty()) {
return new Path[] {path};
} else {
final String defaultParName = conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
final boolean hivePartition = conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING);
List<Map<String, String>> partitionPaths =
getPartitions(path, hadoopConf, partitionKeys, defaultParName, hivePartition);
return partitionPath2ReadPath(path, partitionKeys, partitionPaths, hivePartition);
}
}
/**
* Transforms the given partition key value mapping to read paths.
*
* @param path The base path
* @param partitionKeys The partition key list
* @param partitionPaths The partition key value mapping
* @param hivePartition Whether the partition path is in Hive style
* @see #getReadPaths
*/
public static Path[] partitionPath2ReadPath(
Path path,
List<String> partitionKeys,
List<Map<String, String>> partitionPaths,
boolean hivePartition) {
return partitionPaths.stream()
.map(m -> validateAndReorderPartitions(m, partitionKeys))
.map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition, true))
.map(n -> new Path(path, n))
.toArray(Path[]::new);
}
/**
* Transforms the given partition key value mapping to relative partition paths.
*
* @param partitionKeys The partition key list
* @param partitionPaths The partition key value mapping
* @param hivePartition Whether the partition path is in Hive style
* @see #getReadPaths
*/
public static Set<String> toRelativePartitionPaths(
List<String> partitionKeys,
List<Map<String, String>> partitionPaths,
boolean hivePartition) {
return partitionPaths.stream()
.map(m -> validateAndReorderPartitions(m, partitionKeys))
.map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition, false))
.collect(Collectors.toSet());
}
/**
* Transforms the array of Hadoop paths to Flink paths.
*/
public static org.apache.flink.core.fs.Path[] toFlinkPaths(Path[] paths) {
return Arrays.stream(paths)
.map(FilePathUtils::toFlinkPath)
.toArray(org.apache.flink.core.fs.Path[]::new);
}
/**
* Transforms the Hadoop path to Flink path.
*/
public static org.apache.flink.core.fs.Path toFlinkPath(Path path) {
return new org.apache.flink.core.fs.Path(path.toUri());
}
public static org.apache.flink.core.fs.Path toFlinkPath(StoragePath path) {
return new org.apache.flink.core.fs.Path(path.toUri());
}
/**
* Extracts the partition keys with given configuration.
*
* @param conf The flink configuration
* @return array of the partition fields
*/
public static String[] extractPartitionKeys(org.apache.flink.configuration.Configuration conf) {
if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PARTITION_PATH_FIELD)) {
return new String[0];
}
return conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
}
/**
* Extracts the hive sync partition fields with given configuration.
*
* @param conf The flink configuration
* @return array of the hive partition fields
*/
public static String[] extractHivePartitionFields(org.apache.flink.configuration.Configuration conf) {
if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_PARTITION_FIELDS)) {
return extractPartitionKeys(conf);
}
return conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS).split(",");
}
public static boolean isHiveStylePartitioning(String path) {
return HIVE_PARTITION_NAME_PATTERN.matcher(path).matches();
}
}