in session.go [218:371]
func (s *Session) init() error {
hosts, err := addrsToHosts(s.cfg.Hosts, s.cfg.Port, s.logger)
if err != nil {
return err
}
s.ring.endpoints = hosts
if !s.cfg.disableControlConn {
s.control = createControlConn(s)
if s.cfg.ProtoVersion == 0 {
proto, err := s.control.discoverProtocol(hosts)
if err != nil {
return fmt.Errorf("unable to discover protocol version: %v", err)
} else if proto == 0 {
return errors.New("unable to discovery protocol version")
}
// TODO(zariel): we really only need this in 1 place
s.cfg.ProtoVersion = proto
s.connCfg.ProtoVersion = proto
}
if err := s.control.connect(hosts); err != nil {
return err
}
if !s.cfg.DisableInitialHostLookup {
var partitioner string
newHosts, partitioner, err := s.hostSource.GetHosts()
if err != nil {
return err
}
s.policy.SetPartitioner(partitioner)
filteredHosts := make([]*HostInfo, 0, len(newHosts))
for _, host := range newHosts {
if !s.cfg.filterHost(host) {
filteredHosts = append(filteredHosts, host)
}
}
hosts = filteredHosts
}
}
for _, host := range hosts {
// In case when host lookup is disabled and when we are in unit tests,
// host are not discovered, and we are missing host ID information used
// by internal logic.
// Associate random UUIDs here with all hosts missing this information.
if len(host.HostID()) == 0 {
host.SetHostID(MustRandomUUID().String())
}
}
hostMap := make(map[string]*HostInfo, len(hosts))
for _, host := range hosts {
hostMap[host.HostID()] = host
}
hosts = hosts[:0]
// each host will increment left and decrement it after connecting and once
// there's none left, we'll close hostCh
var left int64
// we will receive up to len(hostMap) of messages so create a buffer so we
// don't end up stuck in a goroutine if we stopped listening
connectedCh := make(chan struct{}, len(hostMap))
// we add one here because we don't want to end up closing hostCh until we're
// done looping and the decerement code might be reached before we've looped
// again
atomic.AddInt64(&left, 1)
for _, host := range hostMap {
host := s.ring.addOrUpdate(host)
if s.cfg.filterHost(host) {
continue
}
atomic.AddInt64(&left, 1)
go func() {
s.pool.addHost(host)
connectedCh <- struct{}{}
// if there are no hosts left, then close the hostCh to unblock the loop
// below if its still waiting
if atomic.AddInt64(&left, -1) == 0 {
close(connectedCh)
}
}()
hosts = append(hosts, host)
}
// once we're done looping we subtract the one we initially added and check
// to see if we should close
if atomic.AddInt64(&left, -1) == 0 {
close(connectedCh)
}
// before waiting for them to connect, add them all to the policy so we can
// utilize efficiencies by calling AddHosts if the policy supports it
type bulkAddHosts interface {
AddHosts([]*HostInfo)
}
if v, ok := s.policy.(bulkAddHosts); ok {
v.AddHosts(hosts)
} else {
for _, host := range hosts {
s.policy.AddHost(host)
}
}
readyPolicy, _ := s.policy.(ReadyPolicy)
// now loop over connectedCh until it's closed (meaning we've connected to all)
// or until the policy says we're ready
for range connectedCh {
if readyPolicy != nil && readyPolicy.Ready() {
break
}
}
// TODO(zariel): we probably dont need this any more as we verify that we
// can connect to one of the endpoints supplied by using the control conn.
// See if there are any connections in the pool
if s.cfg.ReconnectInterval > 0 {
go s.reconnectDownedHosts(s.cfg.ReconnectInterval)
}
// If we disable the initial host lookup, we need to still check if the
// cluster is using the newer system schema or not... however, if control
// connection is disable, we really have no choice, so we just make our
// best guess...
if !s.cfg.disableControlConn && s.cfg.DisableInitialHostLookup {
newer, _ := checkSystemSchema(s.control)
s.useSystemSchema = newer
} else {
version := s.ring.rrHost().Version()
s.useSystemSchema = version.AtLeast(3, 0, 0)
s.hasAggregatesAndFunctions = version.AtLeast(2, 2, 0)
}
if s.pool.Size() == 0 {
return ErrNoConnectionsStarted
}
// Invoke KeyspaceChanged to let the policy cache the session keyspace
// parameters. This is used by tokenAwareHostPolicy to discover replicas.
if !s.cfg.disableControlConn && s.cfg.Keyspace != "" {
s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: s.cfg.Keyspace})
}
s.sessionStateMu.Lock()
s.isInitialized = true
s.sessionStateMu.Unlock()
return nil
}