def getRMWebAppURL()

in streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala [133:235]


  def getRMWebAppURL(getLatest: Boolean = false): String = {
    if (rmHttpURL == null || getLatest) {
      synchronized {
        val conf = HadoopUtils.hadoopConf
        val useHttps = YarnConfiguration.useHttps(conf)
        val (addressPrefix, defaultPort, protocol) = useHttps match {
          case x if x =>
            (YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "8090", Constants.HTTPS_SCHEMA)
          case _ =>
            (YarnConfiguration.RM_WEBAPP_ADDRESS, "8088", Constants.HTTP_SCHEMA)
        }

        rmHttpURL = Option(conf.get("yarn.web-proxy.address", null)) match {
          case Some(proxy) => s"$protocol$proxy"
          case _ =>
            val name =
              if (!HAUtil.isHAEnabled(conf)) addressPrefix
              else {
                val yarnConf = new YarnConfiguration(conf)
                val activeRMId = {
                  Option(RMHAUtils.findActiveRMHAId(yarnConf)) match {
                    case Some(x) =>
                      logInfo("'findActiveRMHAId' successful")
                      x
                    case None =>
                      // if you don't know why, don't modify it
                      logWarn(s"'findActiveRMHAId' is null,config yarn.acl.enable:${yarnConf
                          .get("yarn.acl.enable")},now http try it.")
                      // url ==> rmId
                      val idUrlMap = new JavaHashMap[String, String]
                      val rmIds = HAUtil.getRMHAIds(conf)
                      rmIds.foreach(id => {
                        val address = conf.get(HAUtil.addSuffix(addressPrefix, id)) match {
                          case null =>
                            val hostname =
                              conf.get(HAUtil.addSuffix("yarn.resourcemanager.hostname", id))
                            s"$hostname:$defaultPort"
                          case x => x
                        }
                        idUrlMap.put(s"$protocol$address", id)
                      })
                      var rmId: String = null
                      val rpcTimeoutForChecks = yarnConf.getInt(
                        CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_KEY,
                        CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_DEFAULT)
                      breakable(
                        idUrlMap.foreach(x => {
                          // test yarn url
                          val activeUrl =
                            httpTestYarnRMUrl(x._1, rpcTimeoutForChecks)
                          if (activeUrl != null) {
                            rmId = idUrlMap(activeUrl)
                            break
                          }
                        }))
                      rmId
                  }
                }
                require(
                  activeRMId != null,
                  "[StreamPark] YarnUtils.getRMWebAppURL: can not found yarn active node")
                logInfo(s"Current activeRMHAId: $activeRMId")
                val appActiveRMKey = HAUtil.addSuffix(addressPrefix, activeRMId)
                val hostnameActiveRMKey =
                  HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, activeRMId)
                if (null == HAUtil.getConfValueForRMInstance(
                    appActiveRMKey,
                    yarnConf) && null != HAUtil.getConfValueForRMInstance(
                    hostnameActiveRMKey,
                    yarnConf)) {
                  logInfo(s"Find rm web address by : $hostnameActiveRMKey")
                  hostnameActiveRMKey
                } else {
                  logInfo(s"Find rm web address by : $appActiveRMKey")
                  appActiveRMKey
                }
              }

            val inetSocketAddress =
              conf.getSocketAddr(name, s"0.0.0.0:$defaultPort", defaultPort.toInt)

            val address = NetUtils.getConnectAddress(inetSocketAddress)

            val buffer = new mutable.StringBuilder(protocol)
            val resolved = address.getAddress
            if (resolved != null && !resolved.isAnyLocalAddress && !resolved.isLoopbackAddress) {
              buffer.append(address.getHostName)
            } else {
              Try(InetAddress.getLocalHost.getCanonicalHostName) match {
                case Success(value) => buffer.append(value)
                case _ => buffer.append(address.getHostName)
              }
            }
            buffer
              .append(":")
              .append(address.getPort)
              .toString()
        }
        logInfo(s"Yarn resourceManager webapp url:$rmHttpURL")
      }
    }
    rmHttpURL
  }