in conn.go [1898:1970]
func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
const localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
var versions map[string]struct{}
var schemaVersion string
endDeadline := time.Now().Add(c.session.cfg.MaxWaitSchemaAgreement)
for time.Now().Before(endDeadline) {
iter := c.querySystemPeers(ctx, c.host.version)
versions = make(map[string]struct{})
rows, err := iter.SliceMap()
if err != nil {
goto cont
}
for _, row := range rows {
h, err := NewHostInfo(c.host.ConnectAddress(), c.session.cfg.Port)
if err != nil {
goto cont
}
host, err := c.session.hostInfoFromMap(row, h)
if err != nil {
goto cont
}
if !isValidPeer(host) || host.schemaVersion == "" {
c.logger.Printf("invalid peer or peer with empty schema_version: peer=%q", host)
continue
}
versions[host.schemaVersion] = struct{}{}
}
if err = iter.Close(); err != nil {
goto cont
}
iter = c.query(ctx, localSchemas)
for iter.Scan(&schemaVersion) {
versions[schemaVersion] = struct{}{}
schemaVersion = ""
}
if err = iter.Close(); err != nil {
goto cont
}
if len(versions) <= 1 {
return nil
}
cont:
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(200 * time.Millisecond):
}
}
if err != nil {
return err
}
schemas := make([]string, 0, len(versions))
for schema := range versions {
schemas = append(schemas, schema)
}
// not exported
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", schemas)
}