cdk/lib/observer-data-export.ts (285 lines of code) (raw):
import { GuAlarm } from '@guardian/cdk/lib/constructs/cloudwatch';
import type { GuStackProps } from '@guardian/cdk/lib/constructs/core';
import { GuStack, GuStringParameter } from '@guardian/cdk/lib/constructs/core';
import {
type GuFunctionProps,
GuLambdaFunction,
} from '@guardian/cdk/lib/constructs/lambda';
import { type App, Duration } from 'aws-cdk-lib';
import { CfnFlow } from 'aws-cdk-lib/aws-appflow';
import {
ComparisonOperator,
Metric,
TreatMissingData,
} from 'aws-cdk-lib/aws-cloudwatch';
import {
ArnPrincipal,
PolicyStatement,
ServicePrincipal,
User,
} from 'aws-cdk-lib/aws-iam';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import { Bucket, EventType } from 'aws-cdk-lib/aws-s3';
import { SqsDestination } from 'aws-cdk-lib/aws-s3-notifications';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { nodeVersion } from './node-version';
export class ObserverDataExport extends GuStack {
constructor(scope: App, id: string, props: GuStackProps) {
super(scope, id, props);
const app = 'observer-data-export';
const unifidaPublicRsaKeyFilePath = `Public_keys/unifida_public_rsa_key.pem`;
const observerNewspaperSubscribersFolder = `Observer_newspaper_subscribers`;
const sharedBucket = new Bucket(this, 'Bucket', {
bucketName: `${app}-${this.stage.toLowerCase()}`,
lifecycleRules: [
'Observer_newsletter_eligible/',
'Observer_newsletter_subscribers/',
'Observer_newspaper_subscribers/',
].map((prefix) => ({
expiration: Duration.days(28),
prefix,
})),
});
const md5FingerprintsBucket = new Bucket(this, 'Md5FingerprintsBucket', {
bucketName: `${app}-md5-fingerprints-${this.stage.toLowerCase()}`,
});
const unifidaUser = new User(this, 'UnifidaUser', {
userName: `unifida-${this.stage.toLowerCase()}`,
});
sharedBucket.grantRead(unifidaUser);
const airflowCloudComposerUserArnParameter = new GuStringParameter(
this,
`${app}-airflow-cloud-composer-user-arn`,
{
description: `Airflow cloud composer user ARN (Ophan Account)`,
},
);
sharedBucket.grantReadWrite(
new ArnPrincipal(airflowCloudComposerUserArnParameter.valueAsString),
`Observer_newsletter_eligible/*`,
);
sharedBucket.grantRead(
new ArnPrincipal(airflowCloudComposerUserArnParameter.valueAsString),
unifidaPublicRsaKeyFilePath,
);
const salesforceObserverDataTransferBucket = new Bucket(
this,
'SalesforceObserverDataTransferBucket',
{
bucketName: `salesforce-observer-data-transfer-${this.stage.toLowerCase()}`,
lifecycleRules: [{ expiration: Duration.days(1) }],
},
);
salesforceObserverDataTransferBucket.grantWrite(
new ServicePrincipal('appflow.amazonaws.com'),
'*',
[
's3:PutObject',
's3:AbortMultipartUpload',
's3:ListMultipartUploadParts',
's3:ListBucketMultipartUploads',
's3:GetBucketAcl',
's3:PutObjectAcl',
],
);
const flow = new CfnFlow(this, 'SalesforceObserverDataTransferFlow', {
flowName: `salesforce-observer-data-transfer-${this.stage}`,
flowStatus: 'Active',
description: `Observer-only data is extracted from Salesforce on a weekly schedule and transferred to a designated S3 bucket in AWS. When a new file is created in this bucket, an S3 event notification sends a message to an SQS queue, which triggers a Lambda function. The function encrypts the file and uploads it to another S3 bucket shared with Tortoise Media.`,
sourceFlowConfig: {
connectorType: 'Salesforce',
connectorProfileName: `salesforce-observer-data-transfer-${this.stage}`,
sourceConnectorProperties: {
salesforce: {
object: 'Observer_Subscriber_Data__c',
},
},
},
destinationFlowConfigList: [
{
connectorType: 'S3',
destinationConnectorProperties: {
s3: {
bucketName: salesforceObserverDataTransferBucket.bucketName,
s3OutputFormatConfig: {
fileType: 'CSV',
},
},
},
},
],
tasks: [
{
sourceFields,
connectorOperator: { salesforce: 'PROJECTION' },
taskType: 'Filter',
},
...sourceFields.map((field) => ({
sourceFields: [field],
taskType: 'Map',
destinationField: field.replace(/__c$/, '').toLowerCase(),
connectorOperator: { salesforce: 'NO_OP' },
})),
],
triggerConfig: {
triggerType: 'Scheduled',
triggerProperties: {
scheduleExpression: 'cron(0 7 ? * TUE *)', // At 07:00 AM UTC every Tuesday
dataPullMode: 'Complete',
},
},
});
const lambdaDefaultConfig: Pick<
GuFunctionProps,
'app' | 'memorySize' | 'fileName' | 'runtime' | 'timeout' | 'environment'
> = {
app,
memorySize: 1024,
fileName: `${app}.zip`,
runtime: nodeVersion,
timeout: Duration.seconds(300),
};
const deadLetterQueue = new Queue(this, `dead-letters-${app}-queue`, {
queueName: `dead-letters-${app}-queue-${props.stage}`,
retentionPeriod: Duration.days(1),
});
const queue = new Queue(this, `${app}-queue`, {
queueName: `${app}-queue-${props.stage}`,
deadLetterQueue: {
queue: deadLetterQueue,
maxReceiveCount: 2,
},
visibilityTimeout: Duration.seconds(300),
});
const lambda = new GuLambdaFunction(
this,
'EncryptAndUploadObserverDataLambda',
{
...lambdaDefaultConfig,
environment: {
Stage: this.stage,
UnifidaSharedBucketName: sharedBucket.bucketName,
UnifidaPublicRsaKeyFilePath: unifidaPublicRsaKeyFilePath,
ObserverNewspaperSubscribersFolder:
observerNewspaperSubscribersFolder,
Md5FingerprintsBucketName: md5FingerprintsBucket.bucketName,
},
handler: 'encryptAndUploadObserverData.handler',
functionName: `encrypt-and-upload-observer-data-${this.stage}`,
initialPolicy: [
new PolicyStatement({
actions: ['s3:GetObject'],
resources: [
`${sharedBucket.bucketArn}/${unifidaPublicRsaKeyFilePath}`,
`${salesforceObserverDataTransferBucket.bucketArn}/*`,
],
}),
new PolicyStatement({
actions: ['s3:PutObject'],
resources: [
`${sharedBucket.bucketArn}/${observerNewspaperSubscribersFolder}/*`,
],
}),
new PolicyStatement({
actions: ['s3:PutObject'],
resources: [`${md5FingerprintsBucket.bucketArn}/*`],
}),
],
events: [new SqsEventSource(queue)],
},
);
salesforceObserverDataTransferBucket.addEventNotification(
EventType.OBJECT_CREATED,
new SqsDestination(queue),
);
new GuAlarm(
this,
`encrypt-and-upload-observer-data-${this.stage}-lambda-alarm`,
{
app,
snsTopicName: `alarms-handler-topic-${this.stage}`,
alarmName: `${this.stage}: Failed to encrypt & upload Observer-only data to S3 bucket shared with Unifida (Tortoise's dev team)`,
alarmDescription: `Fix: check logs for lambda ${lambda.functionName} and redrive event from dead letter queue ${deadLetterQueue.queueName}.`,
metric: deadLetterQueue
.metric('ApproximateNumberOfMessagesVisible')
.with({ statistic: 'Sum', period: Duration.minutes(1) }),
comparisonOperator: ComparisonOperator.GREATER_THAN_THRESHOLD,
treatMissingData: TreatMissingData.NOT_BREACHING,
threshold: 0,
evaluationPeriods: 1,
datapointsToAlarm: 1,
actionsEnabled: true,
},
);
new GuAlarm(this, `${flow.flowName}-flow-alarm`, {
app,
snsTopicName: `alarms-handler-topic-${this.stage}`,
alarmName: `${this.stage}: Failed to transfer Observer-only data from Salesforce to AWS (via AppFlow)`,
alarmDescription: `Debug: view "Run history" in the dashboard for flow ${flow.flowName}. Manual fix: upload today's unencrypted CSV file anywhere inside the ${salesforceObserverDataTransferBucket.bucketName} bucket.`,
metric: new Metric({
namespace: 'AWS/AppFlow',
metricName: 'FlowExecutionsFailed',
dimensionsMap: {
FlowName: flow.flowName,
},
statistic: 'Sum',
period: Duration.minutes(1),
}),
comparisonOperator: ComparisonOperator.GREATER_THAN_THRESHOLD,
treatMissingData: TreatMissingData.NOT_BREACHING,
threshold: 0,
evaluationPeriods: 1,
datapointsToAlarm: 1,
actionsEnabled: true,
});
}
}
const sourceFields = [
'Subscription_ID__c',
'Subscriber_ID__c',
'Status__c',
'Product__c',
'Product_Delivery_SKU__c',
'Frequency__c',
'Regular_Price__c',
'Initial_Offer_Price__c',
'Initial_Offer_Duration_Months__c',
'In_Life_Discount_Applied_To_Invoice__c',
'In_Life_Discount_Remaining_Months__c',
'Acquired_Timestamp__c',
'First_Delivery_Date__c',
'Latest_Recorded_Delivery_Date__c',
'Last_Invoice_Number__c',
'Last_Invoice_Date__c',
'Last_Invoice_Net_Amount__c',
'Last_Payment_Date__c',
'Last_Payment_Amount__c',
'Payment_Method__c',
'Next_Invoice_Date__c',
'Next_Invoice_Holiday_Credits_Applied__c',
'Next_Invoice_Preview_Amount__c',
'Cancellation_Notification_Date__c',
'Cancellation_Reason__c',
'Cancellation_Effective_Date__c',
'Cancellation_Refund_Date__c',
'Cancellation_Refund_Amount__c',
'Customer_Account_Created_Timestamp__c',
'Billing_Title__c',
'Billing_First_Name__c',
'Billing_Last_Name__c',
'Billing_Email_Address__c',
'Billing_Phone_Number__c',
'Billing_Street__c',
'Billing_City__c',
'Billing_County__c',
'Billing_Postcode__c',
'Delivery_Title__c',
'Delivery_First_Name__c',
'Delivery_Last_Name__c',
'Delivery_Street__c',
'Delivery_City__c',
'Delivery_County__c',
'Delivery_Postcode__c',
'Delivery_Phone_Number__c',
'Delivery_Instructions__c',
];