in flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala [75:118]
def startServiceOnPort[T](
startPort: Int,
startService: Int => T,
maxRetries: Int = 128,
serviceName: String = ""): T = {
if (startPort != 0 && (startPort < 1024 || startPort > 65536)) {
throw new Exception("startPort should be between 1024 and 65535 (inclusive), " +
"or 0 for a random free port.")
}
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
for (offset <- 0 to maxRetries) {
// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) {
startPort
} else {
// If the new port wraps around, do not try a privilege port
((startPort + offset - 1024) % (65536 - 1024)) + 1024
}
try {
val result = startService(tryPort)
logger.info(s"Successfully started service$serviceString, result:$result.")
return result
} catch {
case e: Exception if isBindCollision(e) =>
if (offset >= maxRetries) {
val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after " +
s"$maxRetries retries! Consider explicitly setting the appropriate port for the " +
s"service$serviceString (for example spark.ui.port for SparkUI) to an available " +
"port or increasing spark.port.maxRetries."
val exception = new BindException(exceptionMessage)
// restore original stack trace
exception.setStackTrace(e.getStackTrace)
throw exception
}
logger.error(s"Service$serviceString could not bind on port $tryPort. " +
s"Attempting port ${tryPort + 1}.")
}
}
// Should never happen
throw new Exception(s"Failed to start service$serviceString on port $startPort")
}