func()

in conn.go [1898:1970]


func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
	const localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"

	var versions map[string]struct{}
	var schemaVersion string

	endDeadline := time.Now().Add(c.session.cfg.MaxWaitSchemaAgreement)

	for time.Now().Before(endDeadline) {
		iter := c.querySystemPeers(ctx, c.host.version)

		versions = make(map[string]struct{})

		rows, err := iter.SliceMap()
		if err != nil {
			goto cont
		}

		for _, row := range rows {
			h, err := NewHostInfo(c.host.ConnectAddress(), c.session.cfg.Port)
			if err != nil {
				goto cont
			}
			host, err := c.session.hostInfoFromMap(row, h)
			if err != nil {
				goto cont
			}
			if !isValidPeer(host) || host.schemaVersion == "" {
				c.logger.Printf("invalid peer or peer with empty schema_version: peer=%q", host)
				continue
			}

			versions[host.schemaVersion] = struct{}{}
		}

		if err = iter.Close(); err != nil {
			goto cont
		}

		iter = c.query(ctx, localSchemas)
		for iter.Scan(&schemaVersion) {
			versions[schemaVersion] = struct{}{}
			schemaVersion = ""
		}

		if err = iter.Close(); err != nil {
			goto cont
		}

		if len(versions) <= 1 {
			return nil
		}

	cont:
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-time.After(200 * time.Millisecond):
		}
	}

	if err != nil {
		return err
	}

	schemas := make([]string, 0, len(versions))
	for schema := range versions {
		schemas = append(schemas, schema)
	}

	// not exported
	return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", schemas)
}