in elastictransport/discovery.go [53:126]
func (c *Client) DiscoverNodes() error {
var conns []*Connection
nodes, err := c.getNodesInfo()
if err != nil {
if debugLogger != nil {
debugLogger.Logf("Error getting nodes info: %s\n", err)
}
return fmt.Errorf("discovery: get nodes: %s", err)
}
for _, node := range nodes {
var (
isMasterOnlyNode bool
)
roles := append(node.Roles[:0:0], node.Roles...)
sort.Strings(roles)
if len(roles) == 1 && roles[0] == "master" {
isMasterOnlyNode = true
}
if debugLogger != nil {
var skip string
if isMasterOnlyNode {
skip = "; [SKIP]"
}
debugLogger.Logf("Discovered node [%s]; %s; roles=%s%s\n", node.Name, node.URL, node.Roles, skip)
}
// Skip master only nodes
// TODO(karmi): Move logic to Selector?
if isMasterOnlyNode {
continue
}
conns = append(conns, &Connection{
URL: node.URL,
ID: node.ID,
Name: node.Name,
Roles: node.Roles,
Attributes: node.Attributes,
})
}
c.Lock()
defer c.Unlock()
if lockable, ok := c.pool.(sync.Locker); ok {
lockable.Lock()
defer lockable.Unlock()
}
if c.poolFunc != nil {
c.pool = c.poolFunc(conns, c.selector)
} else {
if p, ok := c.pool.(UpdatableConnectionPool); ok {
err = p.Update(conns)
if err != nil {
if debugLogger != nil {
debugLogger.Logf("Error updating pool: %s\n", err)
}
}
} else {
c.pool, err = NewConnectionPool(conns, c.selector)
if err != nil {
return err
}
}
}
return nil
}