in src/main/java/com/amazonaws/services/cloudtrail/processinglibrary/reader/EventReader.java [122:186]
public void processSource (CloudTrailSource source) {
boolean filterSourceOut = false;
boolean downloadLogSuccess = true;
boolean processSourceSuccess = false;
ProgressStatus processSourceStatus = new ProgressStatus(ProgressState.processSource, new BasicProcessSourceInfo(source, processSourceSuccess));
final Object processSourceReportObject = progressReporter.reportStart(processSourceStatus);
// Start to process the source
try {
// Apply source filter first. If source filtered out then delete source immediately and return.
if (!sourceFilter.filterSource(source)) {
logger.debug("AWSCloudTrailSource " + source + " has been filtered out.");
processSourceSuccess = true;
filterSourceOut = true;
} else {
int nLogFilesToProcess = ((SQSBasedSource)source).getLogs().size();
for (CloudTrailLog ctLog : ((SQSBasedSource)source).getLogs()) {
//start to process the log
boolean processLogSuccess = false;
ProgressStatus processLogStatus = new ProgressStatus(ProgressState.processLog, new BasicProcessLogInfo(source, ctLog, processLogSuccess));
final Object processLogReportObject = progressReporter.reportStart(processLogStatus);
try {
byte[] s3ObjectBytes = s3Manager.downloadLog(ctLog, source);
if (s3ObjectBytes == null) {
downloadLogSuccess = false;
continue; //Failure downloading log file. Skip it.
}
try (GZIPInputStream gzippedInputStream = new GZIPInputStream(new ByteArrayInputStream(s3ObjectBytes));
EventSerializer serializer = getEventSerializer(gzippedInputStream, ctLog)) {
emitEvents(serializer);
//decrement this value upon successfully processed a log
nLogFilesToProcess --;
processLogSuccess = true;
} catch (IllegalArgumentException | IOException e) {
LibraryUtils.handleException(exceptionHandler, processLogStatus, e, "Failed to parse log file.");
}
} finally {
//end to process the log
LibraryUtils.endToProcess(progressReporter, processLogSuccess, processLogStatus, processLogReportObject);
}
}
if (nLogFilesToProcess == 0) {
processSourceSuccess = true;
}
}
} catch (CallbackException ex) {
exceptionHandler.handleException(ex);
} finally {
cleanupMessage(filterSourceOut, downloadLogSuccess, processSourceSuccess, source);
// end to process the source
LibraryUtils.endToProcess(progressReporter, processSourceSuccess, processSourceStatus, processSourceReportObject);
}
}