in app/streamcomponents/ProjectSearchSource.scala [20:64]
override def shape = SourceShape.of(out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
private val logger = LoggerFactory.getLogger(getClass)
private val dbConfig = dbConfigProvider.get[PostgresProfile]
private var cache:List[T] = List()
private var resultCounter = 0
setHandler(out, new AbstractOutHandler(){
val nextResultCb = createAsyncCallback[T](entry=>push(out,entry))
val failureCb = createAsyncCallback[Throwable](err=>fail(out, err))
val completionCb = createAsyncCallback[Unit](_=>complete(out))
override def onPull() = {
if(cache.isEmpty) { //cache is empty, we need to pull more results from the database
logger.debug("empty cache, fetching more results")
dbConfig.db.run(queryFunc.drop(resultCounter).take(pageSize).result).map(results=>{
logger.debug(s"ProjectEntry search returned ${results.length} more items")
resultCounter+=results.length
cache = results.toList
if(cache.isEmpty){
logger.debug("cache is still empty, assuming we got everything")
//if the cache is still empty then we have iterated everything.
completionCb.invoke(() )
} else {
val nextResult = cache.head
logger.debug(s"pushing next result $nextResult")
cache = cache.tail
nextResultCb.invoke(nextResult)
}
}).recover({
case err:Throwable=>
logger.error(s"Could not perform ProjectEntry search: ", err)
failureCb.invoke(err)
})
} else { //we have results in the cache, don't serve from the database
logger.debug("cache is not empty, serving from cache")
val nextResult = cache.head
cache = cache.tail
push(out,nextResult)
}
}
})
}