packages/constructs/L3/dataops/dataops-job-l3-construct/lib/dataops-job-l3-construct.ts (224 lines of code) (raw):

/*! * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ import { DataOpsProjectUtils } from '@aws-mdaa/dataops-project-l3-construct'; import { EventBridgeHelper } from '@aws-mdaa/eventbridge-helper'; import { MdaaCfnJob } from '@aws-mdaa/glue-constructs'; import { MdaaRole } from '@aws-mdaa/iam-constructs'; import { MdaaBucket } from '@aws-mdaa/s3-constructs'; import { MdaaL3Construct, MdaaL3ConstructProps } from '@aws-mdaa/l3-construct'; import { CfnJob } from 'aws-cdk-lib/aws-glue'; import { BucketDeployment, Source } from 'aws-cdk-lib/aws-s3-deployment'; import { MdaaNagSuppressions } from '@aws-mdaa/construct'; //NOSONAR import { Construct } from 'constructs'; import * as path from 'path'; import { SnsTopic } from 'aws-cdk-lib/aws-events-targets'; import { MdaaSnsTopic } from '@aws-mdaa/sns-constructs'; import { Rule } from 'aws-cdk-lib/aws-events'; import { Fn } from 'aws-cdk-lib'; import { ConfigurationElement } from '@aws-mdaa/config'; export type JobCommandPythonVersion = '2' | '3' | undefined; export type JobCommandName = 'glueetl' | 'pythonshell'; export interface JobCommand { /** * "glueetl" | "pythonshell" */ readonly name: JobCommandName; /** * "2" | "3" | undefined */ readonly pythonVersion?: JobCommandPythonVersion; /** * Relative path to Glue script */ readonly scriptLocation: string; } export type JobWorkerType = 'Standard' | 'G.1X' | 'G.2X'; export interface JobConfig { /** * The arn for the role with which the job will be executed */ readonly executionRoleArn: string; /** * The job command configuration */ readonly command: JobCommand; /** * Reference to a template defined elsewhere in the config (in template section) */ readonly template?: string; /** * The number of capacity units that are allocated to this job. */ readonly allocatedCapacity?: number; /** * List of names of connections to be used by the job */ readonly connections?: string[]; /** * Default arguments which will be supplied to the job */ readonly defaultArguments?: { [key: string]: string }; /** * Description of the job */ readonly description: string; /** * Execution properties of the job, including max concurrent executions */ readonly executionProperty?: CfnJob.ExecutionPropertyProperty; /** * Version of Glue */ readonly glueVersion?: string; /** * Maximum number of DPUS allocated to the job */ readonly maxCapacity?: number; /** * Max number of retries of the job before job failure occures */ readonly maxRetries?: number; /** * Notification properties of the job, including notification delay */ readonly notificationProperty?: CfnJob.NotificationPropertyProperty; /** * Number of workers assigned to the job */ readonly numberOfWorkers?: number; /** * The maximum execution time of the job */ readonly timeout?: number; /** * "Standard" | "G.1X" | "G.2X" */ readonly workerType?: JobWorkerType; /** * Additional ETL scripts that are being referenced in main glue etl script * Relative path to Additional Glue scripts */ readonly additionalScripts?: string[]; } export interface GlueJobL3ConstructProps extends MdaaL3ConstructProps { /** * Role which will be used to deploy the Job code. Should be obtained from the DataOps Project */ readonly deploymentRoleArn: string; /** * The name of the Data Ops project bucket where job resources will be deployed and which will be used as a temporary job location */ readonly projectBucketName: string; /** * Map of job names to job configurations */ readonly jobConfigs: { [key: string]: JobConfig }; /** * Name of the Glue Security configuration to be used for all jobs. Likely supplied by the DataOps Project. */ readonly securityConfigurationName: string; /** * Name of the dataops project to which the job will be associated. */ readonly projectName: string; /** * Notification topic Arn */ readonly notificationTopicArn: string; } export class GlueJobL3Construct extends MdaaL3Construct { protected readonly props: GlueJobL3ConstructProps; constructor(scope: Construct, id: string, props: GlueJobL3ConstructProps) { super(scope, id, props); this.props = props; const deploymentRole = MdaaRole.fromRoleArn(this.scope, `deployment-role`, this.props.deploymentRoleArn); const projectBucket = MdaaBucket.fromBucketName(this.scope, `project-bucket`, this.props.projectBucketName); // Build our jobs! const allJobs = this.props.jobConfigs; Object.keys(allJobs).forEach(jobName => { const jobConfig = allJobs[jobName]; const scriptPath = path.dirname(jobConfig.command.scriptLocation.trim()); const scriptName = path.basename(jobConfig.command.scriptLocation.trim()); const scriptSource = Source.asset(scriptPath, { exclude: ['**', `!${scriptName}`] }); const defaultArguments = jobConfig.defaultArguments ? jobConfig.defaultArguments : {}; new BucketDeployment(this.scope, `job-deployment-${jobName}`, { sources: [scriptSource], destinationBucket: projectBucket, destinationKeyPrefix: `deployment/jobs/${jobName}`, role: deploymentRole, extract: true, }); if (jobConfig.additionalScripts) { /** * Group all scripts at parent directory level. This will allow creating zip lib assests at various directory levels * ex. '/main/script1.py' , '/util/script2.py' , '/util/script3.py' will create 2 zip files representing 'main' and 'utils' * */ const directoryToScript: { [scriptPath: string]: string[] } = {}; jobConfig.additionalScripts.forEach(scriptLocation => { const scriptPath = path.dirname(scriptLocation.trim()); if (scriptPath in directoryToScript) { directoryToScript[scriptPath].push(`!${path.basename(scriptLocation.trim())}`); } else { directoryToScript[scriptPath] = [`!${path.basename(scriptLocation.trim())}`]; } }); // Create Source asset for each directory const additionalScriptsSources = Object.entries(directoryToScript).map(([scriptPath, scriptNames]) => { return Source.asset(scriptPath, { exclude: ['**', ...scriptNames] }); }); // Deploy Source asset(s) to /deployment/libs/<job> location. const additionalScriptDeployment = new BucketDeployment( this.scope, `job-deployment-${jobName}-additional-script`, { sources: additionalScriptsSources, destinationBucket: projectBucket, destinationKeyPrefix: `deployment/libs/${jobName}`, role: deploymentRole, extract: false, // Glue expects zip of additional scripts, hence disabling the extraction }, ); // Extract zip name(s) for each source and create comma separated list of s3 locations const libraryZipNames: string[] = []; for (let i = 0; i < additionalScriptsSources.length; i++) { const libName = Fn.select(i, additionalScriptDeployment.objectKeys); // Extract file name of zip containing additional scripts libraryZipNames.push(`s3://${this.props.projectBucketName}/deployment/libs/${jobName}/${libName}`); } // Add comma separated list of zip file names to default arguments. if (defaultArguments['--extra-py-files']) { defaultArguments['--extra-py-files'] += ',' + libraryZipNames.join(','); } else { defaultArguments['--extra-py-files'] = libraryZipNames.join(','); } } MdaaNagSuppressions.addCodeResourceSuppressions( this.scope, [ { id: 'AwsSolutions-L1', reason: 'Function is used only as custom resource during CDK deployment.' }, { id: 'NIST.800.53.R5-LambdaConcurrency', reason: 'Function is used only as custom resource during CDK deployment.', }, { id: 'NIST.800.53.R5-LambdaInsideVPC', reason: 'Function is used only as custom resource during CDK deployment and interacts only with S3.', }, { id: 'NIST.800.53.R5-LambdaDLQ', reason: 'Function is used only as custom resource during CDK deployment. Errors will be handled by CloudFormation.', }, { id: 'HIPAA.Security-LambdaConcurrency', reason: 'Function is used only as custom resource during CDK deployment.', }, { id: 'PCI.DSS.321-LambdaConcurrency', reason: 'Function is used only as custom resource during CDK deployment.', }, { id: 'HIPAA.Security-LambdaInsideVPC', reason: 'Function is used only as custom resource during CDK deployment and interacts only with S3.', }, { id: 'PCI.DSS.321-LambdaInsideVPC', reason: 'Function is used only as custom resource during CDK deployment and interacts only with S3.', }, { id: 'HIPAA.Security-LambdaDLQ', reason: 'Function is used only as custom resource during CDK deployment. Errors will be handled by CloudFormation.', }, { id: 'PCI.DSS.321-LambdaDLQ', reason: 'Function is used only as custom resource during CDK deployment. Errors will be handled by CloudFormation.', }, ], true, ); // Connections will require an array of references where they are defined let connectionsConfigured: ConfigurationElement | undefined; if (jobConfig.connections) { connectionsConfigured = { connections: jobConfig.connections, }; } defaultArguments['--TempDir'] = `s3://${this.props.projectBucketName}/temp/jobs/${jobName}`; const job = new MdaaCfnJob(this.scope, `${jobName}-job`, { command: { name: jobConfig.command.name, pythonVersion: jobConfig.command.pythonVersion, scriptLocation: `s3://${this.props.projectBucketName}/deployment/jobs/${jobName}/${scriptName}`, }, role: jobConfig.executionRoleArn, allocatedCapacity: jobConfig.allocatedCapacity, connections: connectionsConfigured, defaultArguments: defaultArguments, description: jobConfig.description, executionProperty: jobConfig.executionProperty, glueVersion: jobConfig.glueVersion, maxCapacity: jobConfig.maxCapacity, maxRetries: jobConfig.maxRetries, name: jobName, notificationProperty: jobConfig.notificationProperty, numberOfWorkers: jobConfig.numberOfWorkers, securityConfiguration: this.props.securityConfigurationName, timeout: jobConfig.timeout, workerType: jobConfig.workerType, naming: this.props.naming, }); if (job.name) { DataOpsProjectUtils.createProjectSSMParam( this.scope, this.props.naming, this.props.projectName, `job/name/${jobName}`, job.name, ); const eventRule = this.createJobMonitoringEventRule(`${jobName}-monitor`, [job.name]); eventRule.addTarget( new SnsTopic(MdaaSnsTopic.fromTopicArn(this.scope, `${jobName}-topic`, this.props.notificationTopicArn)), ); } }); //CDK S3 Deployment automatically adds inline policy to project deployment role. this.scope.node.children.forEach(child => { if (child.node.id.startsWith('deployment-role')) { MdaaNagSuppressions.addCodeResourceSuppressions( child, [ { id: 'AwsSolutions-IAM5', reason: 'Inline policy used only for deployment.' }, { id: 'NIST.800.53.R5-IAMNoInlinePolicy', reason: 'Policy used only for deployment.' }, { id: 'HIPAA.Security-IAMNoInlinePolicy', reason: 'Policy used only for deployment.' }, { id: 'PCI.DSS.321-IAMNoInlinePolicy', reason: 'Policy used only for deployment.' }, ], true, ); } }); } private createJobMonitoringEventRule(ruleName: string, jobNames: string[]): Rule { return EventBridgeHelper.createGlueMonitoringEventRule( this.scope, this.props.naming, ruleName, 'Workflow Job failure events', { jobName: jobNames, state: ['FAILED', 'TIMEOUT', 'STOPPED'], }, ); } }