in conn.go [1740:1852]
func (c *Conn) executeBatch(ctx context.Context, batch *Batch) *Iter {
if c.version == protoVersion1 {
return &Iter{err: ErrUnsupported}
}
n := len(batch.Entries)
req := &writeBatchFrame{
typ: batch.Type,
statements: make([]batchStatment, n),
consistency: batch.Cons,
serialConsistency: batch.serialCons,
defaultTimestamp: batch.defaultTimestamp,
defaultTimestampValue: batch.defaultTimestampValue,
customPayload: batch.CustomPayload,
}
if c.version > protoVersion4 {
req.keyspace = batch.keyspace
req.nowInSeconds = batch.nowInSeconds
}
usedKeyspace := c.currentKeyspace
if batch.keyspace != "" {
usedKeyspace = batch.keyspace
}
stmts := make(map[string]string, len(batch.Entries))
for i := 0; i < n; i++ {
entry := &batch.Entries[i]
b := &req.statements[i]
if len(entry.Args) > 0 || entry.binding != nil {
info, err := c.prepareStatement(batch.Context(), entry.Stmt, batch.trace, usedKeyspace)
if err != nil {
return &Iter{err: err}
}
var values []interface{}
if entry.binding == nil {
values = entry.Args
} else {
values, err = entry.binding(&QueryInfo{
Id: info.id,
Args: info.request.columns,
Rval: info.response.columns,
PKeyColumns: info.request.pkeyColumns,
})
if err != nil {
return &Iter{err: err}
}
}
if len(values) != info.request.actualColCount {
return &Iter{err: fmt.Errorf("gocql: batch statement %d expected %d values send got %d", i, info.request.actualColCount, len(values))}
}
b.preparedID = info.id
stmts[string(info.id)] = entry.Stmt
b.values = make([]queryValues, info.request.actualColCount)
for j := 0; j < info.request.actualColCount; j++ {
v := &b.values[j]
value := values[j]
typ := info.request.columns[j].TypeInfo
if err := marshalQueryValue(typ, value, v); err != nil {
return &Iter{err: err}
}
}
} else {
b.statement = entry.Stmt
}
}
framer, err := c.exec(batch.Context(), req, batch.trace)
if err != nil {
return &Iter{err: err}
}
resp, err := framer.parseFrame()
if err != nil {
return &Iter{err: err, framer: framer}
}
if len(framer.traceID) > 0 && batch.trace != nil {
batch.trace.Trace(framer.traceID)
}
switch x := resp.(type) {
case *resultVoidFrame:
return &Iter{}
case *RequestErrUnprepared:
stmt, found := stmts[string(x.StatementId)]
if found {
key := c.session.stmtsLRU.keyFor(c.host.HostID(), usedKeyspace, stmt)
c.session.stmtsLRU.evictPreparedID(key, x.StatementId)
}
return c.executeBatch(ctx, batch)
case *resultRowsFrame:
iter := &Iter{
meta: x.meta,
framer: framer,
numRows: x.numRows,
}
return iter
case error:
return &Iter{err: x, framer: framer}
default:
return &Iter{err: NewErrProtocol("Unknown type in response to batch statement: %s", x), framer: framer}
}
}