public void execFlatClient()

in ratis-experiments/src/main/java/org/apache/ratis/experiments/flatbuffers/client/ClientFlat.java [64:119]


  public void execFlatClient(int reps) throws Exception{
    System.out.println("Starting streaming with Flatbuffers");
    StreamObserver<TransferMsg> requestObserver = asyncStubFlat.sendData(new StreamObserver<TransferReply>(){
      @Override
      public void onNext(TransferReply msg) {
        recv[0]++;
        available.release();
      }

      @Override
      public void onError(Throwable t) {
        Status status = Status.fromThrowable(t);
        System.out.println(status);
        System.out.println("Finished streaming with errors");
      }

      @Override
      public void onCompleted() {
        System.out.println("Finished streaming");
      }
    });
    try{
      int i = 0;
      // allocate a byte buffer containing message data.
      ByteBuffer bf = ByteBuffer.allocateDirect(1024*1024);
      if(bf.hasArray()){
        Arrays.fill(bf.array(), (byte) 'a');
      }
      while(i < reps) {
        partId++;
        available.acquire();
        // start a builder and reset position of databuffer to 0.
        FlatBufferBuilder builder = new FlatBufferBuilder();
        bf.position(0).limit(bf.capacity());
        // create string of the message data
        // the datacopy happens here, significant CPU time spent.
        int dataOff = builder.createString(bf);
        // Use flatbuffers generated message builder.
        int off = TransferMsg.createTransferMsg(builder, partId, dataOff);
        builder.finish(off);
        TransferMsg msg = TransferMsg.getRootAsTransferMsg(builder.dataBuffer());
        requestObserver.onNext(msg);
        i++;
      }

    } catch (Exception e){
      System.out.println(e);
    }
    requestObserver.onCompleted();
    Thread.sleep(1000*100);
    if(recv[0] == partId){
      System.out.println("Transfer Successfull....");
    } else{
      System.out.println("Some error occurred...");
    }
  }