bool ProcessorTask::run()

in aios/apps/facility/build_service/build_service/admin/taskcontroller/ProcessorTask.cpp [774:903]


bool ProcessorTask::run(ProcessorNodes& processorNodes)
{
    if (_taskStatus == TASK_FINISHED) {
        return true;
    }
    // batch mode not use optimize
    if (_alreadyRun && !_batchMode) {
        return false;
    }
    _alreadyRun = true;
    registBrokerTopics();
    if (_taskStatus == TASK_SUSPENDED) {
        return false;
    }
    if (_taskStatus == TASK_SUSPENDING) {
        waitSuspended(processorNodes);
        return false;
    }
    if (_step == BUILD_STEP_INC && !_hasIncDatasource) {
        // do nothing when has no inc dataSource
        return false;
    }
    if (_input.src >= _input.dataDescriptions.size()) {
        return false;
    }

    // use newNodes to ensure target have been generated before worker start
    if (!_nodesUpdater) {
        _nodesUpdater = createProcessorNodesUpdater();
    }
    _nodeStatusManager->Update(processorNodes);
    AdaptiveScaling();

    if (_partitionCount * _parallelNum != _nodeStatusManager->GetAllNodeGroups().size() ||
        _partitionCount != _runningPartitionCount || _parallelNum != _runningParallelNum) {
        _slowNodeDetector.reset();
        _adaptiveScaler.reset();
        if (_nodesStartTimestamp == -1) {
            _nodesStartTimestamp = autil::TimeUtility::currentTimeInMicroSeconds();
        }
        BS_LOG(INFO,
               "release processor nodes [%s], whose partition count change from [%u, %u] to [%u, %u], current nodes "
               "size [%lu]",
               getTaskIdentifier().c_str(), _runningPartitionCount, _runningParallelNum, _partitionCount, _parallelNum,
               _nodeStatusManager->GetAllNodeGroups().size());
        processorNodes.clear();
        startWorkers(processorNodes);
        if (_nodeStatusManager->GetAllNodeGroups().size() == 0) {
            recoverFromBackInfo(processorNodes);
        }
    }
    if (!executeWriterTodo(processorNodes)) {
        return false;
    }
    _toUpdateWriterInfo = _writerVersion.update(processorNodes, _runningPartitionCount, _runningParallelNum);
    if (_toUpdateWriterInfo && _safeWrite) {
        return false;
    }
    detectSlowNodes(processorNodes);
    _nodesUpdater->update(processorNodes, _writerVersion,
                          /*PorcessorBasicInfo*/ {_input.src, _input.offset, _partitionCount, _parallelNum},
                          _lastTargetInfos);

    if (!_fatalErrorDiscoverer) {
        _fatalErrorDiscoverer.reset(new FatalErrorDiscoverer());
    }

    _fatalErrorDiscoverer->collectNodeFatalErrors(processorNodes);
    if (_input.offset != _nodesUpdater->getCheckPoint()) {
        _input.offset = _nodesUpdater->getCheckPoint();
        updateCheckpoint();
    }
    ReportCheckpointFreshness(processorNodes);

    if (!_nodesUpdater->isAllProcessorFinished()) {
        // can not switch when not all workers have been finished
        return false;
    }
    syncWriterVersions();

    _nodesStartTimestamp = -1;
    BS_LOG(INFO, "release processor nodes [%s], which have finished", getTaskIdentifier().c_str());
    processorNodes.clear();

    bool isFullFinished =
        _step == BUILD_STEP_FULL && isInLastDataSource() && _switchTimestamp != DEFAULT_SWITCH_TIMESTAMP;
    if (!isFullFinished) {
        deregistBrokerTopics();
        switchDataSource();
        registBrokerTopics();
        // return true;
    }

    bool ret = false;
    if (isFullFinished || _input.src >= _input.dataDescriptions.size()) {
        if (_switchTimestamp != DEFAULT_SWITCH_TIMESTAMP) {
            _input.offset = _switchTimestamp;
            _switchTimestamp = DEFAULT_SWITCH_TIMESTAMP;
        }
        ret = _step == BUILD_STEP_FULL || _batchMode;
    }

    bool needSkipLastSwiftDataSource = !_fullBuildProcessLastSwiftSrc && _step == BUILD_STEP_FULL &&
                                       isInLastDataSource() &&
                                       DataSourceHelper::isRealtime(_input.dataDescriptions[_input.src]);
    if (needSkipLastSwiftDataSource) {
        ret = true;
    }

    if (ret) {
        updateCheckpoint();
        deregistBrokerTopics();
        _taskStatus = TASK_FINISHED;

        string ss = ProtoUtil::toStepString(_step) + " processor task for[";
        for (size_t i = 0; i < _clusterNames.size(); i++) {
            if (i != 0) {
                ss = ss + ",";
            }
            ss = ss + _clusterNames[i] + ":" + _clusterToSchemaId[_clusterNames[i]];
        }
        ss = ss + "] finished!";
        BEEPER_REPORT(GENERATION_STATUS_COLLECTOR_NAME, ss, *_beeperTags);
        if (_batchMode) {
            string msg = "batch mode processor finish batchMask " + StringUtil::toString(_input.batchMask);
            BEEPER_REPORT(BATCHMODE_INFO_COLLECTOR_NAME, msg, *_beeperTags);
        }
    }
    return ret;
}