void onMQTTConnect()

in activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java [237:366]


    void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
        if (connected.get()) {
            throw new MQTTProtocolException("Already connected.");
        }
        this.connect = connect;

        // The Server MUST respond to the CONNECT Packet with a CONNACK return code 0x01
        // (unacceptable protocol level) and then disconnect the Client if the Protocol Level
        // is not supported by the Server [MQTT-3.1.2-2].
        if (connect.version() < 3 || connect.version() > 4) {
            CONNACK ack = new CONNACK();
            ack.code(CONNACK.Code.CONNECTION_REFUSED_UNACCEPTED_PROTOCOL_VERSION);
            try {
                getMQTTTransport().sendToMQTT(ack.encode());
                getMQTTTransport().onException(IOExceptionSupport.create("Unsupported or invalid protocol version", null));
            } catch (IOException e) {
                getMQTTTransport().onException(IOExceptionSupport.create(e));
            }
            return;
        }

        String clientId = "";
        if (connect.clientId() != null) {
            clientId = connect.clientId().toString();
        }

        String userName = null;
        if (connect.userName() != null) {
            userName = connect.userName().toString();
        }
        String passswd = null;
        if (connect.password() != null) {

            if (userName == null && connect.version() != V3_1) {
                // [MQTT-3.1.2-22]: If the user name is not present then the
                // password must also be absent.
                // [MQTT-3.1.4-1]: would seem to imply we don't send a CONNACK here.
                getMQTTTransport().onException(IOExceptionSupport.create("Password given without a user name", null));
                return;
            }

            passswd = connect.password().toString();
        }

        version = connect.version();

        configureInactivityMonitor(connect.keepAlive());

        connectionInfo.setConnectionId(connectionId);
        if (clientId != null && !clientId.isEmpty()) {
            connectionInfo.setClientId(clientId);
        } else {
            // Clean Session MUST be set for 0 length Client Id
            if (!connect.cleanSession()) {
                CONNACK ack = new CONNACK();
                ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
                try {
                    getMQTTTransport().sendToMQTT(ack.encode());
                    getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", null));
                } catch (IOException e) {
                    getMQTTTransport().onException(IOExceptionSupport.create(e));
                }
                return;
            }
            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
        }

        connectionInfo.setResponseRequired(true);
        connectionInfo.setUserName(userName);
        connectionInfo.setPassword(passswd);
        connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());

        sendToActiveMQ(connectionInfo, new ResponseHandler() {
            @Override
            public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {

                if (response.isException()) {
                    // If the connection attempt fails we close the socket.
                    Throwable exception = ((ExceptionResponse) response).getException();
                    //let the client know
                    CONNACK ack = new CONNACK();
                    if (exception instanceof InvalidClientIDException) {
                        ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
                    } else if (exception instanceof SecurityException) {
                        ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
                    } else if (exception instanceof CredentialException) {
                        ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
                    } else {
                        ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                    }
                    getMQTTTransport().sendToMQTT(ack.encode());
                    getMQTTTransport().onException(IOExceptionSupport.create(exception));
                    return;
                }

                final SessionInfo sessionInfo = new SessionInfo(sessionId);
                sendToActiveMQ(sessionInfo, null);

                final ProducerInfo producerInfo = new ProducerInfo(producerId);
                sendToActiveMQ(producerInfo, new ResponseHandler() {
                    @Override
                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {

                        if (response.isException()) {
                            // If the connection attempt fails we close the socket.
                            Throwable exception = ((ExceptionResponse) response).getException();
                            CONNACK ack = new CONNACK();
                            ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
                            getMQTTTransport().sendToMQTT(ack.encode());
                            getMQTTTransport().onException(IOExceptionSupport.create(exception));
                            return;
                        }

                        CONNACK ack = new CONNACK();
                        ack.code(CONNACK.Code.CONNECTION_ACCEPTED);
                        connected.set(true);
                        getMQTTTransport().sendToMQTT(ack.encode());

                        if (connect.cleanSession()) {
                            packetIdGenerator.stopClientSession(getClientId());
                        } else {
                            packetIdGenerator.startClientSession(getClientId());
                        }

                        findSubscriptionStrategy().onConnect(connect);
                    }
                });
            }
        });
    }