in exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/Query.kt [173:220]
override suspend fun collect(collector: FlowCollector<Flow<ResultRow>>) {
var lastOffset = if (fetchInAscendingOrder) 0L else null
while (true) {
val query = this@Query.copy().adjustWhere {
lastOffset?.let { lastOffset ->
whereOp and if (fetchInAscendingOrder) {
when (autoIncColumn.columnType) {
is EntityIDColumnType<*> -> {
(autoIncColumn as? Column<EntityID<Long>>)?.let {
(it greater lastOffset)
} ?: (autoIncColumn as? Column<EntityID<Int>>)?.let {
(it greater lastOffset.toInt())
} ?: (autoIncColumn greater lastOffset)
}
else -> (autoIncColumn greater lastOffset)
}
} else {
when (autoIncColumn.columnType) {
is EntityIDColumnType<*> -> {
(autoIncColumn as? Column<EntityID<Long>>)?.let {
(it less lastOffset)
} ?: (autoIncColumn as? Column<EntityID<Int>>)?.let {
(it less lastOffset.toInt())
} ?: (autoIncColumn less lastOffset)
}
else -> (autoIncColumn less lastOffset)
}
}
} ?: whereOp
}
var resultCount = 0
var lastResult: ResultRow? = null
val results = flow { query.collect(this) }.onEach {
resultCount++
lastResult = it
}
collector.emit(results)
if (resultCount < batchSize) break
lastResult?.let {
lastOffset = toLong(it[autoIncColumn]!!)
}
}
}