in activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java [237:366]
void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
if (connected.get()) {
throw new MQTTProtocolException("Already connected.");
}
this.connect = connect;
// The Server MUST respond to the CONNECT Packet with a CONNACK return code 0x01
// (unacceptable protocol level) and then disconnect the Client if the Protocol Level
// is not supported by the Server [MQTT-3.1.2-2].
if (connect.version() < 3 || connect.version() > 4) {
CONNACK ack = new CONNACK();
ack.code(CONNACK.Code.CONNECTION_REFUSED_UNACCEPTED_PROTOCOL_VERSION);
try {
getMQTTTransport().sendToMQTT(ack.encode());
getMQTTTransport().onException(IOExceptionSupport.create("Unsupported or invalid protocol version", null));
} catch (IOException e) {
getMQTTTransport().onException(IOExceptionSupport.create(e));
}
return;
}
String clientId = "";
if (connect.clientId() != null) {
clientId = connect.clientId().toString();
}
String userName = null;
if (connect.userName() != null) {
userName = connect.userName().toString();
}
String passswd = null;
if (connect.password() != null) {
if (userName == null && connect.version() != V3_1) {
// [MQTT-3.1.2-22]: If the user name is not present then the
// password must also be absent.
// [MQTT-3.1.4-1]: would seem to imply we don't send a CONNACK here.
getMQTTTransport().onException(IOExceptionSupport.create("Password given without a user name", null));
return;
}
passswd = connect.password().toString();
}
version = connect.version();
configureInactivityMonitor(connect.keepAlive());
connectionInfo.setConnectionId(connectionId);
if (clientId != null && !clientId.isEmpty()) {
connectionInfo.setClientId(clientId);
} else {
// Clean Session MUST be set for 0 length Client Id
if (!connect.cleanSession()) {
CONNACK ack = new CONNACK();
ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
try {
getMQTTTransport().sendToMQTT(ack.encode());
getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", null));
} catch (IOException e) {
getMQTTTransport().onException(IOExceptionSupport.create(e));
}
return;
}
connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
}
connectionInfo.setResponseRequired(true);
connectionInfo.setUserName(userName);
connectionInfo.setPassword(passswd);
connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());
sendToActiveMQ(connectionInfo, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
// If the connection attempt fails we close the socket.
Throwable exception = ((ExceptionResponse) response).getException();
//let the client know
CONNACK ack = new CONNACK();
if (exception instanceof InvalidClientIDException) {
ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
} else if (exception instanceof SecurityException) {
ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
} else if (exception instanceof CredentialException) {
ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
} else {
ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
getMQTTTransport().sendToMQTT(ack.encode());
getMQTTTransport().onException(IOExceptionSupport.create(exception));
return;
}
final SessionInfo sessionInfo = new SessionInfo(sessionId);
sendToActiveMQ(sessionInfo, null);
final ProducerInfo producerInfo = new ProducerInfo(producerId);
sendToActiveMQ(producerInfo, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
// If the connection attempt fails we close the socket.
Throwable exception = ((ExceptionResponse) response).getException();
CONNACK ack = new CONNACK();
ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
getMQTTTransport().sendToMQTT(ack.encode());
getMQTTTransport().onException(IOExceptionSupport.create(exception));
return;
}
CONNACK ack = new CONNACK();
ack.code(CONNACK.Code.CONNECTION_ACCEPTED);
connected.set(true);
getMQTTTransport().sendToMQTT(ack.encode());
if (connect.cleanSession()) {
packetIdGenerator.stopClientSession(getClientId());
} else {
packetIdGenerator.startClientSession(getClientId());
}
findSubscriptionStrategy().onConnect(connect);
}
});
}
});
}