in core/src/main/scala/org/apache/spark/eventhubs/ConnectionStringBuilder.scala [303:391]
private def parseConnectionString(connectionString: String): Unit = {
if (StringUtil.isNullOrWhiteSpace(connectionString)) {
throw new IllegalConnectionStringFormatException("connectionString cannot be empty")
}
val connection = if (connectionString takeRight 1 equals ";") {
KeyValuePairDelimiter + connectionString dropRight 1
} else {
KeyValuePairDelimiter + connectionString
}
val keyValuePattern = Pattern.compile(KeysWithDelimitersRegex, Pattern.CASE_INSENSITIVE)
val values = keyValuePattern.split(connection)
val keys = keyValuePattern.matcher(connection)
if (values == null || values.length <= 1 || keys.groupCount == 0) {
throw new IllegalConnectionStringFormatException("Connection String cannot be parsed.")
}
if (!StringUtil.isNullOrWhiteSpace(values(0))) {
throw new IllegalConnectionStringFormatException(
String.format(Locale.US, "Cannot parse part of ConnectionString: %s", values(0)))
}
var valueIndex: Int = 0
while (keys.find) {
valueIndex += 1
var key = keys.group
key = key.substring(1, key.length - 1)
if (values.length < valueIndex + 1) {
throw new IllegalConnectionStringFormatException(
s"Value for the connection string parameter name: $key, not found")
}
if (key.equalsIgnoreCase(EndpointConfigName)) {
if (this.endpoint != null) {
// we have parsed the endpoint once, which means we have multiple config which is not allowed
throw new IllegalConnectionStringFormatException(
s"Multiple $EndpointConfigName and/or $HostnameConfigName detected. Make sure only one is defined"
)
}
try {
this.endpoint = new URI(values(valueIndex))
} catch {
case e: URISyntaxException =>
throw new IllegalConnectionStringFormatException(
s"$EndpointConfigName should be in format sb://<namespaceName>.<domainName>",
e)
}
} else if (key.equalsIgnoreCase(HostnameConfigName)) {
if (this.endpoint != null) {
throw new IllegalConnectionStringFormatException(
s"Multiple $EndpointConfigName and/or $HostnameConfigName detected. Make sure only one is defined"
)
}
try {
this.endpoint = new URI(s"$DefaultProtocol${values(valueIndex)}")
} catch {
case e: URISyntaxException =>
throw new IllegalConnectionStringFormatException(
s"$HostnameConfigName should be a fully quantified host name address",
e)
}
} else if (key.equalsIgnoreCase(SharedAccessKeyNameConfigName)) {
this.sharedAccessKeyName = values(valueIndex)
} else if (key.equalsIgnoreCase(SharedAccessKeyConfigName)) {
this.sharedAccessKey = values(valueIndex)
} else if (key.equalsIgnoreCase(SharedAccessSignatureConfigName)) {
this.sharedAccessSignature = values(valueIndex)
} else if (key.equalsIgnoreCase(EntityPathConfigName)) {
this.eventHubName = values(valueIndex)
} else if (key.equalsIgnoreCase(OperationTimeoutConfigName)) {
try {
this.operationTimeout = Duration.parse(values(valueIndex))
} catch {
case e: DateTimeParseException =>
throw new IllegalConnectionStringFormatException(
"Invalid value specified for property 'Duration' in the ConnectionString.",
e)
}
} else {
throw new IllegalConnectionStringFormatException(
s"Illegal connection string parameter name: $key")
}
}
}