void ClientConnection::handleIncomingCommand()

in lib/ClientConnection.cc [833:946]


void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
    LOG_DEBUG(cnxString_ << "Handling incoming command: " << Commands::messageType(incomingCmd.type()));

    switch (state_.load()) {
        case Pending: {
            LOG_ERROR(cnxString_ << "Connection is not ready yet");
            break;
        }

        case TcpConnected: {
            // Handle Pulsar Connected
            if (incomingCmd.type() != BaseCommand::CONNECTED) {
                // Wrong cmd
                close();
            } else {
                handlePulsarConnected(incomingCmd.connected());
            }
            break;
        }

        case Disconnected: {
            LOG_ERROR(cnxString_ << "Connection already disconnected");
            break;
        }

        case Ready: {
            // Since we are receiving data from the connection, we are assuming that for now the connection is
            // still working well.
            havePendingPingRequest_ = false;

            // Handle normal commands
            switch (incomingCmd.type()) {
                case BaseCommand::SEND_RECEIPT:
                    handleSendReceipt(incomingCmd.send_receipt());
                    break;

                case BaseCommand::SEND_ERROR:
                    handleSendError(incomingCmd.send_error());
                    break;

                case BaseCommand::SUCCESS:
                    handleSuccess(incomingCmd.success());
                    break;

                case BaseCommand::PARTITIONED_METADATA_RESPONSE:
                    handlePartitionedMetadataResponse(incomingCmd.partitionmetadataresponse());
                    break;

                case BaseCommand::CONSUMER_STATS_RESPONSE:
                    handleConsumerStatsResponse(incomingCmd.consumerstatsresponse());
                    break;

                case BaseCommand::LOOKUP_RESPONSE:
                    handleLookupTopicRespose(incomingCmd.lookuptopicresponse());
                    break;

                case BaseCommand::PRODUCER_SUCCESS:
                    handleProducerSuccess(incomingCmd.producer_success());
                    break;

                case BaseCommand::ERROR:
                    handleError(incomingCmd.error());
                    break;

                case BaseCommand::CLOSE_PRODUCER:
                    handleCloseProducer(incomingCmd.close_producer());
                    break;

                case BaseCommand::CLOSE_CONSUMER:
                    handleCloseConsumer(incomingCmd.close_consumer());
                    break;

                case BaseCommand::PING:
                    // Respond to ping request
                    LOG_DEBUG(cnxString_ << "Replying to ping command");
                    sendCommand(Commands::newPong());
                    break;

                case BaseCommand::PONG:
                    LOG_DEBUG(cnxString_ << "Received response to ping message");
                    break;

                case BaseCommand::AUTH_CHALLENGE:
                    handleAuthChallenge();
                    break;

                case BaseCommand::ACTIVE_CONSUMER_CHANGE:
                    handleActiveConsumerChange(incomingCmd.active_consumer_change());
                    break;

                case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
                    handleGetLastMessageIdResponse(incomingCmd.getlastmessageidresponse());
                    break;

                case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
                    handleGetTopicOfNamespaceResponse(incomingCmd.gettopicsofnamespaceresponse());
                    break;

                case BaseCommand::GET_SCHEMA_RESPONSE:
                    handleGetSchemaResponse(incomingCmd.getschemaresponse());
                    break;

                case BaseCommand::ACK_RESPONSE:
                    handleAckResponse(incomingCmd.ackresponse());
                    break;

                default:
                    LOG_WARN(cnxString_ << "Received invalid message from server");
                    close();
                    break;
            }
        }
    }
}