public boolean dropSchema()

in catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java [722:851]


  public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException {
    try {
      Namespace filesetNs =
          NamespaceUtil.ofFileset(
              ident.namespace().level(0), // metalake name
              ident.namespace().level(1), // catalog name
              ident.name() // schema name
              );

      List<FilesetEntity> filesets =
          store.list(filesetNs, FilesetEntity.class, Entity.EntityType.FILESET);
      if (!filesets.isEmpty() && !cascade) {
        throw new NonEmptySchemaException("Schema %s is not empty", ident);
      }

      SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);
      Map<String, String> properties =
          Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
      Map<String, Path> schemaPaths = getAndCheckSchemaPaths(ident.name(), properties);

      boolean dropped = super.dropSchema(ident, cascade);
      filesetCache.invalidateAll(
          filesets.stream().map(FilesetEntity::nameIdentifier).collect(Collectors.toList()));
      if (disableFSOps) {
        return dropped;
      }

      // If the schema entity is failed to be deleted, we should not delete the storage location
      // and return false immediately.
      if (!dropped) {
        return false;
      }

      // Delete all the managed filesets no matter whether the storage location is under the
      // schema path or not.
      // The reason why we delete the managed fileset's storage location one by one is because we
      // may mis-delete the storage location of the external fileset if it happens to be under
      // the schema path.
      ClassLoader cl = Thread.currentThread().getContextClassLoader();
      filesets
          .parallelStream()
          .filter(f -> f.filesetType() == Fileset.Type.MANAGED)
          .forEach(
              f -> {
                ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
                try {
                  // parallelStream uses forkjoin thread pool, which has a different classloader
                  // than the catalog thread. We need to set the context classloader to the
                  // catalog's classloader to avoid classloading issues.
                  Thread.currentThread().setContextClassLoader(cl);
                  f.storageLocations()
                      .forEach(
                          (locationName, location) -> {
                            try {
                              Path filesetPath = new Path(location);
                              FileSystem fs = getFileSystem(filesetPath, conf);
                              if (fs.exists(filesetPath)) {
                                if (!fs.delete(filesetPath, true)) {
                                  LOG.warn(
                                      "Failed to delete fileset {} location: {} with location name: {}",
                                      f.name(),
                                      filesetPath,
                                      locationName);
                                }
                              }
                            } catch (IOException ioe) {
                              LOG.warn(
                                  "Failed to delete fileset {} location: {} with location name: {}",
                                  f.name(),
                                  location,
                                  locationName,
                                  ioe);
                            }
                          });
                } finally {
                  Thread.currentThread().setContextClassLoader(oldCl);
                }
              });

      // Delete the schema path if it exists and is empty.
      if (!schemaPaths.isEmpty()) {
        AtomicReference<RuntimeException> exception = new AtomicReference<>();
        schemaPaths.forEach(
            (locationName, schemaPath) -> {
              try {
                FileSystem fs = getFileSystem(schemaPath, conf);
                if (fs.exists(schemaPath)) {
                  FileStatus[] statuses = fs.listStatus(schemaPath);
                  if (statuses.length == 0) {
                    if (fs.delete(schemaPath, true)) {
                      LOG.info(
                          "Deleted schema {} location {} with location name {}",
                          ident,
                          schemaPath,
                          locationName);
                    } else {
                      LOG.warn(
                          "Failed to delete schema {} because it has files/folders under location {} with location name {}",
                          ident,
                          schemaPath,
                          locationName);
                    }
                  }
                }
              } catch (IOException ioe) {
                LOG.warn(
                    "Failed to delete schema {} location {} with location name {}",
                    ident,
                    schemaPath,
                    locationName,
                    ioe);
                exception.set(
                    new RuntimeException("Failed to delete schema " + ident + " location", ioe));
              }
            });
        if (exception.get() != null) {
          throw exception.get();
        }
      }

      LOG.info("Deleted schema {}", ident);
      return true;

    } catch (NoSuchEntityException ne) {
      LOG.warn("Schema {} does not exist", ident);
      return false;
    } catch (IOException ioe) {
      throw new RuntimeException("Failed to delete schema " + ident + " location", ioe);
    }
  }