packages/constructs/L3/dataops/dataops-workflow-l3-construct/lib/dataops-workflow-l3-construct.ts (210 lines of code) (raw):
/*!
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
import { DataOpsProjectUtils } from '@aws-mdaa/dataops-project-l3-construct';
import { EventBridgeHelper, EventBridgeProps } from '@aws-mdaa/eventbridge-helper';
import { MdaaRole } from '@aws-mdaa/iam-constructs';
import { IMdaaKmsKey, MdaaKmsKey } from '@aws-mdaa/kms-constructs';
import { MdaaL3Construct, MdaaL3ConstructProps } from '@aws-mdaa/l3-construct';
import { Duration, IResolvable } from 'aws-cdk-lib';
import { IRule, IRuleTarget, RuleTargetConfig, RuleTargetInput } from 'aws-cdk-lib/aws-events';
import { TargetBaseProps } from 'aws-cdk-lib/aws-events-targets';
import { CfnTrigger, CfnWorkflow } from 'aws-cdk-lib/aws-glue';
import { Effect, IRole, ManagedPolicy, PolicyStatement, ServicePrincipal } from 'aws-cdk-lib/aws-iam';
import { Construct } from 'constructs';
type PropsNode = { [key: string]: unknown };
export interface WorkflowProps {
/**
* Workflow definition as generated by aws cli
*/
readonly rawWorkflowDef: PropsNode;
/**
* EventBridge props
*/
readonly eventBridge?: EventBridgeProps;
}
export interface GlueWorkflowL3ConstructProps extends MdaaL3ConstructProps {
/**
* The Kms key which will be used to encrypt resources
*/
readonly kmsArn: string;
/**
* The Workflow Definition.
*/
readonly workflowDefinitions: WorkflowProps[];
/**
* The name of the security configuration to be used for the workflow
*/
readonly securityConfigurationName: string;
/**
* The name of the dataops project
*/
readonly projectName: string;
}
/**
* Customize the Lambda Event Target
*/
export interface GlueWorkflowTargetProps extends TargetBaseProps {
/**
* The triggering event
*/
readonly input?: RuleTargetInput;
/**
* The workflow to trigger
*/
readonly workflowArn: string;
/**
* The role with which to trigger the workflow
*/
readonly role: IRole;
}
export class GlueWorkflowTarget implements IRuleTarget {
private props: GlueWorkflowTargetProps;
constructor(props: GlueWorkflowTargetProps) {
this.props = props;
}
bind(_rule: IRule, _id?: string): RuleTargetConfig {
console.log(`Rule: ${_rule}, id: ${_id}`);
const retryPolicy =
this.props.maxEventAge || this.props.retryAttempts
? {
maximumEventAgeInSeconds: this.props.maxEventAge ? this.props.maxEventAge.toSeconds() : undefined,
maximumRetryAttempts: this.props.retryAttempts,
}
: undefined;
return {
arn: this.props.workflowArn,
role: this.props.role,
input: this.props.input,
deadLetterConfig: {
arn: this.props.deadLetterQueue?.queueArn,
},
retryPolicy: retryPolicy,
};
}
}
export class GlueWorkflowL3Construct extends MdaaL3Construct {
protected readonly props: GlueWorkflowL3ConstructProps;
private readonly projectKmsKey: IMdaaKmsKey;
private readonly projectName: string;
private eventBridgePolicy?: ManagedPolicy;
private eventBridgeRole?: IRole;
constructor(scope: Construct, id: string, props: GlueWorkflowL3ConstructProps) {
super(scope, id, props);
this.props = props;
this.projectKmsKey = MdaaKmsKey.fromKeyArn(this.scope, 'project-kms', this.props.kmsArn);
this.projectName = this.props.projectName;
// Build our workflows!
this.props.workflowDefinitions?.map(workflowDefinition => {
const workflow = this.createWorkflowFromDefinition(workflowDefinition, this.props.securityConfigurationName);
if (workflow.name) {
const workflowName = (workflowDefinition.rawWorkflowDef.Workflow as PropsNode).Name;
DataOpsProjectUtils.createProjectSSMParam(
this.scope,
this.props.naming,
this.projectName,
`workflow/name/${workflowName}`,
workflow.name,
);
}
return workflow;
});
}
private getEventBridgeRole(): IRole {
if (!this.eventBridgeRole) {
this.eventBridgeRole = new MdaaRole(this.scope, 'event-bridge-role', {
naming: this.props.naming,
roleName: 'event-bridge',
assumedBy: new ServicePrincipal('events.amazonaws.com'),
});
}
return this.eventBridgeRole;
}
private getEventBridgePolicy(): ManagedPolicy {
if (!this.eventBridgePolicy) {
this.eventBridgePolicy = new ManagedPolicy(this.scope, 'event-bridge-policy', {
managedPolicyName: this.props.naming.resourceName('event-bridge-policy'),
roles: [this.getEventBridgeRole()],
});
}
return this.eventBridgePolicy;
}
private createWorkflowFromDefinition(workflowProps: WorkflowProps, securityConfigurationName: string): CfnWorkflow {
const workflowName = (workflowProps.rawWorkflowDef.Workflow as PropsNode).Name as string;
const workflow = new CfnWorkflow(this.scope, `workflow-${workflowName}`, {
defaultRunProperties: (workflowProps.rawWorkflowDef.Workflow as PropsNode).DefaultRunProperties,
description: (workflowProps.rawWorkflowDef.Workflow as PropsNode).Description as string,
name: this.props.naming.resourceName(workflowName),
});
const graphNodes = ((workflowProps.rawWorkflowDef.Workflow as PropsNode).Graph as PropsNode).Nodes as PropsNode[];
const triggerProps = graphNodes.filter(node => node.Type === 'TRIGGER');
const previousTriggers: { [key: string]: CfnTrigger } = {};
triggerProps.forEach(triggerProps => {
const triggerDetails = (triggerProps.TriggerDetails as PropsNode).Trigger as PropsNode;
const triggerName = triggerDetails.Name;
const actionsProps = triggerDetails.Actions as PropsNode[];
const actions = actionsProps.map(actionProps =>
this.createActionFromProps(actionProps, securityConfigurationName),
);
const predicateProps = triggerDetails.Predicate as PropsNode;
const trigger = new CfnTrigger(this.scope, `trigger-${workflowName}-${triggerName}`, {
name: this.props.naming.resourceName(`${workflowName}-${triggerName}`),
workflowName: workflow.name,
actions: actions,
type: triggerDetails.Type as string,
startOnCreation:
triggerDetails.State && triggerDetails.State == 'ACTIVATED'
? true
: (triggerDetails.StartOnCreation as IResolvable),
predicate: predicateProps ? this.createPredicateFromProps(predicateProps) : undefined,
});
// If Trigger Type is Scheduled, add cron schedule to the trigger
if (trigger.type == 'SCHEDULED') {
trigger.schedule = triggerDetails.Schedule as string;
}
trigger.addDependency(workflow);
// Force sequential deployment of Triggers by workflow, otherwise large number
// of triggers on a single workflow will cause ConcurrentModificationExceptions.
if (previousTriggers[workflowName]) {
trigger.addDependency(previousTriggers[workflowName]);
}
previousTriggers[workflowName] = trigger;
});
if (workflowProps.eventBridge) {
this.createWorkflowEventBridgeRules(workflowProps.eventBridge, workflowName);
}
return workflow;
}
private createWorkflowEventBridgeRules(eventBridgeProps: EventBridgeProps, workflowName: string) {
const workflowResourceName = this.props.naming.resourceName(workflowName);
const workflowArn = `arn:${this.partition}:glue:${this.region}:${this.account}:workflow/${workflowResourceName}`;
const triggerFunctionStatement = new PolicyStatement({
effect: Effect.ALLOW,
actions: ['glue:notifyEvent'],
resources: [workflowArn],
});
this.getEventBridgePolicy().addStatements(triggerFunctionStatement);
const dlq = EventBridgeHelper.createDlq(
this.scope,
this.props.naming,
`${workflowName}-events`,
this.projectKmsKey,
this.getEventBridgeRole(),
);
const eventBridgeRuleProps = EventBridgeHelper.createNamedEventBridgeRuleProps(eventBridgeProps, workflowName);
Object.entries(eventBridgeRuleProps).forEach(propsEntry => {
const ruleName = propsEntry[0];
const ruleProps = propsEntry[1];
const targetProps: GlueWorkflowTargetProps = {
workflowArn: workflowArn,
role: this.getEventBridgeRole(),
deadLetterQueue: dlq,
retryAttempts: eventBridgeProps.retryAttempts,
maxEventAge: eventBridgeProps.maxEventAgeSeconds
? Duration.seconds(eventBridgeProps.maxEventAgeSeconds)
: undefined,
input: RuleTargetInput.fromObject(ruleProps.input),
};
const target = new GlueWorkflowTarget(targetProps);
EventBridgeHelper.createEventBridgeRuleForTarget(this.scope, this.props.naming, target, ruleName, ruleProps);
});
}
protected createPredicateFromProps(predicateProps: PropsNode): CfnTrigger.PredicateProperty {
const conditionProps = predicateProps.Conditions as PropsNode[];
const conditions = conditionProps.map(conditionProps => {
return {
crawlerName: conditionProps.CrawlerName,
crawlState: conditionProps.CrawlState,
jobName: conditionProps.JobName,
logicalOperator: conditionProps.LogicalOperator,
state: conditionProps.State,
} as CfnTrigger.ConditionProperty;
});
return {
logical: predicateProps.Logical as string,
conditions: conditions,
};
}
protected createActionFromProps(
actionProps: PropsNode,
securityConfigurationName: string,
): CfnTrigger.ActionProperty {
const notificationProp = actionProps.NotificationProperty as PropsNode;
return {
arguments: actionProps.Arguments,
crawlerName: actionProps.CrawlerName as string,
jobName: actionProps.JobName as string,
notificationProperty: {
notifyDelayAfter:
notificationProp && notificationProp.NotifyDelayAfter
? (notificationProp.NotifyDelayAfter as number)
: (actionProps.Timeout as number),
},
securityConfiguration: securityConfigurationName,
timeout: actionProps.Timeout as number,
};
}
}