public TripEvent next()

in kinesis-taxi-stream-producer/src/main/java/com/amazonaws/flink/refarch/utils/TaxiEventReader.java [80:144]


  public TripEvent next() {
    String nextLine = null;

    try {
      nextLine = objectStream.readLine();
    } catch (IOException | NullPointerException e) {
      // if the next line cannot be read, that's fine, the next S3 object will be opened and read subsequently
    }

    if (nextLine == null) {
      if (s3Objects.hasNext()) {
        //try to open the next S3 object

        S3ObjectSummary objectSummary = s3Objects.next();
        String bucket = objectSummary.getBucketName();
        String key = objectSummary.getKey();

        //if another object has been previously read, close it before opening another one
        if (s3Object != null) {
          try {
            s3Object.close();
          } catch (IOException e) {
            LOG.error("failed to close object: {}", e);
          }
        }

        LOG.info("reading object {}/{}", bucket, key);

        s3Object = s3.getObject(bucket, key);
        InputStream stream = new BufferedInputStream(s3Object.getObjectContent());

        try {
          stream =  new CompressorStreamFactory().createCompressorInputStream(stream);
        } catch (CompressorException e) {
          //if we cannot decompress a stream, that's fine, as it probably is just a stream of uncompressed data
          LOG.debug("unable to decompress stream: {}", e.getMessage());
        }

        objectStream = new BufferedReader(new InputStreamReader(stream));

        //try to read the next object from the newly opened stream
        return next();
      } else {
        //if there is no next object to parse
        hasNext = false;

        return next;
      }
    } else {
      TripEvent result = next;

      try {
        //parse the next event and return the current one
          next = new TripEvent(nextLine);

        return result;
      } catch (IllegalArgumentException e) {
        //if the current line cannot be parsed, just skip it and emit a warning

        LOG.warn("ignoring line: {}", nextLine);

        return next();
      }
    }
  }