in spark/client/channel/channel.go [151:235]
func NewBuilder(connection string) (*BaseBuilder, error) {
u, err := url.Parse(connection)
if err != nil {
return nil, err
}
if u.Hostname() == "" {
return nil, sparkerrors.WithType(errors.New("URL must contain a hostname"), sparkerrors.InvalidInputError)
}
if u.Scheme != "sc" {
return nil, sparkerrors.WithType(errors.New("URL schema must be set to `sc`"), sparkerrors.InvalidInputError)
}
port := 15002
host := u.Host
// Check if the host part of the URL contains a port and extract.
if strings.Contains(u.Host, ":") {
// We can ignore the error here already since the url parsing
// raises the error about invalid port.
hostStr, portStr, _ := net.SplitHostPort(u.Host)
host = hostStr
if len(portStr) != 0 {
port, err = strconv.Atoi(portStr)
if err != nil {
return nil, err
}
}
}
// Validate that the URL path is empty or follows the right format.
if u.Path != "" && !strings.HasPrefix(u.Path, "/;") {
return nil, sparkerrors.WithType(
fmt.Errorf("the URL path (%v) must be empty or have a proper parameter syntax", u.Path),
sparkerrors.InvalidInputError)
}
cb := &BaseBuilder{
host: host,
port: port,
headers: map[string]string{},
sessionId: uuid.NewString(),
userAgent: "",
}
elements := strings.Split(u.Path, ";")
for _, e := range elements {
props := strings.Split(e, "=")
if len(props) == 2 {
if props[0] == "token" {
cb.token = props[1]
} else if props[0] == "user_id" {
cb.user = props[1]
} else if props[0] == "session_id" {
cb.sessionId = props[1]
} else if props[0] == "user_agent" {
cb.userAgent = props[1]
} else {
cb.headers[props[0]] = props[1]
}
}
}
// Set default user ID if not set.
if cb.user == "" {
cb.user = os.Getenv("USER")
if cb.user == "" {
cb.user = "na"
}
}
// Update the user agent if it is not set or set to a custom value.
val := os.Getenv("SPARK_CONNECT_USER_AGENT")
if cb.userAgent == "" && val != "" {
cb.userAgent = os.Getenv("SPARK_CONNECT_USER_AGENT")
} else if cb.userAgent == "" {
cb.userAgent = "_SPARK_CONNECT_GO"
}
// In addition, to the specified user agent, we need to append information about the
// host encoded as user agent components.
cb.userAgent = fmt.Sprintf("%s spark/%s os/%s go/%s", cb.userAgent, spark.Version(), runtime.GOOS, runtime.Version())
return cb, nil
}