in zuul-core/src/main/java/com/netflix/zuul/netty/filter/BaseZuulFilterRunner.java [192:283]
protected final O filter(ZuulFilter<I, O> filter, I inMesg) {
long startTime = System.nanoTime();
ZuulMessage snapshot = inMesg.getContext().debugRouting() ? inMesg.clone() : null;
FilterChainResumer resumer = null;
try (TaskCloseable ignored = PerfMark.traceTask(filter, f -> f.filterName() + ".filter")) {
addPerfMarkTags(inMesg);
ExecutionStatus filterRunStatus = null;
if (filter.filterType() == FilterType.INBOUND && inMesg.getContext().shouldSendErrorResponse()) {
// Pass request down the pipeline, all the way to error endpoint if error response needs to be generated
filterRunStatus = ExecutionStatus.SKIPPED;
}
;
try (TaskCloseable ignored2 = PerfMark.traceTask(filter, f -> f.filterName() + ".shouldSkipFilter")) {
if (shouldSkipFilter(inMesg, filter)) {
filterRunStatus = ExecutionStatus.SKIPPED;
}
}
if (filter.isDisabled()) {
filterRunStatus = ExecutionStatus.DISABLED;
}
if (filterRunStatus != null) {
recordFilterCompletion(filterRunStatus, filter, startTime, inMesg, snapshot);
return filter.getDefaultOutput(inMesg);
}
if (!isMessageBodyReadyForFilter(filter, inMesg)) {
setFilterAwaitingBody(inMesg, true);
logger.debug(
"Filter {} waiting for body, UUID {}",
filter.filterName(),
inMesg.getContext().getUUID());
return null; // wait for whole body to be buffered
}
setFilterAwaitingBody(inMesg, false);
if (snapshot != null) {
Debug.addRoutingDebug(
inMesg.getContext(),
"Filter " + filter.filterType().toString() + " " + filter.filterOrder() + " "
+ filter.filterName());
}
// run body contents accumulated so far through this filter
inMesg.runBufferedBodyContentThroughFilter(filter);
if (filter.getSyncType() == FilterSyncType.SYNC) {
SyncZuulFilter<I, O> syncFilter = (SyncZuulFilter) filter;
O outMesg;
try (TaskCloseable ignored2 = PerfMark.traceTask(filter, f -> f.filterName() + ".apply")) {
addPerfMarkTags(inMesg);
outMesg = syncFilter.apply(inMesg);
}
recordFilterCompletion(ExecutionStatus.SUCCESS, filter, startTime, inMesg, snapshot);
return (outMesg != null) ? outMesg : filter.getDefaultOutput(inMesg);
}
// async filter
try (TaskCloseable ignored2 = PerfMark.traceTask(filter, f -> f.filterName() + ".applyAsync")) {
Link nettyToSchedulerLink = PerfMark.linkOut();
filter.incrementConcurrency();
resumer = new FilterChainResumer(inMesg, filter, snapshot, startTime);
filter.applyAsync(inMesg)
.doOnSubscribe(() -> {
try (TaskCloseable ignored3 =
PerfMark.traceTask(filter, f -> f.filterName() + ".onSubscribeAsync")) {
PerfMark.linkIn(nettyToSchedulerLink);
}
})
.doOnNext(resumer.onNextStarted(nettyToSchedulerLink))
.doOnError(resumer.onErrorStarted(nettyToSchedulerLink))
.doOnCompleted(resumer.onCompletedStarted(nettyToSchedulerLink))
.observeOn(
Schedulers.from(getChannelHandlerContext(inMesg).executor()))
.doOnUnsubscribe(resumer::decrementConcurrency)
.subscribe(resumer);
}
return null; // wait for the async filter to finish
} catch (Throwable t) {
if (resumer != null) {
resumer.decrementConcurrency();
}
O outMesg = handleFilterException(inMesg, filter, t);
outMesg.finishBufferedBodyIfIncomplete();
recordFilterCompletion(ExecutionStatus.FAILED, filter, startTime, inMesg, snapshot);
return outMesg;
}
}