packages/aws-cdk-lib/aws-stepfunctions/lib/state-machine.ts (426 lines of code) (raw):
import { Construct } from 'constructs';
import { CustomerManagedEncryptionConfiguration } from './customer-managed-key-encryption-configuration';
import { EncryptionConfiguration } from './encryption-configuration';
import { buildEncryptionConfiguration } from './private/util';
import { StateGraph } from './state-graph';
import { StatesMetrics } from './stepfunctions-canned-metrics.generated';
import { CfnStateMachine } from './stepfunctions.generated';
import { IChainable, QueryLanguage } from './types';
import * as cloudwatch from '../../aws-cloudwatch';
import * as iam from '../../aws-iam';
import * as logs from '../../aws-logs';
import * as s3_assets from '../../aws-s3-assets';
import { Arn, ArnFormat, Duration, IResource, RemovalPolicy, Resource, Stack, Token } from '../../core';
import { addConstructMetadata, MethodMetadata } from '../../core/lib/metadata-resource';
/**
* Two types of state machines are available in AWS Step Functions: EXPRESS AND STANDARD.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/concepts-standard-vs-express.html
*
* @default STANDARD
*/
export enum StateMachineType {
/**
* Express Workflows are ideal for high-volume, event processing workloads.
*/
EXPRESS = 'EXPRESS',
/**
* Standard Workflows are ideal for long-running, durable, and auditable workflows.
*/
STANDARD = 'STANDARD',
}
/**
* Defines which category of execution history events are logged.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/cloudwatch-log-level.html
*
* @default ERROR
*/
export enum LogLevel {
/**
* No Logging
*/
OFF = 'OFF',
/**
* Log everything
*/
ALL = 'ALL',
/**
* Log all errors
*/
ERROR = 'ERROR',
/**
* Log fatal errors
*/
FATAL = 'FATAL',
}
/**
* Defines what execution history events are logged and where they are logged.
*/
export interface LogOptions {
/**
* The log group where the execution history events will be logged.
*
* @default No log group. Required if your log level is not set to OFF.
*/
readonly destination?: logs.ILogGroup;
/**
* Determines whether execution data is included in your log.
*
* @default false
*/
readonly includeExecutionData?: boolean;
/**
* Defines which category of execution history events are logged.
*
* @default ERROR
*/
readonly level?: LogLevel;
}
/**
* Properties for defining a State Machine
*/
export interface StateMachineProps {
/**
* A name for the state machine
*
* @default A name is automatically generated
*/
readonly stateMachineName?: string;
/**
* Definition for this state machine
* @deprecated use definitionBody: DefinitionBody.fromChainable()
*/
readonly definition?: IChainable;
/**
* Definition for this state machine
*/
readonly definitionBody?: DefinitionBody;
/**
* substitutions for the definition body as a key-value map
*/
readonly definitionSubstitutions?: { [key: string]: string };
/**
* The execution role for the state machine service
*
* @default A role is automatically created
*/
readonly role?: iam.IRole;
/**
* Maximum run time for this state machine
*
* @default No timeout
*/
readonly timeout?: Duration;
/**
* Comment that describes this state machine
*
* @default - No comment
*/
readonly comment?: string;
/**
* The name of the query language used by the state machine.
* If the state does not contain a `queryLanguage` field,
* then it will use the query language specified in this `queryLanguage` field.
*
* @default - JSON_PATH
*/
readonly queryLanguage?: QueryLanguage;
/**
* Type of the state machine
*
* @default StateMachineType.STANDARD
*/
readonly stateMachineType?: StateMachineType;
/**
* Defines what execution history events are logged and where they are logged.
*
* @default No logging
*/
readonly logs?: LogOptions;
/**
* Specifies whether Amazon X-Ray tracing is enabled for this state machine.
*
* @default false
*/
readonly tracingEnabled?: boolean;
/**
* The removal policy to apply to state machine
*
* @default RemovalPolicy.DESTROY
*/
readonly removalPolicy?: RemovalPolicy;
/**
* Configures server-side encryption of the state machine definition and execution history.
*
* @default - data is transparently encrypted using an AWS owned key
*/
readonly encryptionConfiguration?: EncryptionConfiguration;
}
/**
* A new or imported state machine.
*/
abstract class StateMachineBase extends Resource implements IStateMachine {
/**
* Import a state machine
*/
public static fromStateMachineArn(scope: Construct, id: string, stateMachineArn: string): IStateMachine {
class Import extends StateMachineBase {
public readonly stateMachineArn = stateMachineArn;
public readonly grantPrincipal = new iam.UnknownPrincipal({ resource: this });
}
return new Import(scope, id, {
environmentFromArn: stateMachineArn,
});
}
/**
* Import a state machine via resource name
*/
public static fromStateMachineName(scope: Construct, id: string, stateMachineName: string): IStateMachine {
const stateMachineArn = Stack.of(scope).formatArn({
service: 'states',
resource: 'stateMachine',
arnFormat: ArnFormat.COLON_RESOURCE_NAME,
resourceName: stateMachineName,
});
return this.fromStateMachineArn(scope, id, stateMachineArn);
}
public abstract readonly stateMachineArn: string;
/**
* The principal this state machine is running as
*/
public abstract readonly grantPrincipal: iam.IPrincipal;
/**
* Grant the given identity permissions to start an execution of this state
* machine.
*/
public grantStartExecution(identity: iam.IGrantable): iam.Grant {
return iam.Grant.addToPrincipal({
grantee: identity,
actions: ['states:StartExecution'],
resourceArns: [this.stateMachineArn],
});
}
/**
* Grant the given identity permissions to start a synchronous execution of
* this state machine.
*/
public grantStartSyncExecution(identity: iam.IGrantable): iam.Grant {
return iam.Grant.addToPrincipal({
grantee: identity,
actions: ['states:StartSyncExecution'],
resourceArns: [this.stateMachineArn],
});
}
/**
* Grant the given identity permissions to read results from state
* machine.
*/
public grantRead(identity: iam.IGrantable): iam.Grant {
iam.Grant.addToPrincipal({
grantee: identity,
actions: [
'states:ListExecutions',
'states:ListStateMachines',
],
resourceArns: [this.stateMachineArn],
});
iam.Grant.addToPrincipal({
grantee: identity,
actions: [
'states:DescribeExecution',
'states:DescribeStateMachineForExecution',
'states:GetExecutionHistory',
],
resourceArns: [`${this.executionArn()}:*`],
});
return iam.Grant.addToPrincipal({
grantee: identity,
actions: [
'states:ListActivities',
'states:DescribeStateMachine',
'states:DescribeActivity',
],
resourceArns: ['*'],
});
}
/**
* Grant the given identity task response permissions on a state machine
*/
public grantTaskResponse(identity: iam.IGrantable): iam.Grant {
return iam.Grant.addToPrincipal({
grantee: identity,
actions: [
'states:SendTaskSuccess',
'states:SendTaskFailure',
'states:SendTaskHeartbeat',
],
resourceArns: [this.stateMachineArn],
});
}
/**
* Grant the given identity permissions on all executions of the state machine
*/
public grantExecution(identity: iam.IGrantable, ...actions: string[]) {
return iam.Grant.addToPrincipal({
grantee: identity,
actions,
resourceArns: [`${this.executionArn()}:*`],
});
}
/**
* Grant the given identity custom permissions
*/
public grant(identity: iam.IGrantable, ...actions: string[]): iam.Grant {
return iam.Grant.addToPrincipal({
grantee: identity,
actions,
resourceArns: [this.stateMachineArn],
});
}
/**
* Return the given named metric for this State Machine's executions
*
* @default - sum over 5 minutes
*/
public metric(metricName: string, props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return new cloudwatch.Metric({
namespace: 'AWS/States',
metricName,
dimensionsMap: { StateMachineArn: this.stateMachineArn },
statistic: 'sum',
...props,
}).attachTo(this);
}
/**
* Metric for the number of executions that failed
*
* @default - sum over 5 minutes
*/
public metricFailed(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(StatesMetrics.executionsFailedSum, props);
}
/**
* Metric for the number of executions that were throttled
*
* @default - sum over 5 minutes
*/
public metricThrottled(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
// There's a typo in the "canned" version of this
return this.metric('ExecutionThrottled', props);
}
/**
* Metric for the number of executions that were aborted
*
* @default - sum over 5 minutes
*/
public metricAborted(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(StatesMetrics.executionsAbortedSum, props);
}
/**
* Metric for the number of executions that succeeded
*
* @default - sum over 5 minutes
*/
public metricSucceeded(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(StatesMetrics.executionsSucceededSum, props);
}
/**
* Metric for the number of executions that timed out
*
* @default - sum over 5 minutes
*/
public metricTimedOut(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(StatesMetrics.executionsTimedOutSum, props);
}
/**
* Metric for the number of executions that were started
*
* @default - sum over 5 minutes
*/
public metricStarted(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.metric('ExecutionsStarted', props);
}
/**
* Metric for the interval, in milliseconds, between the time the execution starts and the time it closes
*
* @default - average over 5 minutes
*/
public metricTime(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(StatesMetrics.executionTimeAverage, props);
}
/**
* Returns the pattern for the execution ARN's of the state machine
*/
private executionArn(): string {
return Stack.of(this).formatArn({
resource: 'execution',
service: 'states',
resourceName: Arn.split(this.stateMachineArn, ArnFormat.COLON_RESOURCE_NAME).resourceName,
arnFormat: ArnFormat.COLON_RESOURCE_NAME,
});
}
private cannedMetric(
fn: (dims: { StateMachineArn: string }) => cloudwatch.MetricProps,
props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return new cloudwatch.Metric({
...fn({ StateMachineArn: this.stateMachineArn }),
...props,
}).attachTo(this);
}
}
/**
* Define a StepFunctions State Machine
*/
export class StateMachine extends StateMachineBase {
/**
* Execution role of this state machine
*/
public readonly role: iam.IRole;
/**
* The name of the state machine
* @attribute
*/
public readonly stateMachineName: string;
/**
* The ARN of the state machine
*/
public readonly stateMachineArn: string;
/**
* Type of the state machine
* @attribute
*/
public readonly stateMachineType: StateMachineType;
/**
* Identifier for the state machine revision, which is an immutable, read-only snapshot of a state machine’s definition and configuration.
* @attribute
*/
public readonly stateMachineRevisionId: string;
constructor(scope: Construct, id: string, props: StateMachineProps) {
super(scope, id, {
physicalName: props.stateMachineName,
});
// Enhanced CDK Analytics Telemetry
addConstructMetadata(this, props);
if (props.definition && props.definitionBody) {
throw new Error('Cannot specify definition and definitionBody at the same time');
}
if (!props.definition && !props.definitionBody) {
throw new Error('You need to specify either definition or definitionBody');
}
if (props.stateMachineName !== undefined) {
this.validateStateMachineName(props.stateMachineName);
}
if (props.logs) {
this.validateLogOptions(props.logs);
}
this.role = props.role || new iam.Role(this, 'Role', {
assumedBy: new iam.ServicePrincipal('states.amazonaws.com'),
});
const definitionBody = props.definitionBody ?? DefinitionBody.fromChainable(props.definition!);
this.stateMachineType = props.stateMachineType ?? StateMachineType.STANDARD;
let graph: StateGraph | undefined = undefined;
if (definitionBody instanceof ChainDefinitionBody) {
graph = new StateGraph(definitionBody.chainable.startState, 'State Machine definition');
graph.timeout = props.timeout;
for (const statement of graph.policyStatements) {
this.addToRolePolicy(statement);
}
}
if (props.encryptionConfiguration instanceof CustomerManagedEncryptionConfiguration) {
this.role.addToPrincipalPolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'kms:Decrypt', 'kms:GenerateDataKey',
],
resources: [`${props.encryptionConfiguration.kmsKey.keyArn}`],
conditions: {
StringEquals: {
'kms:EncryptionContext:aws:states:stateMachineArn': Stack.of(this).formatArn({
service: 'states',
resource: 'stateMachine',
sep: ':',
resourceName: this.physicalName,
}),
},
},
}));
if (props.logs && props.logs.level !== LogLevel.OFF) {
this.role.addToPrincipalPolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'kms:GenerateDataKey',
],
resources: [`${props.encryptionConfiguration.kmsKey.keyArn}`],
conditions: {
StringEquals: {
'kms:EncryptionContext:SourceArn': Stack.of(this).formatArn({
service: 'logs',
resource: '*',
sep: ':',
}),
},
},
}));
props.encryptionConfiguration.kmsKey.addToResourcePolicy(new iam.PolicyStatement({
resources: ['*'],
actions: ['kms:Decrypt*'],
principals: [new iam.ServicePrincipal('delivery.logs.amazonaws.com')],
}));
}
}
const resource = new CfnStateMachine(this, 'Resource', {
stateMachineName: this.physicalName,
stateMachineType: props.stateMachineType ?? undefined,
roleArn: this.role.roleArn,
loggingConfiguration: props.logs ? this.buildLoggingConfiguration(props.logs) : undefined,
tracingConfiguration: this.buildTracingConfiguration(props.tracingEnabled),
...definitionBody.bind(this, this.role, props, graph),
definitionSubstitutions: props.definitionSubstitutions,
encryptionConfiguration: buildEncryptionConfiguration(props.encryptionConfiguration),
});
resource.applyRemovalPolicy(props.removalPolicy, { default: RemovalPolicy.DESTROY });
resource.node.addDependency(this.role);
this.stateMachineName = this.getResourceNameAttribute(resource.attrName);
this.stateMachineArn = this.getResourceArnAttribute(resource.ref, {
service: 'states',
resource: 'stateMachine',
resourceName: this.physicalName,
arnFormat: ArnFormat.COLON_RESOURCE_NAME,
});
if (definitionBody instanceof ChainDefinitionBody) {
graph!.bind(this);
}
this.stateMachineRevisionId = resource.attrStateMachineRevisionId;
}
/**
* The principal this state machine is running as
*/
public get grantPrincipal() {
return this.role.grantPrincipal;
}
/**
* Add the given statement to the role's policy
*/
@MethodMetadata()
public addToRolePolicy(statement: iam.PolicyStatement) {
this.role.addToPrincipalPolicy(statement);
}
private validateStateMachineName(stateMachineName: string) {
if (!Token.isUnresolved(stateMachineName)) {
if (stateMachineName.length < 1 || stateMachineName.length > 80) {
throw new Error(`State Machine name must be between 1 and 80 characters. Received: ${stateMachineName}`);
}
if (!stateMachineName.match(/^[a-z0-9\+\!\@\.\(\)\-\=\_\']+$/i)) {
throw new Error(`State Machine name must match "^[a-z0-9+!@.()-=_']+$/i". Received: ${stateMachineName}`);
}
}
}
private validateLogOptions(logOptions: LogOptions) {
if (logOptions.level !== LogLevel.OFF && !logOptions.destination) {
throw new Error('Logs destination is required when level is not OFF.');
}
}
private buildLoggingConfiguration(logOptions: LogOptions): CfnStateMachine.LoggingConfigurationProperty {
let destinations;
if (logOptions.destination) {
// https://docs.aws.amazon.com/step-functions/latest/dg/cw-logs.html#cloudwatch-iam-policy
this.addToRolePolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'logs:CreateLogDelivery',
'logs:GetLogDelivery',
'logs:UpdateLogDelivery',
'logs:DeleteLogDelivery',
'logs:ListLogDeliveries',
'logs:PutResourcePolicy',
'logs:DescribeResourcePolicies',
'logs:DescribeLogGroups',
],
resources: ['*'],
}));
destinations = [{
cloudWatchLogsLogGroup: { logGroupArn: logOptions.destination.logGroupArn },
}];
}
return {
destinations,
includeExecutionData: logOptions.includeExecutionData,
level: logOptions.level || 'ERROR',
};
}
private buildTracingConfiguration(isTracing?: boolean): CfnStateMachine.TracingConfigurationProperty | undefined {
if (isTracing === undefined) {
return undefined;
}
if (isTracing) {
this.addToRolePolicy(new iam.PolicyStatement({
// https://docs.aws.amazon.com/xray/latest/devguide/security_iam_id-based-policy-examples.html#xray-permissions-resources
// https://docs.aws.amazon.com/step-functions/latest/dg/xray-iam.html
actions: [
'xray:PutTraceSegments',
'xray:PutTelemetryRecords',
'xray:GetSamplingRules',
'xray:GetSamplingTargets',
],
resources: ['*'],
}));
}
return {
enabled: isTracing,
};
}
}
/**
* A State Machine
*/
export interface IStateMachine extends IResource, iam.IGrantable {
/**
* The ARN of the state machine
* @attribute
*/
readonly stateMachineArn: string;
/**
* Grant the given identity permissions to start an execution of this state
* machine.
*
* @param identity The principal
*/
grantStartExecution(identity: iam.IGrantable): iam.Grant;
/**
* Grant the given identity permissions to start a synchronous execution of
* this state machine.
*
* @param identity The principal
*/
grantStartSyncExecution(identity: iam.IGrantable): iam.Grant;
/**
* Grant the given identity read permissions for this state machine
*
* @param identity The principal
*/
grantRead(identity: iam.IGrantable): iam.Grant;
/**
* Grant the given identity read permissions for this state machine
*
* @param identity The principal
*/
grantTaskResponse(identity: iam.IGrantable): iam.Grant;
/**
* Grant the given identity permissions for all executions of a state machine
*
* @param identity The principal
* @param actions The list of desired actions
*/
grantExecution(identity: iam.IGrantable, ...actions: string[]): iam.Grant;
/**
* Grant the given identity custom permissions
*
* @param identity The principal
* @param actions The list of desired actions
*/
grant(identity: iam.IGrantable, ...actions: string[]): iam.Grant;
/**
* Return the given named metric for this State Machine's executions
*
* @default - sum over 5 minutes
*/
metric(metricName: string, props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* Metric for the number of executions that failed
*
* @default - sum over 5 minutes
*/
metricFailed(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* Metric for the number of executions that were throttled
*
* @default sum over 5 minutes
*/
metricThrottled(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* Metric for the number of executions that were aborted
*
* @default - sum over 5 minutes
*/
metricAborted(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* Metric for the number of executions that succeeded
*
* @default - sum over 5 minutes
*/
metricSucceeded(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* Metric for the number of executions that timed out
*
* @default - sum over 5 minutes
*/
metricTimedOut(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* Metric for the number of executions that were started
*
* @default - sum over 5 minutes
*/
metricStarted(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* Metric for the interval, in milliseconds, between the time the execution starts and the time it closes
*
* @default - sum over 5 minutes
*/
metricTime(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
}
/**
* Partial object from the StateMachine L1 construct properties containing definition information
*/
export interface DefinitionConfig {
readonly definition?: any;
readonly definitionString?: string;
readonly definitionS3Location?: CfnStateMachine.S3LocationProperty;
}
export abstract class DefinitionBody {
public static fromFile(path: string, options?: s3_assets.AssetOptions): DefinitionBody {
return new FileDefinitionBody(path, options);
}
public static fromString(definition: string): DefinitionBody {
return new StringDefinitionBody(definition);
}
public static fromChainable(chainable: IChainable): DefinitionBody {
return new ChainDefinitionBody(chainable);
}
public abstract bind(scope: Construct, sfnPrincipal: iam.IPrincipal, sfnProps: StateMachineProps, graph?: StateGraph): DefinitionConfig;
}
export class FileDefinitionBody extends DefinitionBody {
constructor(public readonly path: string, private readonly options: s3_assets.AssetOptions = {}) {
super();
}
public bind(scope: Construct, _sfnPrincipal: iam.IPrincipal, _sfnProps: StateMachineProps, _graph?: StateGraph): DefinitionConfig {
const asset = new s3_assets.Asset(scope, 'DefinitionBody', {
path: this.path,
...this.options,
});
return {
definitionS3Location: {
bucket: asset.s3BucketName,
key: asset.s3ObjectKey,
},
};
}
}
export class StringDefinitionBody extends DefinitionBody {
constructor(public readonly body: string) {
super();
}
public bind(_scope: Construct, _sfnPrincipal: iam.IPrincipal, _sfnProps: StateMachineProps, _graph?: StateGraph): DefinitionConfig {
return {
definitionString: this.body,
};
}
}
export class ChainDefinitionBody extends DefinitionBody {
constructor(public readonly chainable: IChainable) {
super();
}
public bind(scope: Construct, _sfnPrincipal: iam.IPrincipal, sfnProps: StateMachineProps, graph?: StateGraph): DefinitionConfig {
const graphJson = graph!.toGraphJson(sfnProps.queryLanguage);
return {
definitionString: Stack.of(scope).toJsonString({
...graphJson,
Comment: sfnProps.comment,
QueryLanguage: sfnProps.queryLanguage,
}),
};
}
}