in server/openejb-client/src/main/java/org/apache/openejb/client/Client.java [146:447]
protected Response processRequest(final Request req, final Response res, final ServerMetaData server) throws RemoteException {
if (server == null) {
throw new IllegalArgumentException("Server instance cannot be null");
}
final long start = System.nanoTime();
final ClusterMetaData cluster = getClusterMetaData(server);
//Determine which protocol to use for request writes
final ProtocolMetaData protocolRequest = (null != COMPATIBLE_META_DATA ? COMPATIBLE_META_DATA : PROTOCOL_META_DATA);
/*----------------------------*/
/* Get a connection to server */
/*----------------------------*/
final Connection conn;
try {
conn = ConnectionManager.getConnection(cluster, server, req);
} catch (final IOException e) {
throw new RemoteException("Unable to connect", e);
}
OutputStream out = null;
InputStream in = null;
try {
/*----------------------------------*/
/* Get output streams */
/*----------------------------------*/
try {
out = conn.getOutputStream();
} catch (final IOException e) {
throw newIOException("Cannot open output stream to server: ", e);
}
/*----------------------------------*/
/* Write the protocol magic */
/*----------------------------------*/
try {
protocolRequest.writeExternal(out);
out.flush();
} catch (final IOException e) {
throw newIOException("Cannot write the protocol metadata to the server: ", e);
}
/*----------------------------------*/
/* Get output streams */
/*----------------------------------*/
final ObjectOutput objectOut;
try {
objectOut = new ObjectOutputStream(out);
} catch (final IOException e) {
throw newIOException("Cannot open object output stream to server: ", e);
}
/*----------------------------------*/
/* Write ServerMetaData */
/*----------------------------------*/
try {
server.setMetaData(protocolRequest);
server.writeExternal(objectOut);
} catch (final IOException e) {
throw newIOException("Cannot write the ServerMetaData to the server: ", e);
}
/*----------------------------------*/
/* Write ClusterMetaData */
/*----------------------------------*/
try {
final ClusterRequest clusterRequest = new ClusterRequest(cluster);
clusterRequest.setMetaData(protocolRequest);
objectOut.write(clusterRequest.getRequestType().getCode());
clusterRequest.writeExternal(objectOut);
} catch (final Throwable e) {
throw newIOException("Cannot write the ClusterMetaData to the server: ", e);
}
/*----------------------------------*/
/* Write request type */
/*----------------------------------*/
try {
objectOut.write(req.getRequestType().getCode());
} catch (final IOException e) {
throw newIOException("Cannot write the request type to the server: ", e);
}
/*----------------------------------*/
/* Write request */
/*----------------------------------*/
try {
req.setMetaData(protocolRequest);
req.writeExternal(objectOut);
objectOut.flush();
out.flush();
} catch (final java.io.NotSerializableException e) {
throw new IllegalArgumentException("Object is not serializable: " + e.getMessage());
} catch (final IOException e) {
throw newIOException("Cannot write the request to the server: " + e.getMessage(), e);
}
/*----------------------------------*/
/* Get input streams */
/*----------------------------------*/
try {
in = conn.getInputStream();
} catch (final IOException e) {
if (AuthenticationException.class.isInstance(e.getCause())) {
throw e.getCause();
}
throw newIOException("Cannot open input stream to server: ", e);
}
//Determine the server response protocol for reading
final ProtocolMetaData protocolResponse = new ProtocolMetaData();
try {
protocolResponse.readExternal(in);
} catch (final EOFException e) {
throw newIOException("Prematurely reached the end of the stream. " + protocolResponse.getSpec() + " : " + e.getMessage(), e);
} catch (final IOException e) {
throw newIOException("Cannot determine server protocol version: Received " + protocolResponse.getSpec() + " : " + e.getMessage(), e);
}
final ObjectInput objectIn;
try {
objectIn = new EjbObjectInputStream(in);
} catch (final IOException e) {
throw newIOException("Cannot open object input stream to server (" + protocolResponse.getSpec() + ") : " + e.getMessage(), e);
}
/*----------------------------------*/
/* Read cluster response */
/*----------------------------------*/
try {
final ClusterResponse clusterResponse = new ClusterResponse();
clusterResponse.setMetaData(protocolResponse);
clusterResponse.readExternal(objectIn);
switch (clusterResponse.getResponseCode()) {
case UPDATE: {
setClusterMetaData(server, clusterResponse.getUpdatedMetaData());
}
break;
case FAILURE: {
throw clusterResponse.getFailure();
}
}
} catch (final ClassNotFoundException e) {
throw new RemoteException("Cannot read the cluster response from the server. The class for an object being returned is not located in this system:", e);
} catch (final IOException e) {
throw newIOException("Cannot read the cluster response from the server (" + protocolResponse.getSpec() + ") : " + e.getMessage(), e);
} catch (final Throwable e) {
throw new RemoteException("Error reading cluster response from server (" + protocolResponse.getSpec() + ") : " + e.getMessage(), e);
}
/*----------------------------------*/
/* Read response */
/*----------------------------------*/
try {
res.setMetaData(protocolResponse);
res.readExternal(objectIn);
} catch (final ClassNotFoundException e) {
throw new RemoteException("Cannot read the response from the server. The class for an object being returned is not located in this system:", e);
} catch (final IOException e) {
throw newIOException("Cannot read the response from the server (" + protocolResponse.getSpec() + ") : " + e.getMessage(), e);
} catch (final Throwable e) {
throw new RemoteException("Error reading response from server (" + protocolResponse.getSpec() + ") : " + e.getMessage(), e);
}
if (retryConditions.size() > 0) {
if (res instanceof EJBResponse) {
final EJBResponse ejbResponse = (EJBResponse) res;
if (ejbResponse.getResult() instanceof ThrowableArtifact) {
final ThrowableArtifact artifact = (ThrowableArtifact) ejbResponse.getResult();
//noinspection ThrowableResultOfMethodCallIgnored
if (retryConditions.contains(artifact.getThrowable().getClass())) {
throw new RetryException(res);
// if (? < maxConditionRetry) {
// throw new RetryException(res);
// } else {
// if (FINER) {
// logger.log(Level.FINER, "Giving up on " + artifact.getThrowable().getClass().getName().toString());
// }
// }
}
}
}
}
if (FINEST) {
final long time = System.nanoTime() - start;
final String message = String.format("Invocation %sns - %s - Request(%s) - Response(%s)", time, conn.getURI(), req, res);
logger.log(Level.FINEST, message);
}
} catch (final RemoteException e) {
throw e;
} catch (final IOException e) {
final URI uri = conn.getURI();
final Set<URI> failed = getFailed();
Client.fireEvent(new RequestFailed(uri, req, e));
if (FINER) {
logger.log(Level.FINER, "Add Failed " + uri.toString());
}
failed.add(uri);
conn.discard();
if (e instanceof RetryException || getRetry()) {
try {
Client.fireEvent(new RetryingRequest(req, server));
processRequest(req, res, server);
} catch (final RemoteFailoverException re) {
throw re;
} catch (final RemoteException re) {
if (e instanceof RetryException) {
return ((RetryException) e).getResponse();
}
throw new RemoteFailoverException("Cannot complete request. Retry attempted on " + failed.size() + " servers", e);
}
} else {
throw new RemoteException("Cannot read the response from the server (" + protocolRequest.getSpec() + ") : " + e.getMessage(), e);
}
} catch (final Throwable error) {
throw new RemoteException("Error while communicating with server: ", error);
} finally {
if (null != out) {
try {
out.close();
} catch (final Throwable e) {
//Ignore
}
}
if (null != in) {
// Without consuming anything that is left in the buffer at the end of the call, we will cause the HttpUrlConnection to
// be closed, and a new one created every time. This is wildly inefficient, particularly for clients using HTTPS,
// as they'll need to handshake everytime. The JDK does do an element of connection pooling on HTTP connections, so we should use it.
// Not consuming the remainder of the response buffer breaks this and forces a new connection. Please do not revert this
// without some discussion on the mailing list, and testing that connections are not reset.
// consume anything left in the buffer if we're running in http(s) mode
if (HttpConnectionFactory.HttpConnection.class.isInstance(conn)) {
try {
int read = 0;
while (read > -1) {
read = in.read();
}
} catch (Throwable e) {
// ignore
}
}
try {
in.close();
} catch (final Throwable e) {
//Ignore
}
}
if (null != conn) {
try {
conn.close();
} catch (final Throwable t) {
logger.log(Level.WARNING, "Error closing connection with server: " + t.getMessage(), t);
}
}
}
return res;
}