in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java [574:854]
public void initialize(URI name, Configuration originalConf)
throws IOException {
// get the host; this is guaranteed to be non-null, non-empty
bucket = name.getHost();
AuditSpan span = null;
// track initialization duration; will only be set after
// statistics are set up.
Optional<DurationTracker> trackInitialization = Optional.empty();
try {
LOG.debug("Initializing S3AFileSystem for {}", bucket);
if (LOG.isTraceEnabled()) {
// log a full trace for deep diagnostics of where an object is created,
// for tracking down memory leak issues.
LOG.trace("Filesystem for {} created; fs.s3a.impl.disable.cache = {}",
name, originalConf.getBoolean("fs.s3a.impl.disable.cache", false),
new RuntimeException(super.toString()));
}
// clone the configuration into one with propagated bucket options
Configuration conf = propagateBucketOptions(originalConf, bucket);
// HADOOP-17894. remove references to s3a stores in JCEKS credentials.
conf = ProviderUtils.excludeIncompatibleCredentialProviders(
conf, S3AFileSystem.class);
String arn = String.format(ARN_BUCKET_OPTION, bucket);
String configuredArn = conf.getTrimmed(arn, "");
if (!configuredArn.isEmpty()) {
accessPoint = ArnResource.accessPointFromArn(configuredArn);
LOG.info("Using AccessPoint ARN \"{}\" for bucket {}", configuredArn, bucket);
bucket = accessPoint.getFullArn();
} else if (conf.getBoolean(AWS_S3_ACCESSPOINT_REQUIRED, false)) {
LOG.warn("Access Point usage is required because \"{}\" is enabled," +
" but not configured for the bucket: {}", AWS_S3_ACCESSPOINT_REQUIRED, bucket);
throw new PathIOException(bucket, AP_REQUIRED_EXCEPTION);
}
// fix up the classloader of the configuration to be whatever
// classloader loaded this filesystem.
// See: HADOOP-17372 and follow-up on HADOOP-18993
S3AUtils.maybeIsolateClassloader(conf, this.getClass().getClassLoader());
// patch the Hadoop security providers
patchSecurityCredentialProviders(conf);
// look for delegation token support early.
boolean delegationTokensEnabled = hasDelegationTokenBinding(conf);
if (delegationTokensEnabled) {
LOG.debug("Using delegation tokens");
}
// set the URI, this will do any fixup of the URI to remove secrets,
// canonicalize.
setUri(name, delegationTokensEnabled);
super.initialize(uri, conf);
setConf(conf);
// initialize statistics, after which statistics
// can be collected.
instrumentation = new S3AInstrumentation(uri);
initializeStatisticsBinding();
// track initialization duration.
// this should really be done in a onceTrackingDuration() call,
// but then all methods below would need to be in the lambda and
// it would create a merge/backport headache for all.
trackInitialization = Optional.of(
instrumentation.trackDuration(FileSystemStatisticNames.FILESYSTEM_INITIALIZATION));
s3aInternals = createS3AInternals();
// look for encryption data
// DT Bindings may override this
setEncryptionSecrets(
buildEncryptionSecrets(bucket, conf));
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
// If encryption method is set to CSE-KMS or CSE-CUSTOM then CSE is enabled.
isCSEEnabled = CSEUtils.isCSEEnabled(getS3EncryptionAlgorithm().getMethod());
isAnalyticsAcceleratorEnabled = StreamIntegration.determineInputStreamType(conf)
.equals(InputStreamType.Analytics);
// Create the appropriate fsHandler instance using a factory method
fsHandler = createFileSystemHandler();
fsHandler.setCSEGauge((IOStatisticsStore) getIOStatistics());
// Username is the current user at the time the FS was instantiated.
owner = UserGroupInformation.getCurrentUser();
username = owner.getShortUserName();
workingDir = new Path("/user", username)
.makeQualified(this.uri, this.getWorkingDirectory());
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
partSize = getMultipartSizeProperty(conf,
MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
multiPartThreshold = getMultipartSizeProperty(conf,
MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD);
//check but do not store the block size
longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
// determine and cache the endpoints
endpoint = accessPoint == null
? conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)
: accessPoint.getEndpoint();
configuredRegion = accessPoint == null
? conf.getTrimmed(AWS_REGION)
: accessPoint.getRegion();
fipsEnabled = conf.getBoolean(FIPS_ENDPOINT, ENDPOINT_FIPS_DEFAULT);
// is this an S3Express store?
s3ExpressStore = isS3ExpressStore(bucket, endpoint);
// should the delete also purge uploads?
// happens if explicitly enabled, or if the store is S3Express storage.
dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS,
s3ExpressStore);
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
// multipart copy and upload are the same; this just makes it explicit
this.isMultipartCopyEnabled = isMultipartUploadEnabled;
int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
if (listVersion < 1 || listVersion > 2) {
LOG.warn("Configured fs.s3a.list.version {} is invalid, forcing " +
"version 2", listVersion);
}
useListV1 = (listVersion == 1);
if (accessPoint != null && useListV1) {
LOG.warn("V1 list configured in fs.s3a.list.version. This is not supported in by" +
" access points. Upgrading to V2");
useListV1 = false;
}
conditionalCreateEnabled = conf.getBoolean(FS_S3A_CONDITIONAL_CREATE_ENABLED,
DEFAULT_FS_S3A_CONDITIONAL_CREATE_ENABLED);
signerManager = new SignerManager(bucket, this, conf, owner);
signerManager.initCustomSigners();
// start auditing
// extra configuration will be passed down later.
initializeAuditService();
// create the requestFactory.
// requires the audit manager to be initialized.
requestFactory = createRequestFactory();
// create an initial span for all other operations.
span = createSpan(INITIALIZE_SPAN, bucket, null);
// creates the AWS client, including overriding auth chain if
// the FS came with a DT
// this may do some patching of the configuration (e.g. setting
// the encryption algorithms)
// requires the audit manager to be initialized.
ClientManager clientManager = createClientManager(name, delegationTokensEnabled);
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE,
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT),
S3AInputPolicy.Normal);
LOG.debug("Input fadvise policy = {}", inputPolicy);
changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf);
LOG.debug("Change detection policy = {}", changeDetectionPolicy);
boolean magicCommitterEnabled = conf.getBoolean(
CommitConstants.MAGIC_COMMITTER_ENABLED,
CommitConstants.DEFAULT_MAGIC_COMMITTER_ENABLED);
LOG.debug("Filesystem support for magic committers {} enabled",
magicCommitterEnabled ? "is" : "is not");
committerIntegration = new MagicCommitIntegration(
this, magicCommitterEnabled);
boolean blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, true);
if (!blockUploadEnabled) {
LOG.warn("The \"slow\" output stream is no longer supported");
}
blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
DEFAULT_FAST_UPLOAD_BUFFER);
blockFactory = S3ADataBlocks.createFactory(createStoreContext(), blockOutputBuffer);
blockOutputActiveBlocks = intOption(conf,
FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
// If CSE is enabled, do multipart uploads serially.
if (isCSEEnabled) {
blockOutputActiveBlocks = 1;
}
LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
" queue limit={}; multipart={}",
blockOutputBuffer, partSize, blockOutputActiveBlocks, isMultipartUploadEnabled);
// verify there's no S3Guard in the store config.
checkNoS3Guard(this.getUri(), getConf());
// read in performance options and parse them to a list of flags.
performanceFlags = buildFlagSet(
PerformanceFlagEnum.class,
conf,
FS_S3A_PERFORMANCE_FLAGS,
true);
// performance creation flag for code which wants performance
// at the risk of overwrites.
// this uses the performance flags as the default and then
// updates the performance flags to match.
// a bit convoluted.
boolean performanceCreation = conf.getBoolean(FS_S3A_CREATE_PERFORMANCE,
performanceFlags.enabled(PerformanceFlagEnum.Create));
performanceFlags.set(PerformanceFlagEnum.Create, performanceCreation);
// freeze.
performanceFlags.makeImmutable();
LOG.debug("{} = {}", FS_S3A_CREATE_PERFORMANCE, performanceCreation);
pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
"page size out of range: %s", pageSize);
listing = new Listing(listingOperationCallbacks, createStoreContext());
// now the open file logic
openFileHelper = new OpenFileSupport(
changeDetectionPolicy,
longBytesOption(conf, READAHEAD_RANGE,
DEFAULT_READAHEAD_RANGE, 0),
username,
intOption(conf, IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT, 0),
longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
inputPolicy);
scheme = (this.uri != null && this.uri.getScheme() != null) ? this.uri.getScheme() : FS_S3A;
optimizedCopyFromLocal = conf.getBoolean(OPTIMIZED_COPY_FROM_LOCAL,
OPTIMIZED_COPY_FROM_LOCAL_DEFAULT);
LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal);
s3AccessGrantsEnabled = conf.getBoolean(AWS_S3_ACCESS_GRANTS_ENABLED, false);
int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
// now create and initialize the store
store = createS3AStore(clientManager, rateLimitCapacity);
// the s3 client is created through the store, rather than
// directly through the client manager.
// this is to aid mocking.
s3Client = getStore().getOrCreateS3Client();
// get the input stream factory requirements.
final StreamFactoryRequirements factoryRequirements =
getStore().factoryRequirements();
// If the input stream can issue get requests outside spans,
// the auditor is forced to disable rejection of unaudited requests.
final EnumSet<AuditorFlags> flags = EnumSet.noneOf(AuditorFlags.class);
if (factoryRequirements.requires(ExpectUnauditedGetRequests)) {
flags.add(AuditorFlags.PermitOutOfBandOperations);
}
getAuditManager().setAuditFlags(flags);
// get the vector IO context from the factory.
vectoredIOContext = factoryRequirements.vectoredIOContext();
// thread pool init requires store to be created and
// the stream factory requirements to include its own requirements.
initThreadPools();
// The filesystem is now ready to perform operations against
// S3
// This initiates a probe against S3 for the bucket existing.
doBucketProbing();
initMultipartUploads(conf);
trackInitialization.ifPresent(DurationTracker::close);
} catch (SdkException e) {
// amazon client exception: stop all services then throw the translation
cleanupWithLogger(LOG, span);
stopAllServices();
trackInitialization.ifPresent(DurationTracker::failed);
throw translateException("initializing ", new Path(name), e);
} catch (IOException | RuntimeException e) {
// other exceptions: stop the services.
cleanupWithLogger(LOG, span);
stopAllServices();
trackInitialization.ifPresent(DurationTracker::failed);
throw e;
}
}