override suspend fun collect()

in exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/Query.kt [173:220]


            override suspend fun collect(collector: FlowCollector<Flow<ResultRow>>) {
                var lastOffset = if (fetchInAscendingOrder) 0L else null
                while (true) {
                    val query = this@Query.copy().adjustWhere {
                        lastOffset?.let { lastOffset ->
                            whereOp and if (fetchInAscendingOrder) {
                                when (autoIncColumn.columnType) {
                                    is EntityIDColumnType<*> -> {
                                        (autoIncColumn as? Column<EntityID<Long>>)?.let {
                                            (it greater lastOffset)
                                        } ?: (autoIncColumn as? Column<EntityID<Int>>)?.let {
                                            (it greater lastOffset.toInt())
                                        } ?: (autoIncColumn greater lastOffset)
                                    }
                                    else -> (autoIncColumn greater lastOffset)
                                }
                            } else {
                                when (autoIncColumn.columnType) {
                                    is EntityIDColumnType<*> -> {
                                        (autoIncColumn as? Column<EntityID<Long>>)?.let {
                                            (it less lastOffset)
                                        } ?: (autoIncColumn as? Column<EntityID<Int>>)?.let {
                                            (it less lastOffset.toInt())
                                        } ?: (autoIncColumn less lastOffset)
                                    }
                                    else -> (autoIncColumn less lastOffset)
                                }
                            }
                        } ?: whereOp
                    }

                    var resultCount = 0
                    var lastResult: ResultRow? = null

                    val results = flow { query.collect(this) }.onEach {
                        resultCount++
                        lastResult = it
                    }

                    collector.emit(results)

                    if (resultCount < batchSize) break

                    lastResult?.let {
                        lastOffset = toLong(it[autoIncColumn]!!)
                    }
                }
            }