function()

in index.js [328:456]


            function (callback) {
                tryNumber++;

                // build the reference to the pending batch, with an atomic add
                // of the current file
                var now = common.now();
                var item = {
                    Key: {
                        batchId: {
                            S: thisBatchId
                        },
                        s3Prefix: {
                            S: s3Info.prefix
                        }
                    },
                    TableName: batchTable,
                    UpdateExpression: "add writeDates :appendFileDate, size :size set #stat = :open, lastUpdate = :updateTime, entryMap = list_append(if_not_exists(entryMap, :emptyList), :entry)",
                    ExpressionAttributeNames: {
                        "#stat": 'status'
                    },
                    ExpressionAttributeValues: {
                        ":entry": {
                            L: [{
                                M: {
                                    "file": {S: itemEntry},
                                    "size": {N: '' + s3Info.size}
                                }
                            }
                            ]
                        },
                        ":emptyList": {
                            L: []
                        },
                        ":appendFileDate": {
                            NS: ['' + now]
                        },
                        ":updateTime": {
                            N: '' + now
                        },
                        ":open": {
                            S: open
                        },
                        ":size": {
                            N: '' + s3Info.size
                        }
                    },
                    /*
					 * current batch can't be locked
					 */
                    ConditionExpression: "#stat = :open or attribute_not_exists(#stat)"
                };

                logger.debug(JSON.stringify(item));

                // add the file to the pending batch
                dynamoDB.updateItem(item, function (err, data) {
                    if (err) {
                        let waitFor = Math.min(Math.pow(tryNumber, 2) * 10, maxRetryMS);

                        if (err.code === provisionedThroughputExceeded) {
                            logger.warn("Provisioned Throughput Exceeded on addition of " + s3Info.prefix + " to pending batch " + thisBatchId + ". Trying again in " + waitFor + " ms");
                            setTimeout(callback, waitFor);
                        } else if (err.code === conditionCheckFailed) {
                            // the batch I have a reference to was locked so
                            // reload the current batch ID from the config
                            var configReloadRequest = {
                                Key: {
                                    s3Prefix: {
                                        S: s3Info.prefix
                                    }
                                },
                                TableName: configTable,
                                /*
								 * we need a consistent read here to ensure we
								 * get the latest batch ID
								 */
                                ConsistentRead: true
                            };
                            dynamoDB.getItem(configReloadRequest, function (err, data) {
                                configReloads++;
                                if (err) {
                                    if (err === provisionedThroughputExceeded) {
                                        logger.warn("Provisioned Throughput Exceeded on reload of " + configTable + " due to locked batch write");
                                        callback();
                                    } else {
                                        console.log(err);
                                        callback(err);
                                    }
                                } else {
                                    if (data.Item.currentBatch.S === thisBatchId) {
                                        // we've obtained the same batch ID back
                                        // from the configuration as we have
                                        // now, meaning it hasn't yet rotated
                                        logger.warn("Batch " + thisBatchId + " still current after configuration reload attempt " + configReloads + ". Recycling in " + waitFor + " ms.");

                                        // because the batch hasn't been
                                        // reloaded on the configuration, we'll
                                        // backoff here for a moment to let that
                                        // happen
                                        setTimeout(callback, waitFor);
                                    } else {
                                        // we've got an updated batch id, so use
                                        // this in the next cycle of file add
                                        thisBatchId = data.Item.currentBatch.S;

                                        logger.warn("Obtained new Batch ID " + thisBatchId + " after configuration reload. Attempt " + configReloads);

                                        /*
										 * callback immediately, as we should
										 * now have a valid and open batch to
										 * use
										 */
                                        callback();
                                    }
                                }
                            });
                        } else {
                            asyncError = err;
                            proceed = true;
                            callback();
                        }
                    } else {
                        // no error - the file was added to the batch, so mark
                        // the operation as OK so async will not retry
                        proceed = true;
                        callback();
                    }
                });
            },