in source/lambda/requestArchives/index.js [47:179]
async function handler(payload) {
const startTime = new Date().getTime();
// Using an array to supplement DynamoDB check for the recently updated files.
// Just in case the GSI index has not been synced for the recently added filename
const processed = [];
console.log(
`Starting partition: ${payload.nextPartition}. Last partition: ${payload.maxPartition}`
);
console.log(`Checking progress in DynamoDB`);
const pid = payload.nextPartition;
var partitionMaxProcessedFileNumber = await db.getPartitionMaxProcessedFileNumber(
pid
);
console.log(`Max Processed File Number : ${partitionMaxProcessedFileNumber}`);
var resultsCSV = await readAthenaPartition(pid);
console.log(`Reading athena results file: ${resultsCSV}`);
const lines = await readResultsCSV(resultsCSV);
let processedSize = Number(0);
for (const line of lines) {
const {
row_num: ifn,
size: sz,
archiveid: aid,
sha256treehash: sha,
archivedescription: descr,
creationdate: creationdate,
} = line;
processedSize += sz
if (ifn <= partitionMaxProcessedFileNumber) {
continue;
}
console.log(`${ifn} : ${aid}`);
let fname = parseFileName(aid, descr);
// Duplicates - adding creation date suffix
if (processed.includes(fname) || (await db.filenameExists(fname))) {
fname += `-${creationdate}`;
}
console.log(`${fname}`);
const glacierJob = await glacier
.initiateJob({
accountId: "-",
jobParameters: {
Type: "archive-retrieval",
ArchiveId: aid,
Tier: TIER,
SNSTopic: SNS_TOPIC,
},
vaultName: VAULT,
})
.promise();
const jobId = glacierJob.jobId;
const cdt = moment().format();
const cc = calculateNumberOfChunks(sz);
const rc = 0; // Retry count is initiated to 0
await dynamodb
.putItem({
Item: AWS.DynamoDB.Converter.marshall({
aid,
jobId,
ifn,
pid,
sha,
sz,
cdt,
descr,
fname,
cc,
rc,
}),
TableName: STATUS_TABLE,
})
.promise();
processed.push(fname);
partitionMaxProcessedFileNumber = ifn;
}
// Increment Processed Partition Count
payload.nextPartition = pid + 1;
// read throttled bytes data from DDB and add it to the calculation
const throttledBytesItem = await db.getItem('throttling');
if (!Object.keys(throttledBytesItem).length == 0 && 'throttledBytes' in throttledBytesItem.Item) {
var throttledBytes = parseInt(throttledBytesItem.Item.throttledBytes.N);
var throttledErrorCount = parseInt(throttledBytesItem.Item.errorCount.N);
}
else {
var throttledBytes = 0;
var throttledErrorCount = 0;
}
// Calculate timeout prior the next batch
const dailyQuota = Number(DQL)
const endTime = new Date().getTime();
const timeTaken = Math.floor((endTime-startTime)/1000);
const processedShare = (processedSize + throttledBytes) / dailyQuota
let timeout = Math.round(86400 * processedShare) - timeTaken;
timeout = timeout < 0 ? 0 : timeout;
// if there are some throttled bytes but timeout is 0, set it to MIN_THROTTLING_DELAY in seconds
if (throttledBytes > 0 && timeout === 0) {
timeout = MIN_THROTTLING_DELAY;
}
payload.timeout = timeout;
console.log(`Processed: ${processedSize}`);
console.log(`Throttled Bytes: ${throttledBytes}`);
console.log(`Processed Share: ${processedShare}`);
console.log(`Timeout: ${timeout}`);
// decrement throttled bytes data on DDB
if (throttledBytes > 0) {
await db.decrementThrottledBytes(throttledBytes, throttledErrorCount);
}
return payload;
}