protected Task submitInternal()

in brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java [100:205]


    protected <T> Task<T> submitInternal(Map<?,?> propertiesQ, final Object task) {
        if (task instanceof TaskAdaptable<?> && !(task instanceof Task<?>)) 
            return submitInternal(propertiesQ, ((TaskAdaptable<?>)task).asTask());
        
        Map properties = propertiesQ;
        if (properties.get("tags")==null) properties.put("tags", new ArrayList()); 
        Collection taskTags = (Collection)properties.get("tags");
        
        // FIXME some of this is brooklyn-specific logic, should be moved to a BrooklynExecContext subclass;
        // the issue is that we want to ensure that cross-entity calls switch execution contexts;
        // previously it was all very messy how that was handled (and it didn't really handle it in many cases)
        if (task instanceof Task<?>) taskTags.addAll( ((Task<?>)task).getTags() ); 
        Entity target = BrooklynTaskTags.getWrappedEntityOfType(taskTags, BrooklynTaskTags.TARGET_ENTITY);
        
        if (target!=null && !tags.contains(BrooklynTaskTags.tagForContextEntity(target))) {
            // task is switching execution context boundaries
            /* 
             * longer notes:
             * you fall in to this block if the caller requests a target entity different to the current context 
             * (e.g. where entity X is invoking an effector on Y, it will start in X's context, 
             * but the effector should run in Y's context).
             * 
             * if X is invoking an effector on himself in his own context, or a sensor or other task, it will not come in to this block.
             */
            final ExecutionContext tc = ((EntityInternal)target).getExecutionContext();
            if (log.isDebugEnabled())
                log.debug("Switching task context on execution of "+task+": from "+this+" to "+target+" (in "+Tasks.current()+")");
            
            if (task instanceof Task<?>) {
                final Task<T> t = (Task<T>)task;
                if (!Tasks.isQueuedOrSubmitted(t) && (!(Tasks.current() instanceof HasTaskChildren) || 
                        !Iterables.contains( ((HasTaskChildren)Tasks.current()).getChildren(), t ))) {
                    // this task is switching execution context boundaries _and_ it is not a child and not yet queued,
                    // so wrap it in a task running in this context to keep a reference to the child
                    // (this matters when we are navigating in the GUI; without it we lose the reference to the child 
                    // when browsing in the context of the parent)
                    return submit(Tasks.<T>builder().displayName("Cross-context execution: "+t.getDescription()).dynamic(true).body(new Callable<T>() {
                        public T call() { 
                            return DynamicTasks.get(t); 
                        }
                    }).build());
                } else {
                    // if we are already tracked by parent, just submit it 
                    return tc.submit(t);
                }
            } else {
                // as above, but here we are definitely not a child (what we are submitting isn't even a task)
                // (will only come here if properties defines tags including a target entity, which probably never happens) 
                submit(Tasks.<T>builder().displayName("Cross-context execution").dynamic(true).body(new Callable<T>() {
                    public T call() {
                        if (task instanceof Callable) {
                            return DynamicTasks.queue( Tasks.<T>builder().dynamic(false).body((Callable<T>)task).build() ).getUnchecked();
                        } else if (task instanceof Runnable) {
                            return DynamicTasks.queue( Tasks.<T>builder().dynamic(false).body((Runnable)task).build() ).getUnchecked();
                        } else {
                            throw new IllegalArgumentException("Unhandled task type: "+task+"; type="+(task!=null ? task.getClass() : "null"));
                        }
                    }
                }).build());
            }
        }
        
        EntitlementContext entitlementContext = BrooklynTaskTags.getEntitlement(taskTags);
        if (entitlementContext==null)
        entitlementContext = Entitlements.getEntitlementContext();
        if (entitlementContext!=null) {
            taskTags.add(BrooklynTaskTags.tagForEntitlement(entitlementContext));
        }

        taskTags.addAll(tags);
        
        if (Tasks.current()!=null && BrooklynTaskTags.isTransient(Tasks.current()) 
                && !taskTags.contains(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) && !taskTags.contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) {
            // tag as transient if submitter is transient, unless explicitly tagged as non-transient
            taskTags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
        }
        
        final Object startCallback = properties.get("newTaskStartCallback");
        properties.put("newTaskStartCallback", new Function<Task<?>,Void>() {
            public Void apply(Task<?> it) {
                registerPerThreadExecutionContext();
                if (startCallback!=null) BasicExecutionManager.invokeCallback(startCallback, it);
                return null;
            }});
        
        final Object endCallback = properties.get("newTaskEndCallback");
        properties.put("newTaskEndCallback", new Function<Task<?>,Void>() {
            public Void apply(Task<?> it) {
                try {
                    if (endCallback!=null) BasicExecutionManager.invokeCallback(endCallback, it);
                } finally {
                    clearPerThreadExecutionContext();
                }
                return null;
            }});
        
        if (task instanceof Task) {
            return executionManager.submit(properties, (Task)task);
        } else if (task instanceof Callable) {
            return executionManager.submit(properties, (Callable)task);
        } else if (task instanceof Runnable) {
            return (Task<T>) executionManager.submit(properties, (Runnable)task);
        } else {
            throw new IllegalArgumentException("Unhandled task type: task="+task+"; type="+(task!=null ? task.getClass() : "null"));
        }
    }