in src/mqtt/NetworkRead.cpp [106:190]
ResponseCode NetworkReadActionRunner::PerformAction(std::shared_ptr<NetworkConnection> p_network_connection,
std::shared_ptr<ActionData> p_action_data) {
if (nullptr == p_network_connection) {
return ResponseCode::NULL_VALUE_ERROR;
}
bool is_duplicate;
bool is_retained;
QoS qos;
unsigned char fixed_header_byte;
unsigned char message_type_byte;
util::Vector<unsigned char> read_buf;
ResponseCode rc = ResponseCode::SUCCESS;
p_network_connection_ = p_network_connection;
std::atomic_bool &_p_thread_continue_ = *p_thread_continue_;
std::chrono::milliseconds thread_sleep_duration(DEFAULT_CORE_THREAD_SLEEP_DURATION_MS);
is_waiting_for_connack_ = !(p_client_state_->IsConnected());
do {
AWS_LOG_TRACE(NETWORK_READ_LOG_TAG,
" Network Read Thread, TLS Status : %d",
p_network_connection->IsConnected());
// Clear buffers
fixed_header_byte = 0x00;
read_buf.clear();
rc = ReadPacketFromNetwork(fixed_header_byte, read_buf);
if (ResponseCode::NETWORK_SSL_NOTHING_TO_READ == rc) {
std::this_thread::sleep_for(thread_sleep_duration);
continue;
} else if (ResponseCode::SUCCESS == rc) {
message_type_byte = fixed_header_byte;
message_type_byte >>= 4; // Packet type is in first 4 bits
message_type_byte &= 0x0F; // Only keep the least significant 4 bits
MessageTypes messageType = (MessageTypes) message_type_byte;
switch (messageType) {
case MessageTypes::CONNACK:
rc = HandleConnack(read_buf);
break;
case MessageTypes::PUBLISH: {
is_retained = ((fixed_header_byte & 0x01) == 0x01);
is_duplicate = ((fixed_header_byte & 0x08) == 0x08);
qos = ((fixed_header_byte & 0x02) == 0x02) ? QoS::QOS1 : QoS::QOS0;
rc = HandlePublish(read_buf, is_duplicate, is_retained, qos);
}
break;
case MessageTypes::PUBACK:
rc = HandlePuback(read_buf);
break;
case MessageTypes::SUBACK:
rc = HandleSuback(read_buf);
break;
case MessageTypes::UNSUBACK:
rc = HandleUnsuback(read_buf);
break;
case MessageTypes::PINGRESP:
p_client_state_->SetPingreqPending(false);
rc = ResponseCode::SUCCESS;
break;
default:
// Any type values other than above are either unsupported or invalid
// Packet types used for QoS2 are currently unsupported
break;
}
} else if (!is_waiting_for_connack_) {
is_waiting_for_connack_ = true;
if (_p_thread_continue_ && p_client_state_->IsConnected()) {
AWS_LOG_ERROR(NETWORK_READ_LOG_TAG,
"Network Read attempt returned unhandled error. %s Requesting Network Reconnect.",
ResponseHelper::ToString(rc).c_str());
rc = p_client_state_->PerformAction(ActionType::DISCONNECT,
DisconnectPacket::Create(),
p_client_state_->GetMqttCommandTimeout());
if (ResponseCode::SUCCESS != rc) {
AWS_LOG_ERROR(NETWORK_READ_LOG_TAG,
"Network Disconnect attempt returned unhandled error. %s",
ResponseHelper::ToString(rc).c_str());
// No further action being taken. Assumption is that reconnect logic should bring SDK back to working state
}
p_client_state_->SetAutoReconnectRequired(true);
}
}
} while (_p_thread_continue_);
return rc;
}