in src/it/scala/com/gu/kinesis/MessageUtil.scala [24:61]
private[kinesis] def removeReprocessed(key: String, messages: IndexedSeq[String]): IndexedSeq[String] = {
def unwindRetry(sliceCandidate: IndexedSeq[String], from: Int): Int = {
var i = 0
while (from + i < messages.size && i < sliceCandidate.size && sliceCandidate(i) == messages(from + i)) i += 1
i
}
def unwindRetries(sliceCandidate: IndexedSeq[String], from: Int): Int = {
var j = from
var advanced = 0
do {
advanced = unwindRetry(sliceCandidate, j)
j += advanced
} while (advanced > 0)
j
}
val distinct = messages.distinct
var (i, j) = (0, 0)
var lastRestartedAt = 0
while (j < messages.size) {
val lastDistinct = distinct.lift(i)
val lastMessage = messages(j)
if (lastDistinct.isEmpty || lastDistinct.get != lastMessage) {
val restartedAt = distinct.lastIndexOf(lastMessage)
if (restartedAt < lastRestartedAt) throw new UnexpectedMessageSequence(key, lastMessage, messages)
lastRestartedAt = restartedAt
val reprocessedSliceCandidate = distinct.slice(restartedAt, i)
val lastIndexOfRetrySequence = unwindRetries(reprocessedSliceCandidate, j) - 1
if (lastIndexOfRetrySequence < j || reprocessedSliceCandidate.last != messages(lastIndexOfRetrySequence)) {
throw new UnexpectedMessageSequence(key, lastMessage, messages)
}
j = lastIndexOfRetrySequence + 1
} else {
i += 1
j += 1
}
}
distinct
}