in main/cdk/process-plate-stack.ts [58:493]
constructor(app: cdk.App, id: string, props: ProcessPlateStackProps) {
super(app, id, props);
this.messageLambda = props.messageLambda
this.imageInspectorLambda = new lambda.Function(
this,
"imageInspectorFunction",
{
code: lambda.Code.fromAsset("src/image-inspector/build"),
handler: "image-inspector.handler",
runtime: lambda.Runtime.PYTHON_3_8,
memorySize: 3008,
timeout: cdk.Duration.minutes(15),
environment: {
MESSAGE_LAMBDA_ARN: props.messageLambda.functionArn,
IMAGE_MANAGEMENT_LAMBDA_ARN: props.imageManagementLambda.functionArn,
},
}
);
///////////////////////////////////////////
//
// UploadSourcePlate State Machine
//
///////////////////////////////////////////
const plateFormat1 = new sfn.Pass(this, "Plate Format 1", {
parameters: {
method: "getImagesByPlateId",
plateId: sfn.JsonPath.stringAt("$.plateId"),
},
resultPath: '$.getImagesInput',
});
const plateToImages1 = new tasks.LambdaInvoke(this, "Plate To Images 1", {
lambdaFunction: props.imageManagementLambda,
inputPath: '$.getImagesInput',
resultPath: '$.imageList'
});
const imageInspector = new tasks.LambdaInvoke(this, "Image Inspector", {
lambdaFunction: this.imageInspectorLambda,
outputPath: '$.Payload'
});
const inspectorMap = new sfn.Map(this, "Inspector Map", {
maxConcurrency: 0,
itemsPath: '$.imageList.Payload.body',
resultPath: '$.inspectorMapResult',
});
inspectorMap.iterator(imageInspector);
const plateValidatorInput = new sfn.Pass(this, "Plate Validator Input", {
parameters: {
method: "validatePlate",
plateId: sfn.JsonPath.stringAt("$.plateId")
},
resultPath: '$.validatorInput',
});
const validationFailure1 = new sfn.Fail(this, 'Validation Failed 1', {
cause: 'Validation Failed',
error: 'Validation did not return VALIDATED',
});
const validationSuccess1 = new sfn.Succeed(this, "Validation Succeeded 1", {
comment: "Validation Suceeded"
});
const plateValidator = new tasks.LambdaInvoke(this, "Plate Validator", {
lambdaFunction: props.imageManagementLambda,
inputPath: '$.validatorInput',
resultPath: '$.validatorOutput'
});
const describeStacksInput1 = new sfn.Pass(this, "DescribeStacksInput1", {
parameters: {
method: "createDescribeStacksArtifact",
contextId: sfn.JsonPath.stringAt("$.plateId"),
trainId: "origin"
},
resultPath: '$.describeStacksInput'
});
const describeStacksInjector1 = new tasks.LambdaInvoke(this, "DescribeStacksInjector1", {
lambdaFunction: props.artifactLambda,
inputPath: '$.describeStacksInput',
resultPath: '$.describeStacks'
});
const artifactFunction = new tasks.LambdaInvoke(this, "Image Artifacts", {
lambdaFunction: props.defaultArtifactLambda,
outputPath: '$.Payload'
});
const artifactMap = new sfn.Map(this, "Artifact Map", {
maxConcurrency: 0,
parameters: {
'imageId.$' : "$$.Map.Item.Value.Item.imageId",
describeStacks: sfn.JsonPath.stringAt("$.describeStacks")
},
itemsPath: '$.imageList.Payload.body',
resultPath: '$.artifactMapResult',
});
artifactMap.iterator(artifactFunction);
const artifactSequence = describeStacksInput1.next(describeStacksInjector1).next(artifactMap).next(validationSuccess1)
const artifactChoice = new sfn.Choice(this, "ArtifactChoice")
.when(sfn.Condition.stringMatches('$.validatorOutput.Payload.body', "VALIDATED"), artifactSequence)
.when(sfn.Condition.stringMatches('$.validatorOutput.Payload.body', "READY"), validationSuccess1)
.otherwise(validationFailure1);
const uploadSourcePlateStepFunctionDef = plateValidatorInput
.next(plateFormat1)
.next(plateToImages1)
.next(inspectorMap)
.next(plateValidator)
.next(artifactChoice)
const uploadSourcePlateGroup = new logs.LogGroup(this, "UploadSourcePlateLogGroup");
this.uploadSourcePlateStateMachine = new sfn.StateMachine(
this,
"Upload Source Plate StateMachine",
{
definition: uploadSourcePlateStepFunctionDef,
timeout: cdk.Duration.minutes(60),
logs: {
destination: uploadSourcePlateGroup,
level: sfn.LogLevel.ALL,
},
}
);
///////////////////////////////////////////
//
// Process Plate State Machine
//
///////////////////////////////////////////
const describeStacksInput2 = new sfn.Pass(this, "DescribeStacksInput2", {
parameters: {
method: "createDescribeStacksArtifact",
contextId: sfn.JsonPath.stringAt("$.plateId"),
trainId: "origin"
},
resultPath: '$.describeStacksInput'
});
const describeStacksInjector2 = new tasks.LambdaInvoke(this, "DescribeStacksInjector2", {
lambdaFunction: props.artifactLambda,
inputPath: '$.describeStacksInput',
resultPath: '$.describeStacks'
});
const plateMessageInput = new sfn.Pass(this, "Plate Message Input", {
parameters: {
method: "getPlateMessageId",
plateId: sfn.JsonPath.stringAt("$.plateId"),
},
resultPath: '$.plateMessageInput',
});
const getPlateMessage = new tasks.LambdaInvoke(this, "Get Plate Message", {
lambdaFunction: props.imageManagementLambda,
inputPath: '$.plateMessageInput',
resultPath: '$.plateMessageId'
});
const plateMessagePass = new sfn.Pass(this, "PlateMessagePass", {
parameters: {
plateId: sfn.JsonPath.stringAt("$.plateId"),
embeddingName: sfn.JsonPath.stringAt("$.embeddingName"),
plateMessageId: sfn.JsonPath.stringAt("$.plateMessageId.Payload.body")
}
});
const startMessage = this.createSfnMessage("StartMessage", "Process Plate Start");
const plateValidatorInputPP = new sfn.Pass(this, "Plate Validator Input PP", {
parameters: {
method: "validatePlate",
plateId: sfn.JsonPath.stringAt("$.plateId")
},
resultPath: '$.validatorInput',
});
const plateValidatorPP = new tasks.LambdaInvoke(this, "Plate Validator PP", {
lambdaFunction: props.imageManagementLambda,
inputPath: '$.validatorInput',
resultPath: '$.plateStatus'
});
const validationFailureMessage = this.createSfnMessage("PlateValidationFailureMessage", "Plate Validation Failed");
const plateValidationFailed = new sfn.Fail(this, 'Plate Validation Failed', {
cause: 'Plate Validation Failed',
error: 'Plate Validation Failed',
});
const validationFailure = validationFailureMessage.next(plateValidationFailed);
const validationSucceededMessage = this.createSfnMessage("ValidationSucceeded", "Plate Validation Succeeded");
const embeddingInfoRequest = new sfn.Pass(this, "Embedding Info Request", {
parameters: {
method: "getEmbeddingInfo",
embeddingName: sfn.JsonPath.stringAt('$.embeddingName')
},
resultPath: '$.embeddingInfoRequest'
})
const embeddingInfo = new tasks.LambdaInvoke(this, "Embedding Info", {
lambdaFunction: props.trainingConfigurationLambda,
resultPath: '$.embeddingInfo',
inputPath: '$.embeddingInfoRequest',
});
const processPlateLambda = this.createSfnMessage("PlateLambda", "Placeholder Plate Lambda Task");
const skippingPlateMessage = this.createSfnMessage("SkippingPlate", "No valid Arn for Plate processing - skipping");
const processPlateBatch = new tasks.BatchSubmitJob (this, "PlateBatchJob", {
jobDefinitionArn: sfn.JsonPath.stringAt('$.embeddingInfo.Payload.body.Item.plateMethodArn'),
jobName: sfn.JsonPath.stringAt('$.plateId'),
// jobQueue: props.batchSpotQueue,
jobQueueArn: props.batchOnDemandQueue.jobQueueArn,
payload: {
type: sfn.InputType.OBJECT,
value: {
regionArg: '--region',
region: process.env.CDK_DEFAULT_REGION,
bucketArg: '--bucket',
bucket: props.dataBucket.bucketName,
plateIdArg: '--plateId',
plateId: sfn.JsonPath.stringAt('$.plateId'),
embeddingNameArg: '--embeddingName',
embeddingName: sfn.JsonPath.stringAt('$.embeddingName')
}
},
resultPath: '$.plateBatchOutput'
});
const listWellsInput = new sfn.Pass(this, "ListWellsInput", {
parameters: {
method: "getWellsByPlateId",
plateId: sfn.JsonPath.stringAt('$.plateId'),
},
resultPath: '$.listWellsInput'
});
const listWellsFunction = new tasks.LambdaInvoke(this, "ListWellsFunction", {
lambdaFunction: props.imageManagementLambda,
resultPath: '$.wellList',
inputPath: '$.listWellsInput',
});
const listWells = listWellsInput.next(listWellsFunction);
const processWellLambda = this.createSfnMessage("WellLambda", "Placeholder Well Lambda Task");
const processWellBatch = this.createSfnMessage("WellBatch", "Placeholder Well Batch Task");
const skippingWellMessage = this.createSfnMessage("SkippingWell", "No valid Arn for Well processing - skipping");
// Example Arns
// arn:aws:lambda:us-east-1:580829821648:function:BioimageSearchLabelStack-labelFunction58A4020A-1TEZ4YLWTTXDH
// arn:aws:batch:us-east-1:580829821648:job-definition/platepreprocessingjobde-614a2d2923fd2c7:1
const wellPass = new sfn.Pass(this, "WellPass", {
parameters: {
wellId: sfn.JsonPath.stringAt('$.wellId')
}
});
const wellChoice = new sfn.Choice(this, "Well Arn Service")
.when(sfn.Condition.stringMatches('$.wellMethodArn', "arn:aws:lambda:*"), processWellLambda)
.when(sfn.Condition.stringMatches('$.wellMethodArn', "arn:aws:batch:*"), processWellBatch)
.otherwise(skippingWellMessage)
const wellProcessor = wellChoice.afterwards().next(wellPass)
const wellMap = new sfn.Map(this, "Well Map", {
maxConcurrency: 0,
parameters: {
wellMethodArn: sfn.JsonPath.stringAt('$.embeddingInfo.Payload.body.Item.wellMethodArn'),
'wellId.$' : "$$.Map.Item.Value",
plateMessageId: sfn.JsonPath.stringAt("$.plateMessageId")
},
itemsPath: '$.wellList.Payload.body',
resultPath: '$.wellMapResult',
});
wellMap.iterator(wellProcessor);
const listImagesInput = new sfn.Pass(this, "ListImagesInput", {
parameters: {
method: "getImageIdsByPlateId",
plateId: sfn.JsonPath.stringAt('$.plateId'),
},
resultPath: '$.listImagesInput'
});
const listImagesFunction = new tasks.LambdaInvoke(this, "ListImagesFunction", {
lambdaFunction: props.imageManagementLambda,
resultPath: '$.imageList',
inputPath: '$.listImagesInput',
});
const listImages = listImagesInput.next(listImagesFunction);
const imageArnFailureMessage = this.createSfnMessage("ImageArnFailureMessage", "Invalid Image Arn");
const imageArnFail = new sfn.Fail(this, 'Image Arn Selection Failed', {
cause: 'Invalid Image Arn',
error: 'Invalid Image Arn',
});
const imageArnFailure = imageArnFailureMessage.next(imageArnFail);
const processImageLambda = this.createSfnMessage("ImageLambda", "Placeholder Image Lambda Task");
const processImageBatch = new tasks.BatchSubmitJob (this, "ImageBatchJob", {
jobDefinitionArn: sfn.JsonPath.stringAt('$.imageMethodArn'),
jobName: sfn.JsonPath.stringAt('$.imageId'),
// jobQueue: props.batchSpotQueue,
jobQueueArn: props.batchOnDemandQueue.jobQueueArn,
payload: {
type: sfn.InputType.OBJECT,
value: {
regionArg: '--region',
region: process.env.CDK_DEFAULT_REGION,
bucketArg: '--bucket',
bucket: props.dataBucket.bucketName,
imageIdArg: '--imageId',
imageId: sfn.JsonPath.stringAt('$.imageId'),
embeddingNameArg: '--embeddingName',
embeddingName: sfn.JsonPath.stringAt('$.embeddingName'),
describeStacksArg: '--describeStacks',
describeStacks: sfn.JsonPath.stringAt('$.describeStacks.Payload.body.key')
}
},
resultPath: '$.imageBatchOutput',
});
const imageChoice = new sfn.Choice(this, "Image Arn Service")
.when(sfn.Condition.stringMatches('$.imageMethodArn', "arn:aws:lambda:*"), processImageLambda)
.when(sfn.Condition.stringMatches('$.imageMethodArn', "arn:aws:batch:*"), processImageBatch)
.otherwise(imageArnFailure);
const imagePass = new sfn.Pass(this, "ImagePass", {
parameters: {
imageId: sfn.JsonPath.stringAt('$.imageId')
}
});
const imageProcessor = imageChoice.afterwards().next(imagePass);
const imageMap = new sfn.Map(this, "Image Map", {
maxConcurrency: 0,
parameters: {
imageMethodArn: sfn.JsonPath.stringAt('$.embeddingInfo.Payload.body.Item.imageMethodArn'),
'imageId.$' : "$$.Map.Item.Value",
plateMessageId: sfn.JsonPath.stringAt("$.plateMessageId"),
embeddingName: sfn.JsonPath.stringAt('$.embeddingName'),
describeStacks: sfn.JsonPath.stringAt("$.describeStacks")
},
itemsPath: '$.imageList.Payload.body',
resultPath: '$.imageMapResult',
});
imageMap.iterator(imageProcessor);
const processPlateSuccess = new sfn.Succeed(this, "Process Plate Success", {
comment: "Process Plate Suceeded"
});
const processPlateStepFunctionDef = plateMessageInput
.next(getPlateMessage)
.next(plateMessagePass)
.next(startMessage)
.next(plateValidatorInputPP)
.next(plateValidatorPP)
.next(new sfn.Choice(this, 'Validation Check')
.when(sfn.Condition.not(sfn.Condition.stringEquals('$.plateStatus.Payload.body', 'VALIDATED')), validationFailure)
.otherwise(validationSucceededMessage)
.afterwards())
.next(embeddingInfoRequest)
.next(embeddingInfo)
.next(describeStacksInput2)
.next(describeStacksInjector2)
.next(new sfn.Choice(this, "Plate Arn Service")
.when(sfn.Condition.stringMatches('$.embeddingInfo.Payload.body.Item.plateMethodArn', "arn:aws:lambda:*"), processPlateLambda)
.when(sfn.Condition.stringMatches('$.embeddingInfo.Payload.body.Item.plateMethodArn', "arn:aws:batch:*"), processPlateBatch)
.otherwise(skippingPlateMessage)
.afterwards())
.next(listWells)
.next(wellMap)
.next(listImages)
.next(imageMap)
.next(processPlateSuccess)
const logGroup = new logs.LogGroup(this, "ProcessPlateLogGroup");
this.processPlateStateMachine = new sfn.StateMachine(
this,
"Process Plate StateMachine",
{
definition: processPlateStepFunctionDef,
timeout: cdk.Duration.minutes(360),
logs: {
destination: logGroup,
level: sfn.LogLevel.ALL,
},
}
);
//////////////////////////////////////////
this.processPlateLambda = new lambda.Function(
this,
"processPlateFunction",
{
code: lambda.Code.fromAsset("src/process-plate/build"),
handler: "process-plate.handler",
runtime: lambda.Runtime.NODEJS_12_X,
memorySize: 512,
timeout: cdk.Duration.minutes(15),
environment: {
MESSAGE_LAMBDA_ARN: props.messageLambda.functionArn,
IMAGE_MANAGEMENT_LAMBDA_ARN: props.imageManagementLambda.functionArn,
PROCESS_PLATE_SFN_ARN: this.processPlateStateMachine.stateMachineArn,
UPLOAD_SOURCE_PLATE_SFN_ARN: this.uploadSourcePlateStateMachine.stateMachineArn,
},
}
);
}