in provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java [223:274]
protected Object request(ProxyInvocationHandler handler, final String address, final UTF8Buffer service, final ClassLoader classLoader, final Method method, final Object[] args) throws Exception {
if (!running.get()) {
throw new IllegalStateException("DOSGi Client stopped");
}
final long correlation = correlationGenerator.incrementAndGet();
// Encode the request before we try to pass it onto
// IO layers so that #1 we can report encoding error back to the caller
// and #2 reduce CPU load done in the execution queue since it's
// serially executed.
DataByteArrayOutputStream baos = new DataByteArrayOutputStream((int) (handler.lastRequestSize * 1.10));
baos.writeInt(0); // we don't know the size yet...
baos.writeVarLong(correlation);
writeBuffer(baos, service);
MethodData methodData = getMethodData(method);
writeBuffer(baos, methodData.signature);
final ResponseFuture future = methodData.invocationStrategy.request(methodData.serializationStrategy, classLoader, method, args, baos);
// toBuffer() is better than toByteArray() since it avoids an
// array copy.
final Buffer command = baos.toBuffer();
// Update the field size.
BufferEditor editor = command.buffer().bigEndianEditor();
editor.writeInt(command.length);
handler.lastRequestSize = command.length;
queue().execute(new Runnable() {
public void run() {
try {
TransportPool pool = transports.get(address);
if (pool == null) {
pool = new InvokerTransportPool(address, queue());
transports.put(address, pool);
pool.start();
}
requests.put(correlation, future);
pool.offer(command, correlation);
} catch (Exception e) {
LOGGER.info("Error while sending request", e);
future.fail(e);
}
}
});
// TODO: make that configurable, that's only for tests
return future.get(timeout, TimeUnit.MILLISECONDS);
}