in pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java [110:487]
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HAProxyMessage) {
HAProxyMessage proxyMessage = (HAProxyMessage) msg;
this.proxyMessage = proxyMessage;
proxyMessage.release();
return;
}
// Get a buffer that contains the full frame
ByteBuf buffer = (ByteBuf) msg;
try {
// De-serialize the command
int cmdSize = (int) buffer.readUnsignedInt();
cmd.parseFrom(buffer, cmdSize);
if (log.isDebugEnabled()) {
log.debug("[{}] Received cmd {}", ctx.channel(), cmd.getType());
}
messageReceived();
switch (cmd.getType()) {
case PARTITIONED_METADATA:
checkArgument(cmd.hasPartitionMetadata());
try {
interceptCommand(cmd);
handlePartitionMetadataRequest(cmd.getPartitionMetadata());
} catch (InterceptException e) {
writeAndFlush(ctx,
Commands.newPartitionMetadataResponse(getServerError(e.getErrorCode()),
e.getMessage(), cmd.getPartitionMetadata().getRequestId()));
}
break;
case PARTITIONED_METADATA_RESPONSE:
checkArgument(cmd.hasPartitionMetadataResponse());
handlePartitionResponse(cmd.getPartitionMetadataResponse());
break;
case LOOKUP:
checkArgument(cmd.hasLookupTopic());
handleLookup(cmd.getLookupTopic());
break;
case LOOKUP_RESPONSE:
checkArgument(cmd.hasLookupTopicResponse());
handleLookupResponse(cmd.getLookupTopicResponse());
break;
case ACK:
checkArgument(cmd.hasAck());
safeInterceptCommand(cmd);
handleAck(cmd.getAck());
break;
case ACK_RESPONSE:
checkArgument(cmd.hasAckResponse());
handleAckResponse(cmd.getAckResponse());
break;
case CLOSE_CONSUMER:
checkArgument(cmd.hasCloseConsumer());
safeInterceptCommand(cmd);
handleCloseConsumer(cmd.getCloseConsumer());
break;
case CLOSE_PRODUCER:
checkArgument(cmd.hasCloseProducer());
safeInterceptCommand(cmd);
handleCloseProducer(cmd.getCloseProducer());
break;
case CONNECT:
checkArgument(cmd.hasConnect());
handleConnect(cmd.getConnect());
break;
case CONNECTED:
checkArgument(cmd.hasConnected());
handleConnected(cmd.getConnected());
break;
case ERROR:
checkArgument(cmd.hasError());
handleError(cmd.getError());
break;
case FLOW:
checkArgument(cmd.hasFlow());
handleFlow(cmd.getFlow());
break;
case MESSAGE: {
checkArgument(cmd.hasMessage());
handleMessage(cmd.getMessage(), buffer);
break;
}
case PRODUCER:
checkArgument(cmd.hasProducer());
try {
interceptCommand(cmd);
handleProducer(cmd.getProducer());
} catch (InterceptException e) {
writeAndFlush(ctx, Commands.newError(cmd.getProducer().getRequestId(),
getServerError(e.getErrorCode()), e.getMessage()));
}
break;
case SEND: {
checkArgument(cmd.hasSend());
try {
interceptCommand(cmd);
// Store a buffer marking the content + headers
ByteBuf headersAndPayload = buffer.markReaderIndex();
handleSend(cmd.getSend(), headersAndPayload);
} catch (InterceptException e) {
writeAndFlush(ctx, Commands.newSendError(cmd.getSend().getProducerId(),
cmd.getSend().getSequenceId(), getServerError(e.getErrorCode()), e.getMessage()));
}
break;
}
case SEND_ERROR:
checkArgument(cmd.hasSendError());
handleSendError(cmd.getSendError());
break;
case SEND_RECEIPT:
checkArgument(cmd.hasSendReceipt());
handleSendReceipt(cmd.getSendReceipt());
break;
case SUBSCRIBE:
checkArgument(cmd.hasSubscribe());
try {
interceptCommand(cmd);
handleSubscribe(cmd.getSubscribe());
} catch (InterceptException e) {
writeAndFlush(ctx, Commands.newError(cmd.getSubscribe().getRequestId(),
getServerError(e.getErrorCode()), e.getMessage()));
}
break;
case SUCCESS:
checkArgument(cmd.hasSuccess());
handleSuccess(cmd.getSuccess());
break;
case PRODUCER_SUCCESS:
checkArgument(cmd.hasProducerSuccess());
handleProducerSuccess(cmd.getProducerSuccess());
break;
case UNSUBSCRIBE:
checkArgument(cmd.hasUnsubscribe());
safeInterceptCommand(cmd);
handleUnsubscribe(cmd.getUnsubscribe());
break;
case SEEK:
checkArgument(cmd.hasSeek());
try {
interceptCommand(cmd);
handleSeek(cmd.getSeek());
} catch (InterceptException e) {
writeAndFlush(ctx,
Commands.newError(
cmd.getSeek().getRequestId(),
getServerError(e.getErrorCode()),
e.getMessage()
)
);
}
break;
case PING:
checkArgument(cmd.hasPing());
handlePing(cmd.getPing());
break;
case PONG:
checkArgument(cmd.hasPong());
handlePong(cmd.getPong());
break;
case REDELIVER_UNACKNOWLEDGED_MESSAGES:
checkArgument(cmd.hasRedeliverUnacknowledgedMessages());
safeInterceptCommand(cmd);
handleRedeliverUnacknowledged(cmd.getRedeliverUnacknowledgedMessages());
break;
case CONSUMER_STATS:
checkArgument(cmd.hasConsumerStats());
handleConsumerStats(cmd.getConsumerStats());
break;
case CONSUMER_STATS_RESPONSE:
checkArgument(cmd.hasConsumerStatsResponse());
handleConsumerStatsResponse(cmd.getConsumerStatsResponse());
break;
case REACHED_END_OF_TOPIC:
checkArgument(cmd.hasReachedEndOfTopic());
handleReachedEndOfTopic(cmd.getReachedEndOfTopic());
break;
case TOPIC_MIGRATED:
checkArgument(cmd.hasTopicMigrated());
handleTopicMigrated(cmd.getTopicMigrated());
break;
case GET_LAST_MESSAGE_ID:
checkArgument(cmd.hasGetLastMessageId());
handleGetLastMessageId(cmd.getGetLastMessageId());
break;
case GET_LAST_MESSAGE_ID_RESPONSE:
checkArgument(cmd.hasGetLastMessageIdResponse());
handleGetLastMessageIdSuccess(cmd.getGetLastMessageIdResponse());
break;
case ACTIVE_CONSUMER_CHANGE:
handleActiveConsumerChange(cmd.getActiveConsumerChange());
break;
case GET_TOPICS_OF_NAMESPACE:
checkArgument(cmd.hasGetTopicsOfNamespace());
try {
interceptCommand(cmd);
handleGetTopicsOfNamespace(cmd.getGetTopicsOfNamespace());
} catch (InterceptException e) {
writeAndFlush(ctx, Commands.newError(cmd.getGetTopicsOfNamespace().getRequestId(),
getServerError(e.getErrorCode()), e.getMessage()));
}
break;
case GET_TOPICS_OF_NAMESPACE_RESPONSE:
checkArgument(cmd.hasGetTopicsOfNamespaceResponse());
handleGetTopicsOfNamespaceSuccess(cmd.getGetTopicsOfNamespaceResponse());
break;
case GET_SCHEMA:
checkArgument(cmd.hasGetSchema());
try {
interceptCommand(cmd);
handleGetSchema(cmd.getGetSchema());
} catch (InterceptException e) {
writeAndFlush(ctx, Commands.newGetSchemaResponseError(cmd.getGetSchema().getRequestId(),
getServerError(e.getErrorCode()), e.getMessage()));
}
break;
case GET_SCHEMA_RESPONSE:
checkArgument(cmd.hasGetSchemaResponse());
handleGetSchemaResponse(cmd.getGetSchemaResponse());
break;
case GET_OR_CREATE_SCHEMA:
checkArgument(cmd.hasGetOrCreateSchema());
try {
interceptCommand(cmd);
handleGetOrCreateSchema(cmd.getGetOrCreateSchema());
} catch (InterceptException e) {
writeAndFlush(ctx, Commands.newGetOrCreateSchemaResponseError(
cmd.getGetOrCreateSchema().getRequestId(), getServerError(e.getErrorCode()),
e.getMessage()));
}
break;
case GET_OR_CREATE_SCHEMA_RESPONSE:
checkArgument(cmd.hasGetOrCreateSchemaResponse());
handleGetOrCreateSchemaResponse(cmd.getGetOrCreateSchemaResponse());
break;
case AUTH_CHALLENGE:
checkArgument(cmd.hasAuthChallenge());
handleAuthChallenge(cmd.getAuthChallenge());
break;
case AUTH_RESPONSE:
checkArgument(cmd.hasAuthResponse());
handleAuthResponse(cmd.getAuthResponse());
break;
case TC_CLIENT_CONNECT_REQUEST:
checkArgument(cmd.hasTcClientConnectRequest());
handleTcClientConnectRequest(cmd.getTcClientConnectRequest());
break;
case TC_CLIENT_CONNECT_RESPONSE:
checkArgument(cmd.hasTcClientConnectResponse());
handleTcClientConnectResponse(cmd.getTcClientConnectResponse());
break;
case NEW_TXN:
checkArgument(cmd.hasNewTxn());
handleNewTxn(cmd.getNewTxn());
break;
case NEW_TXN_RESPONSE:
checkArgument(cmd.hasNewTxnResponse());
handleNewTxnResponse(cmd.getNewTxnResponse());
break;
case ADD_PARTITION_TO_TXN:
checkArgument(cmd.hasAddPartitionToTxn());
handleAddPartitionToTxn(cmd.getAddPartitionToTxn());
break;
case ADD_PARTITION_TO_TXN_RESPONSE:
checkArgument(cmd.hasAddPartitionToTxnResponse());
handleAddPartitionToTxnResponse(cmd.getAddPartitionToTxnResponse());
break;
case ADD_SUBSCRIPTION_TO_TXN:
checkArgument(cmd.hasAddSubscriptionToTxn());
handleAddSubscriptionToTxn(cmd.getAddSubscriptionToTxn());
break;
case ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
checkArgument(cmd.hasAddSubscriptionToTxnResponse());
handleAddSubscriptionToTxnResponse(cmd.getAddSubscriptionToTxnResponse());
break;
case END_TXN:
checkArgument(cmd.hasEndTxn());
handleEndTxn(cmd.getEndTxn());
break;
case END_TXN_RESPONSE:
checkArgument(cmd.hasEndTxnResponse());
handleEndTxnResponse(cmd.getEndTxnResponse());
break;
case END_TXN_ON_PARTITION:
checkArgument(cmd.hasEndTxnOnPartition());
handleEndTxnOnPartition(cmd.getEndTxnOnPartition());
break;
case END_TXN_ON_PARTITION_RESPONSE:
checkArgument(cmd.hasEndTxnOnPartitionResponse());
handleEndTxnOnPartitionResponse(cmd.getEndTxnOnPartitionResponse());
break;
case END_TXN_ON_SUBSCRIPTION:
checkArgument(cmd.hasEndTxnOnSubscription());
handleEndTxnOnSubscription(cmd.getEndTxnOnSubscription());
break;
case END_TXN_ON_SUBSCRIPTION_RESPONSE:
checkArgument(cmd.hasEndTxnOnSubscriptionResponse());
handleEndTxnOnSubscriptionResponse(cmd.getEndTxnOnSubscriptionResponse());
break;
case WATCH_TOPIC_LIST:
checkArgument(cmd.hasWatchTopicList());
handleCommandWatchTopicList(cmd.getWatchTopicList());
break;
case WATCH_TOPIC_LIST_SUCCESS:
checkArgument(cmd.hasWatchTopicListSuccess());
handleCommandWatchTopicListSuccess(cmd.getWatchTopicListSuccess());
break;
case WATCH_TOPIC_UPDATE:
checkArgument(cmd.hasWatchTopicUpdate());
handleCommandWatchTopicUpdate(cmd.getWatchTopicUpdate());
break;
case WATCH_TOPIC_LIST_CLOSE:
checkArgument(cmd.hasWatchTopicListClose());
handleCommandWatchTopicListClose(cmd.getWatchTopicListClose());
break;
default:
break;
}
} finally {
buffer.release();
}
}