public void processSource()

in src/main/java/com/amazonaws/services/cloudtrail/processinglibrary/reader/EventReader.java [122:186]


    public void processSource (CloudTrailSource source) {
        boolean filterSourceOut = false;
        boolean downloadLogSuccess = true;
        boolean processSourceSuccess = false;

        ProgressStatus processSourceStatus = new ProgressStatus(ProgressState.processSource, new BasicProcessSourceInfo(source, processSourceSuccess));
        final Object processSourceReportObject = progressReporter.reportStart(processSourceStatus);

        // Start to process the source
        try {
            // Apply source filter first. If source filtered out then delete source immediately and return.
            if (!sourceFilter.filterSource(source)) {
                logger.debug("AWSCloudTrailSource " + source + " has been filtered out.");
                processSourceSuccess = true;
                filterSourceOut = true;

            } else {
                int nLogFilesToProcess = ((SQSBasedSource)source).getLogs().size();

                for (CloudTrailLog ctLog : ((SQSBasedSource)source).getLogs()) {
                    //start to process the log
                    boolean processLogSuccess = false;
                    ProgressStatus processLogStatus = new ProgressStatus(ProgressState.processLog, new BasicProcessLogInfo(source, ctLog, processLogSuccess));
                    final Object processLogReportObject = progressReporter.reportStart(processLogStatus);

                    try {
                        byte[] s3ObjectBytes = s3Manager.downloadLog(ctLog, source);
                        if (s3ObjectBytes == null) {
                            downloadLogSuccess = false;
                            continue; //Failure downloading log file. Skip it.
                        }

                        try (GZIPInputStream gzippedInputStream = new GZIPInputStream(new ByteArrayInputStream(s3ObjectBytes));
                            EventSerializer serializer = getEventSerializer(gzippedInputStream, ctLog)) {

                            emitEvents(serializer);

                            //decrement this value upon successfully processed a log
                            nLogFilesToProcess --;
                            processLogSuccess = true;

                        } catch (IllegalArgumentException | IOException e) {
                            LibraryUtils.handleException(exceptionHandler, processLogStatus, e, "Failed to parse log file.");
                        }

                    } finally {
                        //end to process the log
                        LibraryUtils.endToProcess(progressReporter, processLogSuccess, processLogStatus, processLogReportObject);
                    }
                }

                if (nLogFilesToProcess == 0) {
                    processSourceSuccess = true;
                }
            }

        } catch (CallbackException ex) {
            exceptionHandler.handleException(ex);

        } finally {
            cleanupMessage(filterSourceOut, downloadLogSuccess, processSourceSuccess, source);
            // end to process the source
            LibraryUtils.endToProcess(progressReporter, processSourceSuccess, processSourceStatus, processSourceReportObject);
        }
    }