in core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala [229:259]
def select[A](
logPrefix: String)(statement: Connection => Statement, mapRow: Row => A): Future[immutable.IndexedSeq[A]] = {
getConnection(logPrefix).flatMap { connection =>
val startTime = nanoTime()
val mappedRows =
try {
val boundStmt = statement(connection)
selectInTx(boundStmt, mapRow)
} catch {
case NonFatal(exc) =>
// thrown from statement function
Future.failed(exc)
}
mappedRows.failed.foreach { exc =>
log.debug("{} - Select failed: {}", logPrefix: Any, exc: Any)
connection.close().asFutureDone()
}
mappedRows.flatMap { r =>
connection.close().asFutureDone().map { _ =>
val durationMicros = durationInMicros(startTime)
if (durationMicros >= logDbCallsExceedingMicros)
log.info("{} - Selected [{}] rows in [{}] µs", logPrefix, r.size: java.lang.Integer,
durationMicros: java.lang.Long)
r
}
}
}
}