in source/cdk-infrastructure/lib/bi-reporting/etl/etl-construct.ts [60:446]
constructor(scope: Construct, id: string, props: IEtlProps) {
super(scope, id);
const sourceCodeBucket = Bucket.fromBucketName(this, 'sourceCodeBucket', props.sourceCodeBucketName);
// Glue job files name and the S3 bucket prefix to store files
const glueJobScriptsPrefix = 'glue-job-scripts';
const glueConfigurationJobScriptName = 'configuration.py';
const glueConvertParquetJobScriptName = 'convert_parquet.py';
const glueUpdateCrawlerJobScriptName = 'update_crawler.py';
const { csvPrefix, manifestPrefix, machineInformationPrefix, machineConfigInformationPrefix } = props.metadataConfiguration;
this.glueBucket = SolutionsConstructsCore.buildS3Bucket(this, {
bucketProps: {
serverAccessLogsBucket: props.s3LoggingBucket,
serverAccessLogsPrefix: 'parquet/'
}
}, 'Parquet')[0];
this.glueMetadataBucket = SolutionsConstructsCore.buildS3Bucket(this, {
bucketProps: {
serverAccessLogsBucket: props.s3LoggingBucket,
serverAccessLogsPrefix: 'metadata/'
}
}, 'Metadata')[0];
this.glueDatabase = new Database(this, 'GlueDatabase', {
databaseName: `${props.lowerCaseStackName}-database`
});
this.glueTable = new GlueTable(this, 'GlueTable', {
columns: [
{ name: 'quality', type: Schema.STRING },
{ name: 'value', type: Schema.STRING },
{ name: 'timestamp', type: Schema.STRING },
{ name: 'tag', type: Schema.STRING },
{ name: 'id', type: Schema.STRING }
],
dataFormat: DataFormat.PARQUET,
database: this.glueDatabase,
tableName: Fn.join('_', Fn.split('-', this.glueBucket.bucketName)),
bucket: this.glueBucket,
description: `Table for ${Aws.STACK_NAME} CloudFormation stack`,
partitionKeys: [
{ name: 'Partition_0', type: Schema.STRING, comment: 'Year' },
{ name: 'Partition_1', type: Schema.STRING, comment: 'Month' },
{ name: 'Partition_2', type: Schema.STRING, comment: 'Day' },
]
});
/**
* Glue custom resource to copy Glue job scripts and create initial S3 configuration and manifest files.
*/
const solutionHelperEtlPolicy = new Policy(this, 'SolutionHelperEtlPolicy', {
statements: [
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:GetObject'],
resources: [`${sourceCodeBucket.bucketArn}/*`]
}),
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:PutObject'],
resources: [
`${this.glueMetadataBucket.bucketArn}/${csvPrefix}/*`,
`${this.glueMetadataBucket.bucketArn}/${manifestPrefix}/*`,
`${this.glueMetadataBucket.bucketArn}/${glueJobScriptsPrefix}/*`
]
})
],
roles: [props.solutionHelperFunction.role!]
});
this.glueCustomResource = new CustomResource(this, 'GlueCustomResource', {
serviceToken: props.solutionHelperFunction.functionArn,
properties: {
Action: 'CONFIGURE_ETL',
SourceBucket: sourceCodeBucket.bucketName,
SourcePrefix: props.sourceCodeKeyPrefix,
GlueJobScriptsPrefix: glueJobScriptsPrefix,
GlueJobScripts: [
glueConfigurationJobScriptName,
glueConvertParquetJobScriptName,
glueUpdateCrawlerJobScriptName
],
CsvPrefix: csvPrefix,
ManifestPrefix: manifestPrefix,
MachineInformationPrefix: machineInformationPrefix,
MachineConfigInformationPrefix: machineConfigInformationPrefix,
DestinationBucket: this.glueMetadataBucket.bucketName
}
});
this.glueCustomResource.node.addDependency(solutionHelperEtlPolicy);
/**
* Glue workflow to create machine configuration S3 files and convert the raw data to parquet.
* An initial trigger will be triggered every 1 AM.
* 1) Updates machine information and machine config information to S3.
* 2) Converts the raw data to parquet and unnest array to JSON objects.
* 3) Crawls the parquet data to the Glue table.
* 4) (One time) Updates the Glue crawler configuration to crawl incrementally.
*/
const glueWorkflow = new CfnWorkflow(this, 'GlueWorkflow', {
description: `Workflow for ${Aws.STACK_NAME} CloudFormation stack`
});
// Machine configuration job
const glueConfigurationJobRole = new Role(this, 'GlueConfigurationJobRole', {
assumedBy: new ServicePrincipal('glue.amazonaws.com'),
path: '/service-role/'
});
glueConfigurationJobRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: [
'dynamodb:GetItem',
'dynamodb:Scan'
],
resources: [
props.configTable.tableArn,
props.uiReferenceTable.tableArn
]
}));
glueConfigurationJobRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:PutObject'],
resources: [
`${this.glueMetadataBucket.bucketArn}/csv/*`,
`${this.glueMetadataBucket.bucketArn}/manifest/*`
]
}));
const glueConfigurationJob = new CfnJob(this, 'GlueConfigurationJob', {
command: {
name: `glueetl`,
pythonVersion: '3',
scriptLocation: `s3://${this.glueMetadataBucket.bucketName}/${glueJobScriptsPrefix}/${glueConfigurationJobScriptName}`
},
role: glueConfigurationJobRole.roleName,
defaultArguments: {
'--config_table': props.configTable.tableName,
'--ui_reference_table': props.uiReferenceTable.tableName,
'--output_bucket': this.glueMetadataBucket.bucketName,
'--csv_prefix': csvPrefix,
'--manifest_prefix': manifestPrefix,
'--machine_information_csv': `${machineInformationPrefix}.csv`,
'--machine_config_information_csv': `${machineConfigInformationPrefix}.csv`,
'--machine_information_manifest': `${machineInformationPrefix}_${manifestPrefix}.json`,
'--machine_config_information_manifest': `${machineConfigInformationPrefix}_${manifestPrefix}.json`,
'--additional-python-modules': 'botocore>=1.20.12,boto3>=1.17.12',
'--user_agent_extra': `{"user_agent_extra": "AwsSolution/${props.solutionId}/${props.solutionVersion}"}`
},
description: `Glue configuration job for ${Aws.STACK_NAME} CloudFormation stack`,
glueVersion: '2.0'
});
glueConfigurationJob.node.addDependency(this.glueCustomResource);
new CfnTrigger(this, 'GlueConfigurationJobTrigger', { // NOSONAR: typescript:S1848
name: `${Aws.STACK_NAME}-ConfigurationJobTrigger`,
actions: [{ jobName: glueConfigurationJob.ref }],
type: 'SCHEDULED',
description: `Glue configuration job scheduled trigger for ${Aws.STACK_NAME} CloudFormation stack`,
schedule: 'cron(0 1 * * ? *)',
startOnCreation: true,
workflowName: glueWorkflow.ref
});
// Parquet conversion job
const glueConvertParquetJobRole = new Role(this, 'GlueConvertParquetJobRole', {
assumedBy: new ServicePrincipal('glue.amazonaws.com'),
path: '/service-role/'
});
glueConvertParquetJobRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: ['dynamodb:GetItem'],
resources: [props.configTable.tableArn]
}));
glueConvertParquetJobRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:ListBucket'],
resources: [
`arn:${Aws.PARTITION}:s3:::${props.rawDataS3BucketName}`,
`${this.glueBucket.bucketArn}`
]
}));
glueConvertParquetJobRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:GetObject'],
resources: [
`arn:${Aws.PARTITION}:s3:::${props.rawDataS3BucketName}/*`,
`${this.glueBucket.bucketArn}/*`
]
}));
glueConvertParquetJobRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: [
's3:DeleteObject',
's3:PutObject'
],
resources: [
`${this.glueBucket.bucketArn}/*`
]
}));
const glueConvertParquetJob = new CfnJob(this, 'GlueConvertParquetJob', {
command: {
name: `glueetl`,
pythonVersion: '3',
scriptLocation: `s3://${this.glueMetadataBucket.bucketName}/${glueJobScriptsPrefix}/${glueConvertParquetJobScriptName}`
},
role: glueConvertParquetJobRole.roleName,
defaultArguments: {
'--config_table': props.configTable.tableName,
'--input_bucket': props.rawDataS3BucketName,
'--output_bucket': this.glueBucket.bucketName,
'--additional-python-modules': 'botocore>=1.20.12,boto3>=1.17.12',
'--user_agent_extra': `{"user_agent_extra": "AwsSolution/${props.solutionId}/${props.solutionVersion}"}`
},
description: `Glue parquet conversion job for ${Aws.STACK_NAME} CloudFormation stack`,
glueVersion: '2.0'
});
glueConvertParquetJob.node.addDependency(this.glueCustomResource);
new CfnTrigger(this, 'GlueConvertParquetJobTrigger', { // NOSONAR: typescript:S1848
name: `${Aws.STACK_NAME}-ConvertParquetJobTrigger`,
actions: [{ jobName: glueConvertParquetJob.ref }],
type: 'CONDITIONAL',
description: `Glue parquet conversion job scheduled trigger for ${Aws.STACK_NAME} CloudFormation stack`,
predicate: {
conditions: [{
jobName: glueConfigurationJob.ref,
logicalOperator: 'EQUALS',
state: 'SUCCEEDED'
}]
},
startOnCreation: true,
workflowName: glueWorkflow.ref
});
// Glue crawler
const glueCrawlerRole = new Role(this, 'GlueCrawlerRole', {
assumedBy: new ServicePrincipal('glue.amazonaws.com'),
path: '/service-role/'
});
glueCrawlerRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:ListBucket'],
resources: [`${this.glueBucket.bucketArn}`]
}));
glueCrawlerRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:GetObject'],
resources: [`${this.glueBucket.bucketArn}/*`]
}));
glueCrawlerRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: ['glue:GetDatabase'],
resources: [
`arn:${Aws.PARTITION}:glue:${Aws.REGION}:${Aws.ACCOUNT_ID}:catalog`,
`${this.glueDatabase.databaseArn}`
]
}));
glueCrawlerRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: [
'glue:GetTable',
'glue:CreateTable',
'glue:UpdateTable',
'glue:BatchGetPartition',
'glue:BatchCreatePartition'
],
resources: [
`arn:${Aws.PARTITION}:glue:${Aws.REGION}:${Aws.ACCOUNT_ID}:catalog`,
`${this.glueDatabase.databaseArn}`,
`arn:${Aws.PARTITION}:glue:${Aws.REGION}:${Aws.ACCOUNT_ID}:table/${this.glueDatabase.databaseName}/*`
]
}));
const glueCrawler = new CfnCrawler(this, 'GlueCrawler', {
role: glueCrawlerRole.roleArn,
targets: {
s3Targets: [{
path: `s3://${this.glueBucket.bucketName}/`
}]
},
databaseName: this.glueDatabase.databaseName,
description: `Glue crawler for ${Aws.STACK_NAME} CloudFormation stack`,
schemaChangePolicy: {
deleteBehavior: 'LOG',
updateBehavior: 'UPDATE_IN_DATABASE'
}
});
new CfnTrigger(this, 'GlueCrawlerTrigger', { // NOSONAR: typescript:S1848
name: `${Aws.STACK_NAME}-CrawlerTrigger`,
actions: [{ crawlerName: glueCrawler.ref }],
type: 'CONDITIONAL',
description: `Glue crawler trigger for ${Aws.STACK_NAME} CloudFormation stack`,
predicate: {
conditions: [{
jobName: glueConvertParquetJob.ref,
logicalOperator: 'EQUALS',
state: 'SUCCEEDED'
}]
},
startOnCreation: true,
workflowName: glueWorkflow.ref
});
// Glue crawler configuration update job
const glueUpdateCrawlerJobRole = new Role(this, 'GlueUpdateCrawlerJobRole', {
assumedBy: new ServicePrincipal('glue.amazonaws.com'),
path: '/service-role/'
});
glueUpdateCrawlerJobRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: ['glue:UpdateCrawler'],
resources: [`arn:${Aws.PARTITION}:glue:${Aws.REGION}:${Aws.ACCOUNT_ID}:crawler/${glueCrawler.ref}`]
}));
glueUpdateCrawlerJobRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: ['glue:StopTrigger'],
resources: [`arn:${Aws.PARTITION}:glue:${Aws.REGION}:${Aws.ACCOUNT_ID}:trigger/${Aws.STACK_NAME}-UpdateCrawlerJobTrigger`]
}));
const glueUpdateCrawlerJob = new CfnJob(this, 'GlueUpdateCrawlerJob', {
command: {
name: `glueetl`,
pythonVersion: '3',
scriptLocation: `s3://${this.glueMetadataBucket.bucketName}/${glueJobScriptsPrefix}/${glueUpdateCrawlerJobScriptName}`
},
role: glueUpdateCrawlerJobRole.roleName,
defaultArguments: {
'--glue_crawler': glueCrawler.ref,
'--glue_trigger': `${Aws.STACK_NAME}-UpdateCrawlerJobTrigger`,
'--additional-python-modules': 'botocore>=1.20.12,boto3>=1.17.12',
'--user_agent_extra': `{"user_agent_extra": "AwsSolution/${props.solutionId}/${props.solutionVersion}"}`
},
description: `Glue crawler update job for ${Aws.STACK_NAME} CloudFormation stack`,
glueVersion: '2.0',
});
glueUpdateCrawlerJob.node.addDependency(this.glueCustomResource);
new CfnTrigger(this, 'GlueUpdateCrawlerJobTrigger', { // NOSONAR: typescript:S1848
name: `${Aws.STACK_NAME}-UpdateCrawlerJobTrigger`,
actions: [{ jobName: glueUpdateCrawlerJob.ref }],
type: 'CONDITIONAL',
description: `Glue crawler update job scheduled trigger for ${Aws.STACK_NAME} CloudFormation stack`,
predicate: {
conditions: [{
crawlerName: glueCrawler.ref,
logicalOperator: 'EQUALS',
crawlState: 'SUCCEEDED'
}]
},
startOnCreation: true,
workflowName: glueWorkflow.ref
});
new Policy(this, 'GlueCommonPolicy', { // NOSONAR: typescript:S1848
statements: [
new PolicyStatement({
effect: Effect.ALLOW,
actions: [
's3:GetObject',
's3:PutObject',
's3:DeleteObject'
],
resources: [`${this.glueMetadataBucket.bucketArn}/${glueJobScriptsPrefix}/*`]
}),
new PolicyStatement({
effect: Effect.ALLOW,
actions: [
'logs:CreateLogGroup',
'logs:CreateLogStream',
'logs:PutLogEvents',
],
resources: [`arn:${Aws.PARTITION}:logs:${Aws.REGION}:${Aws.ACCOUNT_ID}:log-group:/aws-glue/*`]
})
],
roles: [
glueConfigurationJobRole,
glueConvertParquetJobRole,
glueUpdateCrawlerJobRole,
glueCrawlerRole
]
});
}