in core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java [239:439]
protected Object doTaskBodyPossiblyResuming(WorkflowStepInstanceExecutionContext context, ReplayContinuationInstructions instructionsForResuming, WorkflowExecutionContext subworkflowTargetForResuming) {
ManagementContext mgmt = context.getManagementContext();
UpdateChildrenStepState stepStateO = getStepState(context);
UpdateChildrenStepState stepState;
if (stepStateO==null) {
stepState = new UpdateChildrenStepState();
Object parentId = context.getInput(PARENT);
stepState.parent = parentId!=null ? WorkflowStepResolution.findEntity(context, parentId).get() : context.getEntity();
stepState.identifier_expression = TypeCoercions.coerce(context.getInputRaw(IDENTIFIER_EXRPESSION.getName()), String.class);
try {
stepState.items = context.getInput(ITEMS);
} catch (Exception e) {
throw new IllegalStateException("Cannot resolve items as a list", e);
}
if (stepState.items==null) throw new IllegalStateException("Items cannot be null");
setStepState(context, stepState);
} else {
stepState = stepStateO;
}
Object blueprintNotYetInterpolated = resolveBlueprint(context, () -> {
String type = context.getInput(TYPE);
if (Strings.isBlank(type)) throw new IllegalStateException("blueprint or type must be supplied"); // should've been caught earlier but check again for good measure
return "type: " + StringEscapes.JavaStringEscapes.wrapJavaString(type);
}, SetVariableWorkflowStep.InterpolationMode.DISABLED, TemplateProcessor.InterpolationErrorMode.FAIL);
BiFunction<CustomWorkflowStep, Consumer<ForeachWorkflowStep>,ConfigBag> outerWorkflowCustomers = (checkWorkflow, foreachCustom) -> {
ForeachWorkflowStep foreach = new ForeachWorkflowStep(checkWorkflow);
foreach.getInput().put("parent", stepState.parent);
foreach.getInput().put("identifier_expression", stepState.identifier_expression);
foreach.getInput().put("blueprint", blueprintNotYetInterpolated);
// TODO other vars?
if (foreach.getIdempotent() == null) foreach.setIdempotent(idempotent);
if (foreach.getConcurrency() == null) foreach.setConcurrency(concurrency);
foreachCustom.accept(foreach);
return ConfigBag.newInstance()
.configure(WorkflowCommonConfig.STEPS,
MutableList.of(foreach))
.configure(WorkflowCommonConfig.IDEMPOTENT, idempotent);
};
List matchesReturned = runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming,
"Matching items against children", stepState.matchCheck, MATCH_CHECK_WORKFLOW,
() -> new CustomWorkflowStep(MutableList.of(
"transform ${identifier_expression} | resolve_expression | set id",
MutableMap.of("step", "fail message identifier_expression should be a non-static expression including an interpolated reference to item, instead got ${identifier_expression}",
"condition", MutableMap.of("target", "${identifier_expression}", "equals", "${id}")),
"let child_or_id = ${parent.children[id]} ?? ${id}",
"transform child_tostring = ${child_or_id} | to_string",
"transform parent_tostring = ${parent} | to_string",
"return ${child_or_id}")),
checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow,
foreach -> {
foreach.setTarget(stepState.items);
foreach.setTargetVarName("item");
}),
list -> (List) list.stream().map(m -> m instanceof Entity ? new TransientEntityReference((Entity)m) : m).collect(Collectors.toList()) );
List<Map<String,Object>> stringMatchesToCreate = MutableList.of();
Set matchesUnhandled = MutableSet.of();
for (int i=0; i<matchesReturned.size(); i++) {
Object m = matchesReturned.get(i);
if (Boxing.isPrimitiveOrStringOrBoxedObject(m)) {
stringMatchesToCreate.add(MutableMap.of("match", m.toString(), "item", stepState.items.get(i), "index", i));
} else if (m!=null) {
matchesUnhandled.add(m);
}
}
List<Map> addedChildren = runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming,
"Creating new children ("+stringMatchesToCreate.size()+")", stepState.creationCheck, CREATION_CHECK_WORKFLOW,
() -> new CustomWorkflowStep(MutableList.of(
"transform blueprint_resolved = ${blueprint} | resolve_expression",
MutableMap.of(
"step", "add-entity",
"parent", "${parent}",
"blueprint", "${blueprint_resolved}"
),
"let result = ${output.entity}",
MutableMap.of("step", "set-config",
"config", MutableMap.of("entity", "${result}", "name", BrooklynConfigKeys.PLAN_ID.getName()),
"value", "${match}"),
"return ${result}")),
checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow,
foreach -> {
foreach.setTarget(stringMatchesToCreate);
foreach.setTargetVarName("{match,item,index}");
foreach.setWorkflowOutput(MutableMap.of("index", "${index}", "match", "${match}", "child", "${output}", "item", "${item}"));
}),
list -> (List) list.stream().map(x -> MutableMap.copyOf( ((Map)x) ).add("child", new TransientEntityReference((Entity) ((Map)x).get("child")))).collect(Collectors.toList()) );
List<Map<String,Object>> onCreateTargets = (List) addedChildren.stream().map(x ->
MutableMap.copyOf(x).add("child", ((TransientEntityReference) x.get("child")).getEntity(mgmt))
).collect(Collectors.toList());
runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming,
"Calling on_create on newly created children ("+stringMatchesToCreate.size()+")", stepState.onCreate, ON_CREATE_WORKFLOW,
() -> new CustomWorkflowStep(MutableList.of(
MutableMap.of(
"step", "invoke-effector on_create",
"entity", "${child}",
"args", MutableMap.of("item", "${item}"),
"condition", MutableMap.of("target", "${child.effector.on_create}")
)) ),
checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow,
foreach -> {
foreach.setTarget(onCreateTargets);
foreach.setTargetVarName("{child,item,index}");
}),
list -> list.size());
List<Map<String,Object>> onUpdateTargets = MutableList.copyOf(onCreateTargets);
Iterator matchesUnhandledI = matchesUnhandled.iterator();
for (int i=0; i<matchesUnhandled.size(); i++) {
Object m = matchesUnhandledI.next();
if (m instanceof TransientEntityReference) {
m = ((TransientEntityReference)m).getEntity(mgmt);
}
if (m instanceof Entity) {
onUpdateTargets.add(MutableMap.of("child", m, "item", stepState.items.get(i), "index", i));
} else {
DynamicTasks.queueIfPossible(Tasks.warning("Unexpected match check result ("+m+"); ignoring", null, true))
.orSubmitAsync(context.getEntity());
}
}
runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming,
"Calling on_update on item-matched children ("+onUpdateTargets.size()+")", stepState.onUpdate, ON_UPDATE_WORKFLOW,
() -> new CustomWorkflowStep(MutableList.of(
MutableMap.of(
"step", "invoke-effector on_update",
"entity", "${child}",
"args", MutableMap.of("item", "${item}"),
"condition", MutableMap.of("target", "${child.effector.on_update}")
)) ),
checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow,
foreach -> {
foreach.setTarget(onUpdateTargets);
foreach.setTargetVarName("{child,item,index}");
}),
list -> list.size());
runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming,
"Calling on_update_child on item-matched children ("+onUpdateTargets.size()+")", stepState.onUpdateChild, ON_UPDATE_CHILD_WORKFLOW,
() -> null,
checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow,
foreach -> {
foreach.setTarget(onUpdateTargets);
foreach.setTargetVarName("{child,item,index}");
foreach.setTargetEntityKey("child");
}),
list -> list.size());
Map<String,Entity> oldChildren = MutableMap.of();
stepState.parent.getChildren().forEach(c -> oldChildren.put(c.getId(), c));
onUpdateTargets.forEach(c -> oldChildren.remove( ((Entity)c.get("child")).getId()) );
List<Entity> entitiesToPossiblyDelete = MutableList.copyOf(oldChildren.values());
List<TransientEntityReference> deletionChecks = runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming,
"Checking old children ("+entitiesToPossiblyDelete.size()+") for deletion", stepState.deletionCheck, DELETION_CHECK_WORKFLOW,
() -> new CustomWorkflowStep(MutableList.of("return true")),
checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow,
foreach -> {
foreach.setTarget(entitiesToPossiblyDelete);
foreach.setTargetVarName("child");
foreach.setWorkflowOutput(MutableMap.of("delete", "${output}", "child", "${child}"));
}),
list -> (List) list.stream().map(x -> {
Object check = ((Map) x).get("delete");
Entity child = (Entity) ((Map) x).get("child");
check = TypeCoercions.coerce(check, Boolean.class);
if (check==null) throw new IllegalStateException("Invalid deletion check result for "+child+": "+check);
if (!Boolean.TRUE.equals(check)) return null;
return new TransientEntityReference(child);
}).filter(x -> x!=null).collect(Collectors.toList()) );
List<Map<String,Object>> onDeleteTargets = (List) deletionChecks.stream().map(t -> t.getEntity(mgmt)).filter(x -> x!=null).map(x -> MutableMap.of("child", x)).collect(Collectors.toList());
runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming,
"Calling on_delete on children to delete ("+onDeleteTargets.size()+")", stepState.onDelete, ON_DELETE_WORKFLOW,
() -> new CustomWorkflowStep(MutableList.of(
MutableMap.of(
"step", "invoke-effector on_delete",
"entity", "${child}",
"condition", MutableMap.of("target", "${child.effector.on_delete}")
)) ),
checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow,
foreach -> {
foreach.setTarget(onDeleteTargets);
foreach.setTargetVarName("{child}");
}),
list -> list.size());
for (TransientEntityReference entityToDelete: deletionChecks) {
Entity entity = entityToDelete.getEntity(mgmt);
if (entity!=null && Entities.isManagedActiveOrComingUp(entity)) Entities.unmanage(entity);
}
return context.getPreviousStepOutput();
}