in zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java [466:571]
private void receiveMessage(ByteBuf message) {
checkIsInEventLoop("receiveMessage");
try {
while (message.isReadable() && !throttled.get()) {
if (bb != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
LOG.trace("0x{} bb {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (bb.remaining() > message.readableBytes()) {
int newLimit = bb.position() + message.readableBytes();
bb.limit(newLimit);
}
message.readBytes(bb);
bb.limit(bb.capacity());
if (LOG.isTraceEnabled()) {
LOG.trace("after readBytes message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
LOG.trace("after readbytes 0x{} bb {}",
Long.toHexString(sessionId),
ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (bb.remaining() == 0) {
bb.flip();
packetReceived(4 + bb.remaining());
ZooKeeperServer zks = this.zkServer;
if (zks == null || !zks.isRunning()) {
throw new IOException("ZK down");
}
if (initialized) {
RequestHeader h = new RequestHeader();
ByteBufferInputStream.byteBuffer2Record(bb, h);
RequestRecord request = RequestRecord.fromBytes(bb.slice());
zks.processPacket(this, h, request);
} else {
LOG.debug("got conn req request from {}", getRemoteSocketAddress());
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
zks.processConnectRequest(this, request);
initialized = true;
}
bb = null;
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("message readable {} bblenrem {}", message.readableBytes(), bbLen.remaining());
ByteBuffer dat = bbLen.duplicate();
dat.flip();
LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (message.readableBytes() < bbLen.remaining()) {
bbLen.limit(bbLen.position() + message.readableBytes());
}
message.readBytes(bbLen);
bbLen.limit(bbLen.capacity());
if (bbLen.remaining() == 0) {
bbLen.flip();
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(bbLen)));
}
int len = bbLen.getInt();
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} bbLen len is {}", Long.toHexString(sessionId), len);
}
bbLen.clear();
if (!initialized) {
if (checkFourLetterWord(channel, message, len)) {
return;
}
}
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error " + len);
}
ZooKeeperServer zks = this.zkServer;
if (zks == null || !zks.isRunning()) {
LOG.info("Closing connection to {} because the server is not ready (server state is: {})",
getRemoteSocketAddress(), zks == null ? "unknown" : zks.getState());
close(DisconnectReason.IO_EXCEPTION);
return;
}
// checkRequestSize will throw IOException if request is rejected
zks.checkRequestSizeWhenReceivingMessage(len);
bb = ByteBuffer.allocate(len);
}
}
}
} catch (IOException e) {
LOG.warn("Closing connection to {}", getRemoteSocketAddress(), e);
close(DisconnectReason.IO_EXCEPTION);
} catch (ClientCnxnLimitException e) {
// Common case exception, print at debug level
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.debug("Closing connection to {}", getRemoteSocketAddress(), e);
close(DisconnectReason.CLIENT_RATE_LIMIT);
}
}