in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsStage.scala [55:153]
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with StageLogging with OutHandler {
private var queryInProgress = false
private var knownPersistenceIds: Set[String] = Set.empty
private var maybeResultSet = Option.empty[AsyncResultSet]
private var buffer: Queue[String] = Queue.empty[String]
private val queryCallback: AsyncCallback[AsyncResultSet] =
getAsyncCallback[AsyncResultSet] { rs =>
queryInProgress = false
maybeResultSet = Some(rs)
rs.currentPage().forEach { row =>
val s = row.getString("persistence_id")
if (!knownPersistenceIds.contains(s)) {
buffer = buffer.enqueue(s)
knownPersistenceIds += s
}
}
flush()
if (refreshInterval.isEmpty && buffer.isEmpty && isExhausted(rs)) {
complete(out)
} else if (rs.hasMorePages) {
rs.fetchNextPage().thenAccept(queryCallback.invoke)
}
}
private def isExhausted(rs: AsyncResultSet): Boolean = {
rs.remaining() == 0 && !rs.hasMorePages
}
private def query(): Unit = {
def doQuery(): Unit = {
queryInProgress = true
val boundStatement = preparedStatement.bind().setExecutionProfileName(readProfile)
session.executeAsync(boundStatement).thenAccept(queryCallback.invoke)
}
maybeResultSet match {
case None =>
doQuery()
case Some(rs) if isExhausted(rs) && !queryInProgress =>
doQuery()
case _ =>
// ignore query request as either a query is in progress or there's a result set
// which isn't fully exhausted
}
}
private def flush(): Unit = {
while (buffer.nonEmpty && isAvailable(out)) {
val (s, newBuffer) = buffer.dequeue
buffer = newBuffer
push(out, s)
}
}
@nowarn("msg=deprecated") // keep compatible with akka 2.5
override def preStart(): Unit = {
query()
refreshInterval.foreach { interval =>
val initial =
if (interval >= 2.seconds)
(interval / 2) + ThreadLocalRandom.current().nextLong(interval.toMillis / 2).millis
else interval
schedulePeriodicallyWithInitialDelay(Continue, initial, interval)
}
}
override def onTimer(timerKey: Any): Unit = {
timerKey match {
case Continue =>
query()
case _ =>
}
}
def onPull(): Unit = {
flush()
if (buffer.isEmpty && isAvailable(out)) {
maybeResultSet match {
case None =>
query()
case Some(rs) =>
if (refreshInterval.isEmpty && isExhausted(rs)) {
complete(out)
} else {
if (!queryInProgress && rs.remaining() == 0 && rs.hasMorePages) {
rs.fetchNextPage().thenAccept(queryCallback.invoke)
}
}
}
}
}
setHandler(out, this)
}