public Fileset createMultipleLocationFileset()

in catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java [241:414]


  public Fileset createMultipleLocationFileset(
      NameIdentifier ident,
      String comment,
      Fileset.Type type,
      Map<String, String> storageLocations,
      Map<String, String> properties)
      throws NoSuchSchemaException, FilesetAlreadyExistsException {
    storageLocations.forEach(
        (name, path) -> {
          if (StringUtils.isBlank(name)) {
            throw new IllegalArgumentException("Location name must not be blank");
          }
        });

    // Check if the fileset already existed in cache first. If it does, it means the fileset is
    // already created, so we should throw an exception.
    if (filesetCache.getIfPresent(ident) != null) {
      throw new FilesetAlreadyExistsException("Fileset %s already exists", ident);
    }

    try {
      if (store.exists(ident, Entity.EntityType.FILESET)) {
        throw new FilesetAlreadyExistsException("Fileset %s already exists", ident);
      }
    } catch (IOException ioe) {
      throw new RuntimeException("Failed to check if fileset " + ident + " exists", ioe);
    }

    SchemaEntity schemaEntity;
    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
    try {
      schemaEntity = store.get(schemaIdent, Entity.EntityType.SCHEMA, SchemaEntity.class);
    } catch (NoSuchEntityException exception) {
      throw new NoSuchSchemaException(exception, SCHEMA_DOES_NOT_EXIST_MSG, schemaIdent);
    } catch (IOException ioe) {
      throw new RuntimeException("Failed to load schema " + schemaIdent, ioe);
    }

    // For external fileset, the storageLocation must be set.
    if (type == Fileset.Type.EXTERNAL) {
      if (storageLocations.isEmpty()) {
        throw new IllegalArgumentException(
            "Storage location must be set for external fileset " + ident);
      }
      storageLocations.forEach(
          (locationName, location) -> {
            if (StringUtils.isBlank(location)) {
              throw new IllegalArgumentException(
                  "Storage location must be set for external fileset "
                      + ident
                      + " with location name "
                      + locationName);
            }
          });
    }

    // Either catalog property "location", or schema property "location", or storageLocation must be
    // set for managed fileset.
    Map<String, Path> schemaPaths =
        getAndCheckSchemaPaths(schemaIdent.name(), schemaEntity.properties());
    if (schemaPaths.isEmpty() && storageLocations.isEmpty()) {
      throw new IllegalArgumentException(
          "Storage location must be set for fileset "
              + ident
              + " when it's catalog and schema location are not set");
    }
    storageLocations.forEach((k, location) -> checkPlaceholderValue(location));

    Map<String, Path> filesetPaths =
        calculateFilesetPaths(
            schemaIdent.name(), ident.name(), storageLocations, schemaPaths, properties);
    properties = setDefaultLocationIfAbsent(properties, filesetPaths);

    ImmutableMap.Builder<String, Path> filesetPathsBuilder = ImmutableMap.builder();
    if (disableFSOps) {
      filesetPaths.forEach(
          (locationName, location) -> {
            // If the location does not have scheme and filesystem operations are disabled in the
            // server side, we cannot formalize the path by filesystem, neither can we do in the
            // client side, so we should throw an exception here.
            if (location.toUri().getScheme() == null) {
              throw new IllegalArgumentException(
                  "Storage location must have scheme for fileset if filesystem operations are "
                      + "disabled in the server side, location: "
                      + location
                      + ", location name: "
                      + locationName);
            }

            filesetPathsBuilder.put(locationName, location);
          });
    } else {
      try {
        // formalize the path to avoid path without scheme, uri, authority, etc.
        for (Map.Entry<String, Path> entry : filesetPaths.entrySet()) {
          Path formalizePath = formalizePath(entry.getValue(), conf);
          filesetPathsBuilder.put(entry.getKey(), formalizePath);

          FileSystem fs = getFileSystem(formalizePath, conf);
          if (!fs.exists(formalizePath)) {
            if (!fs.mkdirs(formalizePath)) {
              throw new RuntimeException(
                  "Failed to create fileset "
                      + ident
                      + " location "
                      + formalizePath
                      + " with location name "
                      + entry.getKey());
            }

            LOG.info(
                "Created fileset {} location {} with location name {}",
                ident,
                formalizePath,
                entry.getKey());
          } else {
            LOG.info(
                "Fileset {} manages the existing location {} with location name {}",
                ident,
                formalizePath,
                entry.getKey());
          }
        }

      } catch (IOException ioe) {
        throw new RuntimeException("Failed to create fileset " + ident, ioe);
      }
    }

    Map<String, String> formattedStorageLocations =
        Maps.transformValues(filesetPathsBuilder.build(), Path::toString);
    validateLocationHierarchy(
        Maps.transformValues(schemaPaths, Path::toString), formattedStorageLocations);

    StringIdentifier stringId = StringIdentifier.fromProperties(properties);
    Preconditions.checkArgument(stringId != null, "Property String identifier should not be null");

    FilesetEntity filesetEntity =
        FilesetEntity.builder()
            .withName(ident.name())
            .withId(stringId.id())
            .withNamespace(ident.namespace())
            .withComment(comment)
            .withFilesetType(type)
            // Store the storageLocation to the store. If the "storageLocation" is null for managed
            // fileset, Gravitino will get and store the location based on the catalog/schema's
            // location and store it to the store.
            .withStorageLocations(formattedStorageLocations)
            .withProperties(properties)
            .withAuditInfo(
                AuditInfo.builder()
                    .withCreator(PrincipalUtils.getCurrentPrincipal().getName())
                    .withCreateTime(Instant.now())
                    .build())
            .build();

    try {
      store.put(filesetEntity, true /* overwrite */);
    } catch (IOException ioe) {
      throw new RuntimeException("Failed to create fileset " + ident, ioe);
    }

    HadoopFileset fileset =
        HadoopFileset.builder()
            .withName(ident.name())
            .withComment(comment)
            .withType(type)
            .withStorageLocations(formattedStorageLocations)
            .withProperties(filesetEntity.properties())
            .withAuditInfo(filesetEntity.auditInfo())
            .build();
    filesetCache.put(ident, fileset);
    return fileset;
  }