in integrations/amplitude-sam/src/handlers/s3-trigger-processor/index.js [139:237]
async function main(event){
var s3Bucket = event.Records[0].s3.bucket.name;
var s3Key = event.Records[0].s3.object.key;
log.info(s3Bucket);
log.info(s3Key);
let tarball = await getFile(s3Bucket,s3Key);
log.info(tarball);
let files = await decompressFile(tarball, dest);
log.info(files);
//Parse File Name
var nameParts = files[0].path.split('_');
var cohort;
var enteringFilename, exitingFilename, existingFilename = null;
files.forEach(file => {
if (file.path.indexOf('entering_Active_Users') > -1){
enteringFilename = file.path;
cohort = enteringFilename.substring(enteringFilename.indexOf('_entering_') + 10).replace('.csv',''); //TODO: this is delicate, but couldn't find a better way since no good chars to split on.
} else if (file.path.indexOf('exiting_Active_Users') > -1){
exitingFilename = file.path;
} else if (file.path.indexOf('existing_Active_Users') > -1){
existingFilename = file.path;
}
});
let segments = await getSegments(pinpointAppId);
log.info(JSON.stringify(segments, null, 2));
var segmentExists = false;
segments.forEach(segment => {
if (segment.Name === `AMP_Cohort_${cohort}`){
segmentExists = true;
}
});
if (!segmentExists){
let newSegment = await createSegment(pinpointAppId, cohort);
}
//TODO: DRY
//Process Entering
log.debug(enteringFilename)
const entering = await csv().fromFile(`/tmp/${enteringFilename}`);
log.debug(entering.length);
var scrubbedEntering = []
entering.forEach((row, index) => {
if (row.email !== ''){
scrubbedEntering.push(row);
}
});
log.debug(`scrubbedEntering: ${scrubbedEntering.length}`);
//Add to SQS Queue
for(var i=0; i<scrubbedEntering.length; ++i){
//for(var i=0; i<=10; i++){
let result = await sendMessageToQueue(pinpointAppId, scrubbedEntering[i], cohort, 'entering');
}
//Process Existing
log.debug(existingFilename)
const existing = await csv().fromFile(`/tmp/${existingFilename}`);
log.debug(existing.length);
var scrubbedExisting = []
existing.forEach((row, index) => {
if (row.email !== ''){
scrubbedExisting.push(row);
}
});
log.debug(`scrubbedExisting: ${scrubbedExisting.length}`);
//Add to SQS Queue
for(var i=0; i<scrubbedExisting.length; ++i){
let result = await sendMessageToQueue(pinpointAppId, scrubbedExisting[i], cohort, 'existing');
}
//Process Exiting
log.debug(exitingFilename)
const exiting = await csv().fromFile(`/tmp/${exitingFilename}`);
log.debug(exiting.length);
var scrubbedExiting = []
exiting.forEach((row, index) => {
if (row.email !== ''){
scrubbedExiting.push(row);
}
});
log.debug(`scrubbedExiting: ${scrubbedExiting.length}`);
//Add to SQS Queue
for(var i=0; i<scrubbedExiting.length; ++i){
let result = await sendMessageToQueue(pinpointAppId, scrubbedExiting[i], cohort, 'exiting');
}
}