func()

in session.go [601:744]


func (s *Session) routingKeyInfo(ctx context.Context, stmt string, keyspace string) (*routingKeyInfo, error) {
	if keyspace == "" {
		keyspace = s.cfg.Keyspace
	}

	routingKeyInfoCacheKey := keyspace + stmt

	s.routingKeyInfoCache.mu.Lock()

	// Using here keyspace + stmt as a cache key because
	// the query keyspace could be overridden via SetKeyspace
	entry, cached := s.routingKeyInfoCache.lru.Get(routingKeyInfoCacheKey)
	if cached {
		// done accessing the cache
		s.routingKeyInfoCache.mu.Unlock()
		// the entry is an inflight struct similar to that used by
		// Conn to prepare statements
		inflight := entry.(*inflightCachedEntry)

		// wait for any inflight work
		inflight.wg.Wait()

		if inflight.err != nil {
			return nil, inflight.err
		}

		key, _ := inflight.value.(*routingKeyInfo)

		return key, nil
	}

	// create a new inflight entry while the data is created
	inflight := new(inflightCachedEntry)
	inflight.wg.Add(1)
	defer inflight.wg.Done()
	s.routingKeyInfoCache.lru.Add(routingKeyInfoCacheKey, inflight)
	s.routingKeyInfoCache.mu.Unlock()

	var (
		info         *preparedStatment
		partitionKey []*ColumnMetadata
	)

	conn := s.getConn()
	if conn == nil {
		// TODO: better error?
		inflight.err = errors.New("gocql: unable to fetch prepared info: no connection available")
		return nil, inflight.err
	}

	// get the query info for the statement
	info, inflight.err = conn.prepareStatement(ctx, stmt, nil, keyspace)
	if inflight.err != nil {
		// don't cache this error
		s.routingKeyInfoCache.Remove(stmt)
		return nil, inflight.err
	}

	// TODO: it would be nice to mark hosts here but as we are not using the policies
	// to fetch hosts we cant

	if info.request.colCount == 0 {
		// no arguments, no routing key, and no error
		return nil, nil
	}

	table := info.request.table
	if info.request.keyspace != "" {
		keyspace = info.request.keyspace
	}

	if len(info.request.pkeyColumns) > 0 {
		// proto v4 dont need to calculate primary key columns
		types := make([]TypeInfo, len(info.request.pkeyColumns))
		for i, col := range info.request.pkeyColumns {
			types[i] = info.request.columns[col].TypeInfo
		}

		routingKeyInfo := &routingKeyInfo{
			indexes:  info.request.pkeyColumns,
			types:    types,
			keyspace: keyspace,
			table:    table,
		}

		inflight.value = routingKeyInfo
		return routingKeyInfo, nil
	}

	var keyspaceMetadata *KeyspaceMetadata
	keyspaceMetadata, inflight.err = s.KeyspaceMetadata(info.request.columns[0].Keyspace)
	if inflight.err != nil {
		// don't cache this error
		s.routingKeyInfoCache.Remove(stmt)
		return nil, inflight.err
	}

	tableMetadata, found := keyspaceMetadata.Tables[table]
	if !found {
		// unlikely that the statement could be prepared and the metadata for
		// the table couldn't be found, but this may indicate either a bug
		// in the metadata code, or that the table was just dropped.
		inflight.err = ErrNoMetadata
		// don't cache this error
		s.routingKeyInfoCache.Remove(stmt)
		return nil, inflight.err
	}

	partitionKey = tableMetadata.PartitionKey

	size := len(partitionKey)
	routingKeyInfo := &routingKeyInfo{
		indexes:  make([]int, size),
		types:    make([]TypeInfo, size),
		keyspace: keyspace,
		table:    table,
	}

	for keyIndex, keyColumn := range partitionKey {
		// set an indicator for checking if the mapping is missing
		routingKeyInfo.indexes[keyIndex] = -1

		// find the column in the query info
		for argIndex, boundColumn := range info.request.columns {
			if keyColumn.Name == boundColumn.Name {
				// there may be many such bound columns, pick the first
				routingKeyInfo.indexes[keyIndex] = argIndex
				routingKeyInfo.types[keyIndex] = boundColumn.TypeInfo
				break
			}
		}

		if routingKeyInfo.indexes[keyIndex] == -1 {
			// missing a routing key column mapping
			// no routing key, and no error
			return nil, nil
		}
	}

	// cache this result
	inflight.value = routingKeyInfo

	return routingKeyInfo, nil
}