in spark/sql/sparksession.go [74:106]
func (s *SparkSessionBuilder) Build(ctx context.Context) (SparkSession, error) {
if s.channelBuilder == nil {
cb, err := channel.NewBuilder(s.connectionString)
if err != nil {
return nil, sparkerrors.WithType(fmt.Errorf(
"failed to connect to remote %s: %w", s.connectionString, err), sparkerrors.ConnectionError)
}
s.channelBuilder = cb
}
conn, err := s.channelBuilder.Build(ctx)
if err != nil {
return nil, sparkerrors.WithType(fmt.Errorf("failed to connect to remote %s: %w",
s.connectionString, err), sparkerrors.ConnectionError)
}
// Add metadata to the request.
meta := metadata.MD{}
for k, v := range s.channelBuilder.Headers() {
meta[k] = append(meta[k], v)
}
sessionId := uuid.NewString()
// Update the options according to the configuration.
opts := options.NewSparkClientOptions(options.DefaultSparkClientOptions.ReattachExecution)
opts.UserAgent = s.channelBuilder.UserAgent()
opts.UserId = s.channelBuilder.User()
return &sparkSessionImpl{
sessionId: sessionId,
client: client.NewSparkExecutor(conn, meta, sessionId, opts),
}, nil
}