std::shared_ptr TcpRemotingClient::CreateTransport()

in src/transport/TcpRemotingClient.cpp [298:361]


std::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const string& addr, bool needResponse) {
  std::shared_ptr<TcpTransport> tts;

  {
    // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking
    // long time, if could not get m_tcpLock, return NULL
    std::unique_lock<std::timed_mutex> lock(m_tcpTableLock, std::try_to_lock);
    if (!lock.owns_lock()) {
      if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
        LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str());
        std::shared_ptr<TcpTransport> pTcp;
        return pTcp;
      }
    }

    // check for reuse
    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
      std::shared_ptr<TcpTransport> tcp = m_tcpTable[addr];

      if (tcp) {
        TcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
        if (connectStatus == TCP_CONNECT_STATUS_SUCCESS) {
          return tcp;
        } else if (connectStatus == TCP_CONNECT_STATUS_WAIT) {
          std::shared_ptr<TcpTransport> pTcp;
          return pTcp;
        } else if (connectStatus == TCP_CONNECT_STATUS_FAILED) {
          LOG_ERROR("tcpTransport with server disconnected, erase server:%s", addr.c_str());
          tcp->disconnect(addr);  // avoid coredump when connection with broker was broken
          m_tcpTable.erase(addr);
        } else {
          LOG_ERROR("go to fault state, erase:%s from tcpMap, and reconnect it", addr.c_str());
          m_tcpTable.erase(addr);
        }
      }
    }

    //<!callback;
    TcpTransportReadCallback callback = needResponse ? &TcpRemotingClient::static_messageReceived : nullptr;

    tts = TcpTransport::CreateTransport(this, m_enableSsl, m_sslPropertyFile, callback);
    TcpConnectStatus connectStatus = tts->connect(addr, 0);  // use non-block
    if (connectStatus != TCP_CONNECT_STATUS_WAIT) {
      LOG_WARN("can not connect to:%s", addr.c_str());
      tts->disconnect(addr);
      std::shared_ptr<TcpTransport> pTcp;
      return pTcp;
    } else {
      // even if connecting failed finally, this server transport will be erased by next CreateTransport
      m_tcpTable[addr] = tts;
    }
  }

  TcpConnectStatus connectStatus = tts->waitTcpConnectEvent(static_cast<int>(m_tcpConnectTimeout));
  if (connectStatus != TCP_CONNECT_STATUS_SUCCESS) {
    LOG_WARN("can not connect to server:%s", addr.c_str());
    tts->disconnect(addr);
    std::shared_ptr<TcpTransport> pTcp;
    return pTcp;
  } else {
    LOG_INFO("connect server with addr:%s success", addr.c_str());
    return tts;
  }
}