in conn.go [1493:1690]
func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
params := queryParams{
consistency: qry.cons,
}
// frame checks that it is not 0
params.serialConsistency = qry.serialCons
params.defaultTimestamp = qry.defaultTimestamp
params.defaultTimestampValue = qry.defaultTimestampValue
if len(qry.pageState) > 0 {
params.pagingState = qry.pageState
}
if qry.pageSize > 0 {
params.pageSize = qry.pageSize
}
if c.version > protoVersion4 {
params.keyspace = qry.keyspace
params.nowInSeconds = qry.nowInSecondsValue
}
// If a keyspace for the qry is overriden,
// then we should use it to create stmt cache key
usedKeyspace := c.currentKeyspace
if qry.keyspace != "" {
usedKeyspace = qry.keyspace
}
var (
frame frameBuilder
info *preparedStatment
)
if !qry.skipPrepare && qry.shouldPrepare() {
// Prepare all DML queries. Other queries can not be prepared.
var err error
info, err = c.prepareStatement(ctx, qry.stmt, qry.trace, usedKeyspace)
if err != nil {
return &Iter{err: err}
}
values := qry.values
if qry.binding != nil {
values, err = qry.binding(&QueryInfo{
Id: info.id,
Args: info.request.columns,
Rval: info.response.columns,
PKeyColumns: info.request.pkeyColumns,
})
if err != nil {
return &Iter{err: err}
}
}
if len(values) != info.request.actualColCount {
return &Iter{err: fmt.Errorf("gocql: expected %d values send got %d", info.request.actualColCount, len(values))}
}
params.values = make([]queryValues, len(values))
for i := 0; i < len(values); i++ {
v := ¶ms.values[i]
value := values[i]
typ := info.request.columns[i].TypeInfo
if err := marshalQueryValue(typ, value, v); err != nil {
return &Iter{err: err}
}
}
// if the metadata was not present in the response then we should not skip it
params.skipMeta = !(c.session.cfg.DisableSkipMetadata || qry.disableSkipMetadata) && info != nil && info.response.flags&flagNoMetaData == 0
frame = &writeExecuteFrame{
preparedID: info.id,
params: params,
customPayload: qry.customPayload,
resultMetadataID: info.resultMetadataID,
}
// Set "keyspace" and "table" property in the query if it is present in preparedMetadata
qry.routingInfo.mu.Lock()
qry.routingInfo.keyspace = info.request.keyspace
if info.request.keyspace == "" {
qry.routingInfo.keyspace = usedKeyspace
}
qry.routingInfo.table = info.request.table
qry.routingInfo.mu.Unlock()
} else {
frame = &writeQueryFrame{
statement: qry.stmt,
params: params,
customPayload: qry.customPayload,
}
}
framer, err := c.exec(ctx, frame, qry.trace)
if err != nil {
return &Iter{err: err}
}
resp, err := framer.parseFrame()
if err != nil {
return &Iter{err: err}
}
if len(framer.traceID) > 0 && qry.trace != nil {
qry.trace.Trace(framer.traceID)
}
switch x := resp.(type) {
case *resultVoidFrame:
return &Iter{framer: framer}
case *resultRowsFrame:
if x.meta.newMetadataID != nil {
// If a RESULT/Rows message reports
// changed resultset metadata with the Metadata_changed flag, the reported new
// resultset metadata must be used in subsequent executions
stmtCacheKey := c.session.stmtsLRU.keyFor(c.host.HostID(), usedKeyspace, qry.stmt)
oldInflight, ok := c.session.stmtsLRU.get(stmtCacheKey)
if ok {
newInflight := &inflightPrepare{
done: make(chan struct{}),
preparedStatment: &preparedStatment{
id: oldInflight.preparedStatment.id,
resultMetadataID: x.meta.newMetadataID,
request: oldInflight.preparedStatment.request,
response: x.meta,
},
}
// The driver should close this done to avoid deadlocks of
// other subsequent requests
close(newInflight.done)
c.session.stmtsLRU.add(stmtCacheKey, newInflight)
// Updating info to ensure the code is looking at the updated
// version of the prepared statement
info = newInflight.preparedStatment
}
}
iter := &Iter{
meta: x.meta,
framer: framer,
numRows: x.numRows,
}
if x.meta.noMetaData() {
if info != nil {
iter.meta = info.response
iter.meta.pagingState = copyBytes(x.meta.pagingState)
} else {
return &Iter{framer: framer, err: errors.New("gocql: did not receive metadata but prepared info is nil")}
}
} else {
iter.meta = x.meta
}
if x.meta.morePages() && !qry.disableAutoPage {
newQry := new(Query)
*newQry = *qry
newQry.pageState = copyBytes(x.meta.pagingState)
newQry.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}
iter.next = &nextIter{
qry: newQry,
pos: int((1 - qry.prefetch) * float64(x.numRows)),
}
if iter.next.pos < 1 {
iter.next.pos = 1
}
}
return iter
case *resultKeyspaceFrame:
return &Iter{framer: framer}
case *schemaChangeKeyspace, *schemaChangeTable, *schemaChangeFunction, *schemaChangeAggregate, *schemaChangeType:
iter := &Iter{framer: framer}
if err := c.awaitSchemaAgreement(ctx); err != nil {
// TODO: should have this behind a flag
c.logger.Println(err)
}
// dont return an error from this, might be a good idea to give a warning
// though. The impact of this returning an error would be that the cluster
// is not consistent with regards to its schema.
return iter
case *RequestErrUnprepared:
stmtCacheKey := c.session.stmtsLRU.keyFor(c.host.HostID(), usedKeyspace, qry.stmt)
c.session.stmtsLRU.evictPreparedID(stmtCacheKey, x.StatementId)
return c.executeQuery(ctx, qry)
case error:
return &Iter{err: x, framer: framer}
default:
return &Iter{
err: NewErrProtocol("Unknown type in response to execute query (%T): %s", x, x),
framer: framer,
}
}
}