in streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java [124:195]
public void prepare(Object configurationObject) {
streamsConfiguration = StreamsConfigurator.detectConfiguration();
lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration);
// Connect to S3
synchronized (this) {
// Create the credentials Object
AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey());
ClientConfiguration clientConfig = new ClientConfiguration();
clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toString()));
// We do not want path style access
S3ClientOptions clientOptions = new S3ClientOptions();
clientOptions.setPathStyleAccess(false);
this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
if (StringUtils.isNotEmpty(s3ReaderConfiguration.getRegion())) {
this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion())));
}
this.amazonS3Client.setS3ClientOptions(clientOptions);
}
final ListObjectsRequest request = new ListObjectsRequest()
.withBucketName(this.s3ReaderConfiguration.getBucket())
.withPrefix(s3ReaderConfiguration.getReaderPath())
.withMaxKeys(500);
ObjectListing listing = this.amazonS3Client.listObjects(request);
this.files = new ArrayList<>();
/*
* If you can list files that are in this path, then you must be dealing with a directory
* if you cannot list files that are in this path, then you are most likely dealing with
* a simple file.
*/
boolean hasCommonPrefixes = listing.getCommonPrefixes().size() > 0;
boolean hasObjectSummaries = listing.getObjectSummaries().size() > 0;
if (hasCommonPrefixes || hasObjectSummaries) {
// Handle the 'directory' use case
do {
if (hasCommonPrefixes) {
for (String file : listing.getCommonPrefixes()) {
this.files.add(file);
}
} else {
for (final S3ObjectSummary objectSummary : listing.getObjectSummaries()) {
this.files.add(objectSummary.getKey());
}
}
// get the next batch.
listing = this.amazonS3Client.listNextBatchOfObjects(listing);
}
while (listing.isTruncated());
} else {
// handle the single file use-case
this.files.add(s3ReaderConfiguration.getReaderPath());
}
if (this.files.size() <= 0) {
LOGGER.error("There are no files to read");
}
this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(streamsConfiguration.getQueueSize().intValue()));
this.executor = Executors.newSingleThreadExecutor();
}