func()

in consumer/consumer.go [800:865]


func (dc *defaultConsumer) computePullFromWhereWithException(mq *primitive.MessageQueue) (int64, error) {
	result := int64(-1)
	lastOffset, err := dc.storage.readWithException(mq, _ReadFromStore)
	if err != nil {
		// 这里 lastOffset = -1
		return lastOffset, err
	}

	if lastOffset >= 0 {
		result = lastOffset
	} else {
		switch dc.option.FromWhere {
		case ConsumeFromLastOffset:
			if lastOffset == -1 {
				if strings.HasPrefix(mq.Topic, internal.RetryGroupTopicPrefix) {
					result = 0
				} else {
					lastOffset, err := dc.queryMaxOffset(mq)
					if err == nil {
						result = lastOffset
					} else {
						rlog.Warning("query max offset error", map[string]interface{}{
							rlog.LogKeyMessageQueue:  mq,
							rlog.LogKeyUnderlayError: err,
						})
					}
				}
			} else {
				result = -1
			}
		case ConsumeFromFirstOffset:
			if lastOffset == -1 {
				result = 0
			}
		case ConsumeFromTimestamp:
			if lastOffset == -1 {
				if strings.HasPrefix(mq.Topic, internal.RetryGroupTopicPrefix) {
					lastOffset, err := dc.queryMaxOffset(mq)
					if err == nil {
						result = lastOffset
					} else {
						result = -1
						rlog.Warning("query max offset error", map[string]interface{}{
							rlog.LogKeyMessageQueue:  mq,
							rlog.LogKeyUnderlayError: err,
						})
					}
				} else {
					t, err := time.Parse("20060102150405", dc.option.ConsumeTimestamp)
					if err != nil {
						result = -1
					} else {
						lastOffset, err := dc.searchOffsetByTimestamp(mq, t.Unix()*1000)
						if err != nil {
							result = -1
						} else {
							result = lastOffset
						}
					}
				}
			}
		default:
		}
	}
	return result, nil
}