async function upsertEndpoints()

in integrations/amplitude-sam/src/handlers/sqs-queue-processor/index.js [31:88]


async function upsertEndpoints(records) {
    var endpoints = []

    //"amplitude_id","user_id","amplitude_id","name","a_prop","persona","username","email"
    for(var i=0; i<records.length; ++i) {
        endpoint = JSON.parse(records[i].body);

        userAttributes = {};
        switch (endpoint.action) {
            case 'entering':
            case 'existing':
                userAttributes[`AMP_Cohort_${endpoint.cohort}`] = ['Active'];
                break;

            case 'exiting':
                userAttributes[`AMP_Cohort_${endpoint.cohort}`] = ['InActive'];
                break;
        }

        //Populate User Attributes ignoring common attributes
        for (const [key, value] of Object.entries(endpoint)) {
            if(attributesToIgnore.indexOf(key) === -1){
                userAttributes[`AMP_${key}`] = [value]
            }
        }

        endpoints.push(
            {
                Address: endpoint.email,
                ChannelType: 'EMAIL',
                Id: endpoint.amplitude_id, //Is this unique per endpoint?
                User: {
                  UserAttributes: userAttributes,
                  UserId: endpoint.user_id
                }
              }
        )
    };

    var params = {
        ApplicationId: endpoint.applicationId,
        EndpointBatchRequest: {
            Item: endpoints
        }
    };

    log.trace(JSON.stringify(params, null, 2));

    try {
        var data = await pinpoint.updateEndpointsBatch(params).promise();
        log.trace(data)
        log.debug(`Upserted ${endpoints.length} endpoints.`);
        return data
    }
    catch (err){
        log.error(err, err.stack);
    }
}