in src/main/java/com/amazonaws/services/cloudtrail/processinglibrary/reader/EventReader.java [122:186]
    public void processSource (CloudTrailSource source) {
        boolean filterSourceOut = false;
        boolean downloadLogSuccess = true;
        boolean processSourceSuccess = false;
        ProgressStatus processSourceStatus = new ProgressStatus(ProgressState.processSource, new BasicProcessSourceInfo(source, processSourceSuccess));
        final Object processSourceReportObject = progressReporter.reportStart(processSourceStatus);
        // Start to process the source
        try {
            // Apply source filter first. If source filtered out then delete source immediately and return.
            if (!sourceFilter.filterSource(source)) {
                logger.debug("AWSCloudTrailSource " + source + " has been filtered out.");
                processSourceSuccess = true;
                filterSourceOut = true;
            } else {
                int nLogFilesToProcess = ((SQSBasedSource)source).getLogs().size();
                for (CloudTrailLog ctLog : ((SQSBasedSource)source).getLogs()) {
                    //start to process the log
                    boolean processLogSuccess = false;
                    ProgressStatus processLogStatus = new ProgressStatus(ProgressState.processLog, new BasicProcessLogInfo(source, ctLog, processLogSuccess));
                    final Object processLogReportObject = progressReporter.reportStart(processLogStatus);
                    try {
                        byte[] s3ObjectBytes = s3Manager.downloadLog(ctLog, source);
                        if (s3ObjectBytes == null) {
                            downloadLogSuccess = false;
                            continue; //Failure downloading log file. Skip it.
                        }
                        try (GZIPInputStream gzippedInputStream = new GZIPInputStream(new ByteArrayInputStream(s3ObjectBytes));
                            EventSerializer serializer = getEventSerializer(gzippedInputStream, ctLog)) {
                            emitEvents(serializer);
                            //decrement this value upon successfully processed a log
                            nLogFilesToProcess --;
                            processLogSuccess = true;
                        } catch (IllegalArgumentException | IOException e) {
                            LibraryUtils.handleException(exceptionHandler, processLogStatus, e, "Failed to parse log file.");
                        }
                    } finally {
                        //end to process the log
                        LibraryUtils.endToProcess(progressReporter, processLogSuccess, processLogStatus, processLogReportObject);
                    }
                }
                if (nLogFilesToProcess == 0) {
                    processSourceSuccess = true;
                }
            }
        } catch (CallbackException ex) {
            exceptionHandler.handleException(ex);
        } finally {
            cleanupMessage(filterSourceOut, downloadLogSuccess, processSourceSuccess, source);
            // end to process the source
            LibraryUtils.endToProcess(progressReporter, processSourceSuccess, processSourceStatus, processSourceReportObject);
        }
    }