in src/index.ts [203:393]
constructor(scope: cdk.Construct, id: string, props: SfnRedshiftTaskerProps) {
super(scope, id);
let lambdaP = path.join(__dirname, '../lambda');
let pythonP = path.join(lambdaP, 'python');
let rsIntegrationFunctionP = path.join(pythonP, 'rs_integration_function');
let ddbP = path.join(rsIntegrationFunctionP, 'ddb');
let ddbInitP = path.join(ddbP, '__init__.py');
let rsIntegrationFunctionEnvVarP = path.join(rsIntegrationFunctionP, 'environment_labels.py');
// eslint-disable-next-line @typescript-eslint/no-require-imports
const PropertiesReader = require('properties-reader');
const stripSurroundingQuotes = (x: string) => x.replace(/^['"](.+)['"]$/, '$1');
// Define helper function that return the string value from a Python string
let ddbProps = new PropertiesReader(ddbInitP);
const getDdbProp = (x: string) => stripSurroundingQuotes(ddbProps.get(x));
/**
* Make sure we set environment variable that our Lambda function expects.
*/
let DDB_TTL = getDdbProp('DDB_TTL');
let DDB_ID = getDdbProp('DDB_ID');
let DDB_INVOCATION_ID = getDdbProp('DDB_INVOCATION_ID');
let rsProcedureStarterEnvProps = new PropertiesReader(rsIntegrationFunctionEnvVarP);
const getRsProcedureStarterEnvProp = (x: string) => stripSurroundingQuotes(rsProcedureStarterEnvProps.get(x));
let CLUSTER_IDENTIFIER = getRsProcedureStarterEnvProp('CLUSTER_IDENTIFIER');
let DATABASE = getRsProcedureStarterEnvProp('DATABASE');
let DB_USER = getRsProcedureStarterEnvProp('DB_USER');
if (props.powertoolsArn === undefined) {
let powertools = new sam.CfnApplication(this, 'Powertools', {
location: {
applicationId: 'arn:aws:serverlessrepo:eu-west-1:057560766410:applications/aws-lambda-powertools-python-layer',
semanticVersion: '1.11.0',
},
});
this.powertoolsArn = powertools.getAtt('Outputs.LayerVersionArn').toString();
} else {
this.powertoolsArn = props.powertoolsArn;
}
let defaultDynamoTableProps = {
partitionKey: { name: DDB_ID, type: dynamodb.AttributeType.STRING },
sortKey: { name: DDB_INVOCATION_ID, type: dynamodb.AttributeType.STRING },
removalPolicy: cdk.RemovalPolicy.DESTROY,
timeToLiveAttribute: DDB_TTL,
};
let defaultLambdaFunctionProps = {
code: new lambda.AssetCode(rsIntegrationFunctionP),
handler: 'index.handler',
runtime: lambda.Runtime.PYTHON_3_8,
environment: {
// DynamoDB table environment variable gets automatically added by LambdaToDynamoDB
[CLUSTER_IDENTIFIER]: props.redshiftTargetProps.clusterIdentifier,
[DATABASE]: props.redshiftTargetProps.dbName,
[DB_USER]: props.redshiftTargetProps.dbUser,
[DDB_TTL]: '1', //Default time to live is 1 day.
LOG_LEVEL: props.logLevel || 'INFO',
},
layers: [lambda.LayerVersion.fromLayerVersionArn(this, 'powertoolsVersion', this.powertoolsArn)],
logRetention: logs.RetentionDays.ONE_YEAR,
timeout: cdk.Duration.seconds(29),
reservedConcurrentExecutions: 1, // Limit to 1 concurrent execution to allow safe checking concurrent invocations
};
const existingTableErr = 'Must pass existing helper table via "existingTableObj" if createCallBackInfra is set to false';
assert(props.createCallbackInfra || props.createCallbackInfra === undefined || props.existingTableObj !== undefined, existingTableErr);
// When an existing lambda function is provided re-use it otherwise create one using the provided properties
let lambdaDetails;
if (props.starterExistingLambdaObj === undefined) {
lambdaDetails = { lambdaFunctionProps: { ...defaultLambdaFunctionProps, ...props.starterLambdaFunctionProps } };
} else {
lambdaDetails = { existingLambdaObj: props.starterExistingLambdaObj };
}
// When an existing DDB table is provided re-use it otherwise create one using the provided properties
let ddbDetails;
if (props.existingTableObj === undefined) {
ddbDetails = { dynamoTableProps: { ...defaultDynamoTableProps, ...props.dynamoTableProps } };
} else {
ddbDetails = { existingTableObj: props.existingTableObj };
}
let lambda_ddb = new LambdaToDynamoDB(this, 'RSInvoker', {
...lambdaDetails,
...ddbDetails,
tablePermissions: props.tablePermissions || 'ReadWrite',
});
this.lambdaFunction = lambda_ddb.lambdaFunction;
this.trackingTable = lambda_ddb.dynamoTable;
let allowRedshiftDataApiExecuteStatement = new iam.PolicyStatement({
actions: ['redshift-data:ExecuteStatement', 'redshift-data:DescribeStatement',
'redshift-data:GetStatementResult', 'redshift-data:CancelStatement', 'redshift-data:ListStatements'],
effect: iam.Effect.ALLOW,
resources: ['*'],
});
let allowRedshiftGetCredentials = new iam.PolicyStatement({
actions: ['redshift:GetClusterCredentials'],
effect: iam.Effect.ALLOW,
resources: [
cdk.Fn.sub(
'arn:${AWS::Partition}:redshift:${AWS::Region}:${AWS::AccountId}:dbname:${ID}/${DB}',
{
ID: props.redshiftTargetProps.clusterIdentifier,
DB: props.redshiftTargetProps.dbName,
},
),
cdk.Fn.sub(
'arn:${AWS::Partition}:redshift:${AWS::Region}:${AWS::AccountId}:dbuser:${ID}/${DB_USER}',
{
ID: props.redshiftTargetProps.clusterIdentifier,
DB_USER: props.redshiftTargetProps.dbUser,
},
),
],
});
this.lambdaFunction.addToRolePolicy(allowRedshiftDataApiExecuteStatement);
this.lambdaFunction.addToRolePolicy(allowRedshiftGetCredentials);
if (props.createCallbackInfra === undefined || props.createCallbackInfra) {
let allowReportTaskOutcome = new iam.PolicyStatement({
actions: ['states:SendTaskSuccess', 'states:SendTaskFailure'],
effect: iam.Effect.ALLOW,
resources: ['*'],
});
let completerLambdaDetails;
if (props.completerExistingLambdaObj === undefined && props.completerLambdaFunctionProps === undefined ) {
//We fall back on re-using the function that starts execution.
completerLambdaDetails = { existingLambdaObj: this.lambdaFunction };
} else {
if (props.completerExistingLambdaObj === undefined) {
completerLambdaDetails = { lambdaFunctionProps: { ...defaultLambdaFunctionProps, ...props.completerLambdaFunctionProps } };
} else {
completerLambdaDetails = { existingLambdaObj: props.completerExistingLambdaObj };
}
}
let completerIntegration = new LambdaToDynamoDB(this, 'Completer', {
...completerLambdaDetails,
existingTableObj: this.trackingTable,
});
let eventQueue = new EventbridgeToSqs(
this,
'QueryFinished',
{
eventRuleProps: {
description: 'Monitor queries that have been issued by Redshift data API and that completed',
enabled: true,
eventPattern: {
source: ['aws.redshift-data'],
detailType: ['Redshift Data Statement Status Change'],
},
},
existingQueueObj: props.existingQueueObj,
queueProps: props.queueProps,
enableQueuePurging: props.enableQueuePurging,
deadLetterQueueProps: props.deadLetterQueueProps,
deployDeadLetterQueue: props.deployDeadLetterQueue,
maxReceiveCount: props.maxReceiveCount,
enableEncryptionWithCustomerManagedKey: props.enableEncryptionWithCustomerManagedKey,
encryptionKey: props.encryptionKey,
encryptionKeyProps: props.encryptionKeyProps,
},
);
new SqsToLambda(this, 'SqsToCompleter', {
existingLambdaObj: completerIntegration.lambdaFunction,
existingQueueObj: eventQueue.sqsQueue,
});
completerIntegration.lambdaFunction.addToRolePolicy(allowReportTaskOutcome);
} else {
// No callback infrastructure needed
let no_queue_err = 'Queue is part of SFN callback infra so cannot be provided if sfnCallbackSupport == false';
assert(props.existingQueueObj === undefined, no_queue_err);
assert(props.queueProps === undefined, no_queue_err);
assert(props.enableQueuePurging === undefined, no_queue_err);
assert(props.deadLetterQueueProps === undefined, no_queue_err);
assert(props.deployDeadLetterQueue === undefined, no_queue_err);
assert(props.maxReceiveCount === undefined, no_queue_err);
assert(props.enableEncryptionWithCustomerManagedKey === undefined, no_queue_err);
assert(props.encryptionKey === undefined, no_queue_err);
assert(props. encryptionKeyProps === undefined, no_queue_err);
}
}