in spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala [368:506]
protected[hive] def newClientForMetadata(
conf: SparkConf,
hadoopConf: Configuration,
configurations: Map[String, String]): HiveClient = {
val sqlConf = new SQLConf
sqlConf.setConf(SQLContext.getSQLProperties(conf))
val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf)
val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf)
val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf)
val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf)
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
def addLocalHiveJars(file: File): Seq[URL] = {
if (file.getName == "*") {
val files = file.getParentFile.listFiles()
if (files == null) {
logWarning(s"Hive jar path '${file.getPath}' does not exist.")
Nil
} else {
files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).map(_.toURI.toURL).toSeq
}
} else {
file.toURI.toURL :: Nil
}
}
val isolatedLoader = if (hiveMetastoreJars == "builtin") {
if (builtinHiveVersion != hiveMetastoreVersion) {
throw new IllegalArgumentException(
"Builtin jars can only be used when hive execution version == hive metastore version. " +
s"Execution: $builtinHiveVersion != Metastore: $hiveMetastoreVersion. " +
s"Specify a valid path to the correct hive jars using ${HIVE_METASTORE_JARS.key} " +
s"or change ${HIVE_METASTORE_VERSION.key} to $builtinHiveVersion.")
}
// We recursively find all jars in the class loader chain,
// starting from the given classLoader.
def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
case null => Array.empty[URL]
case childFirst: ChildFirstURLClassLoader =>
childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader)
case urlClassLoader: URLClassLoader =>
urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
case other => allJars(other.getParent)
}
val classLoader = Utils.getContextOrSparkClassLoader
val jars: Array[URL] = if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// Do nothing. The system classloader is no longer a URLClassLoader in Java 9,
// so it won't match the case in allJars. It no longer exposes URLs of
// the system classpath
Array.empty[URL]
} else {
val loadedJars = allJars(classLoader)
// Verify at least one jar was found
if (loadedJars.length == 0) {
throw new IllegalArgumentException(
"Unable to locate hive jars to connect to metastore. " +
s"Please set ${HIVE_METASTORE_JARS.key}.")
}
loadedJars
}
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
new IsolatedClientLoader(
version = metaVersion,
sparkConf = conf,
hadoopConf = hadoopConf,
execJars = jars.toSeq,
config = configurations,
isolationOn = !isCliSessionState(),
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "maven") {
// TODO: Support for loading the jars from an already downloaded location.
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = hiveMetastoreVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = conf,
hadoopConf = hadoopConf,
config = configurations,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "path") {
// Convert to files and expand any directories.
val jars =
HiveUtils.hiveMetastoreJarsPath(sqlConf)
.flatMap {
case path if path.contains("\\") && Utils.isWindows =>
addLocalHiveJars(new File(path))
case path =>
DataSource.checkAndGlobPathIfNecessary(
pathStrings = Seq(path),
hadoopConf = hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = false,
enableGlobbing = true
).map(_.toUri.toURL)
}
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
s"using path: ${jars.mkString(";")}")
new IsolatedClientLoader(
version = metaVersion,
sparkConf = conf,
hadoopConf = hadoopConf,
execJars = jars,
config = configurations,
isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else {
// Convert to files and expand any directories.
val jars =
hiveMetastoreJars
.split(File.pathSeparator)
.flatMap { path =>
addLocalHiveJars(new File(path))
}
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
s"using ${jars.mkString(":")}")
new IsolatedClientLoader(
version = metaVersion,
sparkConf = conf,
hadoopConf = hadoopConf,
execJars = jars.toSeq,
config = configurations,
isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
}
isolatedLoader.createClient()
}