in core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala [272:303]
def withConnection[A](logPrefix: String)(fun: Connection => Future[A]): Future[A] = {
getConnection(logPrefix).flatMap { connection =>
val startTime = nanoTime()
connection.beginTransaction().asFutureDone().flatMap { _ =>
val result =
try {
fun(connection)
} catch {
case NonFatal(exc) =>
// thrown from statement function
Future.failed(exc)
}
result.failed.foreach { exc =>
if (log.isDebugEnabled())
log.debug("{} - DB call failed: {}", logPrefix: Any, exc.toString: Any)
// ok to rollback async like this, or should it be before completing the returned Future?
rollbackAndClose(connection)
}
result.flatMap { r =>
commitAndClose(connection).map { _ =>
val durationMicros = durationInMicros(startTime)
if (durationMicros >= logDbCallsExceedingMicros)
log.info("{} - DB call completed in [{}] µs", logPrefix, durationMicros)
r
}
}
}
}
}