in spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala [109:165]
def params(parameters: Map[String, String], logger: Logger) = {
// '.' seems to be problematic when specifying the options
val dottedParams = parameters.map { case (k, v) =>
if (k.startsWith("sink.properties.") || k.startsWith("doris.sink.properties.")){
(k,v)
}else {
(k.replace('_', '.'), v)
}
}
val preferredTableIdentifier = dottedParams.get(ConfigurationOptions.DORIS_TABLE_IDENTIFIER)
.orElse(dottedParams.get(ConfigurationOptions.TABLE_IDENTIFIER))
logger.debug(s"preferred Table Identifier is '$preferredTableIdentifier'.")
// Convert simple parameters into internal properties, and prefix other parameters
// Convert password parameters from "password" into internal password properties
// reuse credentials mask method in spark ExternalCatalogUtils#maskCredentials
val processedParams = dottedParams.map {
case (ConfigurationOptions.DORIS_PASSWORD, _) =>
logger.error(s"${ConfigurationOptions.DORIS_PASSWORD} cannot use in Doris Datasource.")
throw new DorisException(s"${ConfigurationOptions.DORIS_PASSWORD} cannot use in Doris Datasource," +
s" use 'password' option to set password.")
case (ConfigurationOptions.DORIS_USER, _) =>
logger.error(s"${ConfigurationOptions.DORIS_USER} cannot use in Doris Datasource.")
throw new DorisException(s"${ConfigurationOptions.DORIS_USER} cannot use in Doris Datasource," +
s" use 'user' option to set user.")
case (k, v) =>
if (k.startsWith("doris.")) (k, v)
else ("doris." + k, v)
}.map{
case (ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD, _) =>
logger.error(s"${ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD} cannot use in Doris Datasource.")
throw new DorisException(s"${ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD} cannot use in" +
s" Doris Datasource, use 'password' option to set password.")
case (ConfigurationOptions.DORIS_REQUEST_AUTH_USER, _) =>
logger.error(s"${ConfigurationOptions.DORIS_REQUEST_AUTH_USER} cannot use in Doris Datasource.")
throw new DorisException(s"${ConfigurationOptions.DORIS_REQUEST_AUTH_USER} cannot use in" +
s" Doris Datasource, use 'user' option to set user.")
case (ConfigurationOptions.DORIS_PASSWORD, v) =>
(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD, v)
case (ConfigurationOptions.DORIS_USER, v) =>
(ConfigurationOptions.DORIS_REQUEST_AUTH_USER, v)
case (k, v) => (k, v)
}
// Set the preferred resource if it was specified originally
val finalParams = preferredTableIdentifier match {
case Some(tableIdentifier) => processedParams + (ConfigurationOptions.DORIS_TABLE_IDENTIFIER -> tableIdentifier)
case None => processedParams
}
// validate path is available
finalParams.getOrElse(ConfigurationOptions.DORIS_TABLE_IDENTIFIER,
throw new DorisException("table identifier must be specified for doris table identifier."))
finalParams
}