in kinesis-taxi-stream-producer/src/main/java/com/amazonaws/flink/refarch/utils/TaxiEventReader.java [80:144]
public TripEvent next() {
String nextLine = null;
try {
nextLine = objectStream.readLine();
} catch (IOException | NullPointerException e) {
// if the next line cannot be read, that's fine, the next S3 object will be opened and read subsequently
}
if (nextLine == null) {
if (s3Objects.hasNext()) {
//try to open the next S3 object
S3ObjectSummary objectSummary = s3Objects.next();
String bucket = objectSummary.getBucketName();
String key = objectSummary.getKey();
//if another object has been previously read, close it before opening another one
if (s3Object != null) {
try {
s3Object.close();
} catch (IOException e) {
LOG.error("failed to close object: {}", e);
}
}
LOG.info("reading object {}/{}", bucket, key);
s3Object = s3.getObject(bucket, key);
InputStream stream = new BufferedInputStream(s3Object.getObjectContent());
try {
stream = new CompressorStreamFactory().createCompressorInputStream(stream);
} catch (CompressorException e) {
//if we cannot decompress a stream, that's fine, as it probably is just a stream of uncompressed data
LOG.debug("unable to decompress stream: {}", e.getMessage());
}
objectStream = new BufferedReader(new InputStreamReader(stream));
//try to read the next object from the newly opened stream
return next();
} else {
//if there is no next object to parse
hasNext = false;
return next;
}
} else {
TripEvent result = next;
try {
//parse the next event and return the current one
next = new TripEvent(nextLine);
return result;
} catch (IllegalArgumentException e) {
//if the current line cannot be parsed, just skip it and emit a warning
LOG.warn("ignoring line: {}", nextLine);
return next();
}
}
}