in catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java [241:414]
public Fileset createMultipleLocationFileset(
NameIdentifier ident,
String comment,
Fileset.Type type,
Map<String, String> storageLocations,
Map<String, String> properties)
throws NoSuchSchemaException, FilesetAlreadyExistsException {
storageLocations.forEach(
(name, path) -> {
if (StringUtils.isBlank(name)) {
throw new IllegalArgumentException("Location name must not be blank");
}
});
// Check if the fileset already existed in cache first. If it does, it means the fileset is
// already created, so we should throw an exception.
if (filesetCache.getIfPresent(ident) != null) {
throw new FilesetAlreadyExistsException("Fileset %s already exists", ident);
}
try {
if (store.exists(ident, Entity.EntityType.FILESET)) {
throw new FilesetAlreadyExistsException("Fileset %s already exists", ident);
}
} catch (IOException ioe) {
throw new RuntimeException("Failed to check if fileset " + ident + " exists", ioe);
}
SchemaEntity schemaEntity;
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
try {
schemaEntity = store.get(schemaIdent, Entity.EntityType.SCHEMA, SchemaEntity.class);
} catch (NoSuchEntityException exception) {
throw new NoSuchSchemaException(exception, SCHEMA_DOES_NOT_EXIST_MSG, schemaIdent);
} catch (IOException ioe) {
throw new RuntimeException("Failed to load schema " + schemaIdent, ioe);
}
// For external fileset, the storageLocation must be set.
if (type == Fileset.Type.EXTERNAL) {
if (storageLocations.isEmpty()) {
throw new IllegalArgumentException(
"Storage location must be set for external fileset " + ident);
}
storageLocations.forEach(
(locationName, location) -> {
if (StringUtils.isBlank(location)) {
throw new IllegalArgumentException(
"Storage location must be set for external fileset "
+ ident
+ " with location name "
+ locationName);
}
});
}
// Either catalog property "location", or schema property "location", or storageLocation must be
// set for managed fileset.
Map<String, Path> schemaPaths =
getAndCheckSchemaPaths(schemaIdent.name(), schemaEntity.properties());
if (schemaPaths.isEmpty() && storageLocations.isEmpty()) {
throw new IllegalArgumentException(
"Storage location must be set for fileset "
+ ident
+ " when it's catalog and schema location are not set");
}
storageLocations.forEach((k, location) -> checkPlaceholderValue(location));
Map<String, Path> filesetPaths =
calculateFilesetPaths(
schemaIdent.name(), ident.name(), storageLocations, schemaPaths, properties);
properties = setDefaultLocationIfAbsent(properties, filesetPaths);
ImmutableMap.Builder<String, Path> filesetPathsBuilder = ImmutableMap.builder();
if (disableFSOps) {
filesetPaths.forEach(
(locationName, location) -> {
// If the location does not have scheme and filesystem operations are disabled in the
// server side, we cannot formalize the path by filesystem, neither can we do in the
// client side, so we should throw an exception here.
if (location.toUri().getScheme() == null) {
throw new IllegalArgumentException(
"Storage location must have scheme for fileset if filesystem operations are "
+ "disabled in the server side, location: "
+ location
+ ", location name: "
+ locationName);
}
filesetPathsBuilder.put(locationName, location);
});
} else {
try {
// formalize the path to avoid path without scheme, uri, authority, etc.
for (Map.Entry<String, Path> entry : filesetPaths.entrySet()) {
Path formalizePath = formalizePath(entry.getValue(), conf);
filesetPathsBuilder.put(entry.getKey(), formalizePath);
FileSystem fs = getFileSystem(formalizePath, conf);
if (!fs.exists(formalizePath)) {
if (!fs.mkdirs(formalizePath)) {
throw new RuntimeException(
"Failed to create fileset "
+ ident
+ " location "
+ formalizePath
+ " with location name "
+ entry.getKey());
}
LOG.info(
"Created fileset {} location {} with location name {}",
ident,
formalizePath,
entry.getKey());
} else {
LOG.info(
"Fileset {} manages the existing location {} with location name {}",
ident,
formalizePath,
entry.getKey());
}
}
} catch (IOException ioe) {
throw new RuntimeException("Failed to create fileset " + ident, ioe);
}
}
Map<String, String> formattedStorageLocations =
Maps.transformValues(filesetPathsBuilder.build(), Path::toString);
validateLocationHierarchy(
Maps.transformValues(schemaPaths, Path::toString), formattedStorageLocations);
StringIdentifier stringId = StringIdentifier.fromProperties(properties);
Preconditions.checkArgument(stringId != null, "Property String identifier should not be null");
FilesetEntity filesetEntity =
FilesetEntity.builder()
.withName(ident.name())
.withId(stringId.id())
.withNamespace(ident.namespace())
.withComment(comment)
.withFilesetType(type)
// Store the storageLocation to the store. If the "storageLocation" is null for managed
// fileset, Gravitino will get and store the location based on the catalog/schema's
// location and store it to the store.
.withStorageLocations(formattedStorageLocations)
.withProperties(properties)
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();
try {
store.put(filesetEntity, true /* overwrite */);
} catch (IOException ioe) {
throw new RuntimeException("Failed to create fileset " + ident, ioe);
}
HadoopFileset fileset =
HadoopFileset.builder()
.withName(ident.name())
.withComment(comment)
.withType(type)
.withStorageLocations(formattedStorageLocations)
.withProperties(filesetEntity.properties())
.withAuditInfo(filesetEntity.auditInfo())
.build();
filesetCache.put(ident, fileset);
return fileset;
}