public void prepare()

in streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java [124:195]


  public void prepare(Object configurationObject) {

    streamsConfiguration = StreamsConfigurator.detectConfiguration();

    lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration);

    // Connect to S3
    synchronized (this) {
      // Create the credentials Object
      AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey());

      ClientConfiguration clientConfig = new ClientConfiguration();
      clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toString()));

      // We do not want path style access
      S3ClientOptions clientOptions = new S3ClientOptions();
      clientOptions.setPathStyleAccess(false);

      this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
      if (StringUtils.isNotEmpty(s3ReaderConfiguration.getRegion())) {
        this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion())));
      }
      this.amazonS3Client.setS3ClientOptions(clientOptions);
    }

    final ListObjectsRequest request = new ListObjectsRequest()
        .withBucketName(this.s3ReaderConfiguration.getBucket())
        .withPrefix(s3ReaderConfiguration.getReaderPath())
        .withMaxKeys(500);


    ObjectListing listing = this.amazonS3Client.listObjects(request);

    this.files = new ArrayList<>();

    /*
     * If you can list files that are in this path, then you must be dealing with a directory
     * if you cannot list files that are in this path, then you are most likely dealing with
     * a simple file.
     */
    boolean hasCommonPrefixes = listing.getCommonPrefixes().size() > 0;
    boolean hasObjectSummaries = listing.getObjectSummaries().size() > 0;

    if (hasCommonPrefixes || hasObjectSummaries) {
      // Handle the 'directory' use case
      do {
        if (hasCommonPrefixes) {
          for (String file : listing.getCommonPrefixes()) {
            this.files.add(file);
          }
        } else {
          for (final S3ObjectSummary objectSummary : listing.getObjectSummaries()) {
            this.files.add(objectSummary.getKey());
          }
        }

        // get the next batch.
        listing = this.amazonS3Client.listNextBatchOfObjects(listing);
      }
      while (listing.isTruncated());
    } else {
      // handle the single file use-case
      this.files.add(s3ReaderConfiguration.getReaderPath());
    }

    if (this.files.size() <= 0) {
      LOGGER.error("There are no files to read");
    }

    this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(streamsConfiguration.getQueueSize().intValue()));
    this.executor = Executors.newSingleThreadExecutor();
  }