in core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java [411:709]
protected Maybe<T> getMaybeInternal() {
if (started.getAndSet(true))
throw new IllegalStateException("ValueResolver can only be used once");
if (expired) return Maybe.absent("Nested resolution of "+getOriginalValue()+" did not complete within "+timeout);
ExecutionContext exec = this.exec;
if (exec==null) {
// if execution context not specified, take it from the current task if present
exec = BasicExecutionContext.getCurrentExecutionContext();
}
if (!recursive && !TypeTokens.equalsRaw(Object.class, typeT)) {
throw new IllegalStateException("When non-recursive resolver requested the return type must be Object " +
"as the immediately resolved value could be a number of (deferred) types.");
}
CountdownTimer timerU = parentTimer;
if (timerU==null && timeout!=null)
timerU = timeout.countdownTimer();
final CountdownTimer timer = timerU;
if (timer!=null && !timer.isNotPaused())
timer.start();
checkTypeNotNull();
Object v = this.value;
//if the expected type is what we have, we're done (or if it's null);
//but not allowed to return a future or DeferredSupplier as the resolved value,
//and not if generics signatures might be different
if (v==null || (!forceDeep && TypeTokens.isRaw(typeT) && TypeTokens.isInstanceRaw(typeT, v) && !Future.class.isInstance(v) && !DeferredSupplier.class.isInstance(v) && !TaskFactory.class.isInstance(v)))
return Maybe.of((T) v);
try {
boolean allowImmediateExecution = false;
boolean bailOutAfterImmediateExecution = false;
if (v instanceof ImmediateSupplier || v instanceof DeferredSupplier) {
allowImmediateExecution = true;
} else {
if (v instanceof TaskFactory<?>) {
v = ((TaskFactory<?>)v).newTask();
allowImmediateExecution = true;
bailOutAfterImmediateExecution = true;
BrooklynTaskTags.setTransient(((TaskAdaptable<?>)v).asTask());
}
//if it's a task or a future, we wait for the task to complete
if (v instanceof TaskAdaptable<?>) {
v = ((TaskAdaptable<?>) v).asTask();
}
}
if (allowImmediateExecution && isEvaluatingImmediately()) {
// Feb 2017 - many things now we try to run immediate; notable exceptions are:
// * where the target isn't safe to run again (such as a Task which someone else might need),
// * or where he can't be run in an "interrupting" mode even if non-blocking (eg Future.get(), some other tasks)
// (the latter could be tried here, with bailOut false, but in most cases it will just throw so we still need to
// have the timings as in SHORT_WAIT etc as a fallack)
// TODO svet suggested at https://github.com/apache/brooklyn-server/pull/565#pullrequestreview-27124074
// that we might flip the immediately bit if interrupted -- or maybe instead (alex's idea)
// enter this block
// if (allowImmediateExecution && (isEvaluatingImmediately() || Thread.isInterrupted())
// -- feels right, and would help with some recursive immediate values but no compelling
// use case yet and needs some deep thought which we're deferring for now
Maybe<T> result = null;
try {
if (exec==null) {
return Maybe.absent("Immediate resolution requested for '"+getDescription()+"' but no execution context available");
}
result = exec.getImmediately(v);
return (result.isPresent())
? recursive
? new ValueResolver<T>(result.get(), typeT, this).getMaybe()
: result
: result;
} catch (ImmediateSupplier.ImmediateUnsupportedException e) {
if (bailOutAfterImmediateExecution) {
throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot get immediately: "+v);
}
// ignore, continue below
if (log.isTraceEnabled()) {
log.trace("Unable to resolve-immediately for "+description+" ("+v+", unsupported, type "+v.getClass()+"); falling back to executing with timeout: "+e);
}
} catch (ImmediateSupplier.ImmediateValueNotAvailableException e) {
// definitively not available
return Maybe.absent(e);
}
}
if (v instanceof Task) {
//if it's a task, we make sure it is submitted
Task<?> task = (Task<?>) v;
if (!task.isSubmitted()) {
if (exec==null) {
return Maybe.absent("Value for unsubmitted task '"+getDescription()+"' requested but no execution context available");
}
if (!task.getTags().contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) {
// mark this non-transient, because this value is usually something set e.g. in config
// (should discourage this in favour of task factories which can be transiently interrupted?)
BrooklynTaskTags.addTagDynamically(task, BrooklynTaskTags.NON_TRANSIENT_TASK_TAG);
}
if (timer==null && !Thread.currentThread().isInterrupted() && !isEvaluatingImmediately()) {
// if all conditions above: no timeout, not immediate, and not interrupted,
// then we can run in this thread
exec.get(task);
} else {
exec.submit(task);
}
}
}
if (v instanceof Future) {
final Future<?> vfuture = (Future<?>) v;
//including tasks, above
if (!vfuture.isDone()) {
Maybe vm;
if (isEvaluatingImmediately()) {
if (vfuture instanceof Task<?> && ((Task<?>)vfuture).isSubmitted()) {
try {
// not sure if this ever gives a better result than the below - DST's still aren't supported - but might do for some tasks
vm = exec.getImmediately(vfuture);
} catch (ImmediateSupplier.ImmediateValueNotAvailableException e) {
return Maybe.absent(e);
} catch (Exception e) {
return Maybe.absent(() -> new ImmediateSupplier.ImmediateValueNotAvailableException("Future " + vfuture + " cannot be resolved in immediate context", e));
}
} else {
return ImmediateSupplier.ImmediateValueNotAvailableException.newAbsentWithExceptionSupplier(() -> "Future " + vfuture + " cannot be resolved in immediate context");
}
} else {
Callable<Maybe> callable = new Callable<Maybe>() {
@Override
public Maybe call() throws Exception {
return Durations.get(vfuture, timer);
}
};
String description = getDescription();
vm = Tasks.withBlockingDetails("Waiting for " + description, callable);
}
if (vm.isAbsent()) return vm;
v = vm.get();
} else {
v = vfuture.get();
}
} else if (v instanceof DeferredSupplier<?>) {
final DeferredSupplier<?> ds = (DeferredSupplier<?>) v;
if ((!Boolean.FALSE.equals(embedResolutionInTask) && (exec!=null || timeout!=null)) || Boolean.TRUE.equals(embedResolutionInTask)) {
if (exec==null)
return Maybe.absent("Embedding in task needed for '"+getDescription()+"' but no execution context available");
Callable<Object> callable = new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
Tasks.setBlockingDetails("Retrieving "+ds);
return ds.get();
} finally {
Tasks.resetBlockingDetails();
}
} };
String description = getDescription();
TaskBuilder<Object> tb = Tasks.<Object>builder()
.body(callable)
.displayName("Resolving dependent value of deferred supplier")
.description(description);
if (isTransientTask) tb.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG);
// immediate resolution is handled above
Task<Object> vt = exec.submit(tb.build());
Maybe<Object> vm = Durations.get(vt, timer);
vt.cancel(true);
if (vm.isAbsent()) return (Maybe<T>)vm;
v = vm.get();
} else {
try {
Tasks.setBlockingDetails("Retrieving (non-task) "+ds);
v = ((DeferredSupplier<?>) ds).get();
} finally {
Tasks.resetBlockingDetails();
}
}
} else {
if (allowDeepResolution && supportsDeepResolution(v, typeT)) {
// allows user to resolveValue(map, String) with the effect
// that things _in_ the collection would be resolved as string.
// alternatively use generics.
boolean applyThisTypeToContents = Boolean.TRUE.equals(ignoreGenericsAndApplyThisTypeToContents);
// restrict deep resolution to the same set of types as calling code;
// in particular need to avoid for "interesting iterables" such as PortRange
if (v instanceof Map) {
TypeToken<?> keyT;
TypeToken<?> valT;
if (applyThisTypeToContents) {
keyT = typeT;
valT = typeT;
} else {
TypeToken<?>[] innerTypes = TypeTokens.getGenericParameterTypeTokensWhenUpcastToClassRaw(typeT, Map.class); // innerTypes = Reflections.getGenericParameterTypeTokens( typeT );
if (innerTypes.length==2) {
keyT = innerTypes[0];
valT = innerTypes[1];
} else {
keyT = valT = TypeToken.of(Object.class);
}
}
//and if a map or list we look inside
Map result = Maps.newLinkedHashMap();
for (Map.Entry<?,?> entry : ((Map<?,?>)v).entrySet()) {
Maybe<?> kk = new ValueResolver(entry.getKey(), keyT, this)
.description( (description!=null ? description+", " : "") + "map key "+entry.getKey() )
.getMaybe();
if (kk.isAbsent()) return (Maybe<T>)kk;
Maybe<?> vv = new ValueResolver(entry.getValue(), valT, this)
.description( (description!=null ? description+", " : "") + "map value for key "+kk.get() )
.getMaybe();
if (vv.isAbsent()) return (Maybe<T>)vv;
result.put(kk.get(), vv.get());
}
v = result;
} else if (v instanceof Iterable) {
TypeToken<?> entryT;
if (applyThisTypeToContents) {
entryT = typeT;
} else {
TypeToken<?>[] innerTypes = TypeTokens.getGenericParameterTypeTokensWhenUpcastToClassRaw(typeT, Iterable.class);
if (innerTypes.length==1) {
entryT = innerTypes[0];
} else {
entryT = TypeToken.of(Object.class);
}
}
Collection<Object> result = v instanceof Set ? MutableSet.of() : Lists.newArrayList();
int count = 0;
for (Object it : (Iterable)v) {
Maybe<?> vv = new ValueResolver(it, entryT, this)
.description( (description!=null ? description+", " : "") + "entry "+count )
.getMaybe();
if (vv.isAbsent()) return (Maybe<T>)vv;
result.add(vv.get());
count++;
}
v = result;
}
}
if (exec!=null && (
((v instanceof Map) && !((Map)v).isEmpty())
||
((v instanceof List) && !((List)v).isEmpty())) ) {
// do type coercion in a task to allow registered types
Object vf = v;
Task<Maybe<T>> task = Tasks.create(TYPE_COERCION_TASK_NAME, () -> TypeCoercions.tryCoerce(vf, typeT));
BrooklynTaskTags.setTransient(task);
return exec.get(task);
} else {
return TypeCoercions.tryCoerce(v, typeT);
}
}
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
String msg = (description!=null ? "Error in resolution: "+description+"," : "Error resolving value") + " at "+exec;
String eTxt = Exceptions.collapseText(e);
IllegalArgumentException problem = eTxt.startsWith(msg) ? new IllegalArgumentException(e) : new IllegalArgumentException(msg+": "+eTxt, e);
if (swallowExceptions) {
if (log.isDebugEnabled())
log.debug("Resolution of "+this+" failed, swallowing and returning: "+e);
return Maybe.absent(problem);
}
if (log.isDebugEnabled())
log.debug("Resolution of "+this+" failed, throwing: "+e);
throw problem;
}
if (recursive) {
return new ValueResolver(v, typeT, this).getMaybe();
} else {
return (Maybe<T>) Maybe.of(v);
}
}