constructor()

in src/index.ts [203:393]


  constructor(scope: cdk.Construct, id: string, props: SfnRedshiftTaskerProps) {
    super(scope, id);

    let lambdaP = path.join(__dirname, '../lambda');
    let pythonP = path.join(lambdaP, 'python');
    let rsIntegrationFunctionP = path.join(pythonP, 'rs_integration_function');
    let ddbP = path.join(rsIntegrationFunctionP, 'ddb');
    let ddbInitP = path.join(ddbP, '__init__.py');
    let rsIntegrationFunctionEnvVarP = path.join(rsIntegrationFunctionP, 'environment_labels.py');
    // eslint-disable-next-line @typescript-eslint/no-require-imports
    const PropertiesReader = require('properties-reader');
    const stripSurroundingQuotes = (x: string) => x.replace(/^['"](.+)['"]$/, '$1');
    // Define helper function that return the string value from a Python string
    let ddbProps = new PropertiesReader(ddbInitP);
    const getDdbProp = (x: string) => stripSurroundingQuotes(ddbProps.get(x));

    /**
     * Make sure we set environment variable that our Lambda function expects.
     */
    let DDB_TTL = getDdbProp('DDB_TTL');
    let DDB_ID = getDdbProp('DDB_ID');
    let DDB_INVOCATION_ID = getDdbProp('DDB_INVOCATION_ID');

    let rsProcedureStarterEnvProps = new PropertiesReader(rsIntegrationFunctionEnvVarP);
    const getRsProcedureStarterEnvProp = (x: string) => stripSurroundingQuotes(rsProcedureStarterEnvProps.get(x));

    let CLUSTER_IDENTIFIER = getRsProcedureStarterEnvProp('CLUSTER_IDENTIFIER');
    let DATABASE = getRsProcedureStarterEnvProp('DATABASE');
    let DB_USER = getRsProcedureStarterEnvProp('DB_USER');

    if (props.powertoolsArn === undefined) {
      let powertools = new sam.CfnApplication(this, 'Powertools', {
        location: {
          applicationId: 'arn:aws:serverlessrepo:eu-west-1:057560766410:applications/aws-lambda-powertools-python-layer',
          semanticVersion: '1.11.0',
        },
      });
      this.powertoolsArn = powertools.getAtt('Outputs.LayerVersionArn').toString();
    } else {
      this.powertoolsArn = props.powertoolsArn;
    }

    let defaultDynamoTableProps = {
      partitionKey: { name: DDB_ID, type: dynamodb.AttributeType.STRING },
      sortKey: { name: DDB_INVOCATION_ID, type: dynamodb.AttributeType.STRING },
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      timeToLiveAttribute: DDB_TTL,
    };

    let defaultLambdaFunctionProps = {
      code: new lambda.AssetCode(rsIntegrationFunctionP),
      handler: 'index.handler',
      runtime: lambda.Runtime.PYTHON_3_8,
      environment: {
        // DynamoDB table environment variable gets automatically added by LambdaToDynamoDB
        [CLUSTER_IDENTIFIER]: props.redshiftTargetProps.clusterIdentifier,
        [DATABASE]: props.redshiftTargetProps.dbName,
        [DB_USER]: props.redshiftTargetProps.dbUser,
        [DDB_TTL]: '1', //Default time to live is 1 day.
        LOG_LEVEL: props.logLevel || 'INFO',
      },
      layers: [lambda.LayerVersion.fromLayerVersionArn(this, 'powertoolsVersion', this.powertoolsArn)],
      logRetention: logs.RetentionDays.ONE_YEAR,
      timeout: cdk.Duration.seconds(29),
      reservedConcurrentExecutions: 1, // Limit to 1 concurrent execution to allow safe checking concurrent invocations
    };
    const existingTableErr = 'Must pass existing helper table via "existingTableObj" if createCallBackInfra is set to false';
    assert(props.createCallbackInfra || props.createCallbackInfra === undefined || props.existingTableObj !== undefined, existingTableErr);

    // When an existing lambda function is provided re-use it otherwise create one using the provided properties
    let lambdaDetails;
    if (props.starterExistingLambdaObj === undefined) {
      lambdaDetails = { lambdaFunctionProps: { ...defaultLambdaFunctionProps, ...props.starterLambdaFunctionProps } };
    } else {
      lambdaDetails = { existingLambdaObj: props.starterExistingLambdaObj };
    }

    // When an existing DDB table is provided re-use it otherwise create one using the provided properties
    let ddbDetails;
    if (props.existingTableObj === undefined) {
      ddbDetails = { dynamoTableProps: { ...defaultDynamoTableProps, ...props.dynamoTableProps } };
    } else {
      ddbDetails = { existingTableObj: props.existingTableObj };
    }
    let lambda_ddb = new LambdaToDynamoDB(this, 'RSInvoker', {
      ...lambdaDetails,
      ...ddbDetails,
      tablePermissions: props.tablePermissions || 'ReadWrite',
    });
    this.lambdaFunction = lambda_ddb.lambdaFunction;

    this.trackingTable = lambda_ddb.dynamoTable;

    let allowRedshiftDataApiExecuteStatement = new iam.PolicyStatement({
      actions: ['redshift-data:ExecuteStatement', 'redshift-data:DescribeStatement',
        'redshift-data:GetStatementResult', 'redshift-data:CancelStatement', 'redshift-data:ListStatements'],
      effect: iam.Effect.ALLOW,
      resources: ['*'],
    });

    let allowRedshiftGetCredentials = new iam.PolicyStatement({
      actions: ['redshift:GetClusterCredentials'],
      effect: iam.Effect.ALLOW,
      resources: [
        cdk.Fn.sub(
          'arn:${AWS::Partition}:redshift:${AWS::Region}:${AWS::AccountId}:dbname:${ID}/${DB}',
          {
            ID: props.redshiftTargetProps.clusterIdentifier,
            DB: props.redshiftTargetProps.dbName,
          },
        ),
        cdk.Fn.sub(
          'arn:${AWS::Partition}:redshift:${AWS::Region}:${AWS::AccountId}:dbuser:${ID}/${DB_USER}',
          {
            ID: props.redshiftTargetProps.clusterIdentifier,
            DB_USER: props.redshiftTargetProps.dbUser,
          },
        ),
      ],
    });

    this.lambdaFunction.addToRolePolicy(allowRedshiftDataApiExecuteStatement);
    this.lambdaFunction.addToRolePolicy(allowRedshiftGetCredentials);

    if (props.createCallbackInfra === undefined || props.createCallbackInfra) {
      let allowReportTaskOutcome = new iam.PolicyStatement({
        actions: ['states:SendTaskSuccess', 'states:SendTaskFailure'],
        effect: iam.Effect.ALLOW,
        resources: ['*'],
      });

      let completerLambdaDetails;
      if (props.completerExistingLambdaObj === undefined && props.completerLambdaFunctionProps === undefined ) {
        //We fall back on re-using the function that starts execution.
        completerLambdaDetails = { existingLambdaObj: this.lambdaFunction };
      } else {
        if (props.completerExistingLambdaObj === undefined) {
          completerLambdaDetails = { lambdaFunctionProps: { ...defaultLambdaFunctionProps, ...props.completerLambdaFunctionProps } };
        } else {
          completerLambdaDetails = { existingLambdaObj: props.completerExistingLambdaObj };
        }
      }
      let completerIntegration = new LambdaToDynamoDB(this, 'Completer', {
        ...completerLambdaDetails,
        existingTableObj: this.trackingTable,
      });

      let eventQueue = new EventbridgeToSqs(
        this,
        'QueryFinished',
        {
          eventRuleProps: {
            description: 'Monitor queries that have been issued by Redshift data API and that completed',
            enabled: true,
            eventPattern: {
              source: ['aws.redshift-data'],
              detailType: ['Redshift Data Statement Status Change'],
            },
          },
          existingQueueObj: props.existingQueueObj,
          queueProps: props.queueProps,
          enableQueuePurging: props.enableQueuePurging,
          deadLetterQueueProps: props.deadLetterQueueProps,
          deployDeadLetterQueue: props.deployDeadLetterQueue,
          maxReceiveCount: props.maxReceiveCount,
          enableEncryptionWithCustomerManagedKey: props.enableEncryptionWithCustomerManagedKey,
          encryptionKey: props.encryptionKey,
          encryptionKeyProps: props.encryptionKeyProps,
        },
      );

      new SqsToLambda(this, 'SqsToCompleter', {
        existingLambdaObj: completerIntegration.lambdaFunction,
        existingQueueObj: eventQueue.sqsQueue,
      });
      completerIntegration.lambdaFunction.addToRolePolicy(allowReportTaskOutcome);
    } else {
      // No callback infrastructure needed
      let no_queue_err = 'Queue is part of SFN callback infra so cannot be provided if sfnCallbackSupport == false';
      assert(props.existingQueueObj === undefined, no_queue_err);
      assert(props.queueProps === undefined, no_queue_err);
      assert(props.enableQueuePurging === undefined, no_queue_err);
      assert(props.deadLetterQueueProps === undefined, no_queue_err);
      assert(props.deployDeadLetterQueue === undefined, no_queue_err);
      assert(props.maxReceiveCount === undefined, no_queue_err);
      assert(props.enableEncryptionWithCustomerManagedKey === undefined, no_queue_err);
      assert(props.encryptionKey === undefined, no_queue_err);
      assert(props. encryptionKeyProps === undefined, no_queue_err);
    }

  }