in index.js [602:813]
function processPendingBatch(config, thisBatchId, s3Info) {
// make the request for the current batch
var currentBatchRequest = {
Key: {
batchId: {
S: thisBatchId
},
s3Prefix: {
S: s3Info.prefix
}
},
TableName: batchTable,
ConsistentRead: true
};
logger.debug("Loading current Batch record from prefix config");
logger.debug(JSON.stringify(currentBatchRequest));
dynamoDB.getItem(currentBatchRequest, function (err, data) {
if (err) {
if (err === provisionedThroughputExceeded) {
logger.warn("Provisioned Throughput Exceeded on read of " + batchTable);
callback();
} else {
logger.error(JSON.stringify(err));
context.done(error, JSON.stringify(err));
}
} else if (!data || !data.Item) {
var msg = "No open pending Batch " + thisBatchId;
logger.error(msg);
context.done(null, msg);
} else {
// first step is to resolve the earliest writeDate as the batch
// creation date
var batchCreateDate;
data.Item.writeDates.NS.map(function (item) {
var t = parseInt(item);
if (!batchCreateDate || t < batchCreateDate) {
batchCreateDate = t;
}
});
var lastUpdateTime = data.Item.lastUpdate.N;
/*
* grab the pending entries from the locked
* batch. We have 2 copies - a batch that uses StringSet from pre 2.7.8, and a List from 2.7.9
*/
let pendingEntries = {};
let pendingEntryCount = 0;
if (data.Item.entryMap) {
pendingEntryCount += data.Item.entryMap.L.length;
pendingEntries["entryMap"] = data.Item.entryMap.L;
}
if (data.Item.entrySet) {
pendingEntryCount += data.Item.entries.SS.length;
pendingEntries["entrySet"] = data.Item.entries.SS;
}
var doProcessBatch = false;
if (pendingEntryCount >= parseInt(config.batchSize.N)) {
logger.info("Batch count " + config.batchSize.N + " reached");
doProcessBatch = true;
} else {
if (config.batchSize && config.batchSize.N) {
logger.debug("Current batch count of " + pendingEntryCount + " below batch limit of " + config.batchSize.N);
}
}
// check whether the current batch is bigger than the configured
// max count, size, or older than configured max age
if (config.batchTimeoutSecs && config.batchTimeoutSecs.N && pendingEntryCount > 0 && common.now() - batchCreateDate > parseInt(config.batchTimeoutSecs.N)) {
logger.info("Batch age " + config.batchTimeoutSecs.N + " seconds reached");
doProcessBatch = true;
} else {
if (config.batchTimeoutSecs && config.batchTimeoutSecs.N) {
logger.debug("Current batch age of " + (common.now() - batchCreateDate) + " seconds below batch timeout: "
+ (config.batchTimeoutSecs.N ? config.batchTimeoutSecs.N : "None Defined"));
}
}
if (config.batchSizeBytes && config.batchSizeBytes.N && pendingEntryCount > 0 && parseInt(config.batchSizeBytes.N) <= parseInt(data.Item.size.N)) {
logger.info("Batch size " + config.batchSizeBytes.N + " bytes reached");
doProcessBatch = true;
} else {
if (data.Item.size.N) {
logger.debug("Current batch size of " + data.Item.size.N + " below batch threshold or not configured");
}
}
if (doProcessBatch) {
// set the current batch to locked status
var updateCurrentBatchStatus = {
Key: {
batchId: {
S: thisBatchId
},
s3Prefix: {
S: s3Info.prefix
}
},
TableName: batchTable,
AttributeUpdates: {
status: {
Action: 'PUT',
Value: {
S: locked
}
},
lastUpdate: {
Action: 'PUT',
Value: {
N: '' + common.now()
}
}
},
/*
* the batch to be processed has to be 'open', otherwise
* we'll have multiple processes all handling a single
* batch
*/
Expected: {
status: {
AttributeValueList: [{
S: open
}],
ComparisonOperator: 'EQ'
}
},
/*
* add the ALL_NEW return values so we have the most up
* to date version of the entries string set
*/
ReturnValues: "ALL_NEW"
};
logger.debug("Attempting to lock Batch for processing");
logger.debug(JSON.stringify(updateCurrentBatchStatus));
common.retryableUpdate(dynamoDB, updateCurrentBatchStatus, function (err, data) {
if (err) {
if (err.code === conditionCheckFailed) {
// some other Lambda function has locked the
// batch - this is OK and we'll just exit
// quietly
logger.debug("Batch is ready to be processed, but another thread has locked it for loading");
context.done(null, null);
} else if (err.code === provisionedThroughputExceeded) {
logger.error("Provisioned Throughput Exceeded on " + batchTable + " while trying to lock Batch");
context.done(error, JSON.stringify(err));
} else {
logger.error("Unhandled exception while trying to lock Batch " + thisBatchId);
logger.error(JSON.stringify(err));
context.done(error, JSON.stringify(err));
}
} else {
if (!data || !data.Attributes) {
var e = "Unable to extract latest pending entries set from Locked batch";
logger.error(e);
context.done(error, e);
} else {
/*
* assign the loaded configuration a new batch
* ID
*/
var allocateNewBatchRequest = {
Key: {
s3Prefix: {
S: s3Info.prefix
}
},
TableName: configTable,
AttributeUpdates: {
currentBatch: {
Action: 'PUT',
Value: {
S: uuid.v4()
}
},
lastBatchRotation: {
Action: 'PUT',
Value: {
S: common.getFormattedDate()
}
}
}
};
logger.debug("Allocating new Batch ID for future processing");
logger.debug(JSON.stringify(allocateNewBatchRequest));
common.retryableUpdate(dynamoDB, allocateNewBatchRequest, function (err, data) {
if (err) {
logger.error("Error while allocating new Pending Batch ID");
logger.error(JSON.stringify(err));
context.done(error, JSON.stringify(err));
} else {
// OK - let's create the manifest file
createManifest(config, thisBatchId, s3Info, pendingEntries);
}
});
}
}
});
} else {
logger.info("No pending batch flush required");
context.done(null, null);
}
}
});
};