aios/navi/engine/Node.cpp (1,979 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "navi/engine/Node.h"
#include "autil/TimeUtility.h"
#include "navi/engine/Biz.h"
#include "navi/engine/Edge.h"
#include "navi/engine/Graph.h"
#include "navi/engine/GraphDomainFork.h"
#include "navi/engine/InputPortGroup.h"
#include "navi/engine/LocalPortBase.h"
#include "navi/engine/LocalSubGraph.h"
#include "navi/engine/OutputPortGroup.h"
#include "navi/engine/ScopeTerminatorKernel.h"
#include "navi/engine/ComputeScope.h"
#include "navi/log/NaviLogger.h"
#include "navi/ops/ResourceData.h"
#include "navi/proto/GraphVis.pb.h"
#include "navi/util/CommonUtil.h"
#include "navi/util/ReadyBitMap.h"
#include <iostream>
#include <limits>
#include "aios/network/gig/multi_call/common/common.h"
#include "lockless_allocator/LocklessApi.h"
namespace navi {
class DataDestructItem : public NaviNoDropWorkerItem
{
public:
DataDestructItem(NaviWorkerBase *worker,
bool collectPerf,
DataPtr &&data)
: NaviNoDropWorkerItem(worker, collectPerf)
, _data(data)
{
}
public:
void doProcess() override {
if (_data) {
_data.reset();
}
}
private:
DataPtr _data;
};
class KernelDestructItem : public NaviNoDropWorkerItem
{
public:
KernelDestructItem(NaviWorkerBase *worker,
LocalSubGraph *graph,
KernelMetric *metric,
Kernel *kernel)
: NaviNoDropWorkerItem(worker, graph->getParam()->runParams.collectPerf())
, _graph(graph)
, _metric(metric)
, _kernel(kernel)
{
}
public:
void doProcess() override {
if (_kernel) {
const auto ¶m = _graph->getParam()->runParams;
ComputeScope scope(_metric, GET_KERNEL_DELETE_KERNEL,
_graph->getPartId(), _worker, _schedInfo,
param.collectMetric(), param.collectPerf());
delete _kernel;
_kernel = nullptr;
}
}
private:
LocalSubGraph *_graph;
KernelMetric *_metric;
Kernel *_kernel;
};
Node::Node(const NodeDef &nodeDef,
const NaviLoggerPtr &logger,
Biz *biz,
LocalSubGraph *graph,
bool isResourceNode)
: _logger(this, "node", logger)
, _def(&nodeDef)
, _name(_def->name())
, _kernelName(_def->kernel_name())
, _scope(_def->scope())
, _biz(biz)
, _graph(graph)
, _port(nullptr)
, _metric(std::make_shared<KernelMetric>(_graph->getGraphId(), _def->name(), _def->kernel_name()))
, _inputReadyMap(nullptr)
, _inputGroupReadyMap(nullptr)
, _outputReadyMap(nullptr)
, _controlOutputReadyMap(nullptr)
, _controlInputGroupIndex(INVALID_INDEX)
, _resourceInputGroupIndex(INVALID_INDEX)
, _outputSnapshotVec(nullptr)
, _outputDegree(0)
, _kernelCreator(nullptr)
, _kernel(nullptr)
, _kernelBaseAddr(nullptr)
, _dependResourceMap(nullptr)
, _creatorStats(nullptr)
, _forceStopped(false)
, _forkGraphDef(nullptr)
, _forkGraph(nullptr)
{
_hasInput = false;
_hasOutput = false;
_skipConfig = false;
_skipInit = false;
_stopAfterInit = false;
_skipDeleteKernel = false;
_isResourceNode = isResourceNode;
_inlineConfigAndInit = false;
_isInputBorder = false;
_isOutputBorder = false;
_kernelInited = false;
atomic_set(&_nodeSnapshot, 0);
_scheduleStat = SS_INIT;
_frozen = false;
_stopSchedule = false;
_forceStop = false;
_forceStopNode = nullptr;
}
Node::~Node() {
delete _forkGraph;
multi_call::freeProtoMessage(_forkGraphDef);
deleteKernel(true);
ReadyBitMap::freeReadyBitMap(_inputReadyMap);
ReadyBitMap::freeReadyBitMap(_inputGroupReadyMap);
ReadyBitMap::freeReadyBitMap(_outputReadyMap);
ReadyBitMap::freeReadyBitMap(_controlOutputReadyMap);
ReadyBitMap::freeReadyBitMap(_outputSnapshotVec);
for (auto group : _inputGroups) {
delete group;
}
for (auto group : _outputGroups) {
delete group;
}
}
bool Node::init() {
if (!initDef()) {
return false;
}
NaviLoggerScope scope(_logger);
_kernelCreator = _biz->getKernelCreator(getKernelName());
if (!_kernelCreator) {
auto ec = EC_CREATE_KERNEL;
setErrorCode(ec);
NAVI_LOG(ERROR, "kernel not supported, ec[%s]",
CommonUtil::getErrorString(ec));
return false;
}
_inputs.resize(_kernelCreator->inputSize());
_outputs.resize(_kernelCreator->outputSize());
_controlOutputs.resize(_kernelCreator->controlOutputSize());
_inputReadyMap = ReadyBitMap::createReadyBitMap(_inputs.size());
_inputGroupReadyMap = ReadyBitMap::createReadyBitMap(_kernelCreator->inputGroupSize() + 2);
_outputReadyMap = ReadyBitMap::createReadyBitMap(_outputs.size());
_controlOutputReadyMap =
ReadyBitMap::createReadyBitMap(_controlOutputs.size());
initIOGroup();
if (!_outputs.empty() || !_outputGroups.empty()) {
setHasOutput();
}
if (!initCreatorStats()) {
return false;
}
if (!initDependResourceMap()) {
return false;
}
return true;
}
bool Node::initDef() {
if (_name.empty()) {
NAVI_LOG(ERROR, "node name can't be empty, def: %s",
_def->DebugString().c_str());
return false;
}
if (_isResourceNode) {
_inlineConfigAndInit = true;
}
initBuildinAttrs();
_logger.addPrefix("graph %d %s kernel %s", _graph->getGraphId(), _name.c_str(),
_kernelName.c_str());
return true;
}
void Node::initBuildinAttrs() {
if (!_def->has_buildin_attrs()) {
return;
}
const auto &buildinAttrs = _def->buildin_attrs();
_skipConfig = buildinAttrs.skip_config();
_skipInit = buildinAttrs.skip_init();
_stopAfterInit = buildinAttrs.stop_after_init();
_skipDeleteKernel = buildinAttrs.skip_delete_kernel();
}
void Node::initIOGroup() {
const auto *kernelDef = _kernelCreator->def();
for (auto i = 0; i < kernelDef->input_groups().size(); ++i) {
const auto &inputGroupDef = kernelDef->input_groups(i);
auto group = new InputPortGroup(inputGroupDef.type(), inputGroupDef.group_type());
_inputGroups.push_back(group);
NAVI_LOG(SCHEDULE2, "add IO inputGroup[%s]", autil::StringUtil::toString(group).c_str());
}
for (auto i = 0; i < kernelDef->output_groups().size(); ++i) {
auto group = new OutputPortGroup();
_outputGroups.push_back(group);
}
}
bool Node::initCreatorStats() {
if (!isResourceCreateKernel()) {
_creatorStats = &_kernelCreator->getCreatorStats();
} else {
const auto &binaryAttrMap = _def->binary_attrs();
auto it = binaryAttrMap.find(RESOURCE_ATTR_NAME);
if (binaryAttrMap.end() == it) {
NAVI_LOG(ERROR, "can't find resource name in node binary attr");
return false;
}
auto resourceCreator = _biz->getResourceCreator(it->second);
if (!resourceCreator) {
return true;
}
_creatorStats = &resourceCreator->getCreatorStats();
}
return true;
}
bool Node::getBinaryAttr(const std::string &attr, std::string &value) const {
const auto &binaryAttrMap = _def->binary_attrs();
auto it = binaryAttrMap.find(attr);
if (binaryAttrMap.end() == it) {
NAVI_LOG(SCHEDULE3, "can't find attr [%s] in node binary attr", attr.c_str());
return false;
}
value = it->second;
return true;
}
bool Node::postInit() {
if (!initInputSnapshotVec()) {
return false;
}
if (!initOutputSnapshotVec()) {
return false;
}
initOutputDegree();
if (!initResourceIndexMap()) {
return false;
}
return true;
}
bool Node::initInputSnapshotVec() {
IndexType start = _inputs.size();
uint32_t count = _inputGroups.size();
for (uint32_t index = 0; index < count; index++) {
auto group = _inputGroups[index];
if (!group->postInit()) {
NAVI_LOG(ERROR, "invalid input group index");
return false;
}
group->setInputIndexStart(start);
start += group->getInputCount();
_inputGroupReadyMap->setOptional(index, (group->getGroupType() == IT_OPTIONAL));
_inputGroupReadyMap->setFinish(index, false);
}
_inputSnapshotVec.resize(start);
return true;
}
bool Node::initOutputSnapshotVec() {
IndexType start = _outputs.size();
for (auto group : _outputGroups) {
if (!group->postInit()) {
NAVI_LOG(ERROR, "invalid output group index");
return false;
}
group->setOutputIndexStart(start);
start += group->getOutputCount();
}
_outputSnapshotVec = ReadyBitMap::createReadyBitMap(start);
_outputSnapshotVec->setReady(false);
_outputSnapshotVec->setFinish(false);
return true;
}
void Node::initOutputDegree() {
size_t outputDegree = 0;
for (auto edge : _outputs) {
if (edge) {
outputDegree += edge->getOutputDegree();
}
}
for (auto group : _outputGroups) {
outputDegree += group->getOutputDegree();
}
_outputDegree = outputDegree;
}
bool Node::initResourceIndexMap() {
if (INVALID_INDEX == _resourceInputGroupIndex) {
return true;
}
auto resourceGroup = _inputGroups[_resourceInputGroupIndex];
const auto &inputVec = resourceGroup->getInputVec();
auto inputCount = inputVec.size();
for (IndexType index = 0; index < inputCount; index++) {
const auto &edgeInfo = inputVec[index].second;
auto inputNode = edgeInfo.getInputNode();
if (!inputNode) {
NAVI_LOG(ERROR, "unknown error, null resource input node");
return false;
}
std::string resourceName;
if (!inputNode->getBinaryAttr(RESOURCE_ATTR_NAME, resourceName)) {
_flushIndexVec.push_back(index);
continue;
}
_resourceIndexMap.emplace(resourceName, index);
}
return true;
}
void Node::startNormalResourceInput() const {
if (INVALID_INDEX == _resourceInputGroupIndex) {
return;
}
const auto &replaceInfoMap = _graph->getReplaceInfoMap();
for (const auto &depend : *_dependResourceMap) {
const auto &resourceName = replaceInfoMap.getReplaceRTo(depend.first);
if (_isResourceNode || (RS_KERNEL != _biz->getResourceStage(resourceName))) {
startResourceInput(resourceName);
}
}
auto resourceGroup = _inputGroups[_resourceInputGroupIndex];
for (auto index : _flushIndexVec) {
resourceGroup->startUpStreamSchedule(index, this);
}
}
void Node::startKernelResourceInput() const {
if (INVALID_INDEX == _resourceInputGroupIndex) {
return;
}
const auto &replaceInfoMap = _graph->getReplaceInfoMap();
for (const auto &depend : *_dependResourceMap) {
const auto &resourceName = replaceInfoMap.getReplaceRTo(depend.first);
if (RS_KERNEL == _biz->getResourceStage(resourceName)) {
startResourceInput(resourceName);
}
}
}
void Node::startResourceInput(const std::string &resourceName) const {
auto resourceGroup = _inputGroups[_resourceInputGroupIndex];
auto it = _resourceIndexMap.find(resourceName);
if (_resourceIndexMap.end() != it) {
NAVI_LOG(SCHEDULE3, "start input resource [%s] index [%u]", resourceName.c_str(), it->second);
resourceGroup->startUpStreamSchedule(it->second, this);
}
}
bool Node::createSelectResult() {
if (INVALID_INDEX == _resourceInputGroupIndex) {
return true;
}
bool ret = true;
const auto &selectBindVec = _kernelCreator->getBindInfos().selectBindVec;
auto total = selectBindVec.size();
for (size_t i = 0; i < total; i++) {
const auto &selector = selectBindVec[i];
auto tmpCtx = _ctx;
int32_t index = -1;
try {
index = selector.selector(tmpCtx);
} catch (const autil::legacy::ExceptionBase &e) {
NAVI_LOG(
ERROR, "compute select resource failed, selector [%lu], exception: [%s]", i, e.GetMessage().c_str());
ret = false;
continue;
}
int32_t candidateCount = selector.candidates.size();
if (index >= candidateCount) {
NAVI_LOG(ERROR,
"selector index [%d] overflow, total count [%d], selectResource [%lu]",
index,
candidateCount,
i);
ret = false;
continue;
}
NAVI_LOG(SCHEDULE3,
"selected resource [%d] [%s] ok, selector [%lu]",
index,
index >= 0 ? selector.candidates[index].c_str() : "",
i);
_selectorResult.push_back(index);
}
return ret;
}
void Node::startSelectResourceInput() const {
if (INVALID_INDEX == _resourceInputGroupIndex) {
return;
}
const auto &dependResourceMap = _kernelCreator->getDependResourcesWithoutSelect();
auto resourceGroup = _inputGroups[_resourceInputGroupIndex];
const auto &selectBindVec = _kernelCreator->getBindInfos().selectBindVec;
auto total = selectBindVec.size();
for (size_t i = 0; i < total; i++) {
const auto &selector = selectBindVec[i];
auto index = _selectorResult[i];
auto candidateCount = selector.candidates.size();
for (int32_t j = 0; j < candidateCount; j++) {
bool selected = (j == index);
const auto &candidate = selector.candidates[j];
auto it = _resourceIndexMap.find(candidate);
if (_resourceIndexMap.end() == it) {
continue;
}
auto offset = it->second;
if (selected) {
resourceGroup->startUpStreamSchedule(offset, this);
} else {
if (dependResourceMap.end() == dependResourceMap.find(candidate)) {
resourceGroup->setEof(offset, this);
}
}
}
}
}
ResourcePtr Node::buildR(const std::string &name, KernelConfigContext *ctx, ResourceMap &inputResourceMap) {
const auto &enableResourceSet = _kernelCreator->getEnableBuildResourceSet();
if (enableResourceSet.end() == enableResourceSet.find(name)) {
NAVI_LOG(ERROR, "can't build resource [%s], not enabled", name.c_str());
return nullptr;
}
return _graph->buildKernelResourceRecur(name, ctx, _resourceMap, this, inputResourceMap);
}
bool Node::initDependResourceMap() {
if (isResourceCreateKernel()) {
std::string resourceName;
if (!getBinaryAttr(RESOURCE_ATTR_NAME, resourceName)) {
return false;
}
_dependResourceMap = _biz->getResourceDependResourceMap(resourceName);
} else {
_dependResourceMap = &_kernelCreator->getDependResourcesWithoutSelect();
}
return true;
}
ReadyBitMap *Node::getInputBitMap(PortIndex inputIndex) {
if (inputIndex.isGroup()) {
assert(inputIndex.group < _inputGroups.size());
return _inputGroups[inputIndex.group]->getReadyMap();
} else {
return _inputReadyMap;
}
}
NodeAsyncInfoPtr Node::getAsyncInfo() const {
autil::ScopedLock lock(_asyncMutex);
return _asyncInfo;
}
void Node::setAsyncInfo(const NodeAsyncInfoPtr &asyncInfo) {
autil::ScopedLock lock(_asyncMutex);
_asyncInfo = asyncInfo;
}
AsyncPipePtr __attribute__((weak)) Node::createAsyncPipe(ActivateStrategy activateStrategy) {
return doCreateAsyncPipe(activateStrategy);
}
AsyncPipePtr Node::doCreateAsyncPipe(ActivateStrategy activateStrategy) {
autil::ScopedLock lock(_forkLock);
if (_graph->terminated()) {
NAVI_LOG(ERROR, "create async pipe failed, graph terminated");
return nullptr;
}
auto oldInfo = getAsyncInfo();
auto asyncInfo = std::make_shared<NodeAsyncInfo>(this, oldInfo);
auto pipe = asyncInfo->addOne(activateStrategy);
setAsyncInfo(asyncInfo);
return pipe;
}
void Node::notifyPipeTerminate() {
autil::ScopedLock lock(_forkLock);
if (_graph->terminated()) {
return;
}
auto oldInfo = getAsyncInfo();
if (!oldInfo) {
return;
}
auto asyncInfo = std::make_shared<NodeAsyncInfo>(this, oldInfo);
if (asyncInfo->consolidate()) {
setAsyncInfo(asyncInfo);
} else {
setAsyncInfo(nullptr);
}
}
ReadyBitMap *Node::getOutputBitMap(PortIndex outputIndex) {
if (outputIndex.isGroup()) {
assert(outputIndex.group < _outputGroups.size());
return _outputGroups[outputIndex.group]->getReadyMap();
} else if (outputIndex.isControl()) {
return _controlOutputReadyMap;
} else {
return _outputReadyMap;
}
}
bool Node::bindPort(Port *port) {
auto localPort = static_cast<LocalPortBase *>(port);
auto &partInfo = localPort->getPartInfo();
if (IOT_INPUT == localPort->getIoType()) {
assert(0 < _inputGroups.size());
auto group = _inputGroups[0];
if (!group->postInit(partInfo)) {
return false;
}
localPort->setNode(this, group->getReadyMap());
setHasInput();
_isInputBorder = true;
} else {
assert(0 < _outputGroups.size());
auto group = _outputGroups[0];
if (!group->postInit(partInfo)) {
return false;
}
localPort->setNode(this, group->getReadyMap());
setHasOutput();
_isOutputBorder = true;
}
_port = port;
return true;
}
bool Node::addInput(const std::string &portName,
bool require,
const EdgeOutputInfo &edgeOutputInfo,
PortIndex &inputIndex,
std::string &dataType)
{
std::string port;
IndexType index = 0;
if (!CommonUtil::parseGroupPort(portName, port, index)) {
NAVI_LOG(ERROR,
"parse portName [%s] failed, kernel def[%s]",
portName.c_str(),
_kernelCreator->def()->DebugString().c_str());
return false;
}
if (NODE_CONTROL_INPUT_PORT == port) {
auto controlGroup = addControlInputGroup();
auto offset = controlGroup->getInputCount();
controlGroup->addInput(offset, edgeOutputInfo);
setHasInput();
inputIndex.group = _controlInputGroupIndex;
inputIndex.index = offset;
return true;
}
if (NODE_RESOURCE_INPUT_PORT == port) {
auto resourceGroup = addResourceInputGroup();
auto offset = resourceGroup->getInputCount();
resourceGroup->addInput(offset, edgeOutputInfo);
setHasInput();
inputIndex.group = _resourceInputGroupIndex;
inputIndex.index = offset;
dataType = "";
return true;
}
assert(edgeOutputInfo.isValid());
auto portInfo = _kernelCreator->getInputPortInfo(port, index);
if (!portInfo.isValid()) {
NAVI_LOG(ERROR,
"no input port named [%s], check port name or group index, "
"kernel def[%s]",
portName.c_str(),
_kernelCreator->def()->DebugString().c_str());
return false;
}
if (portInfo.isGroup()) {
inputIndex = doAddInputGroup(portInfo, edgeOutputInfo);
} else {
inputIndex = doAddInput(portInfo, edgeOutputInfo, require);
}
auto portDef = static_cast<const InputDef *>(portInfo.def);
dataType = portDef->data_type().name();
if (inputIndex.isValid()) {
setHasInput();
return true;
} else {
return false;
}
}
PortIndex Node::doAddInput(const PortInfo &portInfo,
const EdgeOutputInfo &edgeOutputInfo,
bool edgeRequire)
{
auto portIndex = portInfo.index;
auto inputDef = (InputDef *)portInfo.def;
assert(inputDef);
assert(portIndex.index < _inputs.size());
auto &outputInfo = _inputs[portIndex.index];
if (outputInfo.isValid()) {
NAVI_LOG(ERROR, "multiple input, edge [%s] and edge [%s]",
outputInfo.getDebugName().c_str(),
edgeOutputInfo.getDebugName().c_str());
return PortIndex();
}
outputInfo = edgeOutputInfo;
auto activeStrategy = getInputActivateStrategy(inputDef, edgeRequire);
_inputReadyMap->setOptional(portIndex.index, (activeStrategy == AS_OPTIONAL));
_inputReadyMap->setFinish(portIndex.index, false);
return portIndex;
}
PortIndex Node::doAddInputGroup(const PortInfo &portInfo,
const EdgeOutputInfo &edgeOutputInfo)
{
auto groupIndex = portInfo.index;
assert(groupIndex.group < _inputGroups.size());
auto retIndex = _inputGroups[groupIndex.group]->addInput(groupIndex.index,
edgeOutputInfo);
NAVI_LOG(SCHEDULE2, "add input group[%d,%d], retIndex[%d]", groupIndex.group, groupIndex.index, retIndex);
groupIndex.index = retIndex;
return groupIndex;
}
InputPortGroup *Node::addControlInputGroup() {
if (INVALID_INDEX == _controlInputGroupIndex) {
auto group = new InputPortGroup(IT_REQUIRE, IT_REQUIRE);
_controlInputGroupIndex = _inputGroups.size();
_inputGroups.push_back(group);
NAVI_LOG(SCHEDULE2, "add control inputGroup[%s]", autil::StringUtil::toString(group).c_str());
}
return _inputGroups[_controlInputGroupIndex];
}
InputPortGroup *Node::addResourceInputGroup() {
if (INVALID_INDEX == _resourceInputGroupIndex) {
auto group = new InputPortGroup(IT_REQUIRE, IT_REQUIRE);
_resourceInputGroupIndex = _inputGroups.size();
_inputGroups.push_back(group);
NAVI_LOG(SCHEDULE2, "add resource inputGroup[%s]", autil::StringUtil::toString(group).c_str());
}
return _inputGroups[_resourceInputGroupIndex];
}
void Node::setHasInput() {
_hasInput = true;
}
bool Node::hasInput() const {
return _hasInput || (getAsyncInfo() != nullptr);
}
void Node::setHasOutput() {
_hasOutput = true;
}
bool Node::hasOutput() const {
return _hasOutput;
}
bool Node::addOutput(Edge *output, const std::string &portName,
PortIndex &outputIndex, std::string &dataType)
{
if (!output) {
NAVI_LOG(ERROR, "input is null");
return false;
}
auto portInfo = _kernelCreator->getOutputPortInfo(portName);
if (!portInfo.isValid()) {
NAVI_LOG(ERROR, "no output port named [%s], kernel def[%s]",
portName.c_str(),
_kernelCreator->def()->DebugString().c_str());
return false;
}
if (portInfo.isGroup()) {
outputIndex = doAddOutputGroup(portInfo, output);
} else {
outputIndex = doAddOutput(portInfo, output);
}
auto portDef = static_cast<const OutputDef *>(portInfo.def);
dataType = portDef->data_type().name();
if (outputIndex.isValid()) {
return true;
} else {
return false;
}
}
PortIndex Node::doAddOutput(const PortInfo &portInfo, Edge *output) {
auto portIndex = portInfo.index;
if (portIndex.control) {
assert(portIndex.index < _controlOutputs.size());
if (_controlOutputs[portIndex.index]) {
NAVI_LOG(
ERROR,
"got multiple control link, control output port index [%u]",
portIndex.index);
return PortIndex();
}
_controlOutputs[portIndex.index] = output;
return portIndex;
} else {
assert(portIndex.index < _outputs.size());
assert(!_outputs[portIndex.index]);
_outputs[portIndex.index] = output;
_outputReadyMap->setReady(portIndex.index, true);
_outputReadyMap->setOptional(portIndex.index, false);
_outputReadyMap->setFinish(portIndex.index, false);
return portIndex;
}
}
PortIndex Node::doAddOutputGroup(const PortInfo &portInfo, Edge *output) {
auto groupIndex = portInfo.index;
assert(groupIndex.group < _outputGroups.size());
_outputGroups[groupIndex.group]->addOutput(groupIndex.index, output);
return groupIndex;
}
ActivateStrategy Node::getInputActivateStrategy(
const InputDef *inputDef, bool edgeRequire)
{
switch (inputDef->type()) {
case IT_REQUIRE:
case IT_REQUIRE_WHEN_CONNECTED:
return AS_REQUIRED;
case IT_OPTIONAL:
if (edgeRequire) {
return AS_REQUIRED;
} else {
return AS_OPTIONAL;
}
default:
assert(false);
return AS_REQUIRED;
}
}
bool Node::getPortTypeStr(const std::string &portName,
IoType ioType,
std::string &typeStr) const
{
if (!_kernelCreator) {
return false;
}
return _kernelCreator->getPortTypeStr(portName, ioType, typeStr);
}
bool Node::checkConnection() const {
if (!_kernelCreator) {
return false;
}
auto ret = true;
ret = checkInput() && ret;
ret = checkInputGroup() && ret;
return ret;
}
bool Node::checkInput() const {
for (IndexType i = 0; i < _inputs.size(); i++) {
if (!_inputs[i].isValid()) {
const auto &portName =
_kernelCreator->getInputPortName(PortIndex(i, INVALID_INDEX));
if (portName.empty()) {
NAVI_LOG(ERROR, "unknown error, input index [%d] overflow", i);
return false;
}
NAVI_LOG(ERROR, "input port[%s] not connected", portName.c_str());
return false;
}
}
return true;
}
bool Node::checkInputGroup() const {
for (IndexType i = 0; i < _inputGroups.size(); i++) {
auto bitMap = _inputGroups[i]->getReadyMap();
if (0 == bitMap->size()) {
const auto &portName =
_kernelCreator->getInputPortName(PortIndex(INVALID_INDEX, i));
if (portName.empty()) {
NAVI_LOG(ERROR,
"unknown error, input group index [%d] overflow", i);
return false;
}
NAVI_LOG(ERROR, "input group[%s] not connected", portName.c_str());
return false;
}
}
return true;
}
void Node::finishPendingOutput() {
for (IndexType i = 0; i < _outputs.size(); i++) {
if (!_outputs[i]) {
const auto &portName = _kernelCreator->getOutputPortName(i);
NAVI_LOG(SCHEDULE1, "finish pending output, port [%s]",
portName.c_str());
_outputReadyMap->setFinish(i, true);
}
}
}
bool Node::initResourceMap() {
const auto &dependResourceMap = _kernelCreator->getDependResourcesWithoutSelect();
if (INVALID_INDEX == _resourceInputGroupIndex) {
if (!dependResourceMap.empty()) {
NAVI_LOG(ERROR, "lack resource input");
return false;
} else {
return true;
}
}
InputSnapshot *begin = nullptr;
InputSnapshot *end = nullptr;
if (!snapshotGroupInput(_resourceInputGroupIndex, begin, end)) {
NAVI_LOG(ERROR, "read resource input failed index [%u] size [%lu]",
_resourceInputGroupIndex, _inputGroups.size());
return false;
}
for (; begin != end; begin++) {
DataPtr data(std::move(begin->data.data));
auto resourceData = std::dynamic_pointer_cast<ResourceData>(data);
if (!resourceData) {
continue;
}
const auto &resource = resourceData->_resource;
if (!resource) {
continue;
}
_resourceMap.addResource(resource);
}
_graph->getReplaceInfoMap().replace(_dependResourceMap, _resourceMap);
for (const auto &info : dependResourceMap) {
if (!checkRequireResource(info.first, info.second)) {
return false;
}
}
const auto &selectBindVec = _kernelCreator->getBindInfos().selectBindVec;
for (int i = 0; i < selectBindVec.size(); i++) {
const auto &selectInfo = selectBindVec[i];
auto selectIndex = _selectorResult[i];
if (selectIndex < 0) {
continue;
}
const auto &selected = selectInfo.candidates[selectIndex];
if (!checkRequireResource(selected, selectInfo.require)) {
NAVI_LOG(ERROR, "lack selected resource, select index [%d], selector [%d]", selectIndex, i);
return false;
}
}
return true;
}
bool Node::checkRequireResource(const std::string &resourceName, bool require) const {
bool hasResource = _resourceMap.hasResource(resourceName);
if (require && !hasResource) {
NAVI_LOG(ERROR, "required resource not exist [%s]", resourceName.c_str());
return false;
}
return true;
}
const std::string &Node::getConfigPath() const {
return _biz->getConfigPath();
}
Port *Node::getPort() const {
return _port;
}
const ResourceMap &Node::getResourceMap() const {
return _resourceMap;
}
bool Node::createKernel(const ScheduleInfo &schedInfo) {
const auto ¶m = _graph->getParam()->runParams;
ComputeScope computeScope(_metric.get(), GET_KERNEL_CREATE,
_graph->getPartId(), _graph->getWorker(),
schedInfo, param.collectMetric(),
param.collectPerf());
NaviLoggerScope scope(getLogger());
NAVI_LOG(SCHEDULE1, "create kernel begin");
void *baseAddr = nullptr;
auto kernel = _kernelCreator->create(baseAddr);
if (!kernel) {
NAVI_LOG(ERROR, "create kernel failed, ec[%s]",
CommonUtil::getErrorString(EC_CREATE_KERNEL));
computeScope.setErrorCode(EC_CREATE_KERNEL);
setErrorCode(EC_CREATE_KERNEL);
return false;
}
_kernel = kernel;
_kernelBaseAddr = baseAddr;
kernel->setNodeDef(_def);
auto nodeType = _def->type();
if (NT_SCOPE_TERMINATOR == nodeType) {
if (!dynamic_cast<ScopeTerminatorKernel *>(kernel)) {
NAVI_LOG(ERROR, "scope kernel must inherit ScopeTerminatorKernel");
computeScope.setErrorCode(EC_CREATE_KERNEL);
setErrorCode(EC_CREATE_KERNEL);
return false;
}
}
if (!initAttribute()) {
computeScope.setErrorCode(EC_INVALID_ATTRIBUTE);
setErrorCode(EC_INVALID_ATTRIBUTE);
return false;
}
if (_creatorStats) {
_creatorStats->updateCreateLatency(computeScope.finish());
}
NAVI_LOG(SCHEDULE1, "create kernel success");
return true;
}
bool Node::initKernel(const ScheduleInfo &schedInfo) {
_kernelInited = true;
if (unlikely(_skipInit)) {
NAVI_LOG(DEBUG, "kernel init skipped");
return true;
}
NAVI_LOG(SCHEDULE1, "init kernel begin");
if (!isResourceCreateKernel() && !initResourceMap()) {
setErrorCode(EC_LACK_RESOURCE);
return false;
}
if (!bindResource()) {
setErrorCode(EC_LACK_RESOURCE);
return false;
}
if (!bindNamedData()) {
setErrorCode(EC_BIND_NAMED_DATA);
return false;
}
ErrorCode ec = EC_NONE;
{
const auto ¶m = _graph->getParam()->runParams;
ComputeScope scope(_metric.get(), GET_KERNEL_INIT, _graph->getPartId(),
_graph->getWorker(), schedInfo,
param.collectMetric(), param.collectPerf());
KernelInitContext ctx(this);
try {
ec = _kernel->init(ctx);
} catch (const autil::legacy::ExceptionBase &e) {
NAVI_KERNEL_LOG(ERROR, "kernel init failed, exception: [%s]", e.GetMessage().c_str());
ec = EC_ABORT;
}
scope.setErrorCode(ec);
if (EC_NONE == ec) {
if (_creatorStats) {
_creatorStats->updateInitLatency(scope.finish());
}
}
}
if (EC_NONE != ec) {
NAVI_LOG(ERROR, "kernel init failed");
setErrorCode(ec);
return false;
}
NAVI_LOG(SCHEDULE1, "init kernel success");
return true;
}
bool Node::initAttribute() {
if (unlikely(_skipConfig)) {
NAVI_LOG(DEBUG, "kernel config skipped");
return true;
}
if (!initConfigContext()) {
return false;
}
if (!createSelectResult()) {
return false;
}
NAVI_MEMORY_BARRIER();
startKernelResourceInput();
startSelectResourceInput();
auto ctx = _ctx;
try {
return _kernel->config(ctx);
} catch (const autil::legacy::ExceptionBase &e) {
NAVI_LOG(
ERROR,
"biz[%s], init kernel attribute failed, json attr [%s], msg [%s]",
_biz->getName().c_str(), autil::StringUtil::toString(ctx).c_str(),
e.GetMessage().c_str());
return false;
}
}
bool Node::initConfigContext() {
auto *jsonConfig = _biz->getKernelConfig(getKernelName());
if (!NaviConfig::parseToDocument(_def->json_attrs(), _jsonAttrsDocument)) {
NAVI_LOG(ERROR, "invalid kernel attr, biz [%s]",
_biz->getName().c_str());
return false;
}
_ctx = KernelConfigContext(getConfigPath(), &_jsonAttrsDocument, jsonConfig, _def, _kernelCreator->getJsonAttrs());
return true;
}
const std::vector<int32_t> &Node::getSelectorResult() const {
return _selectorResult;
}
KernelConfigContext Node::getConfigContext() const {
return _ctx;
}
bool Node::getConfigContext(const std::string &dependName, KernelConfigContext &ctx) {
const auto &configKeyMap = _kernelCreator->getConfigKeyMap();
auto it = configKeyMap.find(dependName);
if (configKeyMap.end() == it) {
ctx = _ctx;
return true;
}
const auto &configKey = it->second;
if (!_ctx.hasKey(configKey)) {
NAVI_LOG(ERROR, "lack config key [%s], depend resource [%s]", configKey.c_str(), dependName.c_str());
return false;
}
ctx = _ctx.enter(configKey);
return true;
}
const std::string &Node::getBizName() const { return _biz->getName(); }
bool Node::getBizPartInfo(const std::string &bizName, NaviPartId &partCount, std::vector<NaviPartId> &partIds) const {
return _graph->getParam()->bizManager->getBizPartInfo(bizName, partCount, partIds);
}
bool Node::bindResource() {
const auto &bindInfos = _kernelCreator->getBindInfos();
const auto &dynamicInfoMap = _kernelCreator->getDynamicGroupInfoMap();
if (!ResourceManager::bindResource(
getKernelName(), _kernelBaseAddr, getResourceMap(), dynamicInfoMap, bindInfos, _selectorResult, "kernel")) {
NAVI_LOG(ERROR, "bind kernel resource failed");
return false;
}
return true;
}
bool Node::bindNamedData() {
const auto &bindInfos = _kernelCreator->getBindInfos();
const auto &runGraphParams = _graph->getParam()->runParams;
auto graphId = _graph->getGraphId();
auto namedDataMap = runGraphParams.getSubNamedDataMap(graphId);
if (!ResourceManager::bindNamedData(getKernelName(), *_kernel, namedDataMap,
bindInfos, "kernel"))
{
NAVI_LOG(ERROR, "bind kernel named data failed");
return false;
}
return true;
}
void Node::deleteKernel(bool inDestruct) {
if (inDestruct) {
delete _kernel;
} else {
if (_kernel) {
auto worker = _graph->getWorker();
auto item =
new KernelDestructItem(worker, _graph, _metric.get(), _kernel);
_kernel = nullptr;
worker->schedule(item);
}
}
}
bool Node::inputOk() const {
auto asyncInfo = getAsyncInfo();
if (asyncInfo) {
if (asyncInfo->independentReady()) {
return true;
}
if (!asyncInfo->isOk()) {
return false;
}
}
if (!_inputReadyMap->isOk()) {
return false;
}
uint32_t inputGroupCount = _inputGroups.size();
for (uint32_t index = 0; index < inputGroupCount; index++) {
auto group = _inputGroups[index];
_inputGroupReadyMap->setReady(index, group->isOk());
}
if (!_inputGroupReadyMap->isOk()) {
return false;
}
if (hasInput()) {
return inputFinish() || inputHasReady();
} else {
return true;
}
}
bool Node::resourceInputOk() const {
if (INVALID_INDEX == _resourceInputGroupIndex) {
return true;
}
auto readyMap = _inputGroups[_resourceInputGroupIndex]->getReadyMap();
return readyMap->isOk();
}
bool Node::inputFinish() const {
if (!_inputReadyMap->isFinish()) {
return false;
}
for (const auto &inputGroup : _inputGroups) {
const auto &readyMap = inputGroup->getReadyMap();
if (!readyMap->isFinish()) {
return false;
}
}
auto asyncInfo = getAsyncInfo();
if (asyncInfo && !asyncInfo->isFinish()) {
return false;
}
return true;
}
bool Node::inputHasReady() const {
if (_inputReadyMap->hasReady()) {
return true;
}
for (const auto &inputGroup : _inputGroups) {
if (inputGroup->hasReady()) {
return true;
}
}
auto asyncInfo = getAsyncInfo();
if (asyncInfo && asyncInfo->hasReady()) {
return true;
}
return false;
}
bool Node::inputFinishConsumed() const {
for (const auto &snapshot : _inputSnapshotVec) {
if (!snapshot.data.eof) {
return false;
}
}
auto asyncInfo = getAsyncInfo();
if (asyncInfo && !asyncInfo->isFinishConsumed()) {
return false;
}
return true;
}
bool Node::outputOk() const {
if (!_outputReadyMap->isOk()) {
return false;
}
for (const auto &outputGroup : _outputGroups) {
if (!outputGroup->isOk()) {
return false;
}
}
return true;
}
bool Node::outputFinish() const {
if (!_outputReadyMap->isFinish()) {
return false;
}
for (const auto &outputGroup : _outputGroups) {
const auto &readyMap = outputGroup->getReadyMap();
if (!readyMap->isFinish()) {
return false;
}
}
return true;
}
bool Node::scheduleLock(const Node *callNode) {
incNodeSnapshot();
NAVI_MEMORY_BARRIER();
while (1) {
if (_scheduleStat == SS_INIT) {
NAVI_LOG(SCHEDULE2,
"node not started, schedule by [%p] [%s]",
callNode,
callNode ? callNode->getName().c_str() : "");
return false;
}
if (_scheduleStat == SS_FINISH) { // schedule is checked after compute
NAVI_LOG(SCHEDULE2, "schedule by [%p] [%s], failed for finish",
callNode, callNode ? callNode->getName().c_str() : "");
return false;
}
if (_scheduleStat == SS_RUNNING) { // schedule is checked after compute
NAVI_LOG(SCHEDULE2, "schedule by [%p] [%s], failed for running",
callNode, callNode ? callNode->getName().c_str() : "");
return false;
}
if (_scheduleStat == SS_SCHEDULING) {
NAVI_LOG(SCHEDULE2, "failed for scheduling, snapshot [%ld]", readNodeSnapshot());
return false;
}
if (cmpxchg(&_scheduleStat, SS_IDLE, SS_SCHEDULING)) {
return true;
}
}
}
bool Node::schedule(const Node *callNode, bool ignoreFrozen) {
while (true) {
bool resched = true;
auto ret = doSchedule(callNode, ignoreFrozen, resched);
if (ret) {
return ret;
}
if (resched) {
continue;
}
return ret;
}
}
bool Node::startSchedule(const Node *callNode, bool ignoreFrozen) {
if (cmpxchg(&_scheduleStat, SS_INIT, SS_IDLE)) {
NAVI_LOG(SCHEDULE3, "start schedule by [%s]", callNode ? callNode->getName().c_str() : "");
for (const auto &edgeOutputInfo : _inputs) {
edgeOutputInfo.startUpStreamSchedule(this);
}
IndexType groupCount = _inputGroups.size();
for (IndexType index = 0; index < groupCount; index++) {
if (_resourceInputGroupIndex == index) {
continue;
}
_inputGroups[index]->startUpStreamSchedule(this);
}
startNormalResourceInput();
return schedule(callNode, ignoreFrozen);
}
return true;
}
/*
lock -> checkReady -> running -> checkFinish ->reschedule
finish must be checked after compute
*/
bool Node::doSchedule(const Node *callNode, bool ignoreFrozen, bool &resched) {
if (!ignoreFrozen && _frozen) {
resched = false;
return false;
}
if (!scheduleLock(callNode)) {
resched = false;
return false;
}
auto snapshot = readNodeSnapshot();
_metric->incTryScheduleCount();
NAVI_MEMORY_BARRIER();
// critical region under SS_SCHEDULING stat;
assert(SS_SCHEDULING == _scheduleStat);
if (checkFinish(EC_NONE)) {
resched = false;
return false;
}
bool needCreateKernel = !_kernel;
bool needInitKernel = _kernel && !_kernelInited;
bool ret = false;
if (_forkGraph) {
ret = false;
} else if (_inlineConfigAndInit) {
ret = resourceInputOk() && inputOk() && outputOk();
} else if (needCreateKernel) {
ret = true;
} else if (needInitKernel) {
ret = resourceInputOk();
} else {
ret = (inputOk() && outputOk());
}
NAVI_LOG(SCHEDULE2,
"schedule by node [%p] [%s], ret [%d], forGraph [%p], needCreate [%d], needInit "
"[%d], resourceInputOk [%d], inputOk [%d], outputOk [%d], "
"inputFinish [%d], outputFinish [%d], inputHasReady [%d], hasInput[%d], "
"hasOutput[%d], stat [%ld], snapshot: %ld, computeId: %ld",
callNode, callNode ? callNode->getName().c_str() : "", ret,
_forkGraph, needCreateKernel, needInitKernel, resourceInputOk(), inputOk(),
outputOk(), inputFinish(), outputFinish(), inputHasReady(), hasInput(),
hasOutput(), _scheduleStat, snapshot, getComputeId());
NAVI_LOG(SCHEDULE2,
"inputReadyMap[%s] inputGroup[%s]",
autil::StringUtil::toString(*_inputReadyMap).c_str(),
autil::StringUtil::toString(_inputGroups).c_str());
if (ret) {
_scheduleStat = SS_RUNNING;
if (!_graph->schedule(this)) {
_scheduleStat = SS_IDLE;
}
resched = false;
} else {
cmpxchg(&_scheduleStat, SS_SCHEDULING, SS_IDLE);
auto newSnapshot = readNodeSnapshot();
resched = (newSnapshot != snapshot);
NAVI_LOG(SCHEDULE2,
"schedule by [%p] [%s], failed, old snapshot [%ld], new snapshot [%ld], resched [%d]",
callNode,
callNode ? callNode->getName().c_str() : "",
snapshot,
newSnapshot,
resched);
}
return ret;
}
void Node::stopSchedule() {
_stopSchedule = true;
}
void Node::setForceStopFlag(Node *node) {
_forceStopNode = node;
_forceStop = true;
}
void Node::forceStop() {
doForceStop();
schedule(this);
}
bool Node::checkFinish(ErrorCode ec) {
bool finish = false;
if (_stopSchedule) {
finish = true;
} else if (EC_NONE != ec) {
finish = true;
} else if (hasOutput()) {
finish = outputFinish();
} else {
finish = inputFinish() && inputFinishConsumed();
}
if (!finish && _forceStop) {
NAVI_LOG(SCHEDULE2,
"node [%s] force stopped by node [%s]",
_name.c_str(),
_forceStopNode ? _forceStopNode->_name.c_str() : "");
finish = true;
}
NAVI_LOG(SCHEDULE2,
"check finish [%d], forceStop [%d], stopSchedule [%d], computeId: %ld, hasOutput [%d], outputFinish "
"[%d], inputFinish [%d], inputFinishConsumed [%d]",
finish,
_forceStop,
_stopSchedule,
getComputeId(),
hasOutput(),
outputFinish(),
inputFinish(),
inputFinishConsumed());
if (finish) {
_scheduleStat = SS_FINISH;
// the last edge is finish edge, see creator
auto finishEdge = _controlOutputs[CONTROL_PORT_FINISH_INDEX];
if (finishEdge) {
finishEdge->setEof(this);
}
propagateEof();
if (likely(!_skipDeleteKernel)) {
deleteKernel();
}
}
return finish;
}
void Node::propagateEof() {
for (const auto &edgeOutputInfo : _inputs) {
edgeOutputInfo.setEof(this);
}
for (auto inputGroup : _inputGroups) {
inputGroup->setEof(this);
}
doForceStop();
}
void Node::doForceStop() {
if (_forceStop) {
bool expect = false;
if (!_forceStopped.compare_exchange_weak(expect, true)) {
return;
}
for (auto edge : _outputs) {
if (edge) {
edge->forceEof();
}
}
for (auto group : _outputGroups) {
group->forceEof();
}
if (_port && _port->getIoType() == IOT_OUTPUT) {
_port->setEofAll();
}
}
}
bool Node::isInline() const {
if (unlikely(!_creatorStats)) {
return true;
}
if (!_kernel) {
auto createLatency = _creatorStats->getCreateAvgLatency();
return createLatency < INLINE_LATENCY_THRESHOLD_US;
} else if (!_kernelInited) {
auto initLatency = _creatorStats->getInitAvgLatency();
return initLatency < INLINE_LATENCY_THRESHOLD_US;
} else {
return !_kernel->isExpensive();
}
}
size_t Node::outputDegree() const {
return _outputDegree;
}
void Node::compute(const ScheduleInfo &schedInfo)
{
NaviLoggerScope scope(getLogger());
// critical region under SS_RUNNING stat;
assert(SS_RUNNING == _scheduleStat);
auto *graph = _graph->getGraph();
if (graph->isTimeout()) {
NAVI_LOG(ERROR, "compute failed, graph[%p] timeout", graph);
graph->notifyTimeout();
return;
}
auto snapshot = readNodeSnapshot();
auto computeId = getComputeId() + 1;
ErrorCode ec = EC_NONE;
bool needCreateKernel = !_kernel;
bool needInitKernel = _kernel && !_kernelInited;
if (_inlineConfigAndInit) {
needInitKernel = !_kernelInited;
if (needCreateKernel && !createKernel(schedInfo)) {
ec = EC_CREATE_KERNEL;
NAVI_LOG(ERROR, "create kernel failed, error [%d], abort kernel", ec);
} else if (needInitKernel && !initKernel(schedInfo)) {
ec = EC_INIT_GRAPH;
NAVI_LOG(ERROR, "init kernel failed, error [%d], abort kernel", ec);
} else {
NAVI_LOG(SCHEDULE1, "begin compute, snapshot: %ld, compute id: %ld",
snapshot, computeId);
ec = doCompute(schedInfo);
NAVI_LOG(SCHEDULE1, "end compute, compute id: %ld", computeId);
}
} else {
if (needCreateKernel) {
if (!createKernel(schedInfo)) {
ec = EC_CREATE_KERNEL;
NAVI_LOG(ERROR, "create kernel failed, error [%d], abort kernel", ec);
}
} else if (needInitKernel) {
if (!initKernel(schedInfo)) {
ec = EC_INIT_GRAPH;
NAVI_LOG(ERROR, "init kernel failed, error [%d], abort kernel", ec);
}
} else {
NAVI_LOG(SCHEDULE1, "begin compute, snapshot: %ld, compute id: %ld", snapshot, computeId);
ec = doCompute(schedInfo);
NAVI_LOG(SCHEDULE1, "end compute, compute id: %ld", computeId);
}
}
if (EC_NONE != ec) {
NAVI_LOG(ERROR,
"compute error, error code [%s], compute id: %ld, abort kernel",
CommonUtil::getErrorString(ec), computeId);
notifyFinish(ec);
checkFinish(ec);
return;
}
if (_stopAfterInit && needInitKernel) {
setFrozen(true);
}
// critical region
if (!checkFinish(ec)) {
// block other thread and check snapshot id
cmpxchg(&_scheduleStat, SS_RUNNING, SS_IDLE);
auto newSnapshot = readNodeSnapshot();
auto needSchedule =
(newSnapshot != snapshot) || needCreateKernel || needInitKernel;
if (needSchedule) {
NAVI_LOG(
SCHEDULE2,
"reschedule, prev snapshot [%ld], current snapshot [%ld]",
snapshot, newSnapshot);
schedule(this);
} else {
NAVI_LOG(SCHEDULE2, "no need reschedule");
}
} else {
auto nodeType = _def->type();
if (NT_SCOPE_TERMINATOR == nodeType) {
auto error =
_graph->getParam()->graphResult->makeError(_logger.logger, ec);
_graph->terminateScope(this, getScope(), error);
}
}
}
ErrorCode Node::doCompute(const ScheduleInfo &schedInfo) {
clearControlInput();
KernelComputeContext ctx(this);
ErrorCode ec = EC_NONE;
{
const auto ¶m = _graph->getParam()->runParams;
ComputeScope scope(_metric.get(), GET_KERNEL_COMPUTE,
_graph->getPartId(), _graph->getWorker(), schedInfo,
param.collectMetric(), param.collectPerf());
try {
ec = _kernel->compute(ctx);
} catch (const autil::legacy::ExceptionBase &e) {
NAVI_KERNEL_LOG(ERROR, "kernel compute failed, exception: [%s]", e.GetMessage().c_str());
ec = EC_ABORT;
}
if (ec == EC_NONE && !getAsyncInfo() && ctx.deadLock()) {
ec = EC_DEADLOCK;
}
scope.setErrorCode(ec);
}
resetIOSnapshot();
return ec;
}
void Node::clearControlInput() {
if (INVALID_INDEX == _controlInputGroupIndex) {
return;
}
InputSnapshot *begin = nullptr;
InputSnapshot *end = nullptr;
snapshotGroupInput(_controlInputGroupIndex, begin, end);
for (; begin != end; begin++) {
if (begin->data.data.use_count() == 1) {
destructData(std::move(begin->data.data));
}
begin->data.reset();
}
}
void Node::resetIOSnapshot() {
for (auto &input: _inputSnapshotVec) {
if (input.data.data.use_count() == 1) {
destructData(std::move(input.data.data));
}
input.reset();
}
_outputSnapshotVec->setReady(false);
}
LocalSubGraph *Node::getGraph() const {
return _graph;
}
void Node::destructData(DataPtr &&data) const {
auto worker = _graph->getWorker();
auto item = new DataDestructItem(
worker, _graph->getParam()->runParams.collectPerf(), std::move(data));
worker->schedule(item);
}
const KernelCreator *Node::getKernelCreator() const {
return _kernelCreator;
}
void Node::incNodeSnapshot() {
// snapshot can't be equal
atomic_inc(&_nodeSnapshot);
}
int64_t Node::readNodeSnapshot() const {
return atomic_read(&_nodeSnapshot);
}
int64_t Node::getComputeId() const {
return _metric->scheduleCount();
}
KernelMetric *Node::getMetric() const {
return _metric.get();
}
void Node::collectForkGraphMetric() const {
if (!_forkGraph) {
return;
}
_forkGraph->collectMetric();
}
ErrorCode Node::fork(GraphDef *graphDef, const ForkGraphParam ¶m) {
if (!graphDef) {
NAVI_LOG(ERROR, "null graph def");
return EC_FORK;
}
NAVI_LOG(SCHEDULE1, "fork graph: %s", graphDef->DebugString().c_str());
autil::ScopedLock lock(_forkLock);
if (_graph->terminated()) {
// see terminate
multi_call::freeProtoMessage(graphDef);
NAVI_LOG(ERROR, "fork failed, graph terminated");
return EC_FORK;
}
if (_forkGraph) {
multi_call::freeProtoMessage(graphDef);
NAVI_LOG(ERROR, "double fork");
return EC_FORK;
}
if (!checkForkGraph(graphDef)) {
NAVI_LOG(ERROR, "invalid fork graph def");
return EC_FORK;
}
auto errorAsEof = param.errorAsEof;
auto parentGraph = _graph->getGraph();
auto graphParam = new GraphParam();
auto ec = graphParam->init(_graph->getGraphId(), _graph->getBizName(),
_name, *_graph->getParam(), _logger.logger,
param, getRemainTimeMs());
if (EC_NONE != ec) {
delete graphParam;
NAVI_LOG(ERROR, "init fork graph param failed");
return EC_ABORT;
}
_forkGraphDef = graphDef;
_forkGraph = new Graph(graphParam, parentGraph);
auto forkDomain = new GraphDomainFork(_forkGraph, _graph->getBiz(), this,
_graph->getPartCount(),
_graph->getPartId(), errorAsEof);
GraphDomainPtr domain(forkDomain);
_forkGraph->setForkInfo(PARENT_GRAPH_ID, domain, this, errorAsEof);
_forkGraph->setSubResourceMap(param.subResourceMaps);
ec = _forkGraph->init(_forkGraphDef);
if (EC_NONE != ec) {
_forkGraph->notifyFinishEc(ec);
return ec;
}
if (!bindDomain(forkDomain)) {
return EC_FORK;
}
ec = _forkGraph->run();
if (EC_NONE != ec) {
if (!errorAsEof) {
_forkGraph->notifyFinishEc(ec);
return ec;
} else {
NAVI_LOG(DEBUG, "fork graph failed [%s], error as eof",
CommonUtil::getErrorString(ec));
return EC_NONE;
}
} else {
NAVI_LOG(DEBUG, "fork graph success");
return EC_NONE;
}
}
bool Node::checkForkGraph(const GraphDef *graphDef) const {
bool hasParentGraph = false;
auto subCount = graphDef->sub_graphs_size();
for (int i = 0; i < subCount; i++) {
auto subGraphId = graphDef->sub_graphs(i).graph_id();
if (USER_GRAPH_ID == subGraphId) {
NAVI_LOG(ERROR, "fork graph can't has graph output");
return false;
}
if (PARENT_GRAPH_ID == subGraphId) {
hasParentGraph = true;
break;
}
}
if (hasParentGraph) {
return true;
} else {
NAVI_LOG(ERROR,
"invalid fork graph, no fromInput or toOutput defined ");
return false;
}
}
void Node::terminate(ErrorCode ec) {
{
// _graph->terminated is true,
// lock success means no forking process
autil::ScopedLock lock(_forkLock);
}
auto asyncInfo = getAsyncInfo();
if (asyncInfo) {
setAsyncInfo(nullptr);
asyncInfo->terminate();
}
if (_forkGraph) {
_forkGraph->notifyFinish(nullptr, GFT_PARENT_FINISH);
}
}
bool Node::bindDomain(GraphDomainFork *domain) {
// replace output first
for (size_t i = 0; i < _outputs.size(); i++) {
auto edge = _outputs[i];
const auto &portName = _kernelCreator->getOutputPortName(i);
auto peer = domain->getOutputPeer(portName);
if (!peer.isValid()) {
if (edge && !edge->isEof()) {
NAVI_LOG(ERROR,
"unfinished output must be bound by fork, output: %s",
portName.c_str());
return false;
}
continue;
}
if (!edge) {
peer.setEof(this);
continue;
}
edge->bindInputNode(peer);
}
for (IndexType index = 0; index < _outputGroups.size(); index++) {
auto group = _outputGroups[index];
PortIndex groupIndex(INVALID_INDEX, index);
auto groupName = _kernelCreator->getOutputPortName(groupIndex);
auto groupSize = group->getOutputCount();
for (size_t i = 0; i < groupSize; i++) {
auto edge = group->getEdge(i);
auto portName = CommonUtil::getGroupPortName(groupName, i);
auto peer = domain->getOutputPeer(portName);
if (!peer.isValid()) {
if (edge && !edge->isEof()) {
NAVI_LOG(ERROR,
"unfinished group output must be bound by fork, output: %s, index: %lu",
portName.c_str(),
i);
return false;
}
continue;
}
if (!edge) {
peer.setEof(this);
continue;
}
edge->bindInputNode(peer);
}
}
for (size_t i = 0; i < _inputs.size(); i++) {
const auto &edgeOutputInfo = _inputs[i];
auto edge = edgeOutputInfo.getEdge();
auto slotId = edgeOutputInfo.getSlotId();
auto portName = _kernelCreator->getInputPortName(i);
auto peer = domain->getInputPeer(portName);
if (!peer) {
if (!edge->isEof()) {
NAVI_LOG(ERROR,
"unfinished input must be bound by fork, input: %s",
portName.c_str());
return false;
}
continue;
}
edge->bindOutputNode(slotId, peer);
}
auto inputGroupCount = _kernelCreator->inputGroupSize();
// filter out control and resource input group
for (IndexType index = 0; index < inputGroupCount; index++) {
auto group = _inputGroups[index];
PortIndex groupIndex(INVALID_INDEX, index);
auto groupName = _kernelCreator->getInputPortName(groupIndex);
auto groupSize = group->getInputCount();
for (size_t i = 0; i < groupSize; i++) {
auto edgeOutputInfo = group->getEdgeOutputInfo(i);
auto edge = edgeOutputInfo.getEdge();
auto slotId = edgeOutputInfo.getSlotId();
auto portName = CommonUtil::getGroupPortName(groupName, i);
auto peer = domain->getInputPeer(portName);
if (!peer) {
if (!edge->isEof()) {
NAVI_LOG(ERROR,
"unfinished input group must be bound by fork, input: %s, index: %lu",
portName.c_str(),
i);
return false;
}
continue;
}
edge->bindOutputNode(slotId, peer);
}
}
if (!domain->checkBind()) {
return false;
}
domain->flushData();
return true;
}
bool Node::getInput(PortIndex index, StreamData &streamData) {
IndexType inputIndex = 0;
auto inputInfo = getInputInfo(index, inputIndex);
if (!inputInfo.isValid()) {
return false;
}
const auto &inputSnapshot = doFillInput(inputInfo, inputIndex);
streamData = inputSnapshot.data;
return true;
}
const InputSnapshot &Node::doFillInput(const EdgeOutputInfo &info,
IndexType inputIndex)
{
assert(inputIndex < _inputSnapshotVec.size());
auto &inputSnapshot = _inputSnapshotVec[inputIndex];
if (!inputSnapshot.valid) {
auto ret = info.getData(inputSnapshot.data.data, inputSnapshot.data.eof, this);
inputSnapshot.data.hasValue = ret;
inputSnapshot.valid = true;
}
return inputSnapshot;
}
size_t Node::getInputGroupCount() const {
const auto *kernelDef = _kernelCreator->def();
return kernelDef->input_groups().size();
}
bool Node::getInputGroupSize(IndexType group, size_t &size) {
if (group < _inputGroups.size()) {
size = _inputGroups[group]->getInputCount();
return true;
} else {
return false;
}
}
bool Node::getOutputGroupSize(IndexType group, size_t &size) {
if (group < _outputGroups.size()) {
size = _outputGroups[group]->getOutputCount();
return true;
} else {
return false;
}
}
size_t Node::getOutputGroupCount() const {
return _outputGroups.size();
}
NaviErrorPtr Node::getScopeError() const {
return _graph->getScopeError(getScope());
}
ErrorHandleStrategy Node::getScopeErrorHandleStrategy() const {
return _graph->getScopeErrorHandleStrategy(getScope());
}
int64_t Node::getRemainTimeMs() const {
return _graph->getGraph()->getRemainTimeMs();
}
const TimeoutChecker *Node::getTimeoutChecker() const {
return _graph->getGraph()->getTimeoutChecker();
}
bool Node::snapshotGroupInput(IndexType group,
InputSnapshot *&begin,
InputSnapshot *&end)
{
if (group < _inputGroups.size()) {
const auto &inputGroup = _inputGroups[group];
auto size = inputGroup->getInputCount();
auto inputIndexStart = inputGroup->getInputIndexStart();
auto inputIndex = inputIndexStart;
for (size_t i = 0; i < size; i++) {
doFillInput(inputGroup->getEdgeOutputInfo(i), inputIndex);
inputIndex++;
}
begin = &_inputSnapshotVec[inputIndexStart];
end = begin + size;
return true;
} else {
return false;
}
}
bool Node::setOutput(PortIndex index, const DataPtr &data, bool eof) {
IndexType outputIndex = 0;
Edge *edge = nullptr;
if (!getOutputEdge(index, edge, outputIndex)) {
return false;
}
if (!edge) {
// output not connected
return true;
}
if (_outputSnapshotVec->isReady(outputIndex)) {
NAVI_LOG(ERROR, "output data override, group [%d] index [%d] edge[%p]",
index.group, index.index, edge);
notifyFinish(EC_DATA_OVERRIDE);
return false;
}
if (_outputSnapshotVec->isFinish(outputIndex)) {
NAVI_LOG(ERROR,
"output data after eof, group [%d] index [%d] edge "
"[%p], new data [%p], new eof [%d], %ld",
index.group, index.index, edge, data.get(), eof, getComputeId());
notifyFinish(EC_DATA_AFTER_EOF);
return false;
}
_outputSnapshotVec->setReady(outputIndex, true);
_outputSnapshotVec->setFinish(outputIndex, eof);
return edge->setData(this, data, eof);
}
EdgeOutputInfo Node::getInputInfo(PortIndex index, IndexType &inputIndex) const {
if (_forkGraph) {
NAVI_LOG(ERROR, "input not available after fork");
return {};
}
if (!index.isGroup()) {
if (index.index < _inputs.size()) {
inputIndex = index.index;
return _inputs[index.index];
} else {
NAVI_LOG(ERROR, "invalid input port index [%d], node input size [%lu]",
index.index, _inputs.size());
return {};
}
} else {
if (index.group < _inputGroups.size()) {
auto group = _inputGroups[index.group];
auto inputCount = group->getInputCount();
if (index.index < inputCount) {
inputIndex = group->getInputIndexStart() + index.index;
return group->getEdgeOutputInfo(index.index);
} else {
NAVI_LOG(ERROR,
"invalid input port index [%d], group [%d] input size [%lu]",
index.index, index.group, inputCount);
return {};
}
} else {
NAVI_LOG(ERROR,
"invalid input group index [%d], node input group count [%lu]",
index.group, _inputGroups.size());
return {};
}
}
}
bool Node::getOutputEdge(PortIndex index, Edge *&edge,
IndexType &outputIndex) const
{
if (_forkGraph) {
NAVI_LOG(ERROR, "output not available after fork");
return false;
}
if (!index.isGroup()) {
if (index.index < _outputs.size()) {
outputIndex = index.index;
edge = _outputs[index.index];
return true;
} else {
NAVI_LOG(ERROR,
"invalid output port index [%d], node output size [%lu]",
index.index, _outputs.size());
return false;
}
} else {
if (index.group < _outputGroups.size()) {
auto group = _outputGroups[index.group];
auto outputCount = group->getOutputCount();
if (index.index < outputCount) {
outputIndex = group->getOutputIndexStart() + index.index;
edge = group->getEdge(index.index);
return true;
} else {
NAVI_LOG(ERROR,
"invalid output port index [%d], group [%d] output size [%lu]",
index.index, index.group, outputCount);
return false;
}
} else {
NAVI_LOG(ERROR,
"invalid output group index [%d], node output group count [%lu]",
index.group, _outputGroups.size());
return false;
}
}
}
Kernel *Node::getKernel() const {
return _kernel;
}
void Node::setFrozen(bool frozen) {
_frozen = frozen;
}
bool Node::isFinished() const {
return SS_FINISH == _scheduleStat;
}
bool Node::getIORange(IoType ioType, const std::string &name, PortIndex &start,
PortIndex &end) const
{
PortInfo portInfo;
if (IOT_INPUT == ioType) {
portInfo = _kernelCreator->getInputPortInfo(name);
} else {
portInfo = _kernelCreator->getOutputPortInfo(name);
}
if (!portInfo.isValid()) {
NAVI_LOG(ERROR, "no %s port named [%s]", CommonUtil::getIoType(ioType),
name.c_str());
return false;
}
if (portInfo.isGroup()) {
auto groupIndex = portInfo.index.group;
IndexType portCount = 0;
if (IOT_INPUT == ioType) {
portCount = _inputGroups[groupIndex]->getInputCount();
} else {
portCount = _outputGroups[groupIndex]->getOutputCount();
}
start.group = groupIndex;
start.index = 0;
end.group = groupIndex;
end.index = portCount;
} else {
start.group = INVALID_INDEX;
start.index = portInfo.index.index;
end.group = INVALID_INDEX;
end.index = start.index + 1;
}
return true;
}
bool Node::getInputNode(PortIndex index, std::string &node) const {
IndexType inputIndex = 0;
auto inputInfo = getInputInfo(index, inputIndex);
if (!inputInfo.isValid()) {
return false;
}
node = inputInfo.getInputNodeName();
return true;
}
size_t Node::getInputCount() const {
return _inputSnapshotVec.size();
}
size_t Node::getOutputCount() const {
auto sum = _outputs.size();
for (auto group : _outputGroups) {
sum += group->getOutputCount();
}
return sum;
}
void Node::updateTraceLevel(const std::string &levelStr) {
NAVI_LOG(SCHEDULE1, "update traceLevel[%s]", levelStr.c_str());
_logger.logger->updateTraceLevel(levelStr);
_graph->getParam()->runParams.setTraceLevel(levelStr);
}
bool Node::updateTimeoutMs(int64_t timeoutMs) {
NAVI_LOG(SCHEDULE1, "update timeoutMs[%ld]", timeoutMs);
return _graph->getGraph()->updateTimeoutMs(timeoutMs);
}
void Node::updateCollect(bool collectMetric, bool collectPerf) const {
auto &runParams = _graph->getParam()->runParams;
runParams.setCollectMetric(collectMetric);
runParams.setCollectPerf(collectPerf);
}
void Node::collectTrace(std::vector<std::string> &traceVec) const {
TraceCollector collector;
SymbolTableDef symbolTableDef;
_graph->getParam()->graphResult->collectTrace(collector, symbolTableDef);
collector.format(traceVec);
}
std::shared_ptr<multi_call::GigStreamRpcInfoMap> Node::getRpcInfoMap() const {
return _graph->getParam()->graphResult->getRpcInfoMap();
}
void Node::setErrorCode(ErrorCode ec) {
_graph->getParam()->graphResult->setError(_logger.logger, ec);
}
void Node::notifyFinish(ErrorCode ec) {
auto error = _graph->getParam()->graphResult->makeError(_logger.logger, ec);
_graph->notifyFinish(this, error);
}
void Node::notifyFinish(const NaviErrorPtr &error) {
_graph->notifyFinish(this, error);
}
void Node::reportError(const NaviErrorPtr &error) {
_graph->setScopeError(getScope(), error);
auto forkNode = _graph->getGraph()->getForkNode();
if (forkNode) {
forkNode->reportError(error);
} else {
_graph->getParam()->graphResult->updateError(error);
}
}
}