async function handler()

in source/lambda/requestArchives/index.js [47:179]


async function handler(payload) {
    const startTime = new Date().getTime();

    // Using an array to supplement DynamoDB check for the recently updated files.
    // Just in case the GSI index has not been synced for the recently added filename
    const processed = [];

    console.log(
        `Starting partition: ${payload.nextPartition}. Last partition: ${payload.maxPartition}`
    );

    console.log(`Checking progress in DynamoDB`);
    const pid = payload.nextPartition;
    var partitionMaxProcessedFileNumber = await db.getPartitionMaxProcessedFileNumber(
        pid
    );
    console.log(`Max Processed File Number : ${partitionMaxProcessedFileNumber}`);

    var resultsCSV = await readAthenaPartition(pid);
    console.log(`Reading athena results file: ${resultsCSV}`);

    const lines = await readResultsCSV(resultsCSV);

    let processedSize = Number(0);

    for (const line of lines) {
        const {
            row_num: ifn,
            size: sz,
            archiveid: aid,
            sha256treehash: sha,
            archivedescription: descr,
            creationdate: creationdate,
        } = line;

        processedSize += sz

        if (ifn <= partitionMaxProcessedFileNumber) {
            continue;
        }

        console.log(`${ifn} : ${aid}`);
        let fname = parseFileName(aid, descr);

        // Duplicates - adding creation date suffix
        if (processed.includes(fname) || (await db.filenameExists(fname))) {
            fname += `-${creationdate}`;
        }

        console.log(`${fname}`);
        const glacierJob = await glacier
            .initiateJob({
                accountId: "-",
                jobParameters: {
                    Type: "archive-retrieval",
                    ArchiveId: aid,
                    Tier: TIER,
                    SNSTopic: SNS_TOPIC,
                },
                vaultName: VAULT,
            })
            .promise();

        const jobId = glacierJob.jobId;

        const cdt = moment().format();
        const cc = calculateNumberOfChunks(sz);
        const rc = 0; // Retry count is initiated to 0
        await dynamodb
            .putItem({
                Item: AWS.DynamoDB.Converter.marshall({
                    aid,
                    jobId,
                    ifn,
                    pid,
                    sha,
                    sz,
                    cdt,
                    descr,
                    fname,
                    cc,
                    rc,
                }),
                TableName: STATUS_TABLE,
            })
            .promise();

        processed.push(fname);
        partitionMaxProcessedFileNumber = ifn;
    }

    // Increment Processed Partition Count
    payload.nextPartition = pid + 1;

    // read throttled bytes data from DDB and add it to the calculation
    const throttledBytesItem = await db.getItem('throttling');
    if (!Object.keys(throttledBytesItem).length == 0 && 'throttledBytes' in throttledBytesItem.Item) {
        var throttledBytes = parseInt(throttledBytesItem.Item.throttledBytes.N);
        var throttledErrorCount = parseInt(throttledBytesItem.Item.errorCount.N);
    }
    else {
        var throttledBytes = 0;
        var throttledErrorCount = 0;
    }

    // Calculate timeout prior the next batch
    const dailyQuota = Number(DQL)
    const endTime = new Date().getTime();
    const timeTaken = Math.floor((endTime-startTime)/1000);

    const processedShare = (processedSize + throttledBytes) / dailyQuota
    let timeout = Math.round(86400 * processedShare) - timeTaken;
    timeout = timeout < 0 ? 0 : timeout;

    // if there are some throttled bytes but timeout is 0, set it to MIN_THROTTLING_DELAY in seconds
    if (throttledBytes > 0 && timeout === 0) {
        timeout = MIN_THROTTLING_DELAY;
    }
        
    payload.timeout = timeout;

    console.log(`Processed: ${processedSize}`);
    console.log(`Throttled Bytes: ${throttledBytes}`);
    console.log(`Processed Share: ${processedShare}`);
    console.log(`Timeout: ${timeout}`);

    // decrement throttled bytes data on DDB
    if (throttledBytes > 0) {
        await db.decrementThrottledBytes(throttledBytes, throttledErrorCount);
    }

    return payload;
}