in bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java [309:412]
public void processRequest(Object msg, BookieRequestHandler requestHandler) {
Channel channel = requestHandler.ctx().channel();
// If we can decode this packet as a Request protobuf packet, process
// it as a version 3 packet. Else, just use the old protocol.
if (msg instanceof BookkeeperProtocol.Request) {
BookkeeperProtocol.Request r = (BookkeeperProtocol.Request) msg;
restoreMdcContextFromRequest(r);
try {
BookkeeperProtocol.BKPacketHeader header = r.getHeader();
switch (header.getOperation()) {
case ADD_ENTRY:
processAddRequestV3(r, requestHandler);
break;
case READ_ENTRY:
processReadRequestV3(r, requestHandler);
break;
case FORCE_LEDGER:
processForceLedgerRequestV3(r, requestHandler);
break;
case AUTH:
LOG.info("Ignoring auth operation from client {}", channel.remoteAddress());
BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
.newBuilder()
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
.setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
.build();
final BookkeeperProtocol.Response authResponse = BookkeeperProtocol.Response
.newBuilder().setHeader(r.getHeader())
.setStatus(BookkeeperProtocol.StatusCode.EOK)
.setAuthResponse(message)
.build();
writeAndFlush(channel, authResponse);
break;
case WRITE_LAC:
processWriteLacRequestV3(r, requestHandler);
break;
case READ_LAC:
processReadLacRequestV3(r, requestHandler);
break;
case GET_BOOKIE_INFO:
processGetBookieInfoRequestV3(r, requestHandler);
break;
case START_TLS:
processStartTLSRequestV3(r, requestHandler);
break;
case GET_LIST_OF_ENTRIES_OF_LEDGER:
processGetListOfEntriesOfLedgerProcessorV3(r, requestHandler);
break;
default:
LOG.info("Unknown operation type {}", header.getOperation());
final BookkeeperProtocol.Response response =
BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
.setStatus(BookkeeperProtocol.StatusCode.EBADREQ)
.build();
writeAndFlush(channel, response);
if (statsEnabled) {
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
}
break;
}
} finally {
MDC.clear();
}
} else {
BookieProtocol.Request r = (BookieProtocol.Request) msg;
// process packet
switch (r.getOpCode()) {
case BookieProtocol.ADDENTRY:
checkArgument(r instanceof BookieProtocol.ParsedAddRequest);
processAddRequest((BookieProtocol.ParsedAddRequest) r, requestHandler);
break;
case BookieProtocol.READENTRY:
checkArgument(r instanceof BookieProtocol.ReadRequest);
processReadRequest((BookieProtocol.ReadRequest) r, requestHandler);
break;
case BookieProtocol.BATCH_READ_ENTRY:
checkArgument(r instanceof BookieProtocol.BatchedReadRequest);
processReadRequest((BookieProtocol.BatchedReadRequest) r, requestHandler);
break;
case BookieProtocol.AUTH:
LOG.info("Ignoring auth operation from client {}",
requestHandler.ctx().channel().remoteAddress());
BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
.newBuilder()
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
.setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
.build();
final BookieProtocol.AuthResponse response = new BookieProtocol.AuthResponse(
BookieProtocol.CURRENT_PROTOCOL_VERSION, message);
writeAndFlush(channel, response);
break;
default:
LOG.error("Unknown op type {}, sending error", r.getOpCode());
final BookieProtocol.Response errResponse = ResponseBuilder
.buildErrorResponse(BookieProtocol.EBADREQ, r);
writeAndFlush(channel, errResponse);
if (statsEnabled) {
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
}
break;
}
}
}