override def shape: SourceShape[T] = SourceShape.of()

in app/services/migrationcomponents/DBObjectSource.scala [19:61]


  override def shape: SourceShape[T] = SourceShape.of(out)

  protected def getNextPage(recordsRead:Int)(implicit db:PostgresProfile#Backend#Database):Future[Seq[T]]

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    private var internalBuffer:scala.collection.Seq[T] = Seq()
    private var recordsRead = 0
    private implicit val db = dbConfig.get[PostgresProfile].db

    val pushNextRecordCb = createAsyncCallback[Unit](_=>internalBuffer.headOption match {
      case Some(rec)=>
        this.synchronized {
          internalBuffer = internalBuffer.tail
        }
        push(out, rec)
      case None=>
        logger.info("Scanned out all commissions")
        complete(out)
    })

    val errCb = createAsyncCallback[Throwable](err=>failStage(err))

    setHandler(out, new AbstractOutHandler {
      override def onPull(): Unit = {
        internalBuffer.headOption match {
          case Some(_)=>
            pushNextRecordCb.invoke( () )
          case None=>
            getNextPage(recordsRead).onComplete({
              case Success(newRecords)=>
                this.synchronized {
                  internalBuffer = internalBuffer ++ newRecords
                  recordsRead += newRecords.length
                }
                pushNextRecordCb.invoke(())
              case Failure(err)=>
                logger.error("Could not retrieve more records: ", err)
                errCb.invoke(err)
            })
        }
      }
    })
  }