in infra/stack/mlops/mlops-pipeline-stack.ts [108:246]
private createStateMachine(props: StateMachineProps): sfn.StateMachine {
const startState = new sfn.Pass(this, `Start`);
const finishState = new sfn.Pass(this, `Finish`);
const etlState = new tasks.GlueStartJobRun(this, `Glue ETL`, {
glueJobName: props.glueJob.name!,
arguments: sfn.TaskInput.fromJsonPathAt('$.PreprocessGlue'),
timeout: cdk.Duration.minutes(props.glueJobTimeoutInMin),
notifyDelayAfter: cdk.Duration.minutes(1),
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
resultPath: '$.Result',
});
const trainingState = new tasks.SageMakerCreateTrainingJob(this, `Train Model`, {
trainingJobName: sfn.JsonPath.stringAt('$.TrainSageMaker.TrainingJobName'), // `${this.props.projectFullName}-PartsRecommendationTrainingJob`,// 이전 state에 전달 받고 싶다면 ==> sfn.JsonPath.stringAt('$.TrainingJobName'),
role: props.trainJobRole,
algorithmSpecification: {
trainingImage: tasks.DockerImage.fromRegistry(props.trainContainerImage),
trainingInputMode: tasks.InputMode.FILE,
},
hyperparameters: props.trainParameters,
inputDataConfig: [
{
channelName: 'train',
contentType: props.trainInputContent,
dataSource: {
s3DataSource: {
s3Location: tasks.S3Location.fromJsonExpression('$.TrainSageMaker.TrainData'), //ToDo Change
s3DataDistributionType: tasks.S3DataDistributionType.SHARDED_BY_S3_KEY,
},
},
},
{
channelName: 'validation',
contentType: props.trainInputContent,
dataSource: {
s3DataSource: {
s3Location: tasks.S3Location.fromJsonExpression('$.TrainSageMaker.ValidateData'), //ToDo Change
s3DataDistributionType: tasks.S3DataDistributionType.SHARDED_BY_S3_KEY,
},
},
}
],
outputDataConfig: {
s3OutputLocation: tasks.S3Location.fromJsonExpression('$.TrainSageMaker.TrainOutput'),
},
resourceConfig: {
instanceCount: 1,
instanceType: props.trainInstanceType,
volumeSize: cdk.Size.gibibytes(50),
},
stoppingCondition: {
maxRuntime: cdk.Duration.hours(1),
},
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
resultPath: '$.Result',
});
const createModelState = new tasks.SageMakerCreateModel(this, `Create Model`, {
modelName: sfn.JsonPath.stringAt('$.ServeSageMaker.ModelName'),
primaryContainer: new tasks.ContainerDefinition({
image: tasks.DockerImage.fromJsonExpression(sfn.JsonPath.stringAt('$.Result.AlgorithmSpecification.TrainingImage')),
mode: tasks.Mode.SINGLE_MODEL,
modelS3Location: tasks.S3Location.fromJsonExpression('$.Result.ModelArtifacts.S3ModelArtifacts'),
}),
resultPath: '$.Result',
});
let queryAccuracyState = undefined;
if (props.queryAccuracyLambda != undefined) {
queryAccuracyState = new tasks.LambdaInvoke(this, `Query Accuracy`, {
lambdaFunction: props.queryAccuracyLambda,
resultPath: '$.Result',
});
}
const configeEndpointState = new tasks.SageMakerCreateEndpointConfig(this, `Config Endpoint`, {
endpointConfigName: sfn.JsonPath.stringAt('$.ServeSageMaker.EndpointConfigName'),
productionVariants: [{
initialInstanceCount: props.endpointInstanceCount,
instanceType: props.endpointInstanceType,
modelName: sfn.JsonPath.stringAt('$.ServeSageMaker.ModelName'),
variantName: 'variant1',
}],
resultPath: '$.Result',
});
let choiceAccuracyConditionState = undefined;
if (props.queryAccuracyLambda != undefined) {
choiceAccuracyConditionState = new sfn.Choice(this, `Accuracy higher(${props.modelErrorThreshold})?`)
.when(sfn.Condition.numberLessThan('$.Result.Payload.Metrics[0].Value', props.modelErrorThreshold), configeEndpointState)
.otherwise(finishState);
}
const queryEndpointState = new tasks.LambdaInvoke(this, `Query Endpoint`, {
lambdaFunction: props.queryEndpointLambda,
resultPath: '$.Result',
});
const createEndpointState = new tasks.SageMakerCreateEndpoint(this, `Create Endpoint`, {
endpointName: sfn.JsonPath.stringAt('$.ServeSageMaker.EndpointName'),
endpointConfigName: sfn.JsonPath.stringAt('$.ServeSageMaker.EndpointConfigName'),
resultPath: '$.Result',
});
const uddateEndpointState = new tasks.SageMakerUpdateEndpoint(this, `Update Endpoint`, {
endpointName: sfn.JsonPath.stringAt('$.ServeSageMaker.EndpointName'),
endpointConfigName: sfn.JsonPath.stringAt('$.ServeSageMaker.EndpointConfigName'),
});
const choiceExistentConditionState = new sfn.Choice(this, 'Endpoint Existent?')
.when(sfn.Condition.stringEquals('$.Result.Payload.Existent', 'TRUE'), uddateEndpointState)
.otherwise(createEndpointState);
startState.next(etlState);
etlState.next(trainingState);
trainingState.next(createModelState);
if (queryAccuracyState != undefined && choiceAccuracyConditionState != undefined) {
createModelState.next(queryAccuracyState);
queryAccuracyState.next(choiceAccuracyConditionState);
configeEndpointState.next(queryEndpointState);
queryEndpointState.next(choiceExistentConditionState);
choiceExistentConditionState.afterwards().next(finishState);
} else {
createModelState.next(configeEndpointState);
configeEndpointState.next(queryEndpointState);
queryEndpointState.next(choiceExistentConditionState);
choiceExistentConditionState.afterwards().next(finishState);
}
const stateMachine = new sfn.StateMachine(this, `StateMachine-${props.statemachineName}`, {
stateMachineName: `${this.projectPrefix}-${props.statemachineName}`,
definition: startState,
role: props.statemachineRole
});
return stateMachine;
}