constructor()

in core/src/data-generator/data-generator.ts [75:367]


  constructor(scope: Construct, id: string, props: DataGeneratorProps) {
    super(scope, id);

    const stack = Stack.of(this);
    this.sinkArn = props.sinkArn;
    this.dataset = props.dataset;
    this.frequency = props.frequency || 1800;

    // AWS Glue Database to store source Tables
    const datageneratorDB = SingletonGlueDatabase.getOrCreate(this, DataGenerator.DATA_GENERATOR_DATABASE);

    // Singleton Amazon S3 bucket for Amazon Athena Query logs
    const logBucket = SingletonBucket.getOrCreate(this, 'log');

    // Source table creation in Amazon Athena
    const createSourceTable = new SynchronousAthenaQuery(this, 'createSourceTable', {
      statement: this.dataset.parseCreateSourceQuery(
        datageneratorDB.databaseName,
        this.dataset.tableName+'_source',
        this.dataset.location.bucketName,
        this.dataset.location.objectKey),
      resultPath: {
        bucketName: logBucket.bucketName,
        objectKey: `data_generator/${this.dataset.tableName}/create_source`,
      },
      executionRoleStatements: [
        new PolicyStatement({
          resources: [
            stack.formatArn({
              account: Aws.ACCOUNT_ID,
              region: Aws.REGION,
              service: 'glue',
              resource: 'table',
              resourceName: DataGenerator.DATA_GENERATOR_DATABASE + '/' + this.dataset.tableName + '_source',
            }),
            stack.formatArn({
              account: Aws.ACCOUNT_ID,
              region: Aws.REGION,
              service: 'glue',
              resource: 'catalog',
            }),
            stack.formatArn({
              account: Aws.ACCOUNT_ID,
              region: Aws.REGION,
              service: 'glue',
              resource: 'database',
              resourceName: DataGenerator.DATA_GENERATOR_DATABASE,
            }),
          ],
          actions: [
            'glue:CreateTable',
            'glue:GetTable',
          ],
        }),
      ],
    });
    createSourceTable.node.addDependency(datageneratorDB);

    // Parse the sinkArn into ArnComponents and raise an error if it's not an Amazon S3 Sink
    const arn = Arn.split(this.sinkArn, ArnFormat.SLASH_RESOURCE_NAME);
    Bucket.fromBucketArn(this, 'Sink', this.sinkArn);

    // Target table creation in Amazon Athena
    const createTargetTable = new SynchronousAthenaQuery(this, 'createTargetTable', {
      statement: this.dataset.parseCreateTargetQuery(
        datageneratorDB.databaseName,
        this.dataset.tableName+'_target',
        arn.resource,
        this.dataset.tableName),
      resultPath: {
        bucketName: logBucket.bucketName,
        objectKey: `data_generator/${this.dataset.tableName}/create_target`,
      },
      executionRoleStatements: [
        new PolicyStatement({
          resources: [
            stack.formatArn({
              account: Aws.ACCOUNT_ID,
              region: Aws.REGION,
              service: 'glue',
              resource: 'table',
              resourceName: DataGenerator.DATA_GENERATOR_DATABASE + '/' + this.dataset.tableName + '_target',
            }),
            stack.formatArn({
              account: Aws.ACCOUNT_ID,
              region: Aws.REGION,
              service: 'glue',
              resource: 'catalog',
            }),
            stack.formatArn({
              account: Aws.ACCOUNT_ID,
              region: Aws.REGION,
              service: 'glue',
              resource: 'database',
              resourceName: DataGenerator.DATA_GENERATOR_DATABASE,
            }),
          ],
          actions: [
            'glue:CreateTable',
            'glue:GetTable',
          ],
        }),
      ],
    });
    createTargetTable.node.addDependency(datageneratorDB);

    // AWS Custom Resource to store the datetime offset only on creation
    const offsetCreate = new AwsCustomResource(this, 'offsetCreate', {
      onCreate: {
        service: 'SSM',
        action: 'putParameter',
        physicalResourceId: PhysicalResourceId.of(this.dataset.tableName + '_offset'),
        parameters: {
          Name: this.dataset.tableName + '_offset',
          Value: this.dataset.offset.toString(),
          Type: 'String',
        },
      },
      onDelete: {
        service: 'SSM',
        action: 'deleteParameter',
        physicalResourceId: PhysicalResourceId.of(this.dataset.tableName + '_offset'),
        parameters: {
          Name: this.dataset.tableName + '_offset',
        },
      },
      policy: AwsCustomResourcePolicy.fromStatements([
        new PolicyStatement({
          resources: [
            stack.formatArn({
              account: Aws.ACCOUNT_ID,
              region: Aws.REGION,
              service: 'ssm',
              resource: 'parameter',
              resourceName: this.dataset.tableName + '_offset',
            }),
          ],
          actions: [
            'ssm:PutParameter',
            'ssm:DeleteParameter',
            'ssm:GetParameter',
          ],
        }),
      ]),
      logRetention: RetentionDays.ONE_DAY,
    });

    // AWS Custom Resource to get the datetime offset from AWS SSM
    const offsetGet = new AwsCustomResource(this, 'offsetGet', {
      onCreate: {
        service: 'SSM',
        action: 'getParameter',
        physicalResourceId: PhysicalResourceId.of(Date.now().toString()),
        parameters: {
          Name: this.dataset.tableName + '_offset',
        },
      },
      onUpdate: {
        service: 'SSM',
        action: 'getParameter',
        physicalResourceId: PhysicalResourceId.of(Date.now().toString()),
        parameters: {
          Name: this.dataset.tableName + '_offset',
        },
      },
      policy: AwsCustomResourcePolicy.fromStatements([
        new PolicyStatement({
          resources: [
            stack.formatArn({
              account: Aws.ACCOUNT_ID,
              region: Aws.REGION,
              service: 'ssm',
              resource: 'parameter',
              resourceName: this.dataset.tableName + '_offset',
            }),
          ],
          actions: [
            'ssm:GetParameter',
          ],
        }),
      ]),
      logRetention: RetentionDays.ONE_DAY,
    });
    offsetGet.node.addDependency(offsetCreate);

    // AWS Lambda function to prepare data generation
    const querySetupFn = new PreBundledFunction(this, 'querySetupFn', {
      runtime: Runtime.PYTHON_3_8,
      codePath: 'data-generator/resources/lambdas/setup',
      handler: 'lambda.handler',
      logRetention: RetentionDays.ONE_DAY,
      timeout: Duration.seconds(30),
    });

    // AWS Step Functions task to prepare data generation
    const querySetupTask = new LambdaInvoke(this, 'querySetupTask', {
      lambdaFunction: querySetupFn,
      payload: TaskInput.fromObject({
        Offset: offsetGet.getResponseField('Parameter.Value'),
        Frequency: this.frequency,
        Statement: this.dataset.parseGenerateQuery(
          DataGenerator.DATA_GENERATOR_DATABASE,
          this.dataset.tableName+'_source',
          this.dataset.tableName+'_target',
        ),
      },
      ),
      outputPath: '$.Payload',
    });

    // AWS Step Functions Task to run an Amazon Athena Query for data generation
    const athenaQueryTask = new AthenaStartQueryExecution(this, 'dataGeneratorQuery', {
      queryString: JsonPath.stringAt('$'),
      timeout: Duration.minutes(5),
      workGroup: 'primary',
      integrationPattern: IntegrationPattern.RUN_JOB,
      resultConfiguration: {
        outputLocation: {
          bucketName: logBucket.bucketName,
          objectKey: `data_generator/${this.dataset.tableName}/generate`,
        },
      },
      queryExecutionContext: {
        databaseName: DataGenerator.DATA_GENERATOR_DATABASE,
      },
    });

    // AWS Step Functions State Machine to generate data
    const generatorStepFunctions = new StateMachine(this, 'dataGenerator', {
      definition: querySetupTask.next(athenaQueryTask),
      timeout: Duration.minutes(7),
    });

    // Add permissions for executing the INSERT INTO SELECT query
    generatorStepFunctions.addToRolePolicy(new PolicyStatement({
      resources: [
        stack.formatArn({
          account: Aws.ACCOUNT_ID,
          region: Aws.REGION,
          service: 'glue',
          resource: 'table',
          resourceName: DataGenerator.DATA_GENERATOR_DATABASE + '/' + this.dataset.tableName + '_target',
        }),
        stack.formatArn({
          account: Aws.ACCOUNT_ID,
          region: Aws.REGION,
          service: 'glue',
          resource: 'table',
          resourceName: DataGenerator.DATA_GENERATOR_DATABASE + '/' + this.dataset.tableName + '_source',
        }),
        stack.formatArn({
          account: Aws.ACCOUNT_ID,
          region: Aws.REGION,
          service: 'glue',
          resource: 'catalog',
        }),
        stack.formatArn({
          account: Aws.ACCOUNT_ID,
          region: Aws.REGION,
          service: 'glue',
          resource: 'database',
          resourceName: DataGenerator.DATA_GENERATOR_DATABASE,
        }),
      ],
      actions: [
        'glue:GetDatabase',
        'glue:GetTable',
      ],
    }));

    generatorStepFunctions.addToRolePolicy(new PolicyStatement({
      resources: [
        stack.formatArn({
          account: '',
          region: '',
          service: 's3',
          resource: arn.resource,
          resourceName: this.dataset.tableName + '/*',
        }),
      ],
      actions: [
        's3:AbortMultipartUpload',
        's3:ListBucketMultipartUploads',
        's3:ListMultipartUploadParts',
        's3:PutObject',
      ],
    }));

    // Amazon EventBridge Rule to trigger the AWS Step Functions
    new Rule(this, 'dataGeneratorTrigger', {
      schedule: Schedule.cron({ minute: `0/${Math.ceil(this.frequency/60)}` }),
      targets: [new SfnStateMachine(generatorStepFunctions, {})],
    });