in index.js [328:456]
function (callback) {
tryNumber++;
// build the reference to the pending batch, with an atomic add
// of the current file
var now = common.now();
var item = {
Key: {
batchId: {
S: thisBatchId
},
s3Prefix: {
S: s3Info.prefix
}
},
TableName: batchTable,
UpdateExpression: "add writeDates :appendFileDate, size :size set #stat = :open, lastUpdate = :updateTime, entryMap = list_append(if_not_exists(entryMap, :emptyList), :entry)",
ExpressionAttributeNames: {
"#stat": 'status'
},
ExpressionAttributeValues: {
":entry": {
L: [{
M: {
"file": {S: itemEntry},
"size": {N: '' + s3Info.size}
}
}
]
},
":emptyList": {
L: []
},
":appendFileDate": {
NS: ['' + now]
},
":updateTime": {
N: '' + now
},
":open": {
S: open
},
":size": {
N: '' + s3Info.size
}
},
/*
* current batch can't be locked
*/
ConditionExpression: "#stat = :open or attribute_not_exists(#stat)"
};
logger.debug(JSON.stringify(item));
// add the file to the pending batch
dynamoDB.updateItem(item, function (err, data) {
if (err) {
let waitFor = Math.min(Math.pow(tryNumber, 2) * 10, maxRetryMS);
if (err.code === provisionedThroughputExceeded) {
logger.warn("Provisioned Throughput Exceeded on addition of " + s3Info.prefix + " to pending batch " + thisBatchId + ". Trying again in " + waitFor + " ms");
setTimeout(callback, waitFor);
} else if (err.code === conditionCheckFailed) {
// the batch I have a reference to was locked so
// reload the current batch ID from the config
var configReloadRequest = {
Key: {
s3Prefix: {
S: s3Info.prefix
}
},
TableName: configTable,
/*
* we need a consistent read here to ensure we
* get the latest batch ID
*/
ConsistentRead: true
};
dynamoDB.getItem(configReloadRequest, function (err, data) {
configReloads++;
if (err) {
if (err === provisionedThroughputExceeded) {
logger.warn("Provisioned Throughput Exceeded on reload of " + configTable + " due to locked batch write");
callback();
} else {
console.log(err);
callback(err);
}
} else {
if (data.Item.currentBatch.S === thisBatchId) {
// we've obtained the same batch ID back
// from the configuration as we have
// now, meaning it hasn't yet rotated
logger.warn("Batch " + thisBatchId + " still current after configuration reload attempt " + configReloads + ". Recycling in " + waitFor + " ms.");
// because the batch hasn't been
// reloaded on the configuration, we'll
// backoff here for a moment to let that
// happen
setTimeout(callback, waitFor);
} else {
// we've got an updated batch id, so use
// this in the next cycle of file add
thisBatchId = data.Item.currentBatch.S;
logger.warn("Obtained new Batch ID " + thisBatchId + " after configuration reload. Attempt " + configReloads);
/*
* callback immediately, as we should
* now have a valid and open batch to
* use
*/
callback();
}
}
});
} else {
asyncError = err;
proceed = true;
callback();
}
} else {
// no error - the file was added to the batch, so mark
// the operation as OK so async will not retry
proceed = true;
callback();
}
});
},