in ratis-experiments/src/main/java/org/apache/ratis/experiments/flatbuffers/client/ClientFlat.java [64:119]
public void execFlatClient(int reps) throws Exception{
System.out.println("Starting streaming with Flatbuffers");
StreamObserver<TransferMsg> requestObserver = asyncStubFlat.sendData(new StreamObserver<TransferReply>(){
@Override
public void onNext(TransferReply msg) {
recv[0]++;
available.release();
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
System.out.println(status);
System.out.println("Finished streaming with errors");
}
@Override
public void onCompleted() {
System.out.println("Finished streaming");
}
});
try{
int i = 0;
// allocate a byte buffer containing message data.
ByteBuffer bf = ByteBuffer.allocateDirect(1024*1024);
if(bf.hasArray()){
Arrays.fill(bf.array(), (byte) 'a');
}
while(i < reps) {
partId++;
available.acquire();
// start a builder and reset position of databuffer to 0.
FlatBufferBuilder builder = new FlatBufferBuilder();
bf.position(0).limit(bf.capacity());
// create string of the message data
// the datacopy happens here, significant CPU time spent.
int dataOff = builder.createString(bf);
// Use flatbuffers generated message builder.
int off = TransferMsg.createTransferMsg(builder, partId, dataOff);
builder.finish(off);
TransferMsg msg = TransferMsg.getRootAsTransferMsg(builder.dataBuffer());
requestObserver.onNext(msg);
i++;
}
} catch (Exception e){
System.out.println(e);
}
requestObserver.onCompleted();
Thread.sleep(1000*100);
if(recv[0] == partId){
System.out.println("Transfer Successfull....");
} else{
System.out.println("Some error occurred...");
}
}