protected Response processRequest()

in server/openejb-client/src/main/java/org/apache/openejb/client/Client.java [146:447]


    protected Response processRequest(final Request req, final Response res, final ServerMetaData server) throws RemoteException {

        if (server == null) {
            throw new IllegalArgumentException("Server instance cannot be null");
        }

        final long start = System.nanoTime();
        final ClusterMetaData cluster = getClusterMetaData(server);

        //Determine which protocol to use for request writes
        final ProtocolMetaData protocolRequest = (null != COMPATIBLE_META_DATA ? COMPATIBLE_META_DATA : PROTOCOL_META_DATA);

        /*----------------------------*/
        /* Get a connection to server */
        /*----------------------------*/

        final Connection conn;
        try {
            conn = ConnectionManager.getConnection(cluster, server, req);
        } catch (final IOException e) {
            throw new RemoteException("Unable to connect", e);
        }

        OutputStream out = null;
        InputStream in = null;

        try {


            /*----------------------------------*/
            /* Get output streams */
            /*----------------------------------*/
            try {

                out = conn.getOutputStream();

            } catch (final IOException e) {
                throw newIOException("Cannot open output stream to server: ", e);
            }

            /*----------------------------------*/
            /* Write the protocol magic         */
            /*----------------------------------*/
            try {
                protocolRequest.writeExternal(out);
                out.flush();
            } catch (final IOException e) {
                throw newIOException("Cannot write the protocol metadata to the server: ", e);
            }

            /*----------------------------------*/
            /* Get output streams */
            /*----------------------------------*/
            final ObjectOutput objectOut;
            try {
                objectOut = new ObjectOutputStream(out);
            } catch (final IOException e) {
                throw newIOException("Cannot open object output stream to server: ", e);
            }

            /*----------------------------------*/
            /* Write ServerMetaData */
            /*----------------------------------*/
            try {
                server.setMetaData(protocolRequest);
                server.writeExternal(objectOut);
            } catch (final IOException e) {
                throw newIOException("Cannot write the ServerMetaData to the server: ", e);
            }

            /*----------------------------------*/
            /* Write ClusterMetaData */
            /*----------------------------------*/
            try {

                final ClusterRequest clusterRequest = new ClusterRequest(cluster);
                clusterRequest.setMetaData(protocolRequest);
                objectOut.write(clusterRequest.getRequestType().getCode());
                clusterRequest.writeExternal(objectOut);
            } catch (final Throwable e) {
                throw newIOException("Cannot write the ClusterMetaData to the server: ", e);
            }

            /*----------------------------------*/
            /* Write request type */
            /*----------------------------------*/
            try {
                objectOut.write(req.getRequestType().getCode());
            } catch (final IOException e) {
                throw newIOException("Cannot write the request type to the server: ", e);
            }

            /*----------------------------------*/
            /* Write request */
            /*----------------------------------*/
            try {

                req.setMetaData(protocolRequest);
                req.writeExternal(objectOut);
                objectOut.flush();
                out.flush();

            } catch (final java.io.NotSerializableException e) {

                throw new IllegalArgumentException("Object is not serializable: " + e.getMessage());

            } catch (final IOException e) {

                throw newIOException("Cannot write the request to the server: " + e.getMessage(), e);
            }

            /*----------------------------------*/
            /* Get input streams               */
            /*----------------------------------*/

            try {

                in = conn.getInputStream();

            } catch (final IOException e) {
                if (AuthenticationException.class.isInstance(e.getCause())) {
                    throw e.getCause();
                }
                throw newIOException("Cannot open input stream to server: ", e);
            }

            //Determine the server response protocol for reading
            final ProtocolMetaData protocolResponse = new ProtocolMetaData();
            try {

                protocolResponse.readExternal(in);

            } catch (final EOFException e) {

                throw newIOException("Prematurely reached the end of the stream.  " + protocolResponse.getSpec() + " : " + e.getMessage(), e);

            } catch (final IOException e) {

                throw newIOException("Cannot determine server protocol version: Received " + protocolResponse.getSpec() + " : " + e.getMessage(), e);
            }

            final ObjectInput objectIn;
            try {

                objectIn = new EjbObjectInputStream(in);

            } catch (final IOException e) {
                throw newIOException("Cannot open object input stream to server (" + protocolResponse.getSpec() + ") : " + e.getMessage(), e);
            }

            /*----------------------------------*/
            /* Read cluster response */
            /*----------------------------------*/
            try {
                final ClusterResponse clusterResponse = new ClusterResponse();
                clusterResponse.setMetaData(protocolResponse);
                clusterResponse.readExternal(objectIn);
                switch (clusterResponse.getResponseCode()) {
                    case UPDATE: {
                        setClusterMetaData(server, clusterResponse.getUpdatedMetaData());
                    }
                    break;
                    case FAILURE: {
                        throw clusterResponse.getFailure();
                    }
                }
            } catch (final ClassNotFoundException e) {
                throw new RemoteException("Cannot read the cluster response from the server.  The class for an object being returned is not located in this system:", e);

            } catch (final IOException e) {
                throw newIOException("Cannot read the cluster response from the server (" + protocolResponse.getSpec() + ") : " + e.getMessage(), e);

            } catch (final Throwable e) {
                throw new RemoteException("Error reading cluster response from server (" + protocolResponse.getSpec() + ") : " + e.getMessage(), e);
            }

            /*----------------------------------*/
            /* Read response */
            /*----------------------------------*/
            try {
                res.setMetaData(protocolResponse);
                res.readExternal(objectIn);
            } catch (final ClassNotFoundException e) {
                throw new RemoteException("Cannot read the response from the server.  The class for an object being returned is not located in this system:", e);

            } catch (final IOException e) {
                throw newIOException("Cannot read the response from the server (" + protocolResponse.getSpec() + ") : " + e.getMessage(), e);

            } catch (final Throwable e) {
                throw new RemoteException("Error reading response from server (" + protocolResponse.getSpec() + ") : " + e.getMessage(), e);
            }

            if (retryConditions.size() > 0) {
                if (res instanceof EJBResponse) {
                    final EJBResponse ejbResponse = (EJBResponse) res;
                    if (ejbResponse.getResult() instanceof ThrowableArtifact) {
                        final ThrowableArtifact artifact = (ThrowableArtifact) ejbResponse.getResult();
                        //noinspection ThrowableResultOfMethodCallIgnored
                        if (retryConditions.contains(artifact.getThrowable().getClass())) {

                            throw new RetryException(res);

                            //                            if (? < maxConditionRetry) {
                            //                                throw new RetryException(res);
                            //                            } else {
                            //                                if (FINER) {
                            //                                    logger.log(Level.FINER, "Giving up on " + artifact.getThrowable().getClass().getName().toString());
                            //                                }
                            //                            }
                        }
                    }
                }
            }

            if (FINEST) {
                final long time = System.nanoTime() - start;
                final String message = String.format("Invocation %sns - %s - Request(%s) - Response(%s)", time, conn.getURI(), req, res);
                logger.log(Level.FINEST, message);
            }

        } catch (final RemoteException e) {
            throw e;
        } catch (final IOException e) {
            final URI uri = conn.getURI();
            final Set<URI> failed = getFailed();

            Client.fireEvent(new RequestFailed(uri, req, e));

            if (FINER) {
                logger.log(Level.FINER, "Add Failed " + uri.toString());
            }
            failed.add(uri);
            conn.discard();

            if (e instanceof RetryException || getRetry()) {
                try {

                    Client.fireEvent(new RetryingRequest(req, server));

                    processRequest(req, res, server);
                } catch (final RemoteFailoverException re) {
                    throw re;
                } catch (final RemoteException re) {
                    if (e instanceof RetryException) {
                        return ((RetryException) e).getResponse();
                    }
                    throw new RemoteFailoverException("Cannot complete request.  Retry attempted on " + failed.size() + " servers", e);
                }
            } else {
                throw new RemoteException("Cannot read the response from the server (" + protocolRequest.getSpec() + ") : " + e.getMessage(), e);
            }

        } catch (final Throwable error) {
            throw new RemoteException("Error while communicating with server: ", error);

        } finally {

            if (null != out) {
                try {
                    out.close();
                } catch (final Throwable e) {
                    //Ignore
                }
            }

            if (null != in) {
                // Without consuming anything that is left in the buffer at the end of the call, we will cause the HttpUrlConnection to
                // be closed, and a new one created every time. This is wildly inefficient, particularly for clients using HTTPS,
                // as they'll need to handshake everytime. The JDK does do an element of connection pooling on HTTP connections, so we should use it.
                // Not consuming the remainder of the response buffer breaks this and forces a new connection. Please do not revert this
                // without some discussion on the mailing list, and testing that connections are not reset.

                // consume anything left in the buffer if we're running in http(s) mode
                if (HttpConnectionFactory.HttpConnection.class.isInstance(conn)) {
                    try {
                        int read = 0;
                        while (read > -1) {
                            read = in.read();
                        }
                    } catch (Throwable e) {
                        // ignore
                    }
                }

                try {
                    in.close();
                } catch (final Throwable e) {
                    //Ignore
                }
            }

            if (null != conn) {
                try {
                    conn.close();
                } catch (final Throwable t) {
                    logger.log(Level.WARNING, "Error closing connection with server: " + t.getMessage(), t);
                }
            }

        }
        return res;
    }