int64 RebalancePush::computePullFromWhere()

in src/consumer/Rebalance.cpp [555:636]


int64 RebalancePush::computePullFromWhere(const MQMessageQueue& mq) {
  int64 result = -1;
  DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
  if (!pConsumer) {
    LOG_ERROR("Cast consumer pointer to DefaultMQPushConsumer pointer failed when computePullFromWhere %s",
              mq.toString().c_str());
    return result;
  }
  ConsumeFromWhere consumeFromWhere = pConsumer->getConsumeFromWhere();
  OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
  switch (consumeFromWhere) {
    case CONSUME_FROM_LAST_OFFSET: {
      int64 lastOffset = pOffsetStore->readOffset(mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
      if (lastOffset >= 0) {
        LOG_INFO("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is:%lld", mq.toString().c_str(), lastOffset);
        result = lastOffset;
      } else if (-1 == lastOffset) {
        LOG_WARN("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is -1", mq.toString().c_str());
        if (UtilAll::startsWith_retry(mq.getTopic())) {
          LOG_INFO("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is 0", mq.toString().c_str());
          result = 0;
        } else {
          try {
            result = pConsumer->maxOffset(mq);
            LOG_INFO("CONSUME_FROM_LAST_OFFSET, maxOffset of mq:%s is:%lld", mq.toString().c_str(), result);
          } catch (MQException& e) {
            LOG_ERROR("CONSUME_FROM_LAST_OFFSET error, lastOffset  of mq:%s is -1", mq.toString().c_str());
            result = -1;
          }
        }
      } else {
        LOG_ERROR("CONSUME_FROM_LAST_OFFSET error, lastOffset  of mq:%s is -1", mq.toString().c_str());
        result = -1;
      }
      break;
    }
    case CONSUME_FROM_FIRST_OFFSET: {
      int64 lastOffset = pOffsetStore->readOffset(mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
      if (lastOffset >= 0) {
        LOG_INFO("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s is:%lld", mq.toString().c_str(), lastOffset);
        result = lastOffset;
      } else if (-1 == lastOffset) {
        LOG_INFO("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s, return 0", mq.toString().c_str());
        result = 0;
      } else {
        LOG_ERROR("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s, return -1", mq.toString().c_str());
        result = -1;
      }
      break;
    }
    case CONSUME_FROM_TIMESTAMP: {
      int64 lastOffset = pOffsetStore->readOffset(mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
      if (lastOffset >= 0) {
        LOG_INFO("CONSUME_FROM_TIMESTAMP, lastOffset of mq:%s is:%lld", mq.toString().c_str(), lastOffset);
        result = lastOffset;
      } else if (-1 == lastOffset) {
        if (UtilAll::startsWith_retry(mq.getTopic())) {
          try {
            result = pConsumer->maxOffset(mq);
            LOG_INFO("CONSUME_FROM_TIMESTAMP, maxOffset  of mq:%s is:%lld", mq.toString().c_str(), result);
          } catch (MQException& e) {
            LOG_ERROR("CONSUME_FROM_TIMESTAMP error, lastOffset  of mq:%s is -1", mq.toString().c_str());
            result = -1;
          }
        } else {
          try {
          } catch (MQException& e) {
            LOG_ERROR("CONSUME_FROM_TIMESTAMP error, lastOffset  of mq:%s, return 0", mq.toString().c_str());
            result = -1;
          }
        }
      } else {
        LOG_ERROR("CONSUME_FROM_TIMESTAMP error, lastOffset  of mq:%s, return -1", mq.toString().c_str());
        result = -1;
      }
      break;
    }
    default:
      break;
  }
  return result;
}