packages/aws-cdk-lib/aws-stepfunctions/lib/states/state.ts (427 lines of code) (raw):

import { IConstruct, Construct, Node } from 'constructs'; import { Token } from '../../../core'; import { Condition } from '../condition'; import { FieldUtils } from '../fields'; import { StateGraph } from '../state-graph'; import { CatchProps, Errors, IChainable, INextable, ProcessorConfig, ProcessorMode, QueryLanguage, RetryProps } from '../types'; /** * Properties shared by all states */ export interface StateBaseProps { /** * The name of the query language used by the state. * If the state does not contain a `queryLanguage` field, * then it will use the query language specified in the top-level `queryLanguage` field. * * @default - JSONPath */ readonly queryLanguage?: QueryLanguage; /** * Optional name for this state * * @default - The construct ID will be used as state name */ readonly stateName?: string; /** * A comment describing this state * * @default No comment */ readonly comment?: string; } /** * Option properties for JSONPath state. */ export interface JsonPathCommonOptions { /** * JSONPath expression to select part of the state to be the input to this state. * * May also be the special value JsonPath.DISCARD, which will cause the effective * input to be the empty object {}. * * @default $ */ readonly inputPath?: string; /** * JSONPath expression to select part of the state to be the output to this state. * * May also be the special value JsonPath.DISCARD, which will cause the effective * output to be the empty object {}. * * @default $ */ readonly outputPath?: string; } interface JsonPathStateOptions extends JsonPathCommonOptions { /** * JSONPath expression to indicate where to inject the state's output * * May also be the special value JsonPath.DISCARD, which will cause the state's * input to become its output. * * @default $ */ readonly resultPath?: string; /** * The JSON that will replace the state's raw result and become the effective * result before ResultPath is applied. * * You can use ResultSelector to create a payload with values that are static * or selected from the state's raw result. * * @see * https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector * * @default - None */ readonly resultSelector?: { [key: string]: any }; /** * Parameters pass a collection of key-value pairs, either static values or JSONPath expressions that select from the input. * * @see * https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-parameters * * @default No parameters */ readonly parameters?: { [name: string]: any }; } /** * Option properties for JSONata state. */ export interface JsonataCommonOptions { /** * Used to specify and transform output from the state. * When specified, the value overrides the state output default. * The output field accepts any JSON value (object, array, string, number, boolean, null). * Any string value, including those inside objects or arrays, * will be evaluated as JSONata if surrounded by {% %} characters. * Output also accepts a JSONata expression directly. * * @see https://docs.aws.amazon.com/step-functions/latest/dg/concepts-input-output-filtering.html * * @default - $states.result or $states.errorOutput */ readonly outputs?: any; } /** * Option properties for JSONata task state. */ export interface JsonataStateOptions extends JsonataCommonOptions { /** * Parameters pass a collection of key-value pairs, either static values or JSONata expressions that select from the input. * * @see * https://docs.aws.amazon.com/step-functions/latest/dg/transforming-data.html * * @default - No arguments */ readonly arguments?: any; } /** * Option properties for state that can assign variables. */ export interface AssignableStateOptions { /** * Workflow variables to store in this step. * Using workflow variables, you can store data in a step and retrieve that data in future steps. * * @see * https://docs.aws.amazon.com/step-functions/latest/dg/workflow-variables.html * * @default - Not assign variables */ readonly assign?: { [name: string]: any }; } /** * Properties shared by all states that use JSONPath */ export interface JsonPathStateProps extends StateBaseProps, JsonPathStateOptions, AssignableStateOptions {} /** * Properties shared by all states that use JSONata */ export interface JsonataStateProps extends StateBaseProps, JsonataStateOptions, AssignableStateOptions {} /** * Properties shared by all states */ export interface StateProps extends StateBaseProps, JsonPathStateOptions, JsonataStateOptions, AssignableStateOptions {} /** * Base class for all other state classes */ export abstract class State extends Construct implements IChainable { /** * Add a prefix to the stateId of all States found in a construct tree */ public static prefixStates(root: IConstruct, prefix: string) { const queue = [root]; while (queue.length > 0) { const el = queue.splice(0, 1)[0]!; if (isPrefixable(el)) { el.addPrefix(prefix); } queue.push(...Node.of(el).children); } } /** * Find the set of states reachable through transitions from the given start state. * This does not retrieve states from within sub-graphs, such as states within a Parallel state's branch. */ public static findReachableStates(start: State, options: FindStateOptions = {}): State[] { const visited = new Set<State>(); const ret = new Set<State>(); const queue = [start]; while (queue.length > 0) { const state = queue.splice(0, 1)[0]!; if (visited.has(state)) { continue; } visited.add(state); const outgoing = state.outgoingTransitions(options); queue.push(...outgoing); ret.add(state); } return Array.from(ret); } /** * Find the set of end states states reachable through transitions from the given start state */ public static findReachableEndStates(start: State, options: FindStateOptions = {}): State[] { const visited = new Set<State>(); const ret = new Set<State>(); const queue = [start]; while (queue.length > 0) { const state = queue.splice(0, 1)[0]!; if (visited.has(state)) { continue; } visited.add(state); const outgoing = state.outgoingTransitions(options); if (outgoing.length > 0) { // We can continue queue.push(...outgoing); } else { // Terminal state ret.add(state); } } return Array.from(ret); } /** * Return only the states that allow chaining from an array of states */ public static filterNextables(states: State[]): INextable[] { return states.filter(isNextable) as any; } /** * First state of this Chainable */ public readonly startState: State; /** * Continuable states of this Chainable */ public abstract readonly endStates: INextable[]; // This class has a superset of most of the features of the other states, // and the subclasses decide which part of the features to expose. Most // features are shared by a couple of states, and it becomes cumbersome to // slice it out across all states. This is not great design, but it is // pragmatic! protected readonly stateName?: string; protected readonly comment?: string; protected readonly inputPath?: string; protected readonly parameters?: object; protected readonly outputPath?: string; protected readonly resultPath?: string; protected readonly resultSelector?: object; protected readonly branches: StateGraph[] = []; protected readonly queryLanguage?: QueryLanguage; protected readonly outputs?: object; protected readonly arguments?: object; protected readonly assign?: object; protected iteration?: StateGraph; protected processorMode?: ProcessorMode = ProcessorMode.INLINE; protected processor?: StateGraph; protected processorConfig?: ProcessorConfig; protected defaultChoice?: State; /** * @internal */ protected _next?: State; private readonly retries: RetryProps[] = []; private readonly catches: CatchTransition[] = []; private readonly choices: ChoiceTransition[] = []; private readonly prefixes: string[] = []; /** * The graph that this state is part of. * * Used for guaranteeing consistency between graphs and graph components. */ private containingGraph?: StateGraph; /** * States with references to this state. * * Used for finding complete connected graph that a state is part of. */ private readonly incomingStates: State[] = []; constructor(scope: Construct, id: string, props: StateProps) { super(scope, id); this.startState = this; this.stateName = props.stateName; this.queryLanguage = props.queryLanguage; this.comment = props.comment; this.inputPath = props.inputPath; this.parameters = props.parameters; this.outputPath = props.outputPath; this.resultPath = props.resultPath; this.resultSelector = props.resultSelector; this.outputs = props.outputs; this.arguments = props.arguments; this.assign = props.assign; this.node.addValidation({ validate: () => this.validateState() }); } /** * Allows the state to validate itself. */ protected validateState(): string[] { return []; } public get id() { return this.node.id; } /** * Tokenized string that evaluates to the state's ID */ public get stateId(): string { return this.prefixes.concat(this.stateName? this.stateName: this.id).join(''); } /** * Add a prefix to the stateId of this state */ public addPrefix(x: string) { if (x !== '') { this.prefixes.splice(0, 0, x); } } /** * Register this state as part of the given graph * * Don't call this. It will be called automatically when you work * with states normally. */ public bindToGraph(graph: StateGraph) { if (this.containingGraph === graph) { return; } if (this.containingGraph) { // eslint-disable-next-line max-len throw new Error(`Trying to use state '${this.stateId}' in ${graph}, but is already in ${this.containingGraph}. Every state can only be used in one graph.`); } this.containingGraph = graph; this.whenBoundToGraph(graph); for (const incoming of this.incomingStates) { incoming.bindToGraph(graph); } for (const outgoing of this.outgoingTransitions({ includeErrorHandlers: true })) { outgoing.bindToGraph(graph); } for (const branch of this.branches) { branch.registerSuperGraph(this.containingGraph); } if (!!this.iteration) { this.iteration.registerSuperGraph(this.containingGraph); } if (!!this.processor) { this.processor.registerSuperGraph(this.containingGraph); } } /** * Render the state as JSON */ public abstract toStateJson(stateMachineQueryLanguage?: QueryLanguage): object; /** * Add a retrier to the retry list of this state * @internal */ protected _addRetry(props: RetryProps = {}) { validateErrors(props.errors); this.retries.push({ ...props, errors: props.errors ?? [Errors.ALL], }); } /** * Add an error handler to the catch list of this state * @internal */ protected _addCatch(handler: State, props: CatchProps = {}) { validateErrors(props.errors); this.catches.push({ next: handler, props: { errors: props.errors ?? [Errors.ALL], resultPath: props.resultPath, outputs: props.outputs, assign: props.assign, }, }); handler.addIncoming(this); if (this.containingGraph) { handler.bindToGraph(this.containingGraph); } } /** * Make the indicated state the default transition of this state */ protected makeNext(next: State) { // Can't be called 'setNext' because of JSII if (this._next) { throw new Error(`State '${this.id}' already has a next state`); } this._next = next; next.addIncoming(this); if (this.containingGraph) { next.bindToGraph(this.containingGraph); } } /** * Add a choice branch to this state */ protected addChoice(condition: Condition, next: State, options?: ChoiceTransitionOptions) { this.choices.push({ condition, next, ...options }); next.startState.addIncoming(this); if (this.containingGraph) { next.startState.bindToGraph(this.containingGraph); } } /** * Add a parallel branch to this state */ protected addBranch(branch: StateGraph) { this.branches.push(branch); if (this.containingGraph) { branch.registerSuperGraph(this.containingGraph); } } /** * Add a map iterator to this state */ protected addIterator(iteration: StateGraph) { this.iteration = iteration; if (this.containingGraph) { iteration.registerSuperGraph(this.containingGraph); } } /** * Add a item processor to this state */ protected addItemProcessor(processor: StateGraph, config: ProcessorConfig = {}) { this.processor = processor; this.processorConfig = config; if (this.containingGraph) { processor.registerSuperGraph(this.containingGraph); } } /** * Make the indicated state the default choice transition of this state */ protected makeDefault(def: State) { // Can't be called 'setDefault' because of JSII if (this.defaultChoice) { throw new Error(`Choice '${this.id}' already has a default next state`); } this.defaultChoice = def; } /** * Render the default next state in ASL JSON format */ protected renderNextEnd(): any { if (this._next) { return { Next: this._next.stateId }; } else { return { End: true }; } } /** * Render the choices in ASL JSON format */ protected renderChoices(topLevelQueryLanguage?: QueryLanguage): any { const queryLanguage = _getActualQueryLanguage(topLevelQueryLanguage, this.queryLanguage); return { Choices: renderList(this.choices, (x) => renderChoice(x, queryLanguage)), Default: this.defaultChoice?.stateId, }; } /** * Render InputPath/Parameters/OutputPath/Arguments/Output in ASL JSON format */ protected renderInputOutput(): any { return { InputPath: renderJsonPath(this.inputPath), Parameters: this.parameters, OutputPath: renderJsonPath(this.outputPath), Arguments: this.arguments, Output: this.outputs, }; } /** * Render parallel branches in ASL JSON format */ protected renderBranches(): any { return { Branches: this.branches.map(b => b.toGraphJson()), }; } /** * Render map iterator in ASL JSON format */ protected renderIterator(): any { if (!this.iteration) return undefined; return { Iterator: this.iteration.toGraphJson(), }; } /** * Render error recovery options in ASL JSON format */ protected renderRetryCatch(topLevelQueryLanguage?: QueryLanguage): any { const queryLanguage = _getActualQueryLanguage(topLevelQueryLanguage, this.queryLanguage); return { Retry: renderList(this.retries, renderRetry, (a, b) => compareErrors(a.errors, b.errors)), Catch: renderList(this.catches, (x) => renderCatch(x, queryLanguage), (a, b) => compareErrors(a.props.errors, b.props.errors)), }; } /** * Render ResultSelector in ASL JSON format */ protected renderResultSelector(): any { return FieldUtils.renderObject({ ResultSelector: this.resultSelector, }); } /** * Render ItemProcessor in ASL JSON format */ protected renderItemProcessor(): any { if (!this.processor) return undefined; return { ItemProcessor: { ...this.renderProcessorConfig(), ...this.processor.toGraphJson(), }, }; } /** * Render ProcessorConfig in ASL JSON format */ private renderProcessorConfig() { const mode = this.processorConfig?.mode?.toString() ?? this.processorMode; if (mode === ProcessorMode.INLINE) { return { ProcessorConfig: { Mode: mode, }, }; } const executionType = this.processorConfig?.executionType?.toString(); return { ProcessorConfig: { Mode: mode, ExecutionType: executionType, }, }; } /** * Render QueryLanguage in ASL JSON format if needed. */ protected renderQueryLanguage(topLevelQueryLanguage?: QueryLanguage): any { topLevelQueryLanguage = topLevelQueryLanguage ?? QueryLanguage.JSON_PATH; if (topLevelQueryLanguage === QueryLanguage.JSONATA && this.queryLanguage === QueryLanguage.JSON_PATH) { throw new Error(`'queryLanguage' can not be 'JSONPath' if set to 'JSONata' for whole state machine ${this.node.path}`); } const queryLanguage = topLevelQueryLanguage === QueryLanguage.JSON_PATH && this.queryLanguage === QueryLanguage.JSONATA ? QueryLanguage.JSONATA : undefined; return { QueryLanguage: queryLanguage, }; } /** * Render the assign in ASL JSON format */ protected renderAssign(topLevelQueryLanguage?: QueryLanguage): any { const queryLanguage = _getActualQueryLanguage(topLevelQueryLanguage, this.queryLanguage); return { Assign: queryLanguage === QueryLanguage.JSON_PATH ? FieldUtils.renderObject(this.assign) : this.assign, }; } /** * Called whenever this state is bound to a graph * * Can be overridden by subclasses. */ protected whenBoundToGraph(graph: StateGraph) { graph.registerState(this); } /** * Add a state to the incoming list */ private addIncoming(source: State) { this.incomingStates.push(source); } /** * Return all states this state can transition to */ private outgoingTransitions(options: FindStateOptions): State[] { const ret = new Array<State>(); if (this._next) { ret.push(this._next); } if (this.defaultChoice) { ret.push(this.defaultChoice); } for (const c of this.choices) { ret.push(c.next); } if (options.includeErrorHandlers) { for (const c of this.catches) { ret.push(c.next); } } return ret; } } /** * Options for finding reachable states */ export interface FindStateOptions { /** * Whether or not to follow error-handling transitions * * @default false */ readonly includeErrorHandlers?: boolean; } /** * A Choice Transition */ interface ChoiceTransition extends ChoiceTransitionOptions { /** * State to transition to */ next: State; /** * Condition for this transition */ condition: Condition; } /** * Options for Choice Transition */ export interface ChoiceTransitionOptions extends AssignableStateOptions { /** * An optional description for the choice transition * * @default No comment */ readonly comment?: string; /** * This option for JSONata only. When you use JSONPath, then the state ignores this property. * Used to specify and transform output from the state. * When specified, the value overrides the state output default. * The output field accepts any JSON value (object, array, string, number, boolean, null). * Any string value, including those inside objects or arrays, * will be evaluated as JSONata if surrounded by {% %} characters. * Output also accepts a JSONata expression directly. * * @see https://docs.aws.amazon.com/step-functions/latest/dg/concepts-input-output-filtering.html * * @default - $states.result or $states.errorOutput */ readonly outputs?: any; } /** * Render a choice transition */ function renderChoice(c: ChoiceTransition, queryLanguage: QueryLanguage) { const optionsByLanguage = queryLanguage === QueryLanguage.JSONATA ? { Output: c.outputs, Assign: c.assign, } : { Assign: FieldUtils.renderObject(c.assign), }; return { ...c.condition.renderCondition(), ...optionsByLanguage, Next: c.next.stateId, Comment: c.comment, }; } /** * A Catch Transition */ interface CatchTransition { /** * State to transition to */ next: State; /** * Additional properties for this transition */ props: CatchProps; } /** * Render a Retry object to ASL */ function renderRetry(retry: RetryProps) { return { ErrorEquals: retry.errors, IntervalSeconds: retry.interval && retry.interval.toSeconds(), MaxAttempts: retry.maxAttempts, BackoffRate: retry.backoffRate, MaxDelaySeconds: retry.maxDelay && retry.maxDelay.toSeconds(), JitterStrategy: retry.jitterStrategy, }; } /** * Render a Catch object to ASL */ function renderCatch(c: CatchTransition, queryLanguage: QueryLanguage) { const optionsByLanguage = queryLanguage === QueryLanguage.JSONATA ? { Output: c.props.outputs, Assign: c.props.assign, } : { Assign: FieldUtils.renderObject(c.props.assign), }; return { ...optionsByLanguage, ErrorEquals: c.props.errors, ResultPath: renderJsonPath(c.props.resultPath), Next: c.next.stateId, }; } /** * Compares a list of Errors to move Errors.ALL last in a sort function */ function compareErrors(a?: string[], b?: string[]) { if (a?.includes(Errors.ALL)) { return 1; } if (b?.includes(Errors.ALL)) { return -1; } return 0; } /** * Validates an errors list */ function validateErrors(errors?: string[]) { if (errors?.includes(Errors.ALL) && errors.length > 1) { throw new Error(`${Errors.ALL} must appear alone in an error list`); } } /** * Render a list or return undefined for an empty list */ export function renderList<T>(xs: T[], mapFn: (x: T) => any, sortFn?: (a: T, b: T) => number): any { if (xs.length === 0) { return undefined; } let list = xs; if (sortFn) { list = xs.sort(sortFn); } return list.map(mapFn); } /** * Render JSON path, respecting the special value JsonPath.DISCARD */ export function renderJsonPath(jsonPath?: string): undefined | null | string { if (jsonPath === undefined) { return undefined; } if (!Token.isUnresolved(jsonPath) && !jsonPath.startsWith('$')) { throw new Error(`Expected JSON path to start with '$', got: ${jsonPath}`); } return jsonPath; } /** * Interface for structural feature testing (to make TypeScript happy) */ interface Prefixable { addPrefix(x: string): void; } /** * Whether an object is a Prefixable */ function isPrefixable(x: any): x is Prefixable { return typeof(x) === 'object' && x.addPrefix; } /** * Whether an object is INextable */ function isNextable(x: any): x is INextable { return typeof(x) === 'object' && x.next; } /** * @internal */ export function _getActualQueryLanguage(topLevelQueryLanguage?: QueryLanguage, stateLevelQueryLanguage?: QueryLanguage) { return stateLevelQueryLanguage ?? topLevelQueryLanguage ?? QueryLanguage.JSON_PATH; }