func()

in conn.go [1493:1690]


func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
	params := queryParams{
		consistency: qry.cons,
	}

	// frame checks that it is not 0
	params.serialConsistency = qry.serialCons
	params.defaultTimestamp = qry.defaultTimestamp
	params.defaultTimestampValue = qry.defaultTimestampValue

	if len(qry.pageState) > 0 {
		params.pagingState = qry.pageState
	}
	if qry.pageSize > 0 {
		params.pageSize = qry.pageSize
	}
	if c.version > protoVersion4 {
		params.keyspace = qry.keyspace
		params.nowInSeconds = qry.nowInSecondsValue
	}

	// If a keyspace for the qry is overriden,
	// then we should use it to create stmt cache key
	usedKeyspace := c.currentKeyspace
	if qry.keyspace != "" {
		usedKeyspace = qry.keyspace
	}

	var (
		frame frameBuilder
		info  *preparedStatment
	)

	if !qry.skipPrepare && qry.shouldPrepare() {
		// Prepare all DML queries. Other queries can not be prepared.
		var err error
		info, err = c.prepareStatement(ctx, qry.stmt, qry.trace, usedKeyspace)
		if err != nil {
			return &Iter{err: err}
		}

		values := qry.values
		if qry.binding != nil {
			values, err = qry.binding(&QueryInfo{
				Id:          info.id,
				Args:        info.request.columns,
				Rval:        info.response.columns,
				PKeyColumns: info.request.pkeyColumns,
			})

			if err != nil {
				return &Iter{err: err}
			}
		}

		if len(values) != info.request.actualColCount {
			return &Iter{err: fmt.Errorf("gocql: expected %d values send got %d", info.request.actualColCount, len(values))}
		}

		params.values = make([]queryValues, len(values))
		for i := 0; i < len(values); i++ {
			v := &params.values[i]
			value := values[i]
			typ := info.request.columns[i].TypeInfo
			if err := marshalQueryValue(typ, value, v); err != nil {
				return &Iter{err: err}
			}
		}

		// if the metadata was not present in the response then we should not skip it
		params.skipMeta = !(c.session.cfg.DisableSkipMetadata || qry.disableSkipMetadata) && info != nil && info.response.flags&flagNoMetaData == 0

		frame = &writeExecuteFrame{
			preparedID:       info.id,
			params:           params,
			customPayload:    qry.customPayload,
			resultMetadataID: info.resultMetadataID,
		}

		// Set "keyspace" and "table" property in the query if it is present in preparedMetadata
		qry.routingInfo.mu.Lock()
		qry.routingInfo.keyspace = info.request.keyspace
		if info.request.keyspace == "" {
			qry.routingInfo.keyspace = usedKeyspace
		}
		qry.routingInfo.table = info.request.table
		qry.routingInfo.mu.Unlock()
	} else {
		frame = &writeQueryFrame{
			statement:     qry.stmt,
			params:        params,
			customPayload: qry.customPayload,
		}
	}

	framer, err := c.exec(ctx, frame, qry.trace)
	if err != nil {
		return &Iter{err: err}
	}

	resp, err := framer.parseFrame()
	if err != nil {
		return &Iter{err: err}
	}

	if len(framer.traceID) > 0 && qry.trace != nil {
		qry.trace.Trace(framer.traceID)
	}

	switch x := resp.(type) {
	case *resultVoidFrame:
		return &Iter{framer: framer}
	case *resultRowsFrame:
		if x.meta.newMetadataID != nil {
			// If a RESULT/Rows message reports
			//      changed resultset metadata with the Metadata_changed flag, the reported new
			//      resultset metadata must be used in subsequent executions
			stmtCacheKey := c.session.stmtsLRU.keyFor(c.host.HostID(), usedKeyspace, qry.stmt)
			oldInflight, ok := c.session.stmtsLRU.get(stmtCacheKey)
			if ok {
				newInflight := &inflightPrepare{
					done: make(chan struct{}),
					preparedStatment: &preparedStatment{
						id:               oldInflight.preparedStatment.id,
						resultMetadataID: x.meta.newMetadataID,
						request:          oldInflight.preparedStatment.request,
						response:         x.meta,
					},
				}
				// The driver should close this done to avoid deadlocks of
				// other subsequent requests
				close(newInflight.done)
				c.session.stmtsLRU.add(stmtCacheKey, newInflight)
				// Updating info to ensure the code is looking at the updated
				// version of the prepared statement
				info = newInflight.preparedStatment
			}
		}

		iter := &Iter{
			meta:    x.meta,
			framer:  framer,
			numRows: x.numRows,
		}

		if x.meta.noMetaData() {
			if info != nil {
				iter.meta = info.response
				iter.meta.pagingState = copyBytes(x.meta.pagingState)
			} else {
				return &Iter{framer: framer, err: errors.New("gocql: did not receive metadata but prepared info is nil")}
			}
		} else {
			iter.meta = x.meta
		}

		if x.meta.morePages() && !qry.disableAutoPage {
			newQry := new(Query)
			*newQry = *qry
			newQry.pageState = copyBytes(x.meta.pagingState)
			newQry.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}

			iter.next = &nextIter{
				qry: newQry,
				pos: int((1 - qry.prefetch) * float64(x.numRows)),
			}

			if iter.next.pos < 1 {
				iter.next.pos = 1
			}
		}

		return iter
	case *resultKeyspaceFrame:
		return &Iter{framer: framer}
	case *schemaChangeKeyspace, *schemaChangeTable, *schemaChangeFunction, *schemaChangeAggregate, *schemaChangeType:
		iter := &Iter{framer: framer}
		if err := c.awaitSchemaAgreement(ctx); err != nil {
			// TODO: should have this behind a flag
			c.logger.Println(err)
		}
		// dont return an error from this, might be a good idea to give a warning
		// though. The impact of this returning an error would be that the cluster
		// is not consistent with regards to its schema.
		return iter
	case *RequestErrUnprepared:
		stmtCacheKey := c.session.stmtsLRU.keyFor(c.host.HostID(), usedKeyspace, qry.stmt)
		c.session.stmtsLRU.evictPreparedID(stmtCacheKey, x.StatementId)
		return c.executeQuery(ctx, qry)
	case error:
		return &Iter{err: x, framer: framer}
	default:
		return &Iter{
			err:    NewErrProtocol("Unknown type in response to execute query (%T): %s", x, x),
			framer: framer,
		}
	}
}