in catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java [722:851]
public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException {
try {
Namespace filesetNs =
NamespaceUtil.ofFileset(
ident.namespace().level(0), // metalake name
ident.namespace().level(1), // catalog name
ident.name() // schema name
);
List<FilesetEntity> filesets =
store.list(filesetNs, FilesetEntity.class, Entity.EntityType.FILESET);
if (!filesets.isEmpty() && !cascade) {
throw new NonEmptySchemaException("Schema %s is not empty", ident);
}
SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);
Map<String, String> properties =
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
Map<String, Path> schemaPaths = getAndCheckSchemaPaths(ident.name(), properties);
boolean dropped = super.dropSchema(ident, cascade);
filesetCache.invalidateAll(
filesets.stream().map(FilesetEntity::nameIdentifier).collect(Collectors.toList()));
if (disableFSOps) {
return dropped;
}
// If the schema entity is failed to be deleted, we should not delete the storage location
// and return false immediately.
if (!dropped) {
return false;
}
// Delete all the managed filesets no matter whether the storage location is under the
// schema path or not.
// The reason why we delete the managed fileset's storage location one by one is because we
// may mis-delete the storage location of the external fileset if it happens to be under
// the schema path.
ClassLoader cl = Thread.currentThread().getContextClassLoader();
filesets
.parallelStream()
.filter(f -> f.filesetType() == Fileset.Type.MANAGED)
.forEach(
f -> {
ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
try {
// parallelStream uses forkjoin thread pool, which has a different classloader
// than the catalog thread. We need to set the context classloader to the
// catalog's classloader to avoid classloading issues.
Thread.currentThread().setContextClassLoader(cl);
f.storageLocations()
.forEach(
(locationName, location) -> {
try {
Path filesetPath = new Path(location);
FileSystem fs = getFileSystem(filesetPath, conf);
if (fs.exists(filesetPath)) {
if (!fs.delete(filesetPath, true)) {
LOG.warn(
"Failed to delete fileset {} location: {} with location name: {}",
f.name(),
filesetPath,
locationName);
}
}
} catch (IOException ioe) {
LOG.warn(
"Failed to delete fileset {} location: {} with location name: {}",
f.name(),
location,
locationName,
ioe);
}
});
} finally {
Thread.currentThread().setContextClassLoader(oldCl);
}
});
// Delete the schema path if it exists and is empty.
if (!schemaPaths.isEmpty()) {
AtomicReference<RuntimeException> exception = new AtomicReference<>();
schemaPaths.forEach(
(locationName, schemaPath) -> {
try {
FileSystem fs = getFileSystem(schemaPath, conf);
if (fs.exists(schemaPath)) {
FileStatus[] statuses = fs.listStatus(schemaPath);
if (statuses.length == 0) {
if (fs.delete(schemaPath, true)) {
LOG.info(
"Deleted schema {} location {} with location name {}",
ident,
schemaPath,
locationName);
} else {
LOG.warn(
"Failed to delete schema {} because it has files/folders under location {} with location name {}",
ident,
schemaPath,
locationName);
}
}
}
} catch (IOException ioe) {
LOG.warn(
"Failed to delete schema {} location {} with location name {}",
ident,
schemaPath,
locationName,
ioe);
exception.set(
new RuntimeException("Failed to delete schema " + ident + " location", ioe));
}
});
if (exception.get() != null) {
throw exception.get();
}
}
LOG.info("Deleted schema {}", ident);
return true;
} catch (NoSuchEntityException ne) {
LOG.warn("Schema {} does not exist", ident);
return false;
} catch (IOException ioe) {
throw new RuntimeException("Failed to delete schema " + ident + " location", ioe);
}
}