in aios/apps/facility/build_service/build_service/admin/taskcontroller/ProcessorTask.cpp [774:903]
bool ProcessorTask::run(ProcessorNodes& processorNodes)
{
if (_taskStatus == TASK_FINISHED) {
return true;
}
// batch mode not use optimize
if (_alreadyRun && !_batchMode) {
return false;
}
_alreadyRun = true;
registBrokerTopics();
if (_taskStatus == TASK_SUSPENDED) {
return false;
}
if (_taskStatus == TASK_SUSPENDING) {
waitSuspended(processorNodes);
return false;
}
if (_step == BUILD_STEP_INC && !_hasIncDatasource) {
// do nothing when has no inc dataSource
return false;
}
if (_input.src >= _input.dataDescriptions.size()) {
return false;
}
// use newNodes to ensure target have been generated before worker start
if (!_nodesUpdater) {
_nodesUpdater = createProcessorNodesUpdater();
}
_nodeStatusManager->Update(processorNodes);
AdaptiveScaling();
if (_partitionCount * _parallelNum != _nodeStatusManager->GetAllNodeGroups().size() ||
_partitionCount != _runningPartitionCount || _parallelNum != _runningParallelNum) {
_slowNodeDetector.reset();
_adaptiveScaler.reset();
if (_nodesStartTimestamp == -1) {
_nodesStartTimestamp = autil::TimeUtility::currentTimeInMicroSeconds();
}
BS_LOG(INFO,
"release processor nodes [%s], whose partition count change from [%u, %u] to [%u, %u], current nodes "
"size [%lu]",
getTaskIdentifier().c_str(), _runningPartitionCount, _runningParallelNum, _partitionCount, _parallelNum,
_nodeStatusManager->GetAllNodeGroups().size());
processorNodes.clear();
startWorkers(processorNodes);
if (_nodeStatusManager->GetAllNodeGroups().size() == 0) {
recoverFromBackInfo(processorNodes);
}
}
if (!executeWriterTodo(processorNodes)) {
return false;
}
_toUpdateWriterInfo = _writerVersion.update(processorNodes, _runningPartitionCount, _runningParallelNum);
if (_toUpdateWriterInfo && _safeWrite) {
return false;
}
detectSlowNodes(processorNodes);
_nodesUpdater->update(processorNodes, _writerVersion,
/*PorcessorBasicInfo*/ {_input.src, _input.offset, _partitionCount, _parallelNum},
_lastTargetInfos);
if (!_fatalErrorDiscoverer) {
_fatalErrorDiscoverer.reset(new FatalErrorDiscoverer());
}
_fatalErrorDiscoverer->collectNodeFatalErrors(processorNodes);
if (_input.offset != _nodesUpdater->getCheckPoint()) {
_input.offset = _nodesUpdater->getCheckPoint();
updateCheckpoint();
}
ReportCheckpointFreshness(processorNodes);
if (!_nodesUpdater->isAllProcessorFinished()) {
// can not switch when not all workers have been finished
return false;
}
syncWriterVersions();
_nodesStartTimestamp = -1;
BS_LOG(INFO, "release processor nodes [%s], which have finished", getTaskIdentifier().c_str());
processorNodes.clear();
bool isFullFinished =
_step == BUILD_STEP_FULL && isInLastDataSource() && _switchTimestamp != DEFAULT_SWITCH_TIMESTAMP;
if (!isFullFinished) {
deregistBrokerTopics();
switchDataSource();
registBrokerTopics();
// return true;
}
bool ret = false;
if (isFullFinished || _input.src >= _input.dataDescriptions.size()) {
if (_switchTimestamp != DEFAULT_SWITCH_TIMESTAMP) {
_input.offset = _switchTimestamp;
_switchTimestamp = DEFAULT_SWITCH_TIMESTAMP;
}
ret = _step == BUILD_STEP_FULL || _batchMode;
}
bool needSkipLastSwiftDataSource = !_fullBuildProcessLastSwiftSrc && _step == BUILD_STEP_FULL &&
isInLastDataSource() &&
DataSourceHelper::isRealtime(_input.dataDescriptions[_input.src]);
if (needSkipLastSwiftDataSource) {
ret = true;
}
if (ret) {
updateCheckpoint();
deregistBrokerTopics();
_taskStatus = TASK_FINISHED;
string ss = ProtoUtil::toStepString(_step) + " processor task for[";
for (size_t i = 0; i < _clusterNames.size(); i++) {
if (i != 0) {
ss = ss + ",";
}
ss = ss + _clusterNames[i] + ":" + _clusterToSchemaId[_clusterNames[i]];
}
ss = ss + "] finished!";
BEEPER_REPORT(GENERATION_STATUS_COLLECTOR_NAME, ss, *_beeperTags);
if (_batchMode) {
string msg = "batch mode processor finish batchMask " + StringUtil::toString(_input.batchMask);
BEEPER_REPORT(BATCHMODE_INFO_COLLECTOR_NAME, msg, *_beeperTags);
}
}
return ret;
}