void TcpTransport::readNextMessageIntCallback()

in src/transport/TcpTransport.cpp [227:290]


void TcpTransport::readNextMessageIntCallback(BufferEvent* event, TcpTransport* transport) {
  /* This callback is invoked when there is data to read on bev. */

  // protocol:  <length> <header length> <header data> <body data>
  //               1            2               3           4
  // rocketmq protocol contains 4 parts as following:
  //     1, big endian 4 bytes int, its length is sum of 2,3 and 4
  //     2, big endian 4 bytes int, its length is 3
  //     3, use json to serialization data
  //     4, application could self-defined binary data

  struct evbuffer* input = event->getInput();
  while (1) {
    struct evbuffer_iovec v[4];
    int n = evbuffer_peek(input, 4, NULL, v, sizeof(v) / sizeof(v[0]));

    char hdr[4];
    char* p = hdr;
    size_t needed = 4;

    for (int idx = 0; idx < n; idx++) {
      if (needed > 0) {
        size_t tmp = needed < v[idx].iov_len ? needed : v[idx].iov_len;
        memcpy(p, v[idx].iov_base, tmp);
        p += tmp;
        needed -= tmp;
      } else {
        break;
      }
    }

    if (needed > 0) {
      LOG_DEBUG("too little data received with sum = %d", 4 - needed);
      /**
       * reset read water mark to 4
       */
      event->setWatermark(EV_READ, 4, 0);
      return;
    }

    uint32 totalLenOfOneMsg = *(uint32*)hdr;  // first 4 bytes, which indicates 1st part of protocol
    uint32 msgLen = ntohl(totalLenOfOneMsg);
    size_t recvLen = evbuffer_get_length(input);
    if (recvLen >= msgLen + 4) {
      LOG_DEBUG("had received all data. msgLen:%d, from:%d, recvLen:%d", msgLen, event->getfd(), recvLen);
    } else {
      LOG_DEBUG("didn't received whole. msgLen:%d, from:%d, recvLen:%d", msgLen, event->getfd(), recvLen);
      /**
       * set read water mark to msgLen + 4,wait for receiving whole data
       */
      event->setWatermark(EV_READ, msgLen + 4, 0);
      return;  // consider large data which was not received completely by now
    }

    if (msgLen > 0) {
      MemoryBlock msg(msgLen, true);

      event->read(hdr, 4);  // skip length field
      event->read(msg.getData(), msgLen);

      transport->messageReceived(msg, event->getPeerAddrPort());
    }
  }
}