protected final O filter()

in zuul-core/src/main/java/com/netflix/zuul/netty/filter/BaseZuulFilterRunner.java [192:283]


    protected final O filter(ZuulFilter<I, O> filter, I inMesg) {
        long startTime = System.nanoTime();
        ZuulMessage snapshot = inMesg.getContext().debugRouting() ? inMesg.clone() : null;
        FilterChainResumer resumer = null;

        try (TaskCloseable ignored = PerfMark.traceTask(filter, f -> f.filterName() + ".filter")) {
            addPerfMarkTags(inMesg);
            ExecutionStatus filterRunStatus = null;
            if (filter.filterType() == FilterType.INBOUND && inMesg.getContext().shouldSendErrorResponse()) {
                // Pass request down the pipeline, all the way to error endpoint if error response needs to be generated
                filterRunStatus = ExecutionStatus.SKIPPED;
            }

            ;
            try (TaskCloseable ignored2 = PerfMark.traceTask(filter, f -> f.filterName() + ".shouldSkipFilter")) {
                if (shouldSkipFilter(inMesg, filter)) {
                    filterRunStatus = ExecutionStatus.SKIPPED;
                }
            }

            if (filter.isDisabled()) {
                filterRunStatus = ExecutionStatus.DISABLED;
            }

            if (filterRunStatus != null) {
                recordFilterCompletion(filterRunStatus, filter, startTime, inMesg, snapshot);
                return filter.getDefaultOutput(inMesg);
            }

            if (!isMessageBodyReadyForFilter(filter, inMesg)) {
                setFilterAwaitingBody(inMesg, true);
                logger.debug(
                        "Filter {} waiting for body, UUID {}",
                        filter.filterName(),
                        inMesg.getContext().getUUID());
                return null; // wait for whole body to be buffered
            }
            setFilterAwaitingBody(inMesg, false);

            if (snapshot != null) {
                Debug.addRoutingDebug(
                        inMesg.getContext(),
                        "Filter " + filter.filterType().toString() + " " + filter.filterOrder() + " "
                                + filter.filterName());
            }

            // run body contents accumulated so far through this filter
            inMesg.runBufferedBodyContentThroughFilter(filter);

            if (filter.getSyncType() == FilterSyncType.SYNC) {
                SyncZuulFilter<I, O> syncFilter = (SyncZuulFilter) filter;
                O outMesg;
                try (TaskCloseable ignored2 = PerfMark.traceTask(filter, f -> f.filterName() + ".apply")) {
                    addPerfMarkTags(inMesg);
                    outMesg = syncFilter.apply(inMesg);
                }
                recordFilterCompletion(ExecutionStatus.SUCCESS, filter, startTime, inMesg, snapshot);
                return (outMesg != null) ? outMesg : filter.getDefaultOutput(inMesg);
            }

            // async filter
            try (TaskCloseable ignored2 = PerfMark.traceTask(filter, f -> f.filterName() + ".applyAsync")) {
                Link nettyToSchedulerLink = PerfMark.linkOut();
                filter.incrementConcurrency();
                resumer = new FilterChainResumer(inMesg, filter, snapshot, startTime);
                filter.applyAsync(inMesg)
                        .doOnSubscribe(() -> {
                            try (TaskCloseable ignored3 =
                                    PerfMark.traceTask(filter, f -> f.filterName() + ".onSubscribeAsync")) {
                                PerfMark.linkIn(nettyToSchedulerLink);
                            }
                        })
                        .doOnNext(resumer.onNextStarted(nettyToSchedulerLink))
                        .doOnError(resumer.onErrorStarted(nettyToSchedulerLink))
                        .doOnCompleted(resumer.onCompletedStarted(nettyToSchedulerLink))
                        .observeOn(
                                Schedulers.from(getChannelHandlerContext(inMesg).executor()))
                        .doOnUnsubscribe(resumer::decrementConcurrency)
                        .subscribe(resumer);
            }

            return null; // wait for the async filter to finish
        } catch (Throwable t) {
            if (resumer != null) {
                resumer.decrementConcurrency();
            }
            O outMesg = handleFilterException(inMesg, filter, t);
            outMesg.finishBufferedBodyIfIncomplete();
            recordFilterCompletion(ExecutionStatus.FAILED, filter, startTime, inMesg, snapshot);
            return outMesg;
        }
    }