public void initialize()

in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java [574:854]


  public void initialize(URI name, Configuration originalConf)
      throws IOException {
    // get the host; this is guaranteed to be non-null, non-empty
    bucket = name.getHost();
    AuditSpan span = null;
    // track initialization duration; will only be set after
    // statistics are set up.
    Optional<DurationTracker> trackInitialization = Optional.empty();
    try {
      LOG.debug("Initializing S3AFileSystem for {}", bucket);
      if (LOG.isTraceEnabled()) {
        // log a full trace for deep diagnostics of where an object is created,
        // for tracking down memory leak issues.
        LOG.trace("Filesystem for {} created; fs.s3a.impl.disable.cache = {}",
            name, originalConf.getBoolean("fs.s3a.impl.disable.cache", false),
            new RuntimeException(super.toString()));
      }
      // clone the configuration into one with propagated bucket options
      Configuration conf = propagateBucketOptions(originalConf, bucket);
      // HADOOP-17894. remove references to s3a stores in JCEKS credentials.
      conf = ProviderUtils.excludeIncompatibleCredentialProviders(
          conf, S3AFileSystem.class);
      String arn = String.format(ARN_BUCKET_OPTION, bucket);
      String configuredArn = conf.getTrimmed(arn, "");
      if (!configuredArn.isEmpty()) {
        accessPoint = ArnResource.accessPointFromArn(configuredArn);
        LOG.info("Using AccessPoint ARN \"{}\" for bucket {}", configuredArn, bucket);
        bucket = accessPoint.getFullArn();
      } else if (conf.getBoolean(AWS_S3_ACCESSPOINT_REQUIRED, false)) {
        LOG.warn("Access Point usage is required because \"{}\" is enabled," +
            " but not configured for the bucket: {}", AWS_S3_ACCESSPOINT_REQUIRED, bucket);
        throw new PathIOException(bucket, AP_REQUIRED_EXCEPTION);
      }

      // fix up the classloader of the configuration to be whatever
      // classloader loaded this filesystem.
      // See: HADOOP-17372 and follow-up on HADOOP-18993
      S3AUtils.maybeIsolateClassloader(conf, this.getClass().getClassLoader());

      // patch the Hadoop security providers
      patchSecurityCredentialProviders(conf);
      // look for delegation token support early.
      boolean delegationTokensEnabled = hasDelegationTokenBinding(conf);
      if (delegationTokensEnabled) {
        LOG.debug("Using delegation tokens");
      }
      // set the URI, this will do any fixup of the URI to remove secrets,
      // canonicalize.
      setUri(name, delegationTokensEnabled);
      super.initialize(uri, conf);
      setConf(conf);

      // initialize statistics, after which statistics
      // can be collected.
      instrumentation = new S3AInstrumentation(uri);
      initializeStatisticsBinding();

      // track initialization duration.
      // this should really be done in a onceTrackingDuration() call,
      // but then all methods below would need to be in the lambda and
      // it would create a merge/backport headache for all.
      trackInitialization = Optional.of(
          instrumentation.trackDuration(FileSystemStatisticNames.FILESYSTEM_INITIALIZATION));

      s3aInternals = createS3AInternals();

      // look for encryption data
      // DT Bindings may override this
      setEncryptionSecrets(
          buildEncryptionSecrets(bucket, conf));

      invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);

      // If encryption method is set to CSE-KMS or CSE-CUSTOM then CSE is enabled.
      isCSEEnabled = CSEUtils.isCSEEnabled(getS3EncryptionAlgorithm().getMethod());

      isAnalyticsAcceleratorEnabled = StreamIntegration.determineInputStreamType(conf)
          .equals(InputStreamType.Analytics);

      // Create the appropriate fsHandler instance using a factory method
      fsHandler = createFileSystemHandler();
      fsHandler.setCSEGauge((IOStatisticsStore) getIOStatistics());
      // Username is the current user at the time the FS was instantiated.
      owner = UserGroupInformation.getCurrentUser();
      username = owner.getShortUserName();
      workingDir = new Path("/user", username)
          .makeQualified(this.uri, this.getWorkingDirectory());

      maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
      partSize = getMultipartSizeProperty(conf,
          MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
      multiPartThreshold = getMultipartSizeProperty(conf,
          MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD);

      //check but do not store the block size
      longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
      enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);

      // determine and cache the endpoints
      endpoint = accessPoint == null
          ? conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)
          : accessPoint.getEndpoint();

      configuredRegion = accessPoint == null
          ? conf.getTrimmed(AWS_REGION)
          : accessPoint.getRegion();

      fipsEnabled = conf.getBoolean(FIPS_ENDPOINT, ENDPOINT_FIPS_DEFAULT);

      // is this an S3Express store?
      s3ExpressStore = isS3ExpressStore(bucket, endpoint);

      // should the delete also purge uploads?
      // happens if explicitly enabled, or if the store is S3Express storage.
      dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS,
          s3ExpressStore);

      this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
          DEFAULT_MULTIPART_UPLOAD_ENABLED);
      // multipart copy and upload are the same; this just makes it explicit
      this.isMultipartCopyEnabled = isMultipartUploadEnabled;

      int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
      if (listVersion < 1 || listVersion > 2) {
        LOG.warn("Configured fs.s3a.list.version {} is invalid, forcing " +
            "version 2", listVersion);
      }
      useListV1 = (listVersion == 1);
      if (accessPoint != null && useListV1) {
        LOG.warn("V1 list configured in fs.s3a.list.version. This is not supported in by" +
            " access points. Upgrading to V2");
        useListV1 = false;
      }
      conditionalCreateEnabled = conf.getBoolean(FS_S3A_CONDITIONAL_CREATE_ENABLED,
                DEFAULT_FS_S3A_CONDITIONAL_CREATE_ENABLED);


      signerManager = new SignerManager(bucket, this, conf, owner);
      signerManager.initCustomSigners();

      // start auditing
      // extra configuration will be passed down later.
      initializeAuditService();

      // create the requestFactory.
      // requires the audit manager to be initialized.
      requestFactory = createRequestFactory();

      // create an initial span for all other operations.
      span = createSpan(INITIALIZE_SPAN, bucket, null);

      // creates the AWS client, including overriding auth chain if
      // the FS came with a DT
      // this may do some patching of the configuration (e.g. setting
      // the encryption algorithms)
      // requires the audit manager to be initialized.
      ClientManager clientManager = createClientManager(name, delegationTokensEnabled);

      inputPolicy = S3AInputPolicy.getPolicy(
          conf.getTrimmed(INPUT_FADVISE,
              Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT),
          S3AInputPolicy.Normal);
      LOG.debug("Input fadvise policy = {}", inputPolicy);
      changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf);
      LOG.debug("Change detection policy = {}", changeDetectionPolicy);
      boolean magicCommitterEnabled = conf.getBoolean(
          CommitConstants.MAGIC_COMMITTER_ENABLED,
          CommitConstants.DEFAULT_MAGIC_COMMITTER_ENABLED);
      LOG.debug("Filesystem support for magic committers {} enabled",
          magicCommitterEnabled ? "is" : "is not");
      committerIntegration = new MagicCommitIntegration(
          this, magicCommitterEnabled);

      boolean blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, true);

      if (!blockUploadEnabled) {
        LOG.warn("The \"slow\" output stream is no longer supported");
      }
      blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
          DEFAULT_FAST_UPLOAD_BUFFER);
      blockFactory = S3ADataBlocks.createFactory(createStoreContext(), blockOutputBuffer);
      blockOutputActiveBlocks = intOption(conf,
          FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
      // If CSE is enabled, do multipart uploads serially.
      if (isCSEEnabled) {
        blockOutputActiveBlocks = 1;
      }
      LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
              " queue limit={}; multipart={}",
          blockOutputBuffer, partSize, blockOutputActiveBlocks, isMultipartUploadEnabled);
      // verify there's no S3Guard in the store config.
      checkNoS3Guard(this.getUri(), getConf());

      // read in performance options and parse them to a list of flags.
      performanceFlags = buildFlagSet(
          PerformanceFlagEnum.class,
          conf,
          FS_S3A_PERFORMANCE_FLAGS,
          true);
      // performance creation flag for code which wants performance
      // at the risk of overwrites.
      // this uses the performance flags as the default and then
      // updates the performance flags to match.
      // a bit convoluted.
      boolean performanceCreation = conf.getBoolean(FS_S3A_CREATE_PERFORMANCE,
          performanceFlags.enabled(PerformanceFlagEnum.Create));
      performanceFlags.set(PerformanceFlagEnum.Create, performanceCreation);
      // freeze.
      performanceFlags.makeImmutable();

      LOG.debug("{} = {}", FS_S3A_CREATE_PERFORMANCE, performanceCreation);

      pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
          BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
      checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
              "page size out of range: %s", pageSize);
      listing = new Listing(listingOperationCallbacks, createStoreContext());
      // now the open file logic
      openFileHelper = new OpenFileSupport(
          changeDetectionPolicy,
          longBytesOption(conf, READAHEAD_RANGE,
              DEFAULT_READAHEAD_RANGE, 0),
          username,
          intOption(conf, IO_FILE_BUFFER_SIZE_KEY,
              IO_FILE_BUFFER_SIZE_DEFAULT, 0),
          longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
                        DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
          inputPolicy);
      scheme = (this.uri != null && this.uri.getScheme() != null) ? this.uri.getScheme() : FS_S3A;
      optimizedCopyFromLocal = conf.getBoolean(OPTIMIZED_COPY_FROM_LOCAL,
          OPTIMIZED_COPY_FROM_LOCAL_DEFAULT);
      LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal);
      s3AccessGrantsEnabled = conf.getBoolean(AWS_S3_ACCESS_GRANTS_ENABLED, false);

      int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);

      // now create and initialize the store
      store = createS3AStore(clientManager, rateLimitCapacity);
      // the s3 client is created through the store, rather than
      // directly through the client manager.
      // this is to aid mocking.
      s3Client = getStore().getOrCreateS3Client();

      // get the input stream factory requirements.
      final StreamFactoryRequirements factoryRequirements =
          getStore().factoryRequirements();

      // If the input stream can issue get requests outside spans,
      // the auditor is forced to disable rejection of unaudited requests.
      final EnumSet<AuditorFlags> flags = EnumSet.noneOf(AuditorFlags.class);
      if (factoryRequirements.requires(ExpectUnauditedGetRequests)) {
        flags.add(AuditorFlags.PermitOutOfBandOperations);
      }
      getAuditManager().setAuditFlags(flags);
      // get the vector IO context from the factory.
      vectoredIOContext = factoryRequirements.vectoredIOContext();

      // thread pool init requires store to be created and
      // the stream factory requirements to include its own requirements.
      initThreadPools();

      // The filesystem is now ready to perform operations against
      // S3
      // This initiates a probe against S3 for the bucket existing.
      doBucketProbing();
      initMultipartUploads(conf);
      trackInitialization.ifPresent(DurationTracker::close);
    } catch (SdkException e) {
      // amazon client exception: stop all services then throw the translation
      cleanupWithLogger(LOG, span);
      stopAllServices();
      trackInitialization.ifPresent(DurationTracker::failed);
      throw translateException("initializing ", new Path(name), e);
    } catch (IOException | RuntimeException e) {
      // other exceptions: stop the services.
      cleanupWithLogger(LOG, span);
      stopAllServices();
      trackInitialization.ifPresent(DurationTracker::failed);
      throw e;
    }
  }