in zuul-core/src/main/java/com/netflix/zuul/netty/filter/ZuulFilterChainRunner.java [67:133]
public void filter(T inMesg, HttpContent chunk) {
String filterName = "-";
try (TaskCloseable ignored = PerfMark.traceTask(this, s -> s.getClass().getSimpleName() + ".filterChunk")) {
addPerfMarkTags(inMesg);
Preconditions.checkNotNull(inMesg, "input message");
AtomicInteger runningFilterIdx = getRunningFilterIndex(inMesg);
int limit = runningFilterIdx.get();
for (int i = 0; i < limit; i++) {
ZuulFilter<T, T> filter = filters[i];
filterName = filter.filterName();
if (!filter.isDisabled() && !shouldSkipFilter(inMesg, filter)) {
ByteBufUtil.touch(chunk, "Filter runner processing chunk, filter: ", filterName);
HttpContent newChunk = filter.processContentChunk(inMesg, chunk);
if (newChunk == null) {
// Filter wants to break the chain and stop propagating this chunk any further
return;
}
// deallocate original chunk if necessary
if ((newChunk != chunk) && (chunk.refCnt() > 0)) {
ByteBufUtil.touch(chunk, "Filter runner processing newChunk, filter: ", filterName);
chunk.release(chunk.refCnt());
}
chunk = newChunk;
}
}
if (limit >= filters.length) {
// Filter chain has run to end, pass down the channel pipeline
ByteBufUtil.touch(chunk, "Filter runner chain complete, message: ", inMesg);
invokeNextStage(inMesg, chunk);
} else {
ByteBufUtil.touch(chunk, "Filter runner buffering chunk, message: ", inMesg);
inMesg.bufferBodyContents(chunk);
boolean isAwaitingBody = isFilterAwaitingBody(inMesg.getContext());
// Record passport states for start and end of buffering bodies.
if (isAwaitingBody) {
CurrentPassport passport = CurrentPassport.fromSessionContext(inMesg.getContext());
if (inMesg.hasCompleteBody()) {
if (inMesg instanceof HttpRequestMessage) {
passport.addIfNotAlready(PassportState.FILTERS_INBOUND_BUF_END);
} else if (inMesg instanceof HttpResponseMessage) {
passport.addIfNotAlready(PassportState.FILTERS_OUTBOUND_BUF_END);
}
} else {
if (inMesg instanceof HttpRequestMessage) {
passport.addIfNotAlready(PassportState.FILTERS_INBOUND_BUF_START);
} else if (inMesg instanceof HttpResponseMessage) {
passport.addIfNotAlready(PassportState.FILTERS_OUTBOUND_BUF_START);
}
}
}
if (isAwaitingBody && inMesg.hasCompleteBody()) {
// whole body has arrived, resume filter chain
ByteBufUtil.touch(chunk, "Filter body complete, resume chain, ZuulMessage: ", inMesg);
runFilters(inMesg, runningFilterIdx);
}
}
} catch (Exception ex) {
ReferenceCountUtil.safeRelease(chunk);
handleException(inMesg, filterName, ex);
}
}