public EnumMap getSubscopedCreds()

in polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java [73:179]


  public EnumMap<StorageAccessProperty, String> getSubscopedCreds(
      @Nonnull PolarisDiagnostics diagnostics,
      @Nonnull AzureStorageConfigurationInfo storageConfig,
      boolean allowListOperation,
      @Nonnull Set<String> allowedReadLocations,
      @Nonnull Set<String> allowedWriteLocations) {
    EnumMap<StorageAccessProperty, String> credentialMap =
        new EnumMap<>(StorageAccessProperty.class);
    String loc =
        !allowedWriteLocations.isEmpty()
            ? allowedWriteLocations.stream().findAny().orElse(null)
            : allowedReadLocations.stream().findAny().orElse(null);
    if (loc == null) {
      throw new IllegalArgumentException("Expect valid location");
    }
    // schema://<container_name>@<account_name>.<endpoint>/<file_path>
    AzureLocation location = new AzureLocation(loc);
    validateAccountAndContainer(location, allowedReadLocations, allowedWriteLocations);

    String storageDnsName = location.getStorageAccount() + "." + location.getEndpoint();
    String filePath = location.getFilePath();

    BlobSasPermission blobSasPermission = new BlobSasPermission();
    // pathSasPermission is for Data lake storage
    PathSasPermission pathSasPermission = new PathSasPermission();

    if (allowListOperation) {
      // container level
      blobSasPermission.setListPermission(true);
      pathSasPermission.setListPermission(true);
    }
    if (!allowedReadLocations.isEmpty()) {
      blobSasPermission.setReadPermission(true);
      pathSasPermission.setReadPermission(true);
    }
    if (!allowedWriteLocations.isEmpty()) {
      blobSasPermission.setAddPermission(true);
      blobSasPermission.setWritePermission(true);
      blobSasPermission.setDeletePermission(true);
      pathSasPermission.setAddPermission(true);
      pathSasPermission.setWritePermission(true);
      pathSasPermission.setDeletePermission(true);
    }

    Instant start = Instant.now();
    OffsetDateTime expiry =
        OffsetDateTime.ofInstant(
            start.plusSeconds(3600), ZoneOffset.UTC); // 1 hr to sync with AWS and GCP Access token

    AccessToken accessToken = getAccessToken(storageConfig.getTenantId());
    // Get user delegation key.
    // Set the new generated user delegation key expiry to 7 days and minute 1 min
    // Azure strictly requires the end time to be <= 7 days from the current time, -1 min to avoid
    // clock skew between the client and server,
    OffsetDateTime startTime = start.truncatedTo(ChronoUnit.SECONDS).atOffset(ZoneOffset.UTC);
    int intendedDurationSeconds =
        FeatureConfiguration.loadConfig(FeatureConfiguration.STORAGE_CREDENTIAL_DURATION_SECONDS);
    OffsetDateTime intendedEndTime =
        start.plusSeconds(intendedDurationSeconds).atOffset(ZoneOffset.UTC);
    OffsetDateTime maxAllowedEndTime =
        start.plus(Period.ofDays(7)).minusSeconds(60).atOffset(ZoneOffset.UTC);
    OffsetDateTime sanitizedEndTime =
        intendedEndTime.isBefore(maxAllowedEndTime) ? intendedEndTime : maxAllowedEndTime;

    LOGGER
        .atDebug()
        .addKeyValue("allowedListAction", allowListOperation)
        .addKeyValue("allowedReadLoc", allowedReadLocations)
        .addKeyValue("allowedWriteLoc", allowedWriteLocations)
        .addKeyValue("location", loc)
        .addKeyValue("storageAccount", location.getStorageAccount())
        .addKeyValue("endpoint", location.getEndpoint())
        .addKeyValue("container", location.getContainer())
        .addKeyValue("filePath", filePath)
        .log("Subscope Azure SAS");
    String sasToken = "";
    if (location.getEndpoint().equalsIgnoreCase(AzureLocation.BLOB_ENDPOINT)) {
      sasToken =
          getBlobUserDelegationSas(
              startTime,
              sanitizedEndTime,
              expiry,
              storageDnsName,
              location.getContainer(),
              blobSasPermission,
              Mono.just(accessToken));
    } else if (location.getEndpoint().equalsIgnoreCase(AzureLocation.ADLS_ENDPOINT)) {
      sasToken =
          getAdlsUserDelegationSas(
              startTime,
              sanitizedEndTime,
              expiry,
              storageDnsName,
              location.getContainer(),
              pathSasPermission,
              Mono.just(accessToken));
    } else {
      throw new RuntimeException(
          String.format("Endpoint %s not supported", location.getEndpoint()));
    }
    credentialMap.put(StorageAccessProperty.AZURE_SAS_TOKEN, sasToken);
    credentialMap.put(StorageAccessProperty.AZURE_ACCOUNT_HOST, storageDnsName);
    credentialMap.put(
        StorageAccessProperty.EXPIRATION_TIME,
        String.valueOf(sanitizedEndTime.toInstant().toEpochMilli()));
    return credentialMap;
  }