public static List getMetadataObjectLocation()

in core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java [444:558]


  public static List<String> getMetadataObjectLocation(
      NameIdentifier ident, Entity.EntityType type) {
    List<String> locations = new ArrayList<>();

    // If we don't enable authorization, the location should return empty collection.
    if (GravitinoEnv.getInstance().accessControlDispatcher() == null) {
      return locations;
    }

    try {
      switch (type) {
        case METALAKE:
          break;
        case CATALOG:
          {
            Catalog catalogObj = GravitinoEnv.getInstance().catalogDispatcher().loadCatalog(ident);
            if (catalogObj.provider().equals("hive")) {
              // The Hive default schema location is Hive warehouse directory
              String defaultSchemaLocation =
                  getHiveDefaultLocation(ident.namespace().level(0), ident.name());
              if (defaultSchemaLocation != null && !defaultSchemaLocation.isEmpty()) {
                locations.add(defaultSchemaLocation);
              }
            }
          }
          break;
        case SCHEMA:
          Catalog catalogObj =
              GravitinoEnv.getInstance()
                  .catalogDispatcher()
                  .loadCatalog(
                      NameIdentifier.of(ident.namespace().level(0), ident.namespace().level(1)));
          Schema schema = GravitinoEnv.getInstance().schemaDispatcher().loadSchema(ident);

          switch (catalogObj.type()) {
            case RELATIONAL:
              if ("hive".equals(catalogObj.provider())
                  && schema.properties().containsKey(HIVE_LOCATION)) {
                String schemaLocation = schema.properties().get(HIVE_LOCATION);
                if (StringUtils.isNotBlank(schemaLocation)) {
                  locations.add(schemaLocation);
                } else {
                  LOG.warn("Schema {} location is not found", ident);
                }
              }
              break;

            case FILESET:
              if ("hadoop".equals(catalogObj.provider())) {
                if (schema.properties().containsKey(FILESET_SCHEMA_LOCATION)) {
                  String schemaLocation = schema.properties().get(FILESET_SCHEMA_LOCATION);
                  if (StringUtils.isNotBlank(schemaLocation)) {
                    locations.add(schemaLocation);
                  } else if (catalogObj.properties().containsKey(FILESET_CATALOG_LOCATION)) {
                    String catalogLocation = schema.properties().get(FILESET_CATALOG_LOCATION);
                    if (StringUtils.isNotBlank(catalogLocation)) {
                      schemaLocation = catalogLocation + "/" + schema.name();
                      locations.add(schemaLocation);
                    }
                  } else {
                    LOG.warn("Schema {} location is not found", ident);
                  }
                }
              }
              break;

            default:
              LOG.warn("Unsupported catalog type {}", catalogObj.type());
              break;
          }
          break;

        case TABLE:
          {
            catalogObj =
                GravitinoEnv.getInstance()
                    .catalogDispatcher()
                    .loadCatalog(
                        NameIdentifier.of(ident.namespace().level(0), ident.namespace().level(1)));
            if (catalogObj.provider().equals("hive")) {
              Table table = GravitinoEnv.getInstance().tableDispatcher().loadTable(ident);
              if (table.properties().containsKey(HiveConstants.LOCATION)) {
                String tableLocation = table.properties().get(HiveConstants.LOCATION);
                if (StringUtils.isNotBlank(tableLocation)) {
                  locations.add(tableLocation);
                } else {
                  LOG.warn("Table {} location is not found", ident);
                }
              }
            }
          }
          break;
        case FILESET:
          FilesetDispatcher filesetDispatcher = GravitinoEnv.getInstance().filesetDispatcher();
          Fileset fileset = filesetDispatcher.loadFileset(ident);
          Preconditions.checkArgument(
              fileset != null, String.format("Fileset %s is not found", ident));
          String filesetLocation = fileset.storageLocation();
          Preconditions.checkArgument(
              filesetLocation != null, String.format("Fileset %s location is not found", ident));
          locations.add(filesetLocation);
          break;
        case TOPIC:
          // Topic doesn't have locations now.
          break;
        default:
          throw new AuthorizationPluginException(
              "Failed to get location paths for metadata object %s type %s", ident, type);
      }
    } catch (Exception e) {
      LOG.warn("Failed to get location paths for metadata object {} type {}", ident, type, e);
    }

    return locations;
  }