def getRMWebAppURL()

in streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala [124:227]


  def getRMWebAppURL(getLatest: Boolean = false): String = {
    if (rmHttpURL == null || getLatest) {
      synchronized {
        val conf = HadoopUtils.hadoopConf
        val useHttps = YarnConfiguration.useHttps(conf)
        val (addressPrefix, defaultPort, protocol) = useHttps match {
          case x if x => (YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "8090", "https://")
          case _ => (YarnConfiguration.RM_WEBAPP_ADDRESS, "8088", "http://")
        }

        rmHttpURL = Option(conf.get("yarn.web-proxy.address", null)) match {
          case Some(proxy) => s"$protocol$proxy"
          case _ =>
            val name =
              if (!HAUtil.isHAEnabled(conf)) addressPrefix
              else {
                val yarnConf = new YarnConfiguration(conf)
                val activeRMId = {
                  Option(RMHAUtils.findActiveRMHAId(yarnConf)) match {
                    case Some(x) =>
                      logInfo("findActiveRMHAId successful")
                      x
                    case None =>
                      // if you don't know why, don't modify it
                      logWarn(
                        s"findActiveRMHAId is null,config yarn.acl.enable:${yarnConf.get("yarn.acl.enable")},now http try it.")
                      // url ==> rmId
                      val idUrlMap = new JavaHashMap[String, String]
                      val rmIds = HAUtil.getRMHAIds(conf)
                      rmIds.foreach(
                        id => {
                          val address = conf.get(HAUtil.addSuffix(addressPrefix, id)) match {
                            case null =>
                              val hostname =
                                conf.get(HAUtil.addSuffix("yarn.resourcemanager.hostname", id))
                              s"$hostname:$defaultPort"
                            case x => x
                          }
                          idUrlMap.put(s"$protocol$address", id)
                        })
                      var rmId: String = null
                      val rpcTimeoutForChecks = yarnConf.getInt(
                        CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_KEY,
                        CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_DEFAULT)
                      breakable(
                        idUrlMap.foreach(
                          x => {
                            // test yarn url
                            val activeUrl = httpTestYarnRMUrl(x._1, rpcTimeoutForChecks)
                            if (activeUrl != null) {
                              rmId = idUrlMap(activeUrl)
                              break
                            }
                          }))
                      rmId
                  }
                }
                require(
                  activeRMId != null,
                  "[StreamPark] YarnUtils.getRMWebAppURL: can not found yarn active node")
                logInfo(s"current activeRMHAId: $activeRMId")
                val appActiveRMKey = HAUtil.addSuffix(addressPrefix, activeRMId)
                val hostnameActiveRMKey =
                  HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, activeRMId)
                if (
                  null == HAUtil.getConfValueForRMInstance(
                    appActiveRMKey,
                    yarnConf) && null != HAUtil.getConfValueForRMInstance(
                    hostnameActiveRMKey,
                    yarnConf)
                ) {
                  logInfo(s"Find rm web address by : $hostnameActiveRMKey")
                  hostnameActiveRMKey
                } else {
                  logInfo(s"Find rm web address by : $appActiveRMKey")
                  appActiveRMKey
                }
              }

            val inetSocketAddress =
              conf.getSocketAddr(name, s"0.0.0.0:$defaultPort", defaultPort.toInt)

            val address = NetUtils.getConnectAddress(inetSocketAddress)

            val buffer = new mutable.StringBuilder(protocol)
            val resolved = address.getAddress
            if (resolved != null && !resolved.isAnyLocalAddress && !resolved.isLoopbackAddress) {
              buffer.append(address.getHostName)
            } else {
              Try(InetAddress.getLocalHost.getCanonicalHostName) match {
                case Success(value) => buffer.append(value)
                case _ => buffer.append(address.getHostName)
              }
            }
            buffer
              .append(":")
              .append(address.getPort)
              .toString()
        }
        logInfo(s"yarn resourceManager webapp url:$rmHttpURL")
      }
    }
    rmHttpURL
  }