in streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala [133:235]
def getRMWebAppURL(getLatest: Boolean = false): String = {
if (rmHttpURL == null || getLatest) {
synchronized {
val conf = HadoopUtils.hadoopConf
val useHttps = YarnConfiguration.useHttps(conf)
val (addressPrefix, defaultPort, protocol) = useHttps match {
case x if x =>
(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "8090", Constants.HTTPS_SCHEMA)
case _ =>
(YarnConfiguration.RM_WEBAPP_ADDRESS, "8088", Constants.HTTP_SCHEMA)
}
rmHttpURL = Option(conf.get("yarn.web-proxy.address", null)) match {
case Some(proxy) => s"$protocol$proxy"
case _ =>
val name =
if (!HAUtil.isHAEnabled(conf)) addressPrefix
else {
val yarnConf = new YarnConfiguration(conf)
val activeRMId = {
Option(RMHAUtils.findActiveRMHAId(yarnConf)) match {
case Some(x) =>
logInfo("'findActiveRMHAId' successful")
x
case None =>
// if you don't know why, don't modify it
logWarn(s"'findActiveRMHAId' is null,config yarn.acl.enable:${yarnConf
.get("yarn.acl.enable")},now http try it.")
// url ==> rmId
val idUrlMap = new JavaHashMap[String, String]
val rmIds = HAUtil.getRMHAIds(conf)
rmIds.foreach(id => {
val address = conf.get(HAUtil.addSuffix(addressPrefix, id)) match {
case null =>
val hostname =
conf.get(HAUtil.addSuffix("yarn.resourcemanager.hostname", id))
s"$hostname:$defaultPort"
case x => x
}
idUrlMap.put(s"$protocol$address", id)
})
var rmId: String = null
val rpcTimeoutForChecks = yarnConf.getInt(
CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_KEY,
CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_DEFAULT)
breakable(
idUrlMap.foreach(x => {
// test yarn url
val activeUrl =
httpTestYarnRMUrl(x._1, rpcTimeoutForChecks)
if (activeUrl != null) {
rmId = idUrlMap(activeUrl)
break
}
}))
rmId
}
}
require(
activeRMId != null,
"[StreamPark] YarnUtils.getRMWebAppURL: can not found yarn active node")
logInfo(s"Current activeRMHAId: $activeRMId")
val appActiveRMKey = HAUtil.addSuffix(addressPrefix, activeRMId)
val hostnameActiveRMKey =
HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, activeRMId)
if (null == HAUtil.getConfValueForRMInstance(
appActiveRMKey,
yarnConf) && null != HAUtil.getConfValueForRMInstance(
hostnameActiveRMKey,
yarnConf)) {
logInfo(s"Find rm web address by : $hostnameActiveRMKey")
hostnameActiveRMKey
} else {
logInfo(s"Find rm web address by : $appActiveRMKey")
appActiveRMKey
}
}
val inetSocketAddress =
conf.getSocketAddr(name, s"0.0.0.0:$defaultPort", defaultPort.toInt)
val address = NetUtils.getConnectAddress(inetSocketAddress)
val buffer = new mutable.StringBuilder(protocol)
val resolved = address.getAddress
if (resolved != null && !resolved.isAnyLocalAddress && !resolved.isLoopbackAddress) {
buffer.append(address.getHostName)
} else {
Try(InetAddress.getLocalHost.getCanonicalHostName) match {
case Success(value) => buffer.append(value)
case _ => buffer.append(address.getHostName)
}
}
buffer
.append(":")
.append(address.getPort)
.toString()
}
logInfo(s"Yarn resourceManager webapp url:$rmHttpURL")
}
}
rmHttpURL
}