in src/consumer/Rebalance.cpp [555:636]
int64 RebalancePush::computePullFromWhere(const MQMessageQueue& mq) {
int64 result = -1;
DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
if (!pConsumer) {
LOG_ERROR("Cast consumer pointer to DefaultMQPushConsumer pointer failed when computePullFromWhere %s",
mq.toString().c_str());
return result;
}
ConsumeFromWhere consumeFromWhere = pConsumer->getConsumeFromWhere();
OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET: {
int64 lastOffset = pOffsetStore->readOffset(mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
if (lastOffset >= 0) {
LOG_INFO("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is:%lld", mq.toString().c_str(), lastOffset);
result = lastOffset;
} else if (-1 == lastOffset) {
LOG_WARN("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is -1", mq.toString().c_str());
if (UtilAll::startsWith_retry(mq.getTopic())) {
LOG_INFO("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is 0", mq.toString().c_str());
result = 0;
} else {
try {
result = pConsumer->maxOffset(mq);
LOG_INFO("CONSUME_FROM_LAST_OFFSET, maxOffset of mq:%s is:%lld", mq.toString().c_str(), result);
} catch (MQException& e) {
LOG_ERROR("CONSUME_FROM_LAST_OFFSET error, lastOffset of mq:%s is -1", mq.toString().c_str());
result = -1;
}
}
} else {
LOG_ERROR("CONSUME_FROM_LAST_OFFSET error, lastOffset of mq:%s is -1", mq.toString().c_str());
result = -1;
}
break;
}
case CONSUME_FROM_FIRST_OFFSET: {
int64 lastOffset = pOffsetStore->readOffset(mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
if (lastOffset >= 0) {
LOG_INFO("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s is:%lld", mq.toString().c_str(), lastOffset);
result = lastOffset;
} else if (-1 == lastOffset) {
LOG_INFO("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s, return 0", mq.toString().c_str());
result = 0;
} else {
LOG_ERROR("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s, return -1", mq.toString().c_str());
result = -1;
}
break;
}
case CONSUME_FROM_TIMESTAMP: {
int64 lastOffset = pOffsetStore->readOffset(mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
if (lastOffset >= 0) {
LOG_INFO("CONSUME_FROM_TIMESTAMP, lastOffset of mq:%s is:%lld", mq.toString().c_str(), lastOffset);
result = lastOffset;
} else if (-1 == lastOffset) {
if (UtilAll::startsWith_retry(mq.getTopic())) {
try {
result = pConsumer->maxOffset(mq);
LOG_INFO("CONSUME_FROM_TIMESTAMP, maxOffset of mq:%s is:%lld", mq.toString().c_str(), result);
} catch (MQException& e) {
LOG_ERROR("CONSUME_FROM_TIMESTAMP error, lastOffset of mq:%s is -1", mq.toString().c_str());
result = -1;
}
} else {
try {
} catch (MQException& e) {
LOG_ERROR("CONSUME_FROM_TIMESTAMP error, lastOffset of mq:%s, return 0", mq.toString().c_str());
result = -1;
}
}
} else {
LOG_ERROR("CONSUME_FROM_TIMESTAMP error, lastOffset of mq:%s, return -1", mq.toString().c_str());
result = -1;
}
break;
}
default:
break;
}
return result;
}