in consumer/consumer.go [800:865]
func (dc *defaultConsumer) computePullFromWhereWithException(mq *primitive.MessageQueue) (int64, error) {
result := int64(-1)
lastOffset, err := dc.storage.readWithException(mq, _ReadFromStore)
if err != nil {
// 这里 lastOffset = -1
return lastOffset, err
}
if lastOffset >= 0 {
result = lastOffset
} else {
switch dc.option.FromWhere {
case ConsumeFromLastOffset:
if lastOffset == -1 {
if strings.HasPrefix(mq.Topic, internal.RetryGroupTopicPrefix) {
result = 0
} else {
lastOffset, err := dc.queryMaxOffset(mq)
if err == nil {
result = lastOffset
} else {
rlog.Warning("query max offset error", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
rlog.LogKeyUnderlayError: err,
})
}
}
} else {
result = -1
}
case ConsumeFromFirstOffset:
if lastOffset == -1 {
result = 0
}
case ConsumeFromTimestamp:
if lastOffset == -1 {
if strings.HasPrefix(mq.Topic, internal.RetryGroupTopicPrefix) {
lastOffset, err := dc.queryMaxOffset(mq)
if err == nil {
result = lastOffset
} else {
result = -1
rlog.Warning("query max offset error", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
rlog.LogKeyUnderlayError: err,
})
}
} else {
t, err := time.Parse("20060102150405", dc.option.ConsumeTimestamp)
if err != nil {
result = -1
} else {
lastOffset, err := dc.searchOffsetByTimestamp(mq, t.Unix()*1000)
if err != nil {
result = -1
} else {
result = lastOffset
}
}
}
}
default:
}
}
return result, nil
}