public Action inspect()

in rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java [111:241]


    public Action inspect(final AtmosphereResource r) {
        LOG.log(Level.FINE, "inspect");
        if (AtmosphereResource.TRANSPORT.WEBSOCKET != r.transport()
            && AtmosphereResource.TRANSPORT.SSE != r.transport()
            && AtmosphereResource.TRANSPORT.POLLING != r.transport()) {
            LOG.fine("Skipping ignorable request");
            return Action.CONTINUE;
        }
        if (AtmosphereResource.TRANSPORT.POLLING == r.transport()) {
            final String saruuid = (String)r.getRequest()
                .getAttribute(ApplicationConfig.SUSPENDED_ATMOSPHERE_RESOURCE_UUID);
            final AtmosphereResponse suspendedResponse = suspendedResponses.get(saruuid);
            LOG.fine("Attaching a proxy writer to suspended response");
            r.getResponse().asyncIOWriter(new AtmosphereInterceptorWriter() {
                @Override
                public AsyncIOWriter write(AtmosphereResponse r, String data) throws IOException {
                    suspendedResponse.write(data);
                    suspendedResponse.flushBuffer();
                    return this;
                }

                @Override
                public AsyncIOWriter write(AtmosphereResponse r, byte[] data) throws IOException {
                    suspendedResponse.write(data);
                    suspendedResponse.flushBuffer();
                    return this;
                }

                @Override
                public AsyncIOWriter write(AtmosphereResponse r, byte[] data, int offset, int length)
                    throws IOException {
                    suspendedResponse.write(data, offset, length);
                    suspendedResponse.flushBuffer();
                    return this;
                }

                @Override
                public void close(AtmosphereResponse response) throws IOException {
                }
            });
            // REVISIT we need to keep this response's asyncwriter alive so that data can be written to the
            //   suspended response, but investigate if there is a better alternative.
            r.getResponse().destroyable(false);
            return Action.CONTINUE;
        }

        r.addEventListener(new AtmosphereResourceEventListenerAdapter() {
            @Override
            public void onSuspend(AtmosphereResourceEvent event) {
                final String srid = (String)event.getResource().getRequest()
                    .getAttribute(ApplicationConfig.SUSPENDED_ATMOSPHERE_RESOURCE_UUID);
                LOG.log(Level.FINE, "Registrering suspended resource: {}", srid);
                suspendedResponses.put(srid, event.getResource().getResponse());

                AsyncIOWriter writer = event.getResource().getResponse().getAsyncIOWriter();
                if (writer instanceof AtmosphereInterceptorWriter) {
                    ((AtmosphereInterceptorWriter)writer).interceptor(interceptor);
                }
            }

            @Override
            public void onDisconnect(AtmosphereResourceEvent event) {
                super.onDisconnect(event);
                final String srid = (String)event.getResource().getRequest()
                    .getAttribute(ApplicationConfig.SUSPENDED_ATMOSPHERE_RESOURCE_UUID);
                LOG.log(Level.FINE, "Unregistrering suspended resource: {}", srid);
                suspendedResponses.remove(srid);
            }

        });
        AtmosphereRequest request = r.getRequest();

        if (request.getAttribute(REQUEST_DISPATCHED) == null) {
            AtmosphereResponse response = null;

            AtmosphereFramework framework = r.getAtmosphereConfig().framework();
            try {
                byte[] data = WebSocketUtils.readBody(request.getInputStream());
                if (data.length == 0) {
                    if (AtmosphereResource.TRANSPORT.WEBSOCKET == r.transport()
                        || AtmosphereResource.TRANSPORT.SSE == r.transport()) {
                        r.suspend();
                        return Action.SUSPEND;
                    }
                    return Action.CANCELLED;
                }

                if (LOG.isLoggable(Level.FINE)) {
                    LOG.log(Level.FINE, "inspecting data {0}", new String(data));
                }
                try {
                    AtmosphereRequest ar = createAtmosphereRequest(request, data);
                    response = new WrappedAtmosphereResponse(r.getResponse(), ar);
                    ar.localAttributes().put(REQUEST_DISPATCHED, "true");
                    String refid = ar.getHeader(WebSocketConstants.DEFAULT_REQUEST_ID_KEY);
                    if (refid != null) {
                        ar.localAttributes().put(WebSocketConstants.DEFAULT_REQUEST_ID_KEY, refid);
                    }
                    // This is a new request, we must clean the Websocket AtmosphereResource.
                    request.removeAttribute(FrameworkConfig.INJECTED_ATMOSPHERE_RESOURCE);
                    response.request(ar);
                    attachWriter(r);

                    Action action = framework.doCometSupport(ar, response);
                    if (action.type() == Action.TYPE.SUSPEND) {
                        ar.destroyable(false);
                        response.destroyable(false);
                    }
                } catch (Exception e) {
                    LOG.log(Level.WARNING, "Error during request dispatching", e);
                    if (response == null) {
                        response = new WrappedAtmosphereResponse(r.getResponse(), request);
                    }
                    if (e instanceof InvalidPathException) {
                        response.setIntHeader(WebSocketUtils.SC_KEY, 400);
                    } else {
                        response.setIntHeader(WebSocketUtils.SC_KEY, 500);
                    }
                    OutputStream out = response.getOutputStream();
                    out.write(createResponse(response, null, true));
                    out.close();
                }
                return Action.CANCELLED;
            } catch (IOException e) {
                LOG.log(Level.WARNING, "Error during protocol processing", e);
            }
        } else {
            request.destroyable(false);
        }
        return Action.CONTINUE;
    }