cdk/lib/salesforce-disaster-recovery.ts (500 lines of code) (raw):
import type { GuStackProps } from '@guardian/cdk/lib/constructs/core';
import { GuStack } from '@guardian/cdk/lib/constructs/core';
import {
type GuFunctionProps,
GuLambdaFunction,
} from '@guardian/cdk/lib/constructs/lambda';
import { type App, Duration } from 'aws-cdk-lib';
import { Policy, PolicyStatement } from 'aws-cdk-lib/aws-iam';
import { Bucket } from 'aws-cdk-lib/aws-s3';
import {
Choice,
Condition,
CustomState,
DefinitionBody,
JsonPath,
Pass,
StateMachine,
TaskInput,
Wait,
WaitTime,
} from 'aws-cdk-lib/aws-stepfunctions';
import { LambdaInvoke } from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { nodeVersion } from './node-version';
interface Props extends GuStackProps {
salesforceApiDomain: string;
salesforceApiConnectionResourceId: string;
salesforceQueryWaitSeconds: number;
salesforceOauthSecretName: string;
}
export class SalesforceDisasterRecovery extends GuStack {
constructor(scope: App, id: string, props: Props) {
super(scope, id, props);
const salesforceApiConnectionArn = `arn:aws:events:${this.region}:${this.account}:connection/${props.salesforceApiConnectionResourceId}`;
const app = 'salesforce-disaster-recovery';
const bucket = new Bucket(this, 'Bucket', {
bucketName: `${app}-${this.stage.toLowerCase()}`,
});
const queryResultFileName = 'query-result.csv';
const failedRowsFileName = 'failed-rows.csv';
const snsTopicArn = `arn:aws:sns:${this.region}:${this.account}:alarms-handler-topic-${this.stage}`;
const lambdaDefaultConfig: Pick<
GuFunctionProps,
'app' | 'memorySize' | 'fileName' | 'runtime' | 'timeout' | 'environment'
> = {
app,
memorySize: 1024,
fileName: `${app}.zip`,
runtime: nodeVersion,
timeout: Duration.seconds(300),
environment: { APP: app, STACK: this.stack, STAGE: this.stage },
};
const createSalesforceQueryJob = new CustomState(
this,
'CreateSalesforceQueryJob',
{
stateJson: {
Type: 'Task',
Resource: 'arn:aws:states:::http:invoke',
Parameters: {
ApiEndpoint: `${props.salesforceApiDomain}/services/data/v59.0/jobs/query`,
Method: 'POST',
Authentication: {
ConnectionArn: salesforceApiConnectionArn,
},
RequestBody: {
operation: 'query',
'query.$': '$.query',
},
},
Retry: [
{
ErrorEquals: ['States.Http.StatusCode.400'],
MaxAttempts: 0,
},
{
ErrorEquals: ['States.ALL'],
IntervalSeconds: 5,
MaxAttempts: 3,
BackoffRate: 2,
},
],
},
},
);
const waitForSalesforceQueryJobToComplete = new Wait(
this,
'WaitForSalesforceQueryJobToComplete',
{
time: WaitTime.duration(
Duration.seconds(props.salesforceQueryWaitSeconds),
),
},
);
const getSalesforceQueryJobStatus = new CustomState(
this,
'GetSalesforceQueryJobStatus',
{
stateJson: {
Type: 'Task',
Resource: 'arn:aws:states:::http:invoke',
Parameters: {
'ApiEndpoint.$': JsonPath.format(
`${props.salesforceApiDomain}/services/data/v59.0/jobs/query/{}`,
JsonPath.stringAt('$.ResponseBody.id'),
),
Method: 'GET',
Authentication: {
ConnectionArn: salesforceApiConnectionArn,
},
},
},
},
);
const saveSalesforceQueryResultToS3 = new LambdaInvoke(
this,
'SaveSalesforceQueryResultToS3',
{
lambdaFunction: new GuLambdaFunction(
this,
'SaveSalesforceQueryResultToS3Lambda',
{
...lambdaDefaultConfig,
handler: 'saveSalesforceQueryResultToS3.handler',
functionName: `save-salesforce-query-result-to-s3-${this.stage}`,
environment: {
...lambdaDefaultConfig.environment,
SALESFORCE_API_DOMAIN: props.salesforceApiDomain,
SALESFORCE_OAUTH_SECRET_NAME: props.salesforceOauthSecretName,
S3_BUCKET: bucket.bucketName,
},
initialPolicy: [
new PolicyStatement({
actions: [
'secretsmanager:GetSecretValue',
'secretsmanager:DescribeSecret',
],
resources: [
`arn:aws:secretsmanager:${this.region}:${this.account}:secret:events!connection/${app}-${this.stage}-salesforce-api/*`,
],
}),
new PolicyStatement({
actions: ['s3:PutObject'],
resources: [bucket.arnForObjects('*')],
}),
],
},
),
payload: TaskInput.fromObject({
queryJobId: JsonPath.stringAt('$.ResponseBody.id'),
filePath: JsonPath.format(
`{}/${queryResultFileName}`,
JsonPath.stringAt('$$.Execution.StartTime'),
),
}),
},
);
const updateZuoraAccountsLambda = new GuLambdaFunction(
this,
'UpdateZuoraAccountsLambda',
{
...lambdaDefaultConfig,
timeout: Duration.minutes(15),
memorySize: 10240,
handler: 'updateZuoraAccounts.handler',
functionName: `update-zuora-accounts-${this.stage}`,
initialPolicy: [
new PolicyStatement({
actions: ['secretsmanager:GetSecretValue'],
resources: [
`arn:aws:secretsmanager:${this.region}:${this.account}:secret:${this.stage}/Zuora-OAuth/SupportServiceLambdas-*`,
],
}),
],
},
);
const processCsvInDistributedMap = new CustomState(
this,
'ProcessCsvInDistributedMap',
{
stateJson: {
Type: 'Map',
MaxConcurrency: 20,
ToleratedFailurePercentage: 100,
Comment: `ToleratedFailurePercentage is set to 100% because we want the distributed map state to complete processing all batches`,
ItemReader: {
Resource: 'arn:aws:states:::s3:getObject',
ReaderConfig: {
InputType: 'CSV',
CSVHeaderLocation: 'FIRST_ROW',
},
Parameters: {
Bucket: bucket.bucketName,
'Key.$': JsonPath.format(
`{}/${queryResultFileName}`,
JsonPath.stringAt('$$.Execution.StartTime'),
),
},
},
ItemBatcher: {
MaxItemsPerBatch: 50,
},
ItemProcessor: {
ProcessorConfig: {
Mode: 'DISTRIBUTED',
ExecutionType: 'STANDARD',
},
StartAt: 'UpdateZuoraAccounts',
States: {
UpdateZuoraAccounts: {
Type: 'Task',
Resource: 'arn:aws:states:::lambda:invoke',
OutputPath: '$.Payload',
Parameters: {
'Payload.$': '$',
FunctionName: updateZuoraAccountsLambda.functionArn,
},
Retry: [
{
ErrorEquals: [
'Lambda.ServiceException',
'Lambda.AWSLambdaException',
'Lambda.SdkClientException',
'Lambda.TooManyRequestsException',
],
IntervalSeconds: 2,
MaxAttempts: 6,
BackoffRate: 2,
},
{
ErrorEquals: ['ZuoraError'],
IntervalSeconds: 10,
MaxAttempts: 5,
BackoffRate: 5,
},
],
End: true,
},
},
},
ResultWriter: {
Resource: 'arn:aws:states:::s3:putObject',
Parameters: {
Bucket: bucket.bucketName,
'Prefix.$': JsonPath.stringAt('$$.Execution.StartTime'),
},
},
},
},
);
const getMapResult = new CustomState(this, 'GetMapResult', {
stateJson: {
Type: 'Task',
Resource: 'arn:aws:states:::aws-sdk:s3:getObject',
Parameters: {
'Bucket.$': JsonPath.stringAt('$.ResultWriterDetails.Bucket'),
'Key.$': JsonPath.stringAt('$.ResultWriterDetails.Key'),
},
ResultSelector: {
'Payload.$': JsonPath.stringToJson(JsonPath.stringAt('$.Body')),
},
OutputPath: '$.Payload',
},
});
const saveFailedRowsToS3 = new LambdaInvoke(this, 'SaveFailedRowsToS3', {
lambdaFunction: new GuLambdaFunction(this, 'SaveFailedRowsToS3Lambda', {
...lambdaDefaultConfig,
memorySize: 10240,
handler: 'saveFailedRowsToS3.handler',
functionName: `save-failed-rows-to-s3-${this.stage}`,
environment: {
...lambdaDefaultConfig.environment,
S3_BUCKET: bucket.bucketName,
},
initialPolicy: [
new PolicyStatement({
actions: ['s3:GetObject', 's3:PutObject'],
resources: [bucket.arnForObjects('*')],
}),
],
}),
payload: TaskInput.fromObject({
'resultFiles.$': '$.ResultFiles.FAILED',
filePath: JsonPath.format(
`{}/${failedRowsFileName}`,
JsonPath.stringAt('$$.Execution.StartTime'),
),
}),
resultSelector: {
failedRowsCount: JsonPath.numberAt('$.Payload.failedRowsCount'),
},
});
const constructNotificationData = new Pass(
this,
'ConstructNotificationData',
{
parameters: {
stateMachineExecutionDetailsUrl: JsonPath.format(
`https://{}.console.aws.amazon.com/states/home?region={}#/executions/details/{}`,
this.region,
this.region,
JsonPath.stringAt('$$.Execution.Id'),
),
queryResultFileUrl: JsonPath.format(
`https://s3.console.aws.amazon.com/s3/object/{}?region={}&prefix={}/{}`,
bucket.bucketName,
this.region,
JsonPath.stringAt('$$.Execution.StartTime'),
queryResultFileName,
),
failedRowsCount: JsonPath.numberAt('$.failedRowsCount'),
failedRowsFileUrl: JsonPath.format(
`https://s3.console.aws.amazon.com/s3/object/{}?region={}&prefix={}/{}`,
bucket.bucketName,
this.region,
JsonPath.stringAt('$$.Execution.StartTime'),
failedRowsFileName,
),
},
},
);
const sendCompletionNotification = new CustomState(
this,
'SendCompletionNotification',
{
stateJson: {
Type: 'Task',
Resource: 'arn:aws:states:::sns:publish',
Parameters: {
TopicArn: snsTopicArn,
'Message.$': JsonPath.format(
`Salesforce Disaster Recovery Re-syncing Procedure Completed For ${this.stage}\n\nThis notification is part of the Salesforce Disaster Recovery procedure explained in the runbook below:\n{}\n\nState machine execution details:\n{}\n\nAccounts to sync:\n{}\n\nAccounts that failed to update ({}):\n{}
`,
'https://docs.google.com/document/d/1_KxFtfKU3-3-PSzaAYG90uONa05AVgoBmyBDyu5SC5c/edit#heading=h.2r6eh2y6rjut',
JsonPath.stringAt('$.stateMachineExecutionDetailsUrl'),
JsonPath.stringAt('$.queryResultFileUrl'),
JsonPath.stringAt('$.failedRowsCount'),
JsonPath.stringAt('$.failedRowsFileUrl'),
),
MessageAttributes: {
app: {
DataType: 'String',
StringValue: app,
},
stage: {
DataType: 'String',
StringValue: this.stage,
},
},
},
ResultPath: JsonPath.stringAt('$.TaskResult'),
},
},
);
const stateMachine = new StateMachine(
this,
'SalesforceDisasterRecoveryStateMachine',
{
stateMachineName: `${app}-${this.stage}`,
definitionBody: DefinitionBody.fromChainable(
createSalesforceQueryJob
.next(waitForSalesforceQueryJobToComplete)
.next(getSalesforceQueryJobStatus)
.next(
new Choice(this, 'IsSalesforceQueryJobCompleted')
.when(
Condition.stringEquals('$.ResponseBody.state', 'JobComplete'),
new Choice(this, 'AreThereAccountsToResync')
.when(
Condition.numberEquals(
'$.ResponseBody.numberRecordsProcessed',
0,
),
new Pass(this, 'NoAccountsToResync'),
)
.otherwise(
saveSalesforceQueryResultToS3.next(
processCsvInDistributedMap
.next(getMapResult)
.next(saveFailedRowsToS3)
.next(
new Choice(this, 'IsHealthCheck')
.when(
Condition.stringMatches(
'$$.Execution.Name',
'health-check-*',
),
new Pass(this, 'HealthCheckSuccessful'),
)
.otherwise(
constructNotificationData
.next(sendCompletionNotification)
.next(
new Choice(
this,
'HaveAllAccountsBeenResynced',
)
.when(
Condition.numberEquals(
'$.failedRowsCount',
0,
),
new Pass(
this,
'AllAccountsHaveBeenResynced',
),
)
.otherwise(
new Pass(
this,
'SomeAccountsHaveFailedToUpdate',
{
stateName:
"SomeAccountsHaveFailedToUpdate - Open state input 'failedRowsFileUrl' to debug",
},
),
),
),
),
),
),
),
)
.otherwise(waitForSalesforceQueryJobToComplete),
),
),
},
);
stateMachine.role.attachInlinePolicy(
new Policy(
this,
'SalesforceDisasterRecoveryStateMachineRoleAdditionalPolicy',
{
statements: [
new PolicyStatement({
actions: ['states:InvokeHTTPEndpoint'],
resources: [stateMachine.stateMachineArn],
conditions: {
StringEquals: {
'states:HTTPMethod': 'POST',
'states:HTTPEndpoint': `${props.salesforceApiDomain}/services/data/v59.0/jobs/query`,
},
},
}),
new PolicyStatement({
actions: ['states:InvokeHTTPEndpoint'],
resources: [stateMachine.stateMachineArn],
conditions: {
StringEquals: {
'states:HTTPMethod': 'GET',
},
StringLike: {
'states:HTTPEndpoint': `${props.salesforceApiDomain}/services/data/v59.0/jobs/query/*`,
},
},
}),
new PolicyStatement({
actions: ['events:RetrieveConnectionCredentials'],
resources: [salesforceApiConnectionArn],
}),
new PolicyStatement({
actions: [
'secretsmanager:GetSecretValue',
'secretsmanager:DescribeSecret',
],
resources: [
`arn:aws:secretsmanager:${this.region}:${this.account}:secret:events!connection/${app}-${this.stage}-salesforce-api/*`,
],
}),
new PolicyStatement({
actions: ['s3:GetObject', 's3:PutObject'],
resources: [bucket.arnForObjects('*')],
}),
new PolicyStatement({
actions: ['states:StartExecution'],
resources: [stateMachine.stateMachineArn],
}),
new PolicyStatement({
actions: [
'states:RedriveExecution',
'states:DescribeExecution',
'states:StopExecution',
],
resources: [
`arn:aws:states:${this.region}:${this.account}:execution:${stateMachine.stateMachineName}/*`,
],
}),
new PolicyStatement({
actions: ['lambda:InvokeFunction'],
resources: [updateZuoraAccountsLambda.functionArn],
}),
new PolicyStatement({
actions: ['sns:Publish'],
resources: [snsTopicArn],
}),
],
},
),
);
}
}