in brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java [100:205]
protected <T> Task<T> submitInternal(Map<?,?> propertiesQ, final Object task) {
if (task instanceof TaskAdaptable<?> && !(task instanceof Task<?>))
return submitInternal(propertiesQ, ((TaskAdaptable<?>)task).asTask());
Map properties = propertiesQ;
if (properties.get("tags")==null) properties.put("tags", new ArrayList());
Collection taskTags = (Collection)properties.get("tags");
// FIXME some of this is brooklyn-specific logic, should be moved to a BrooklynExecContext subclass;
// the issue is that we want to ensure that cross-entity calls switch execution contexts;
// previously it was all very messy how that was handled (and it didn't really handle it in many cases)
if (task instanceof Task<?>) taskTags.addAll( ((Task<?>)task).getTags() );
Entity target = BrooklynTaskTags.getWrappedEntityOfType(taskTags, BrooklynTaskTags.TARGET_ENTITY);
if (target!=null && !tags.contains(BrooklynTaskTags.tagForContextEntity(target))) {
// task is switching execution context boundaries
/*
* longer notes:
* you fall in to this block if the caller requests a target entity different to the current context
* (e.g. where entity X is invoking an effector on Y, it will start in X's context,
* but the effector should run in Y's context).
*
* if X is invoking an effector on himself in his own context, or a sensor or other task, it will not come in to this block.
*/
final ExecutionContext tc = ((EntityInternal)target).getExecutionContext();
if (log.isDebugEnabled())
log.debug("Switching task context on execution of "+task+": from "+this+" to "+target+" (in "+Tasks.current()+")");
if (task instanceof Task<?>) {
final Task<T> t = (Task<T>)task;
if (!Tasks.isQueuedOrSubmitted(t) && (!(Tasks.current() instanceof HasTaskChildren) ||
!Iterables.contains( ((HasTaskChildren)Tasks.current()).getChildren(), t ))) {
// this task is switching execution context boundaries _and_ it is not a child and not yet queued,
// so wrap it in a task running in this context to keep a reference to the child
// (this matters when we are navigating in the GUI; without it we lose the reference to the child
// when browsing in the context of the parent)
return submit(Tasks.<T>builder().displayName("Cross-context execution: "+t.getDescription()).dynamic(true).body(new Callable<T>() {
public T call() {
return DynamicTasks.get(t);
}
}).build());
} else {
// if we are already tracked by parent, just submit it
return tc.submit(t);
}
} else {
// as above, but here we are definitely not a child (what we are submitting isn't even a task)
// (will only come here if properties defines tags including a target entity, which probably never happens)
submit(Tasks.<T>builder().displayName("Cross-context execution").dynamic(true).body(new Callable<T>() {
public T call() {
if (task instanceof Callable) {
return DynamicTasks.queue( Tasks.<T>builder().dynamic(false).body((Callable<T>)task).build() ).getUnchecked();
} else if (task instanceof Runnable) {
return DynamicTasks.queue( Tasks.<T>builder().dynamic(false).body((Runnable)task).build() ).getUnchecked();
} else {
throw new IllegalArgumentException("Unhandled task type: "+task+"; type="+(task!=null ? task.getClass() : "null"));
}
}
}).build());
}
}
EntitlementContext entitlementContext = BrooklynTaskTags.getEntitlement(taskTags);
if (entitlementContext==null)
entitlementContext = Entitlements.getEntitlementContext();
if (entitlementContext!=null) {
taskTags.add(BrooklynTaskTags.tagForEntitlement(entitlementContext));
}
taskTags.addAll(tags);
if (Tasks.current()!=null && BrooklynTaskTags.isTransient(Tasks.current())
&& !taskTags.contains(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) && !taskTags.contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) {
// tag as transient if submitter is transient, unless explicitly tagged as non-transient
taskTags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
}
final Object startCallback = properties.get("newTaskStartCallback");
properties.put("newTaskStartCallback", new Function<Task<?>,Void>() {
public Void apply(Task<?> it) {
registerPerThreadExecutionContext();
if (startCallback!=null) BasicExecutionManager.invokeCallback(startCallback, it);
return null;
}});
final Object endCallback = properties.get("newTaskEndCallback");
properties.put("newTaskEndCallback", new Function<Task<?>,Void>() {
public Void apply(Task<?> it) {
try {
if (endCallback!=null) BasicExecutionManager.invokeCallback(endCallback, it);
} finally {
clearPerThreadExecutionContext();
}
return null;
}});
if (task instanceof Task) {
return executionManager.submit(properties, (Task)task);
} else if (task instanceof Callable) {
return executionManager.submit(properties, (Callable)task);
} else if (task instanceof Runnable) {
return (Task<T>) executionManager.submit(properties, (Runnable)task);
} else {
throw new IllegalArgumentException("Unhandled task type: task="+task+"; type="+(task!=null ? task.getClass() : "null"));
}
}