in lib/ClientConnection.cc [833:946]
void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
LOG_DEBUG(cnxString_ << "Handling incoming command: " << Commands::messageType(incomingCmd.type()));
switch (state_.load()) {
case Pending: {
LOG_ERROR(cnxString_ << "Connection is not ready yet");
break;
}
case TcpConnected: {
// Handle Pulsar Connected
if (incomingCmd.type() != BaseCommand::CONNECTED) {
// Wrong cmd
close();
} else {
handlePulsarConnected(incomingCmd.connected());
}
break;
}
case Disconnected: {
LOG_ERROR(cnxString_ << "Connection already disconnected");
break;
}
case Ready: {
// Since we are receiving data from the connection, we are assuming that for now the connection is
// still working well.
havePendingPingRequest_ = false;
// Handle normal commands
switch (incomingCmd.type()) {
case BaseCommand::SEND_RECEIPT:
handleSendReceipt(incomingCmd.send_receipt());
break;
case BaseCommand::SEND_ERROR:
handleSendError(incomingCmd.send_error());
break;
case BaseCommand::SUCCESS:
handleSuccess(incomingCmd.success());
break;
case BaseCommand::PARTITIONED_METADATA_RESPONSE:
handlePartitionedMetadataResponse(incomingCmd.partitionmetadataresponse());
break;
case BaseCommand::CONSUMER_STATS_RESPONSE:
handleConsumerStatsResponse(incomingCmd.consumerstatsresponse());
break;
case BaseCommand::LOOKUP_RESPONSE:
handleLookupTopicRespose(incomingCmd.lookuptopicresponse());
break;
case BaseCommand::PRODUCER_SUCCESS:
handleProducerSuccess(incomingCmd.producer_success());
break;
case BaseCommand::ERROR:
handleError(incomingCmd.error());
break;
case BaseCommand::CLOSE_PRODUCER:
handleCloseProducer(incomingCmd.close_producer());
break;
case BaseCommand::CLOSE_CONSUMER:
handleCloseConsumer(incomingCmd.close_consumer());
break;
case BaseCommand::PING:
// Respond to ping request
LOG_DEBUG(cnxString_ << "Replying to ping command");
sendCommand(Commands::newPong());
break;
case BaseCommand::PONG:
LOG_DEBUG(cnxString_ << "Received response to ping message");
break;
case BaseCommand::AUTH_CHALLENGE:
handleAuthChallenge();
break;
case BaseCommand::ACTIVE_CONSUMER_CHANGE:
handleActiveConsumerChange(incomingCmd.active_consumer_change());
break;
case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
handleGetLastMessageIdResponse(incomingCmd.getlastmessageidresponse());
break;
case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
handleGetTopicOfNamespaceResponse(incomingCmd.gettopicsofnamespaceresponse());
break;
case BaseCommand::GET_SCHEMA_RESPONSE:
handleGetSchemaResponse(incomingCmd.getschemaresponse());
break;
case BaseCommand::ACK_RESPONSE:
handleAckResponse(incomingCmd.ackresponse());
break;
default:
LOG_WARN(cnxString_ << "Received invalid message from server");
close();
break;
}
}
}
}