constructor()

in cdk/lib/observer-data-export.ts [28:254]


	constructor(scope: App, id: string, props: GuStackProps) {
		super(scope, id, props);

		const app = 'observer-data-export';
		const unifidaPublicRsaKeyFilePath = `Public_keys/unifida_public_rsa_key.pem`;
		const observerNewspaperSubscribersFolder = `Observer_newspaper_subscribers`;

		const sharedBucket = new Bucket(this, 'Bucket', {
			bucketName: `${app}-${this.stage.toLowerCase()}`,
			lifecycleRules: [
				'Observer_newsletter_eligible/',
				'Observer_newsletter_subscribers/',
				'Observer_newspaper_subscribers/',
			].map((prefix) => ({
				expiration: Duration.days(28),
				prefix,
			})),
		});

		const md5FingerprintsBucket = new Bucket(this, 'Md5FingerprintsBucket', {
			bucketName: `${app}-md5-fingerprints-${this.stage.toLowerCase()}`,
		});

		const unifidaUser = new User(this, 'UnifidaUser', {
			userName: `unifida-${this.stage.toLowerCase()}`,
		});

		sharedBucket.grantRead(unifidaUser);

		const airflowCloudComposerUserArnParameter = new GuStringParameter(
			this,
			`${app}-airflow-cloud-composer-user-arn`,
			{
				description: `Airflow cloud composer user ARN (Ophan Account)`,
			},
		);

		sharedBucket.grantReadWrite(
			new ArnPrincipal(airflowCloudComposerUserArnParameter.valueAsString),
			`Observer_newsletter_eligible/*`,
		);

		sharedBucket.grantRead(
			new ArnPrincipal(airflowCloudComposerUserArnParameter.valueAsString),
			unifidaPublicRsaKeyFilePath,
		);

		const salesforceObserverDataTransferBucket = new Bucket(
			this,
			'SalesforceObserverDataTransferBucket',
			{
				bucketName: `salesforce-observer-data-transfer-${this.stage.toLowerCase()}`,
				lifecycleRules: [{ expiration: Duration.days(1) }],
			},
		);

		salesforceObserverDataTransferBucket.grantWrite(
			new ServicePrincipal('appflow.amazonaws.com'),
			'*',
			[
				's3:PutObject',
				's3:AbortMultipartUpload',
				's3:ListMultipartUploadParts',
				's3:ListBucketMultipartUploads',
				's3:GetBucketAcl',
				's3:PutObjectAcl',
			],
		);

		const flow = new CfnFlow(this, 'SalesforceObserverDataTransferFlow', {
			flowName: `salesforce-observer-data-transfer-${this.stage}`,
			flowStatus: 'Active',
			description: `Observer-only data is extracted from Salesforce on a weekly schedule and transferred to a designated S3 bucket in AWS. When a new file is created in this bucket, an S3 event notification sends a message to an SQS queue, which triggers a Lambda function. The function encrypts the file and uploads it to another S3 bucket shared with Tortoise Media.`,
			sourceFlowConfig: {
				connectorType: 'Salesforce',
				connectorProfileName: `salesforce-observer-data-transfer-${this.stage}`,
				sourceConnectorProperties: {
					salesforce: {
						object: 'Observer_Subscriber_Data__c',
					},
				},
			},
			destinationFlowConfigList: [
				{
					connectorType: 'S3',
					destinationConnectorProperties: {
						s3: {
							bucketName: salesforceObserverDataTransferBucket.bucketName,
							s3OutputFormatConfig: {
								fileType: 'CSV',
							},
						},
					},
				},
			],
			tasks: [
				{
					sourceFields,
					connectorOperator: { salesforce: 'PROJECTION' },
					taskType: 'Filter',
				},
				...sourceFields.map((field) => ({
					sourceFields: [field],
					taskType: 'Map',
					destinationField: field.replace(/__c$/, '').toLowerCase(),
					connectorOperator: { salesforce: 'NO_OP' },
				})),
			],
			triggerConfig: {
				triggerType: 'Scheduled',
				triggerProperties: {
					scheduleExpression: 'cron(0 7 ? * TUE *)', // At 07:00 AM UTC every Tuesday
					dataPullMode: 'Complete',
				},
			},
		});

		const lambdaDefaultConfig: Pick<
			GuFunctionProps,
			'app' | 'memorySize' | 'fileName' | 'runtime' | 'timeout' | 'environment'
		> = {
			app,
			memorySize: 1024,
			fileName: `${app}.zip`,
			runtime: nodeVersion,
			timeout: Duration.seconds(300),
		};

		const deadLetterQueue = new Queue(this, `dead-letters-${app}-queue`, {
			queueName: `dead-letters-${app}-queue-${props.stage}`,
			retentionPeriod: Duration.days(1),
		});

		const queue = new Queue(this, `${app}-queue`, {
			queueName: `${app}-queue-${props.stage}`,
			deadLetterQueue: {
				queue: deadLetterQueue,
				maxReceiveCount: 2,
			},
			visibilityTimeout: Duration.seconds(300),
		});

		const lambda = new GuLambdaFunction(
			this,
			'EncryptAndUploadObserverDataLambda',
			{
				...lambdaDefaultConfig,
				environment: {
					Stage: this.stage,
					UnifidaSharedBucketName: sharedBucket.bucketName,
					UnifidaPublicRsaKeyFilePath: unifidaPublicRsaKeyFilePath,
					ObserverNewspaperSubscribersFolder:
						observerNewspaperSubscribersFolder,
					Md5FingerprintsBucketName: md5FingerprintsBucket.bucketName,
				},
				handler: 'encryptAndUploadObserverData.handler',
				functionName: `encrypt-and-upload-observer-data-${this.stage}`,
				initialPolicy: [
					new PolicyStatement({
						actions: ['s3:GetObject'],
						resources: [
							`${sharedBucket.bucketArn}/${unifidaPublicRsaKeyFilePath}`,
							`${salesforceObserverDataTransferBucket.bucketArn}/*`,
						],
					}),
					new PolicyStatement({
						actions: ['s3:PutObject'],
						resources: [
							`${sharedBucket.bucketArn}/${observerNewspaperSubscribersFolder}/*`,
						],
					}),
					new PolicyStatement({
						actions: ['s3:PutObject'],
						resources: [`${md5FingerprintsBucket.bucketArn}/*`],
					}),
				],
				events: [new SqsEventSource(queue)],
			},
		);

		salesforceObserverDataTransferBucket.addEventNotification(
			EventType.OBJECT_CREATED,
			new SqsDestination(queue),
		);

		new GuAlarm(
			this,
			`encrypt-and-upload-observer-data-${this.stage}-lambda-alarm`,
			{
				app,
				snsTopicName: `alarms-handler-topic-${this.stage}`,
				alarmName: `${this.stage}: Failed to encrypt & upload Observer-only data to S3 bucket shared with Unifida (Tortoise's dev team)`,
				alarmDescription: `Fix: check logs for lambda ${lambda.functionName} and redrive event from dead letter queue ${deadLetterQueue.queueName}.`,
				metric: deadLetterQueue
					.metric('ApproximateNumberOfMessagesVisible')
					.with({ statistic: 'Sum', period: Duration.minutes(1) }),
				comparisonOperator: ComparisonOperator.GREATER_THAN_THRESHOLD,
				treatMissingData: TreatMissingData.NOT_BREACHING,
				threshold: 0,
				evaluationPeriods: 1,
				datapointsToAlarm: 1,
				actionsEnabled: true,
			},
		);

		new GuAlarm(this, `${flow.flowName}-flow-alarm`, {
			app,
			snsTopicName: `alarms-handler-topic-${this.stage}`,
			alarmName: `${this.stage}: Failed to transfer Observer-only data from Salesforce to AWS (via AppFlow)`,
			alarmDescription: `Debug: view "Run history" in the dashboard for flow ${flow.flowName}. Manual fix: upload today's unencrypted CSV file anywhere inside the ${salesforceObserverDataTransferBucket.bucketName} bucket.`,
			metric: new Metric({
				namespace: 'AWS/AppFlow',
				metricName: 'FlowExecutionsFailed',
				dimensionsMap: {
					FlowName: flow.flowName,
				},
				statistic: 'Sum',
				period: Duration.minutes(1),
			}),
			comparisonOperator: ComparisonOperator.GREATER_THAN_THRESHOLD,
			treatMissingData: TreatMissingData.NOT_BREACHING,
			threshold: 0,
			evaluationPeriods: 1,
			datapointsToAlarm: 1,
			actionsEnabled: true,
		});
	}