async function main()

in integrations/amplitude-sam/src/handlers/s3-trigger-processor/index.js [139:237]


async function main(event){

    var s3Bucket = event.Records[0].s3.bucket.name;
    var s3Key = event.Records[0].s3.object.key;
    log.info(s3Bucket);
    log.info(s3Key);
    let tarball = await getFile(s3Bucket,s3Key);
    log.info(tarball);


    let files = await decompressFile(tarball, dest);
    log.info(files);

    //Parse File Name
    var nameParts = files[0].path.split('_');
    var cohort;
    var enteringFilename, exitingFilename, existingFilename = null;

    files.forEach(file => {
        if (file.path.indexOf('entering_Active_Users') > -1){
            enteringFilename = file.path;
            cohort = enteringFilename.substring(enteringFilename.indexOf('_entering_') + 10).replace('.csv',''); //TODO: this is delicate, but couldn't find a better way since no good chars to split on.
        } else if (file.path.indexOf('exiting_Active_Users') > -1){
            exitingFilename = file.path;
        } else if (file.path.indexOf('existing_Active_Users') > -1){
            existingFilename = file.path;
        }
    });

    let segments = await getSegments(pinpointAppId);
    log.info(JSON.stringify(segments, null, 2));

    var segmentExists = false;
    segments.forEach(segment => {
        if (segment.Name === `AMP_Cohort_${cohort}`){
            segmentExists = true;
        }
    });

    if (!segmentExists){
        let newSegment = await createSegment(pinpointAppId, cohort);
    }

    //TODO: DRY
    //Process Entering
    log.debug(enteringFilename)
    const entering = await csv().fromFile(`/tmp/${enteringFilename}`);
    log.debug(entering.length);

    var scrubbedEntering = []
    entering.forEach((row, index) => {
        if (row.email !== ''){
            scrubbedEntering.push(row);
        }
    });
    log.debug(`scrubbedEntering: ${scrubbedEntering.length}`);

    //Add to SQS Queue
    for(var i=0; i<scrubbedEntering.length; ++i){
    //for(var i=0; i<=10; i++){
        let result = await sendMessageToQueue(pinpointAppId, scrubbedEntering[i], cohort, 'entering');
    }

    //Process Existing
    log.debug(existingFilename)
    const existing = await csv().fromFile(`/tmp/${existingFilename}`);
    log.debug(existing.length);

    var scrubbedExisting = []
    existing.forEach((row, index) => {
        if (row.email !== ''){
            scrubbedExisting.push(row);
        }
    });
    log.debug(`scrubbedExisting: ${scrubbedExisting.length}`);

    //Add to SQS Queue
    for(var i=0; i<scrubbedExisting.length; ++i){
        let result = await sendMessageToQueue(pinpointAppId, scrubbedExisting[i], cohort, 'existing');
    }

    //Process Exiting
    log.debug(exitingFilename)
    const exiting = await csv().fromFile(`/tmp/${exitingFilename}`);
    log.debug(exiting.length);

    var scrubbedExiting = []
    exiting.forEach((row, index) => {
        if (row.email !== ''){
            scrubbedExiting.push(row);
        }
    });
    log.debug(`scrubbedExiting: ${scrubbedExiting.length}`);

    //Add to SQS Queue
    for(var i=0; i<scrubbedExiting.length; ++i){
        let result = await sendMessageToQueue(pinpointAppId, scrubbedExiting[i], cohort, 'exiting');
    }
}