in core/src/data-generator/data-generator.ts [75:367]
constructor(scope: Construct, id: string, props: DataGeneratorProps) {
super(scope, id);
const stack = Stack.of(this);
this.sinkArn = props.sinkArn;
this.dataset = props.dataset;
this.frequency = props.frequency || 1800;
// AWS Glue Database to store source Tables
const datageneratorDB = SingletonGlueDatabase.getOrCreate(this, DataGenerator.DATA_GENERATOR_DATABASE);
// Singleton Amazon S3 bucket for Amazon Athena Query logs
const logBucket = SingletonBucket.getOrCreate(this, 'log');
// Source table creation in Amazon Athena
const createSourceTable = new SynchronousAthenaQuery(this, 'createSourceTable', {
statement: this.dataset.parseCreateSourceQuery(
datageneratorDB.databaseName,
this.dataset.tableName+'_source',
this.dataset.location.bucketName,
this.dataset.location.objectKey),
resultPath: {
bucketName: logBucket.bucketName,
objectKey: `data_generator/${this.dataset.tableName}/create_source`,
},
executionRoleStatements: [
new PolicyStatement({
resources: [
stack.formatArn({
account: Aws.ACCOUNT_ID,
region: Aws.REGION,
service: 'glue',
resource: 'table',
resourceName: DataGenerator.DATA_GENERATOR_DATABASE + '/' + this.dataset.tableName + '_source',
}),
stack.formatArn({
account: Aws.ACCOUNT_ID,
region: Aws.REGION,
service: 'glue',
resource: 'catalog',
}),
stack.formatArn({
account: Aws.ACCOUNT_ID,
region: Aws.REGION,
service: 'glue',
resource: 'database',
resourceName: DataGenerator.DATA_GENERATOR_DATABASE,
}),
],
actions: [
'glue:CreateTable',
'glue:GetTable',
],
}),
],
});
createSourceTable.node.addDependency(datageneratorDB);
// Parse the sinkArn into ArnComponents and raise an error if it's not an Amazon S3 Sink
const arn = Arn.split(this.sinkArn, ArnFormat.SLASH_RESOURCE_NAME);
Bucket.fromBucketArn(this, 'Sink', this.sinkArn);
// Target table creation in Amazon Athena
const createTargetTable = new SynchronousAthenaQuery(this, 'createTargetTable', {
statement: this.dataset.parseCreateTargetQuery(
datageneratorDB.databaseName,
this.dataset.tableName+'_target',
arn.resource,
this.dataset.tableName),
resultPath: {
bucketName: logBucket.bucketName,
objectKey: `data_generator/${this.dataset.tableName}/create_target`,
},
executionRoleStatements: [
new PolicyStatement({
resources: [
stack.formatArn({
account: Aws.ACCOUNT_ID,
region: Aws.REGION,
service: 'glue',
resource: 'table',
resourceName: DataGenerator.DATA_GENERATOR_DATABASE + '/' + this.dataset.tableName + '_target',
}),
stack.formatArn({
account: Aws.ACCOUNT_ID,
region: Aws.REGION,
service: 'glue',
resource: 'catalog',
}),
stack.formatArn({
account: Aws.ACCOUNT_ID,
region: Aws.REGION,
service: 'glue',
resource: 'database',
resourceName: DataGenerator.DATA_GENERATOR_DATABASE,
}),
],
actions: [
'glue:CreateTable',
'glue:GetTable',
],
}),
],
});
createTargetTable.node.addDependency(datageneratorDB);
// AWS Custom Resource to store the datetime offset only on creation
const offsetCreate = new AwsCustomResource(this, 'offsetCreate', {
onCreate: {
service: 'SSM',
action: 'putParameter',
physicalResourceId: PhysicalResourceId.of(this.dataset.tableName + '_offset'),
parameters: {
Name: this.dataset.tableName + '_offset',
Value: this.dataset.offset.toString(),
Type: 'String',
},
},
onDelete: {
service: 'SSM',
action: 'deleteParameter',
physicalResourceId: PhysicalResourceId.of(this.dataset.tableName + '_offset'),
parameters: {
Name: this.dataset.tableName + '_offset',
},
},
policy: AwsCustomResourcePolicy.fromStatements([
new PolicyStatement({
resources: [
stack.formatArn({
account: Aws.ACCOUNT_ID,
region: Aws.REGION,
service: 'ssm',
resource: 'parameter',
resourceName: this.dataset.tableName + '_offset',
}),
],
actions: [
'ssm:PutParameter',
'ssm:DeleteParameter',
'ssm:GetParameter',
],
}),
]),
logRetention: RetentionDays.ONE_DAY,
});
// AWS Custom Resource to get the datetime offset from AWS SSM
const offsetGet = new AwsCustomResource(this, 'offsetGet', {
onCreate: {
service: 'SSM',
action: 'getParameter',
physicalResourceId: PhysicalResourceId.of(Date.now().toString()),
parameters: {
Name: this.dataset.tableName + '_offset',
},
},
onUpdate: {
service: 'SSM',
action: 'getParameter',
physicalResourceId: PhysicalResourceId.of(Date.now().toString()),
parameters: {
Name: this.dataset.tableName + '_offset',
},
},
policy: AwsCustomResourcePolicy.fromStatements([
new PolicyStatement({
resources: [
stack.formatArn({
account: Aws.ACCOUNT_ID,
region: Aws.REGION,
service: 'ssm',
resource: 'parameter',
resourceName: this.dataset.tableName + '_offset',
}),
],
actions: [
'ssm:GetParameter',
],
}),
]),
logRetention: RetentionDays.ONE_DAY,
});
offsetGet.node.addDependency(offsetCreate);
// AWS Lambda function to prepare data generation
const querySetupFn = new PreBundledFunction(this, 'querySetupFn', {
runtime: Runtime.PYTHON_3_8,
codePath: 'data-generator/resources/lambdas/setup',
handler: 'lambda.handler',
logRetention: RetentionDays.ONE_DAY,
timeout: Duration.seconds(30),
});
// AWS Step Functions task to prepare data generation
const querySetupTask = new LambdaInvoke(this, 'querySetupTask', {
lambdaFunction: querySetupFn,
payload: TaskInput.fromObject({
Offset: offsetGet.getResponseField('Parameter.Value'),
Frequency: this.frequency,
Statement: this.dataset.parseGenerateQuery(
DataGenerator.DATA_GENERATOR_DATABASE,
this.dataset.tableName+'_source',
this.dataset.tableName+'_target',
),
},
),
outputPath: '$.Payload',
});
// AWS Step Functions Task to run an Amazon Athena Query for data generation
const athenaQueryTask = new AthenaStartQueryExecution(this, 'dataGeneratorQuery', {
queryString: JsonPath.stringAt('$'),
timeout: Duration.minutes(5),
workGroup: 'primary',
integrationPattern: IntegrationPattern.RUN_JOB,
resultConfiguration: {
outputLocation: {
bucketName: logBucket.bucketName,
objectKey: `data_generator/${this.dataset.tableName}/generate`,
},
},
queryExecutionContext: {
databaseName: DataGenerator.DATA_GENERATOR_DATABASE,
},
});
// AWS Step Functions State Machine to generate data
const generatorStepFunctions = new StateMachine(this, 'dataGenerator', {
definition: querySetupTask.next(athenaQueryTask),
timeout: Duration.minutes(7),
});
// Add permissions for executing the INSERT INTO SELECT query
generatorStepFunctions.addToRolePolicy(new PolicyStatement({
resources: [
stack.formatArn({
account: Aws.ACCOUNT_ID,
region: Aws.REGION,
service: 'glue',
resource: 'table',
resourceName: DataGenerator.DATA_GENERATOR_DATABASE + '/' + this.dataset.tableName + '_target',
}),
stack.formatArn({
account: Aws.ACCOUNT_ID,
region: Aws.REGION,
service: 'glue',
resource: 'table',
resourceName: DataGenerator.DATA_GENERATOR_DATABASE + '/' + this.dataset.tableName + '_source',
}),
stack.formatArn({
account: Aws.ACCOUNT_ID,
region: Aws.REGION,
service: 'glue',
resource: 'catalog',
}),
stack.formatArn({
account: Aws.ACCOUNT_ID,
region: Aws.REGION,
service: 'glue',
resource: 'database',
resourceName: DataGenerator.DATA_GENERATOR_DATABASE,
}),
],
actions: [
'glue:GetDatabase',
'glue:GetTable',
],
}));
generatorStepFunctions.addToRolePolicy(new PolicyStatement({
resources: [
stack.formatArn({
account: '',
region: '',
service: 's3',
resource: arn.resource,
resourceName: this.dataset.tableName + '/*',
}),
],
actions: [
's3:AbortMultipartUpload',
's3:ListBucketMultipartUploads',
's3:ListMultipartUploadParts',
's3:PutObject',
],
}));
// Amazon EventBridge Rule to trigger the AWS Step Functions
new Rule(this, 'dataGeneratorTrigger', {
schedule: Schedule.cron({ minute: `0/${Math.ceil(this.frequency/60)}` }),
targets: [new SfnStateMachine(generatorStepFunctions, {})],
});