function processPendingBatch()

in index.js [602:813]


    function processPendingBatch(config, thisBatchId, s3Info) {
        // make the request for the current batch
        var currentBatchRequest = {
            Key: {
                batchId: {
                    S: thisBatchId
                },
                s3Prefix: {
                    S: s3Info.prefix
                }
            },
            TableName: batchTable,
            ConsistentRead: true
        };

        logger.debug("Loading current Batch record from prefix config");
        logger.debug(JSON.stringify(currentBatchRequest));

        dynamoDB.getItem(currentBatchRequest, function (err, data) {
            if (err) {
                if (err === provisionedThroughputExceeded) {
                    logger.warn("Provisioned Throughput Exceeded on read of " + batchTable);
                    callback();
                } else {
                    logger.error(JSON.stringify(err));
                    context.done(error, JSON.stringify(err));
                }
            } else if (!data || !data.Item) {
                var msg = "No open pending Batch " + thisBatchId;
                logger.error(msg);
                context.done(null, msg);
            } else {
                // first step is to resolve the earliest writeDate as the batch
                // creation date
                var batchCreateDate;
                data.Item.writeDates.NS.map(function (item) {
                    var t = parseInt(item);

                    if (!batchCreateDate || t < batchCreateDate) {
                        batchCreateDate = t;
                    }
                });

                var lastUpdateTime = data.Item.lastUpdate.N;

                /*
 * grab the pending entries from the locked
 * batch. We have 2 copies - a batch that uses StringSet from pre 2.7.8, and a List from 2.7.9
 */
                let pendingEntries = {};
                let pendingEntryCount = 0;
                if (data.Item.entryMap) {
                    pendingEntryCount += data.Item.entryMap.L.length;
                    pendingEntries["entryMap"] = data.Item.entryMap.L;
                }
                if (data.Item.entrySet) {
                    pendingEntryCount += data.Item.entries.SS.length;
                    pendingEntries["entrySet"] = data.Item.entries.SS;
                }
                var doProcessBatch = false;

                if (pendingEntryCount >= parseInt(config.batchSize.N)) {
                    logger.info("Batch count " + config.batchSize.N + " reached");
                    doProcessBatch = true;
                } else {
                    if (config.batchSize && config.batchSize.N) {
                        logger.debug("Current batch count of " + pendingEntryCount + " below batch limit of " + config.batchSize.N);
                    }
                }

                // check whether the current batch is bigger than the configured
                // max count, size, or older than configured max age
                if (config.batchTimeoutSecs && config.batchTimeoutSecs.N && pendingEntryCount > 0 && common.now() - batchCreateDate > parseInt(config.batchTimeoutSecs.N)) {
                    logger.info("Batch age " + config.batchTimeoutSecs.N + " seconds reached");
                    doProcessBatch = true;
                } else {
                    if (config.batchTimeoutSecs && config.batchTimeoutSecs.N) {
                        logger.debug("Current batch age of " + (common.now() - batchCreateDate) + " seconds below batch timeout: "
                            + (config.batchTimeoutSecs.N ? config.batchTimeoutSecs.N : "None Defined"));
                    }
                }

                if (config.batchSizeBytes && config.batchSizeBytes.N && pendingEntryCount > 0 && parseInt(config.batchSizeBytes.N) <= parseInt(data.Item.size.N)) {
                    logger.info("Batch size " + config.batchSizeBytes.N + " bytes reached");
                    doProcessBatch = true;
                } else {
                    if (data.Item.size.N) {
                        logger.debug("Current batch size of " + data.Item.size.N + " below batch threshold or not configured");
                    }
                }

                if (doProcessBatch) {
                    // set the current batch to locked status
                    var updateCurrentBatchStatus = {
                        Key: {
                            batchId: {
                                S: thisBatchId
                            },
                            s3Prefix: {
                                S: s3Info.prefix
                            }
                        },
                        TableName: batchTable,
                        AttributeUpdates: {
                            status: {
                                Action: 'PUT',
                                Value: {
                                    S: locked
                                }
                            },
                            lastUpdate: {
                                Action: 'PUT',
                                Value: {
                                    N: '' + common.now()
                                }
                            }
                        },
                        /*
						 * the batch to be processed has to be 'open', otherwise
						 * we'll have multiple processes all handling a single
						 * batch
						 */
                        Expected: {
                            status: {
                                AttributeValueList: [{
                                    S: open
                                }],
                                ComparisonOperator: 'EQ'
                            }
                        },
                        /*
						 * add the ALL_NEW return values so we have the most up
						 * to date version of the entries string set
						 */
                        ReturnValues: "ALL_NEW"
                    };

                    logger.debug("Attempting to lock Batch for processing");
                    logger.debug(JSON.stringify(updateCurrentBatchStatus));

                    common.retryableUpdate(dynamoDB, updateCurrentBatchStatus, function (err, data) {
                        if (err) {
                            if (err.code === conditionCheckFailed) {
                                // some other Lambda function has locked the
                                // batch - this is OK and we'll just exit
                                // quietly
                                logger.debug("Batch is ready to be processed, but another thread has locked it for loading");
                                context.done(null, null);
                            } else if (err.code === provisionedThroughputExceeded) {
                                logger.error("Provisioned Throughput Exceeded on " + batchTable + " while trying to lock Batch");
                                context.done(error, JSON.stringify(err));
                            } else {
                                logger.error("Unhandled exception while trying to lock Batch " + thisBatchId);
                                logger.error(JSON.stringify(err));
                                context.done(error, JSON.stringify(err));
                            }
                        } else {
                            if (!data || !data.Attributes) {
                                var e = "Unable to extract latest pending entries set from Locked batch";
                                logger.error(e);
                                context.done(error, e);
                            } else {
                                /*
								 * assign the loaded configuration a new batch
								 * ID
								 */
                                var allocateNewBatchRequest = {
                                    Key: {
                                        s3Prefix: {
                                            S: s3Info.prefix
                                        }
                                    },
                                    TableName: configTable,
                                    AttributeUpdates: {
                                        currentBatch: {
                                            Action: 'PUT',
                                            Value: {
                                                S: uuid.v4()
                                            }
                                        },
                                        lastBatchRotation: {
                                            Action: 'PUT',
                                            Value: {
                                                S: common.getFormattedDate()
                                            }
                                        }
                                    }
                                };

                                logger.debug("Allocating new Batch ID for future processing");
                                logger.debug(JSON.stringify(allocateNewBatchRequest));

                                common.retryableUpdate(dynamoDB, allocateNewBatchRequest, function (err, data) {
                                    if (err) {
                                        logger.error("Error while allocating new Pending Batch ID");
                                        logger.error(JSON.stringify(err));
                                        context.done(error, JSON.stringify(err));
                                    } else {
                                        // OK - let's create the manifest file
                                        createManifest(config, thisBatchId, s3Info, pendingEntries);
                                    }
                                });
                            }
                        }
                    });
                } else {
                    logger.info("No pending batch flush required");
                    context.done(null, null);
                }
            }
        });
    };