in src/transport/TcpTransport.cpp [227:290]
void TcpTransport::readNextMessageIntCallback(BufferEvent* event, TcpTransport* transport) {
/* This callback is invoked when there is data to read on bev. */
// protocol: <length> <header length> <header data> <body data>
// 1 2 3 4
// rocketmq protocol contains 4 parts as following:
// 1, big endian 4 bytes int, its length is sum of 2,3 and 4
// 2, big endian 4 bytes int, its length is 3
// 3, use json to serialization data
// 4, application could self-defined binary data
struct evbuffer* input = event->getInput();
while (1) {
struct evbuffer_iovec v[4];
int n = evbuffer_peek(input, 4, NULL, v, sizeof(v) / sizeof(v[0]));
char hdr[4];
char* p = hdr;
size_t needed = 4;
for (int idx = 0; idx < n; idx++) {
if (needed > 0) {
size_t tmp = needed < v[idx].iov_len ? needed : v[idx].iov_len;
memcpy(p, v[idx].iov_base, tmp);
p += tmp;
needed -= tmp;
} else {
break;
}
}
if (needed > 0) {
LOG_DEBUG("too little data received with sum = %d", 4 - needed);
/**
* reset read water mark to 4
*/
event->setWatermark(EV_READ, 4, 0);
return;
}
uint32 totalLenOfOneMsg = *(uint32*)hdr; // first 4 bytes, which indicates 1st part of protocol
uint32 msgLen = ntohl(totalLenOfOneMsg);
size_t recvLen = evbuffer_get_length(input);
if (recvLen >= msgLen + 4) {
LOG_DEBUG("had received all data. msgLen:%d, from:%d, recvLen:%d", msgLen, event->getfd(), recvLen);
} else {
LOG_DEBUG("didn't received whole. msgLen:%d, from:%d, recvLen:%d", msgLen, event->getfd(), recvLen);
/**
* set read water mark to msgLen + 4,wait for receiving whole data
*/
event->setWatermark(EV_READ, msgLen + 4, 0);
return; // consider large data which was not received completely by now
}
if (msgLen > 0) {
MemoryBlock msg(msgLen, true);
event->read(hdr, 4); // skip length field
event->read(msg.getData(), msgLen);
transport->messageReceived(msg, event->getPeerAddrPort());
}
}
}