public void execProto()

in ratis-experiments/src/main/java/org/apache/ratis/experiments/flatbuffers/client/ClientProto.java [64:117]


  public void execProto(int reps) throws Exception{
    System.out.println("Starting streaming with Protobuffers");

    StreamObserver<TransferMsgProto> requestObserver = asyncStubProto.sendData(new StreamObserver<TransferReplyProto>(){

      @Override
      public void onNext(TransferReplyProto msg) {
        available.release();
        recv[0]++;
      }

      @Override
      public void onError(Throwable t) {
        Status status = Status.fromThrowable(t);
        System.out.println(status);
        System.out.println("Finished streaming with errors");
      }

      @Override
      public void onCompleted() {
        System.out.println("Finished streaming");
      }
    });
    try{
      int i = 0;
      // allocate a byte buffer containing message data.
      ByteBuffer bf = ByteBuffer.allocate(1024*1024);
      if(bf.hasArray()){
        Arrays.fill(bf.array(), (byte) 'a');
      }
      while(i < reps) {
        partId++;
        available.acquire();
        // using unsafewrap operations
        // creates a ByteString refrencing buffer data. Avoids Copying.
        // Something similar is missing in flatbuffers.
        TransferMsgProto msg = TransferMsgProto.newBuilder().
            setPartId(partId).
            setData(UnsafeByteOperations.unsafeWrap(bf)).build();
        requestObserver.onNext(msg);
        i++;
      }

    } catch (Exception e){
      System.out.println(e);
    }
    requestObserver.onCompleted();
    Thread.sleep(1000*100);
    if(recv[0] == partId){
      System.out.println("Transfer Successfull....");
    } else{
      System.out.println("Some error occurred...");
    }
  }