in source/lib/stage-two-orchestrator.ts [44:315]
constructor(scope: cdk.Construct, id: string, props: StageTwoOrchestratorProps) {
super(scope, id);
// -------------------------------------------------------------------------------------------
// Stage Two Orchestrator :: Tasks
const taskStartInventoryQuery = new tasks.AthenaStartQueryExecution(this, 'Start Inventory Query', {
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
queryString:
`SELECT COUNT(1) AS archiveCount, ` +
`COALESCE(SUM(size),0) AS vaultSize ` +
`FROM "${props.glueDataCatalog.inventoryTable.tableName}";`,
queryExecutionContext: {
databaseName: props.glueDataCatalog.inventoryDatabase.databaseName
},
workGroup: props.glueDataCatalog.athenaWorkgroup.name,
outputPath: '$.QueryExecution'
});
const taskGetInventoryResults = new tasks.AthenaGetQueryResults(this, 'Get Inventory Results', {
queryExecutionId: sfn.JsonPath.stringAt('$.QueryExecutionId'),
outputPath: '$.ResultSet.Rows[1]'
});
const taskStartPartitionedQuery = new tasks.AthenaStartQueryExecution(this, 'Start Partitioned Query', {
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
queryString:
`SELECT COUNT(1) AS archiveCount ` +
`FROM "${props.glueDataCatalog.partitionedInventoryTable.tableName}";`,
queryExecutionContext: {
databaseName: props.glueDataCatalog.inventoryDatabase.databaseName
},
workGroup: props.glueDataCatalog.athenaWorkgroup.name,
outputPath: '$.QueryExecution'
});
const taskGetPartitionedResults = new tasks.AthenaGetQueryResults(this, 'Get Partitioned Results', {
queryExecutionId: sfn.JsonPath.stringAt('$.QueryExecutionId'),
outputPath: '$.ResultSet.Rows[1]'
});
const parallelQueries = new sfn.Parallel(this, 'Parallel Queries', {
resultSelector: {
inventoryTable: {
archiveCount: sfn.JsonPath.numberAt('$[0].Data[0].VarCharValue'),
vaultSize: sfn.JsonPath.numberAt('$[0].Data[1].VarCharValue')
},
partitionedTable: {
archiveCount: sfn.JsonPath.numberAt('$[1].Data[0].VarCharValue')
}
}
});
const taskGluePartitioningJob = new tasks.GlueStartJobRun(this, 'Run Glue Partitioning', {
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
glueJobName: props.glueJobName,
arguments:
sfn.TaskInput.fromObject({
'--ARCHIVE_COUNT.$': '$.inventoryTable.archiveCount',
'--VAULT_SIZE.$': '$.inventoryTable.vaultSize'})
});
const taskUpdateMetricCount = new tasks.DynamoPutItem(this, 'Update Count Metric', {
item: {
pk: tasks.DynamoAttributeValue.fromString('count'),
total: tasks.DynamoAttributeValue.numberFromString(sfn.JsonPath.stringAt('$.inventoryTable.archiveCount'))
},
table: props.dynamoDataCatalog.metricTable,
resultPath: '$.putItemResult'
});
const taskUpdateMetricSize = new tasks.DynamoPutItem(this, 'Update Size Metric', {
item: {
pk: tasks.DynamoAttributeValue.fromString('volume'),
total: tasks.DynamoAttributeValue.numberFromString(sfn.JsonPath.stringAt('$.inventoryTable.vaultSize'))
},
table: props.dynamoDataCatalog.metricTable,
resultPath: '$.putItemResult'
});
const taskSubmitAnonymousStatistics = new tasks.LambdaInvoke(this, 'Send Anonymous Statistics', {
lambdaFunction: props.sendAnonymousStats,
inputPath: '$.inventoryTable'
});
taskSubmitAnonymousStatistics.addRetry({
interval: cdk.Duration.seconds(2),
maxAttempts: 6,
backoffRate: 2
});
// Ignore even unrecoverable errors to avoid interfering with the main process
taskSubmitAnonymousStatistics.addCatch(new sfn.Pass(this, 'Ignore SendStats Errors', {}),
{errors: ["States.ALL"]});
const taskStartMaxPartitionQuery = new tasks.AthenaStartQueryExecution(this, 'Start Max Partition Query', {
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
queryString:
`SELECT '{ "nextPartition": 0, "maxPartition" :' || CAST(MAX(part) AS VARCHAR) || '}'` +
`FROM "${props.glueDataCatalog.partitionedInventoryTable.tableName}";`,
queryExecutionContext: {
databaseName: props.glueDataCatalog.inventoryDatabase.databaseName
},
workGroup: props.glueDataCatalog.athenaWorkgroup.name,
outputPath: '$.QueryExecution'
});
const taskGetMaxPartitionResult = new tasks.AthenaGetQueryResults(this, 'Get Max Partition Result', {
queryExecutionId: sfn.JsonPath.stringAt('$.QueryExecutionId'),
resultSelector: {
'result.$': 'States.StringToJson($.ResultSet.Rows[1].Data[0].VarCharValue)'
},
outputPath: '$.result'
});
const taskRequestArchives = new tasks.LambdaInvoke(this, 'Request Archives Retrieval', {
lambdaFunction: props.requestArchives,
outputPath: '$.Payload'
});
taskRequestArchives.addRetry({
maxAttempts: 10000,
backoffRate: 1,
interval: cdk.Duration.seconds(15)
});
const taskWaitX = new sfn.Wait(this, 'Wait X Seconds', {
time: sfn.WaitTime.secondsPath('$.timeout'),
});
// failures
const failOnEmptyInventory = new sfn.Fail(this, 'FAIL: Inventory Empty', {error: 'Vault Inventory Table is empty. Has it been downloaded?'});
const failOnInventoryMismatch = new sfn.Fail(this, 'FAIL: Inventory-Partitioned Mismatch', {
error: 'Inventory and Partitioned table counts are greater than 0 and do not match. Cannot proceed.'
});
// conditionals
const isInventoryEmpty = sfn.Condition.stringEquals('$.inventoryTable.archiveCount', "0");
const equalsPartitionedCountInventory =
sfn.Condition.stringEqualsJsonPath(
'$.inventoryTable.archiveCount',
'$.partitionedTable.archiveCount');
const notConsistentPartitionedTable =
sfn.Condition.and(
sfn.Condition.not(sfn.Condition.stringEquals('$.partitionedTable.archiveCount', "0")),
sfn.Condition.not(equalsPartitionedCountInventory));
const isComplete = sfn.Condition.numberGreaterThanJsonPath('$.nextPartition', '$.maxPartition');
// branching
const parallelPartitioning = new sfn.Parallel(this, 'Parallel Partitioning and Stats update', {});
const checkPartitionStatus = new sfn.Choice(this, 'Partitioning Required ?');
const checkInventory = new sfn.Choice(this, 'Check Inventory State');
const success = new sfn.Succeed(this, 'Success');
// -------------------------------------------------------------------------------------------
// Stage Two Orchestrator :: Graph
const graphDefinition = parallelQueries
.branch(taskStartInventoryQuery.next(taskGetInventoryResults))
.branch(taskStartPartitionedQuery.next(taskGetPartitionedResults))
.next(checkInventory);
checkInventory
.when(isInventoryEmpty, failOnEmptyInventory)
.when(notConsistentPartitionedTable, failOnInventoryMismatch)
.otherwise(checkPartitionStatus);
checkPartitionStatus
.when(equalsPartitionedCountInventory, taskStartMaxPartitionQuery)
.otherwise(parallelPartitioning);
taskUpdateMetricCount
.next(taskUpdateMetricSize)
.next(taskSubmitAnonymousStatistics)
parallelPartitioning
.branch(taskGluePartitioningJob)
.branch(taskUpdateMetricCount)
.next(taskStartMaxPartitionQuery);
taskStartMaxPartitionQuery
.next(taskGetMaxPartitionResult)
.next(taskRequestArchives);
taskRequestArchives
.next(new sfn.Choice(this, 'Is Complete?')
.when(isComplete, success)
.otherwise(taskWaitX.next(taskRequestArchives))); // loop
// -------------------------------------------------------------------------------------------
// Stage Two Orchestrator
const stageTwoOrchestratorLogGroup = logs.LogGroup.fromLogGroupName(this, 'Orchestrator', `/aws/vendedlogs/states/${cdk.Aws.STACK_NAME}-stageTwoOrchestrator`);
// Stage Two Orchestrator :: IAM
const stageTwoOrchestratorRole = new iam.Role(this, 'OrchestratorRole', {
assumedBy: new iam.ServicePrincipal('states.amazonaws.com')
});
props.stagingBucket.grantReadWrite(stageTwoOrchestratorRole);
props.requestArchives.grantInvoke(stageTwoOrchestratorRole);
props.sendAnonymousStats.grantInvoke(stageTwoOrchestratorRole);
props.dynamoDataCatalog.metricTable.grantWriteData(stageTwoOrchestratorRole);
stageTwoOrchestratorLogGroup.grantWrite(stageTwoOrchestratorRole);
stageTwoOrchestratorRole.addToPrincipalPolicy(iamSec.IamPermissions.athena([
props.glueDataCatalog.inventoryDatabase.catalogArn,
props.glueDataCatalog.inventoryDatabase.databaseArn,
`arn:aws:athena:*:${cdk.Aws.ACCOUNT_ID}:workgroup/${props.glueDataCatalog.athenaWorkgroup.name}`,
props.glueDataCatalog.inventoryTable.tableArn,
props.glueDataCatalog.partitionedInventoryTable.tableArn
]));
stageTwoOrchestratorRole.addToPrincipalPolicy(new iam.PolicyStatement({
sid: 'allowGlueJobRun',
effect: iam.Effect.ALLOW,
actions: [
'glue:StartJobRun',
'glue:GetJobRun',
'glue:GetJobRuns'
],
resources: [`arn:aws:glue:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:job/${props.glueJobName}`],
}));
stageTwoOrchestratorRole.addToPrincipalPolicy(new iam.PolicyStatement({
sid: 'allowLogDelivery',
effect: iam.Effect.ALLOW,
actions: [
'logs:CreateLogDelivery',
'logs:GetLogDelivery',
'logs:UpdateLogDelivery',
'logs:DeleteLogDelivery',
'logs:ListLogDeliveries',
'logs:PutResourcePolicy',
'logs:DescribeResourcePolicies',
'logs:DescribeLogGroups'
],
resources: [
'*'
]
}));
const defaultOrchetratorPolicy = stageTwoOrchestratorRole.node.findChild('DefaultPolicy').node.defaultChild as cdk.CfnResource;
defaultOrchetratorPolicy.addMetadata('cfn_nag', {
rules_to_suppress:
[
{
id: 'W12',
reason: '[*] Access granted as per documentation: https://docs.aws.amazon.com/step-functions/latest/dg/cw-logs.html'
},
{
id: 'W76',
reason: 'SPCM complexity greater then 25 is appropriate for the logic implemented'
}
]
});
// Stage Two Orchestrator :: StepFunction
this.stateMachine = new sfn.StateMachine(this, 'StageTwoOrchestrator', {
stateMachineName: `${cdk.Aws.STACK_NAME}-stageTwoOrchestrator`,
stateMachineType: sfn.StateMachineType.STANDARD,
definition: graphDefinition,
role: stageTwoOrchestratorRole.withoutPolicyUpdates(),
logs: {
destination: stageTwoOrchestratorLogGroup,
level: sfn.LogLevel.ALL,
}
});
(this.stateMachine.node.defaultChild as sfn.CfnStateMachine).overrideLogicalId(`stageTwoOrchestrator`);
this.stateMachine.node.addDependency(stageTwoOrchestratorRole);
}