private[kinesis] def removeReprocessed()

in src/it/scala/com/gu/kinesis/MessageUtil.scala [24:61]


  private[kinesis] def removeReprocessed(key: String, messages: IndexedSeq[String]): IndexedSeq[String] = {
    def unwindRetry(sliceCandidate: IndexedSeq[String], from: Int): Int = {
      var i = 0
      while (from + i < messages.size && i < sliceCandidate.size && sliceCandidate(i) == messages(from + i)) i += 1
      i
    }
    def unwindRetries(sliceCandidate: IndexedSeq[String], from: Int): Int = {
      var j = from
      var advanced = 0
      do {
        advanced = unwindRetry(sliceCandidate, j)
        j += advanced
      } while (advanced > 0)
      j
    }
    val distinct = messages.distinct
    var (i, j) = (0, 0)
    var lastRestartedAt = 0
    while (j < messages.size) {
      val lastDistinct = distinct.lift(i)
      val lastMessage = messages(j)
      if (lastDistinct.isEmpty || lastDistinct.get != lastMessage) {
        val restartedAt = distinct.lastIndexOf(lastMessage)
        if (restartedAt < lastRestartedAt) throw new UnexpectedMessageSequence(key, lastMessage, messages)
        lastRestartedAt = restartedAt
        val reprocessedSliceCandidate = distinct.slice(restartedAt, i)
        val lastIndexOfRetrySequence = unwindRetries(reprocessedSliceCandidate, j) - 1
        if (lastIndexOfRetrySequence < j || reprocessedSliceCandidate.last != messages(lastIndexOfRetrySequence)) {
          throw new UnexpectedMessageSequence(key, lastMessage, messages)
        }
        j = lastIndexOfRetrySequence + 1
      } else {
        i += 1
        j += 1
      }
    }
    distinct
  }