in query_executor.go [150:229]
func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter NextHost) *Iter {
selectedHost := hostIter()
rt := qry.retryPolicy()
var lastErr error
var iter *Iter
for selectedHost != nil {
host := selectedHost.Info()
if host == nil || !host.IsUp() {
selectedHost = hostIter()
continue
}
pool, ok := q.pool.getPool(host)
if !ok {
selectedHost = hostIter()
continue
}
conn := pool.Pick()
if conn == nil {
selectedHost = hostIter()
continue
}
iter = q.attemptQuery(ctx, qry, conn)
iter.host = selectedHost.Info()
// Update host
switch iter.err {
case context.Canceled, context.DeadlineExceeded, ErrNotFound:
// those errors represents logical errors, they should not count
// toward removing a node from the pool
selectedHost.Mark(nil)
return iter
default:
selectedHost.Mark(iter.err)
}
// Exit if the query was successful
// or query is not idempotent or no retry policy defined
if iter.err == nil || !qry.IsIdempotent() || rt == nil {
return iter
}
attemptsReached := !rt.Attempt(qry)
retryType := rt.GetRetryType(iter.err)
var stopRetries bool
// If query is unsuccessful, check the error with RetryPolicy to retry
switch retryType {
case Retry:
// retry on the same host
case RetryNextHost:
// retry on the next host
selectedHost = hostIter()
case Ignore:
iter.err = nil
stopRetries = true
case Rethrow:
stopRetries = true
default:
// Undefined? Return nil and error, this will panic in the requester
return &Iter{err: ErrUnknownRetryType}
}
if stopRetries || attemptsReached {
return iter
}
lastErr = iter.err
continue
}
if lastErr != nil {
return &Iter{err: lastErr}
}
return &Iter{err: ErrNoConnections}
}