in maestro-engine/src/main/java/com/netflix/maestro/engine/dao/MaestroWorkflowDao.java [226:302]
public WorkflowDefinition addWorkflowDefinition(
WorkflowDefinition workflowDef, Properties changes) {
LOG.info("Adding a new workflow definition with an id [{}]", workflowDef.getWorkflow().getId());
final Workflow workflow = workflowDef.getWorkflow();
final Metadata metadata = workflowDef.getMetadata();
return withMetricLogError(
() ->
withRetryableTransaction(
conn -> {
WorkflowInfo workflowInfo = getWorkflowInfoForUpdate(conn, workflow.getId());
final long nextVersionId = workflowInfo.getLatestVersionId() + 1;
// update the metadata with version info and then metadata is complete.
metadata.setWorkflowVersionId(nextVersionId);
TriggerUuids triggerUuids =
insertMaestroWorkflowVersion(conn, metadata, workflow);
PropertiesSnapshot snapshot =
updateWorkflowProps(
conn,
workflow.getId(),
metadata.getVersionAuthor(),
metadata.getCreateTime(),
workflowInfo.getPrevPropertiesSnapshot(),
changes,
new PropertiesUpdate(Type.ADD_WORKFLOW_DEFINITION));
// add new snapshot to workflowDef
if (snapshot != null) {
workflowDef.setPropertiesSnapshot(snapshot);
} else {
workflowDef.setPropertiesSnapshot(workflowInfo.getPrevPropertiesSnapshot());
}
final long[] upsertRes = upsertMaestroWorkflow(conn, workflowDef);
Checks.notNull(
upsertRes,
"the upsert result should not be null for workflow [%s]",
workflow.getId());
workflowDef.setIsLatest(true); // a new version will always be latest
// add default flag and modified_time and then workflowDef is complete
workflowDef.setIsDefault(
workflowInfo.getPrevActiveVersionId() == Constants.INACTIVE_VERSION_ID
|| workflowDef.getIsActive());
workflowDef.setModifyTime(upsertRes[0]);
workflowDef.setInternalId(upsertRes[1]);
if (workflowDef.getIsActive()) {
workflowInfo.setNextActiveWorkflow(
MaestroWorkflowVersion.builder()
.definition(workflow)
.triggerUuids(triggerUuids)
.metadata(metadata)
.build(),
workflowDef.getPropertiesSnapshot());
} else if (workflowInfo.getPrevActiveVersionId()
!= Constants.INACTIVE_VERSION_ID) {
// getting an inactive new version but having an active old version
updateWorkflowInfoForNextActiveWorkflow(
conn,
workflow.getId(),
workflowInfo.getPrevActiveVersionId(),
workflowInfo,
workflowDef.getPropertiesSnapshot());
}
if (workflowInfo.withWorkflow()) {
addWorkflowTriggersIfNeeded(conn, workflowInfo);
}
MaestroJobEvent jobEvent =
logToTimeline(
conn, workflowDef, snapshot, workflowInfo.getPrevActiveVersionId());
publisher.publishOrThrow(
jobEvent, "Failed to publish maestro definition change job event.");
return workflowDef;
}),
"addWorkflowDefinition",
"Failed creating a new workflow definition {}",
workflow.getId());
}