packages/constructs/L3/dataops/dataops-workflow-l3-construct/lib/dataops-workflow-l3-construct.ts (210 lines of code) (raw):

/*! * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ import { DataOpsProjectUtils } from '@aws-mdaa/dataops-project-l3-construct'; import { EventBridgeHelper, EventBridgeProps } from '@aws-mdaa/eventbridge-helper'; import { MdaaRole } from '@aws-mdaa/iam-constructs'; import { IMdaaKmsKey, MdaaKmsKey } from '@aws-mdaa/kms-constructs'; import { MdaaL3Construct, MdaaL3ConstructProps } from '@aws-mdaa/l3-construct'; import { Duration, IResolvable } from 'aws-cdk-lib'; import { IRule, IRuleTarget, RuleTargetConfig, RuleTargetInput } from 'aws-cdk-lib/aws-events'; import { TargetBaseProps } from 'aws-cdk-lib/aws-events-targets'; import { CfnTrigger, CfnWorkflow } from 'aws-cdk-lib/aws-glue'; import { Effect, IRole, ManagedPolicy, PolicyStatement, ServicePrincipal } from 'aws-cdk-lib/aws-iam'; import { Construct } from 'constructs'; type PropsNode = { [key: string]: unknown }; export interface WorkflowProps { /** * Workflow definition as generated by aws cli */ readonly rawWorkflowDef: PropsNode; /** * EventBridge props */ readonly eventBridge?: EventBridgeProps; } export interface GlueWorkflowL3ConstructProps extends MdaaL3ConstructProps { /** * The Kms key which will be used to encrypt resources */ readonly kmsArn: string; /** * The Workflow Definition. */ readonly workflowDefinitions: WorkflowProps[]; /** * The name of the security configuration to be used for the workflow */ readonly securityConfigurationName: string; /** * The name of the dataops project */ readonly projectName: string; } /** * Customize the Lambda Event Target */ export interface GlueWorkflowTargetProps extends TargetBaseProps { /** * The triggering event */ readonly input?: RuleTargetInput; /** * The workflow to trigger */ readonly workflowArn: string; /** * The role with which to trigger the workflow */ readonly role: IRole; } export class GlueWorkflowTarget implements IRuleTarget { private props: GlueWorkflowTargetProps; constructor(props: GlueWorkflowTargetProps) { this.props = props; } bind(_rule: IRule, _id?: string): RuleTargetConfig { console.log(`Rule: ${_rule}, id: ${_id}`); const retryPolicy = this.props.maxEventAge || this.props.retryAttempts ? { maximumEventAgeInSeconds: this.props.maxEventAge ? this.props.maxEventAge.toSeconds() : undefined, maximumRetryAttempts: this.props.retryAttempts, } : undefined; return { arn: this.props.workflowArn, role: this.props.role, input: this.props.input, deadLetterConfig: { arn: this.props.deadLetterQueue?.queueArn, }, retryPolicy: retryPolicy, }; } } export class GlueWorkflowL3Construct extends MdaaL3Construct { protected readonly props: GlueWorkflowL3ConstructProps; private readonly projectKmsKey: IMdaaKmsKey; private readonly projectName: string; private eventBridgePolicy?: ManagedPolicy; private eventBridgeRole?: IRole; constructor(scope: Construct, id: string, props: GlueWorkflowL3ConstructProps) { super(scope, id, props); this.props = props; this.projectKmsKey = MdaaKmsKey.fromKeyArn(this.scope, 'project-kms', this.props.kmsArn); this.projectName = this.props.projectName; // Build our workflows! this.props.workflowDefinitions?.map(workflowDefinition => { const workflow = this.createWorkflowFromDefinition(workflowDefinition, this.props.securityConfigurationName); if (workflow.name) { const workflowName = (workflowDefinition.rawWorkflowDef.Workflow as PropsNode).Name; DataOpsProjectUtils.createProjectSSMParam( this.scope, this.props.naming, this.projectName, `workflow/name/${workflowName}`, workflow.name, ); } return workflow; }); } private getEventBridgeRole(): IRole { if (!this.eventBridgeRole) { this.eventBridgeRole = new MdaaRole(this.scope, 'event-bridge-role', { naming: this.props.naming, roleName: 'event-bridge', assumedBy: new ServicePrincipal('events.amazonaws.com'), }); } return this.eventBridgeRole; } private getEventBridgePolicy(): ManagedPolicy { if (!this.eventBridgePolicy) { this.eventBridgePolicy = new ManagedPolicy(this.scope, 'event-bridge-policy', { managedPolicyName: this.props.naming.resourceName('event-bridge-policy'), roles: [this.getEventBridgeRole()], }); } return this.eventBridgePolicy; } private createWorkflowFromDefinition(workflowProps: WorkflowProps, securityConfigurationName: string): CfnWorkflow { const workflowName = (workflowProps.rawWorkflowDef.Workflow as PropsNode).Name as string; const workflow = new CfnWorkflow(this.scope, `workflow-${workflowName}`, { defaultRunProperties: (workflowProps.rawWorkflowDef.Workflow as PropsNode).DefaultRunProperties, description: (workflowProps.rawWorkflowDef.Workflow as PropsNode).Description as string, name: this.props.naming.resourceName(workflowName), }); const graphNodes = ((workflowProps.rawWorkflowDef.Workflow as PropsNode).Graph as PropsNode).Nodes as PropsNode[]; const triggerProps = graphNodes.filter(node => node.Type === 'TRIGGER'); const previousTriggers: { [key: string]: CfnTrigger } = {}; triggerProps.forEach(triggerProps => { const triggerDetails = (triggerProps.TriggerDetails as PropsNode).Trigger as PropsNode; const triggerName = triggerDetails.Name; const actionsProps = triggerDetails.Actions as PropsNode[]; const actions = actionsProps.map(actionProps => this.createActionFromProps(actionProps, securityConfigurationName), ); const predicateProps = triggerDetails.Predicate as PropsNode; const trigger = new CfnTrigger(this.scope, `trigger-${workflowName}-${triggerName}`, { name: this.props.naming.resourceName(`${workflowName}-${triggerName}`), workflowName: workflow.name, actions: actions, type: triggerDetails.Type as string, startOnCreation: triggerDetails.State && triggerDetails.State == 'ACTIVATED' ? true : (triggerDetails.StartOnCreation as IResolvable), predicate: predicateProps ? this.createPredicateFromProps(predicateProps) : undefined, }); // If Trigger Type is Scheduled, add cron schedule to the trigger if (trigger.type == 'SCHEDULED') { trigger.schedule = triggerDetails.Schedule as string; } trigger.addDependency(workflow); // Force sequential deployment of Triggers by workflow, otherwise large number // of triggers on a single workflow will cause ConcurrentModificationExceptions. if (previousTriggers[workflowName]) { trigger.addDependency(previousTriggers[workflowName]); } previousTriggers[workflowName] = trigger; }); if (workflowProps.eventBridge) { this.createWorkflowEventBridgeRules(workflowProps.eventBridge, workflowName); } return workflow; } private createWorkflowEventBridgeRules(eventBridgeProps: EventBridgeProps, workflowName: string) { const workflowResourceName = this.props.naming.resourceName(workflowName); const workflowArn = `arn:${this.partition}:glue:${this.region}:${this.account}:workflow/${workflowResourceName}`; const triggerFunctionStatement = new PolicyStatement({ effect: Effect.ALLOW, actions: ['glue:notifyEvent'], resources: [workflowArn], }); this.getEventBridgePolicy().addStatements(triggerFunctionStatement); const dlq = EventBridgeHelper.createDlq( this.scope, this.props.naming, `${workflowName}-events`, this.projectKmsKey, this.getEventBridgeRole(), ); const eventBridgeRuleProps = EventBridgeHelper.createNamedEventBridgeRuleProps(eventBridgeProps, workflowName); Object.entries(eventBridgeRuleProps).forEach(propsEntry => { const ruleName = propsEntry[0]; const ruleProps = propsEntry[1]; const targetProps: GlueWorkflowTargetProps = { workflowArn: workflowArn, role: this.getEventBridgeRole(), deadLetterQueue: dlq, retryAttempts: eventBridgeProps.retryAttempts, maxEventAge: eventBridgeProps.maxEventAgeSeconds ? Duration.seconds(eventBridgeProps.maxEventAgeSeconds) : undefined, input: RuleTargetInput.fromObject(ruleProps.input), }; const target = new GlueWorkflowTarget(targetProps); EventBridgeHelper.createEventBridgeRuleForTarget(this.scope, this.props.naming, target, ruleName, ruleProps); }); } protected createPredicateFromProps(predicateProps: PropsNode): CfnTrigger.PredicateProperty { const conditionProps = predicateProps.Conditions as PropsNode[]; const conditions = conditionProps.map(conditionProps => { return { crawlerName: conditionProps.CrawlerName, crawlState: conditionProps.CrawlState, jobName: conditionProps.JobName, logicalOperator: conditionProps.LogicalOperator, state: conditionProps.State, } as CfnTrigger.ConditionProperty; }); return { logical: predicateProps.Logical as string, conditions: conditions, }; } protected createActionFromProps( actionProps: PropsNode, securityConfigurationName: string, ): CfnTrigger.ActionProperty { const notificationProp = actionProps.NotificationProperty as PropsNode; return { arguments: actionProps.Arguments, crawlerName: actionProps.CrawlerName as string, jobName: actionProps.JobName as string, notificationProperty: { notifyDelayAfter: notificationProp && notificationProp.NotifyDelayAfter ? (notificationProp.NotifyDelayAfter as number) : (actionProps.Timeout as number), }, securityConfiguration: securityConfigurationName, timeout: actionProps.Timeout as number, }; } }