in app/services/migrationcomponents/DBObjectSource.scala [19:61]
override def shape: SourceShape[T] = SourceShape.of(out)
protected def getNextPage(recordsRead:Int)(implicit db:PostgresProfile#Backend#Database):Future[Seq[T]]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var internalBuffer:scala.collection.Seq[T] = Seq()
private var recordsRead = 0
private implicit val db = dbConfig.get[PostgresProfile].db
val pushNextRecordCb = createAsyncCallback[Unit](_=>internalBuffer.headOption match {
case Some(rec)=>
this.synchronized {
internalBuffer = internalBuffer.tail
}
push(out, rec)
case None=>
logger.info("Scanned out all commissions")
complete(out)
})
val errCb = createAsyncCallback[Throwable](err=>failStage(err))
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = {
internalBuffer.headOption match {
case Some(_)=>
pushNextRecordCb.invoke( () )
case None=>
getNextPage(recordsRead).onComplete({
case Success(newRecords)=>
this.synchronized {
internalBuffer = internalBuffer ++ newRecords
recordsRead += newRecords.length
}
pushNextRecordCb.invoke(())
case Failure(err)=>
logger.error("Could not retrieve more records: ", err)
errCb.invoke(err)
})
}
}
})
}