in rocketmq-spark/src/main/scala/org/apache/spark/streaming/MQPullInputDStream.scala [132:217]
private def computePullFromWhere(mq: MessageQueue): Long = {
var result = -1L
val offsetStore = kc.getOffsetStore
val minOffset = kc.minOffset(mq)
val checkpointOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)
consumerStrategy match {
case LatestStrategy => {
if (checkpointOffset >= 0) {
//consider the checkpoint offset first
if (checkpointOffset < minOffset) {
reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
result = kc.maxOffset(mq)
} else {
result = checkpointOffset
}
} else {
// First start,no offset
if (mq.getTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0
} else {
result = kc.maxOffset(mq)
}
}
}
case EarliestStrategy => {
if (checkpointOffset >= 0) {
//consider the checkpoint offset first
if (checkpointOffset < minOffset) {
reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
result = minOffset
} else {
result = checkpointOffset
}
} else {
// First start,no offset
result = minOffset
}
}
case SpecificOffsetStrategy(queueToOffset) => {
val specificOffset = queueToOffset.get(mq)
if (checkpointOffset >= 0 && !forceSpecial) {
if (checkpointOffset < minOffset) {
reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
result = minOffset
} else {
result = checkpointOffset
}
} else {
specificOffset match {
case Some(ConsumerStrategy.LATEST) => {
result = kc.maxOffset(mq)
}
case Some(ConsumerStrategy.EARLIEST) => {
result = kc.minOffset(mq)
}
case Some(offset) => {
if (offset < minOffset) {
reportDataLoss(s"MessageQueue $mq's specific offset $offset is smaller than minOffset $minOffset")
result = minOffset
} else {
result = offset
}
}
case None => {
if (checkpointOffset >= 0) {
//consider the checkpoint offset first
if (checkpointOffset < minOffset) {
reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
result = minOffset
} else {
result = checkpointOffset
}
} else {
logWarning(s"MessageQueue $mq's specific offset and checkpointOffset are none, then use the minOffset")
result = kc.minOffset(mq)
}
}
}
}
}
}
result
}