aios/navi/example/ExampleEngine.cpp (1,128 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "autil/StringUtil.h"
#include "autil/EnvUtil.h"
#include "autil/ThreadPool.h"
#include "fslib/fslib.h"
#include "navi/builder/GraphBuilder.h"
#include "navi/engine/Navi.h"
#include "navi/example/TestData.h"
#include "navi/example/TestResource.h"
#include "navi/proto/GraphDef.pb.h"
#include "navi/proto/GraphVis.pb.h"
#include "navi/test_cluster/NaviTestCluster.h"
#include "navi/util/CommonUtil.h"
#include "unittest/unittest.h"
#include <arpc/common/LockFreeQueue.h>
#include <fstream>
#include <iostream>
#include <unistd.h>
using namespace std;
namespace navi {
class ModuleManager;
class NaviExampleTest : public TESTBASE {
public:
void setUp();
void tearDown();
protected:
std::string _loader;
GraphDef *_graphDef = nullptr;
private:
std::unique_ptr<autil::EnvGuard> _naviPythonHome;
public:
static const size_t TEST_COUNT = 1;
};
void NaviExampleTest::setUp() {
// for aios test
_naviPythonHome.reset(
new autil::EnvGuard("NAVI_PYTHON_HOME", NAVI_TEST_PYTHON_HOME));
_loader = NAVI_TEST_DATA_PATH + "test_config_loader.py";
_graphDef = new GraphDef();
}
void NaviExampleTest::tearDown() {
DELETE_AND_SET_NULL(_graphDef);
}
NaviResultPtr showData(const NaviUserResultPtr &result, bool show, int64_t expectCount,
int64_t &dataCount, std::vector<NaviUserData> &dataVec,
std::vector<NaviResultPtr> &resultVec)
{
while (true) {
NaviUserData data;
bool eof = false;
if (result->nextData(data, eof)) {
// if (data.data) {
// std::cout << data.name << ": ";
// data.data->show();
// }
dataVec.push_back(data);
}
if (eof) {
break;
}
}
auto naviResult = result->getNaviResult();
resultVec.push_back(naviResult);
return naviResult;
}
void showResultAndSaveVis(const NaviUserResultPtr &userResult) {
// auto visProto = userResult->getNaviResult()->getVisProto();
// std::ofstream ofile(
// "/home/zhang7/alibaba/aios/navi/http_server/test_graph.vis");
// ofile << visProto->SerializeAsString();
auto rpcInfoMap = userResult->getNaviResult()->getRpcInfoMap();
ASSERT_TRUE(rpcInfoMap);
std::cout << "rpcInfo: " << std::endl;
for (const auto &pair : *rpcInfoMap) {
std::cout << "[" << pair.first.first << "->" << pair.first.second
<< "] " << std::endl
<< autil::StringUtil::toString(pair.second, "\n")
<< std::endl;
}
}
static std::string randItem(const std::vector<std::string> &itemList) {
return itemList[0];
return "server_biz_1";
auto rand = CommonUtil::random64();
return itemList[rand % itemList.size()];
}
bool buildGraph1(const std::string &outputName,
GraphDef *graphDef,
const std::vector<std::string> &bizList,
OverrideData &overrideData)
{
std::string splitKernel("StringSplitKernel");
std::string mergeKernel("StringMergeKernel");
GraphBuilder builder(graphDef);
builder.newSubGraph(randItem(bizList));
auto clientWorld1 = builder.node("client_world_1").kernel("HelloKernel");
auto clientHello1 = builder.node("client_hello_1").kernel("WorldKernel");
auto clientHelloDup1 = builder.node("client_hello_dup_1").kernel("WorldKernel");
auto fork = builder.node("fork").kernel("SubGraphKernel");
auto afterFork = builder.node("after_fork").kernel("WorldKernel");
clientWorld1.out("output1").to(clientHello1.in("input1"));
fork.in("input1").from(clientHelloDup1.out("output1"));
afterFork.in("input1").from(fork.out("output1"));
clientHello1.out("output1").asGraphOutput(outputName);
afterFork.out("output1").asGraphOutput(outputName + "_dup");
fork.out("output2").asGraphOutput(outputName + "_dupdup");
builder.newSubGraph(randItem(bizList));
auto source1 = builder.node("source_node_1").kernel("SourceKernel").jsonAttrs(R"json({"times" : 10})json");
source1.out("output1").to(clientWorld1.in("input1")).merge(mergeKernel);
source1.out("output1").to(clientHelloDup1.in("input1")).merge(mergeKernel);
auto sleep = builder.node("sleep").kernel("SleepKernel");
sleep.in("input1").from(source1.out("output1"));
clientHelloDup1.dependOn(sleep.out("output1"));
return builder.ok();
}
bool buildGraph2(const std::string &outputName,
GraphDef *graphDef,
const std::vector<std::string> &bizList)
{
std::string splitKernel("StringSplitKernel");
std::string mergeKernel("StringMergeKernel");
GraphBuilder builder(graphDef);
builder.newSubGraph("server_biz_1");
auto clientHello2 = builder.node("client_hello_2").kernel(randItem({"WorldKernel", "SubGraphKernel"}));
auto clientWorld2 = builder.node("client_world_2").kernel(randItem({"WorldKernel", "SubGraphKernel"}));
clientWorld2.out("output1").asGraphOutput(outputName);
builder.newSubGraph("server_biz");
auto source2 = builder.node("source_node_2")
.kernel("SourceKernel")
.jsonAttrs(R"json({"times" : 20})json");
auto sourceE = source2.out("output1").to(clientHello2.in("input1"));
sourceE.split(splitKernel);
sourceE.merge(mergeKernel);
builder.newSubGraph("server_biz_2");
auto remoteHello2 = builder.node("remote_hello_2")
.kernel(randItem({"WorldKernel", "SubGraphKernel"}));
auto remoteE = remoteHello2.in("input1").from(clientHello2.out("output1"));
remoteE.split(splitKernel);
remoteE.merge(mergeKernel);
auto remoteWorld2 = builder.node("remote_world_2").kernel(randItem({"WorldKernel", "SubGraphKernel"}));
remoteWorld2.in("input1").from(remoteHello2.out("output1"));
builder.newSubGraph("client_biz");
auto remoteHello22 = builder.node("remote_hello_2")
.kernel(randItem({"WorldKernel", "SubGraphKernel"}));
auto remote22E = remoteHello22.in("input1").from(remoteWorld2.out("output1"));
remote22E.split(splitKernel);
remote22E.merge(mergeKernel);
auto remoteWorld22 = builder.node("remote_world_2").kernel(randItem({"WorldKernel", "SubGraphKernel"}));
remoteWorld22.in("input1").from(remoteHello22.out("output1"));
auto e3 = remoteWorld22.out("output1").to(clientWorld2.in("input1"));
e3.split(splitKernel);
e3.merge(mergeKernel);
return builder.ok();
}
bool buildGraph3(const std::string &outputName,
GraphDef *graphDef,
const std::vector<std::string> &bizList)
{
std::string splitKernel("StringSplitKernel");
std::string mergeKernel("StringMergeKernel");
GraphBuilder builder(graphDef);
builder.newSubGraph("server_biz_1");
auto source1 = builder.node("source_node_1").kernel("TFSourceKernel").jsonAttrs(R"json({"times" : 20})json");
auto clientWorld3 = builder.node("client_world_3").kernel("tf.TFIdentityOp");
clientWorld3.in("input_content").from(source1.out("output1"));
clientWorld3.in("input_list").autoNext().from(source1.out("output1"));
clientWorld3.in("input_list").autoNext().from(source1.out("output1"));
clientWorld3.in("input_list").autoNext().from(source1.out("output1"));
size_t count = 20 + 10000;
for (size_t i = 4; i < count; i++) {
std::string current = "client_world_" + autil::StringUtil::toString(i);
auto pre = builder.node("client_world_" + autil::StringUtil::toString(i - 1));
auto n = builder.node(current).kernel("tf.TFIdentityOp");
n.in("input_content").from(pre.out("output_content"));
n.in("input_list").autoNext().from(pre.out("output_list").autoNext());
n.in("input_list").autoNext().from(pre.out("output_list").autoNext());
n.in("input_list").autoNext().from(pre.out("output_list").autoNext());
}
std::string outputNode = "client_world_" + autil::StringUtil::toString(count - 1);
auto outN = builder.node(outputNode);
outN.out("output_content").asGraphOutput(outputName);
outN.out("output_list").index(2).asGraphOutput(outputName + "_ccc");
outN.out("output_list").index(0).asGraphOutput(outputName + "_aaa");
outN.out("output_list").index(1).asGraphOutput(outputName + "_bbb");
outN.out("output_list").index(1).asGraphOutput(outputName + "_ddd");
return builder.ok();
}
bool buildGraph4(const std::string &outputName,
GraphDef *graphDef,
const std::vector<std::string> &bizList)
{
std::string splitKernel("StringSplitKernel");
std::string mergeKernel("StringMergeKernel");
GraphBuilder builder(graphDef);
builder.newSubGraph("server_biz_1");
auto clientWorld1 = builder.node("client_world_1").kernel("WorldKernel");
auto clientHello1 = builder.node("client_hello_1").kernel("HelloKernel");
clientWorld1.out("output1").asGraphOutput(outputName);
builder.newSubGraph(randItem(bizList));
auto source1 = builder.node("source_node_1").kernel("SourceKernel").jsonAttrs(R"json({"times" : 20})json");
source1.out("output1").to(clientHello1.in("input1"));
auto remoteHello1 = builder.node("remote_hello_1").kernel("HelloKernel");
auto e1 = remoteHello1.in("input1").from(clientHello1.out("output1"));
e1.split(splitKernel);
e1.merge(mergeKernel);
auto e2 = remoteHello1.out("output1").to(clientWorld1.in("input1"));
e2.split(splitKernel);
e2.merge(mergeKernel);
return builder.ok();
}
TEST_F(NaviExampleTest, testDistributeChain) {
ResourceMapPtr rootResourceMap(new ResourceMap());
std::string configPath = NAVI_TEST_DATA_PATH + "config/cluster/";
std::string biz1Config = NAVI_TEST_DATA_PATH + "config/cluster/biz1.py";
NaviTestCluster cluster;
ASSERT_TRUE(
cluster.addServer("host_0", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_1", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_2", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_3", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_4", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_5", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_6", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_7", _loader, configPath, rootResourceMap));
std::vector<std::string> hostList = { "host_0", "host_1", "host_2",
"host_3", "host_4", "host_5",
"host_6", "host_7" };
ASSERT_TRUE(cluster.addBiz("host_1", "server_biz", 1, 0, biz1Config));
// ASSERT_TRUE(cluster.addBiz("host_2", "server_biz", 3, 1, biz1Config));
// ASSERT_TRUE(cluster.addBiz("host_3", "server_biz", 3, 2, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_4", "server_biz_2", 2, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_5", "server_biz_2", 2, 1, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_5", "server_biz_1", 4, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_6", "server_biz_1", 4, 1, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_3", "server_biz_1", 4, 2, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_4", "server_biz_1", 4, 3, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_7", "client_biz", 1, 0, biz1Config));
ASSERT_TRUE(cluster.start());
rootResourceMap.reset();
auto bizList = cluster.getBizList();
EXPECT_TRUE(buildGraph2("o1", _graphDef, bizList));
RunGraphParams params;
params.setTimeoutMs(10000000);
params.setTraceLevel("debug");
params.setCollectMetric(true);
params.setCollectPerf(true);
enum RunMode {
RM_SINGLE = 1,
RM_PERF = 2,
};
RunMode mode= RM_PERF;
size_t queryCount = 1;
size_t threadCount = 1;
bool show = false;
if (RM_SINGLE == mode) {
show = true;
} else if (RM_PERF == mode) {
string perfParamEnvStr = autil::EnvUtil::getEnv("perf_param");
if (!perfParamEnvStr.empty()) {
auto vec = autil::StringUtil::split(perfParamEnvStr, ";");
ASSERT_EQ(2u, vec.size());
queryCount = autil::StringUtil::fromString<int32_t>(vec[0]);
threadCount = autil::StringUtil::fromString<int32_t>(vec[1]);
} else {
queryCount = 1;
threadCount = 1;
}
}
NaviLoggerProvider logProvider("error");
bool abort = false;
atomic64_t counter;
atomic_set(&counter, 0);
// std::cout << _graphDef->DebugString() << std::endl;
std::ofstream ofile("/home/zhang7/aios/navi/http_server/test_graph.def");
ofile << _graphDef->SerializeAsString();
auto func = std::bind([&]() {
for (size_t i = 0; i < queryCount; i++) {
// std::cout << "begin round " << i << std::endl;
if (abort) {
break;
}
auto thisShow = show;
thisShow = true;
auto current = atomic_inc_return(&counter);
if (current == 2000) {
// thisShow = true;
std::cout << "show true" << std::endl;
// params.setTraceLevel("");
}
// if (i % 200 == 0) {
// std::cout << "thread: " << pthread_self() << ", round: " << i << std::endl;
// }
// std::string fileContent;
// EXPECT_EQ(fslib::EC_OK,
// fslib::fs::FileSystem::readFile(
// "/home/zhang7/aios/aios/navi/graph.serialize",
// fileContent));
std::unique_ptr<GraphDef> defPtr(new GraphDef());
auto def = defPtr.get();
// def->CopyFrom(*_graphDef);
bool load = false;
OverrideData overrideData;
if (load) {
// ASSERT_TRUE(def->ParseFromString(fileContent));
} else {
// EXPECT_TRUE(buildGraph1("o1", def, bizList, overrideData));
// EXPECT_TRUE(buildGraph2("o2", def, bizList));
// EXPECT_TRUE(buildGraph2("o22", def, bizList));
// EXPECT_TRUE(buildGraph2("o3", def, bizList));
// EXPECT_TRUE(buildGraph2("o4", def, bizList));
// EXPECT_TRUE(buildGraph2("o5", def, bizList));
// EXPECT_TRUE(buildGraph2("o6", def, bizList));
// EXPECT_TRUE(buildGraph2("o7", def, bizList));
// EXPECT_TRUE(buildGraph1("o8", def, bizList, overrideData));
// EXPECT_TRUE(buildGraph2("o9", def, bizList));
// EXPECT_TRUE(buildGraph1("o10", def, bizList, overrideData));
// EXPECT_TRUE(buildGraph2("o11", def, bizList));
// EXPECT_TRUE(buildGraph2("o12", def, bizList));
EXPECT_TRUE(buildGraph4("o13", def, bizList));
EXPECT_TRUE(buildGraph4("o14", def, bizList));
}
int64_t expectCount = 20;
int64_t dataCount = 0;
auto graphStr = def->SerializeAsString();
auto host = randItem(hostList);
// std::cout << host << std::endl;
auto navi = cluster.getNavi("host_1");
auto runParams = params;
// runParams.overrideEdgeData(overrideData);
auto t1 = autil::TimeUtility::currentTime();
auto userResult = navi->runGraph(defPtr.release(), runParams);
auto t2 = autil::TimeUtility::currentTime();
std::cout << "t2 - t1: " << t2 - t1 << std::endl;
std::vector<NaviUserData> dataVec;
std::vector<NaviResultPtr> resultVec;
auto naviResult = showData(userResult, thisShow, expectCount,
dataCount, dataVec, resultVec);
dataVec.clear();
resultVec.clear();
showResultAndSaveVis(userResult);
}
std::cout << "thread exited" << std::endl;
});
std::vector<autil::ThreadPtr> threads;
for (size_t i = 0; i < threadCount; i++) {
threads.push_back(autil::Thread::createThread(
func, "test_thread_" + autil::StringUtil::toString(i)));
}
threads.clear();
// usleep(20000 * 1000);
// for (const auto &data : dataVec) {
// std::cout << "session: "
// << CommonUtil::formatSessionId(data.id)
// << std::endl;
// if (data.data) {
// std::cout << "node: " << data.node << ", port: " << data.port
// << ": ";
// data.data->show();
// } else {
// std::cout << "null data" << std::endl;
// }
// }
// for (const auto &result : resultVec) {
// result->show();
// }
std::cout << "begin destruct" << std::endl;
// usleep(1 * 1000 * 1000);
}
bool buildGraphFork(const std::string &outputName,
GraphDef *graphDef,
const std::vector<std::string> &bizList)
{
GraphBuilder builder(graphDef);
builder.newSubGraph(randItem(bizList));
auto sourceFork = builder.node("source_node_fork").kernel("SourceKernel").jsonAttrs(R"json({"times" : 20})json");
auto currentScope = builder.getCurrentScope();
builder.addScope();
auto fork = builder.node("fork").kernel("SubGraphKernel");
builder.scope(currentScope);
auto clientWorldFork = builder.node("client_world_fork").kernel("WorldKernel");
fork.in("input1").from(sourceFork.out("output1"));
clientWorldFork.in("input1").from(fork.out("output2"));
clientWorldFork.out("output1").asGraphOutput(outputName);
return builder.ok();
}
TEST_F(NaviExampleTest, testFork) {
ResourceMapPtr rootResourceMap(new ResourceMap());
std::string configPath = NAVI_TEST_DATA_PATH + "config/cluster/";
std::string biz1Config = NAVI_TEST_DATA_PATH + "config/cluster/biz1.py";
NaviTestCluster cluster;
ASSERT_TRUE(
cluster.addServer("host_0", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_1", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_2", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_3", _loader, configPath, rootResourceMap));
std::vector<std::string> hostList = { "host_0", "host_1", "host_2",
"host_3", "host_4", "host_5",
"host_6", "host_7" };
ASSERT_TRUE(cluster.addBiz("host_0", "client_biz", 1, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_1", "server_biz_1", 3, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_2", "server_biz_1", 3, 1, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_3", "server_biz_1", 3, 2, biz1Config));
ASSERT_TRUE(cluster.start());
rootResourceMap.reset();
auto bizList = cluster.getBizList();
EXPECT_TRUE(buildGraphFork("o1", _graphDef, { "client_biz" }));
RunGraphParams params;
params.setTimeoutMs(1000000000);
params.setTraceLevel("schedule1");
params.setCollectMetric(true);
params.setCollectPerf(true);
enum RunMode {
RM_SINGLE = 1,
RM_PERF = 2,
};
RunMode mode= RM_SINGLE;
size_t queryCount = 1;
size_t threadCount = 1;
bool show = false;
if (RM_SINGLE == mode) {
show = true;
} else if (RM_PERF == mode) {
string perfParamEnvStr = autil::EnvUtil::getEnv("perf_param");
if (!perfParamEnvStr.empty()) {
auto vec = autil::StringUtil::split(perfParamEnvStr, ";");
ASSERT_EQ(2u, vec.size());
queryCount = autil::StringUtil::fromString<int32_t>(vec[0]);
threadCount = autil::StringUtil::fromString<int32_t>(vec[1]);
} else {
queryCount = 1000000;
threadCount = 1;
}
}
bool abort = false;
atomic64_t counter;
atomic_set(&counter, 0);
// std::cout << _graphDef->DebugString() << std::endl;
std::ofstream ofile("./graph.serialize");
ofile << _graphDef->SerializeAsString();
auto func = std::bind([&]() {
for (size_t i = 0; i < queryCount; i++) {
// std::cout << "begin round " << i << std::endl;
if (abort) {
break;
}
auto thisShow = show;
thisShow = true;
auto current = atomic_inc_return(&counter);
if (current == 2000) {
// thisShow = true;
std::cout << "show true" << std::endl;
// params.setTraceLevel("");
}
// if (i % 200 == 0) {
// std::cout << "thread: " << pthread_self() << ", round: " << i << std::endl;
// }
// std::string fileContent;
// EXPECT_EQ(fslib::EC_OK,
// fslib::fs::FileSystem::readFile(
// "/ha3_develop/source_code/navi/graph_31445_3917",
// fileContent));
auto def = new GraphDef();
def->CopyFrom(*_graphDef);
// bool load = false;
// if (load) {
// ASSERT_TRUE(def->ParseFromString(fileContent));
// } else {
// EXPECT_TRUE(buildGraph2("o1", def, bizList));
// EXPECT_TRUE(buildGraph1("o2", def, bizList));
// EXPECT_TRUE(buildGraph2("o3", def, bizList));
// EXPECT_TRUE(buildGraph1("o4", def, bizList));
// EXPECT_TRUE(buildGraph2("o4", def, bizList));
// EXPECT_TRUE(buildGraph3("o6", def, bizList));
// EXPECT_TRUE(buildGraph3("o7", def, bizList));
// }
// if (current % 100 == 0) {
// std::cout << def->DebugString() << std::endl;
// }
int64_t expectCount = 20;
int64_t dataCount = 0;
auto graphStr = def->SerializeAsString();
// auto host = randItem(hostList);
// std::cout << host << std::endl;
auto navi = cluster.getNavi("host_0");
auto userResult = navi->runGraph(def, params);
// std::string graphFileName(
// "./graph_" + autil::StringUtil::toString(current) + "_" +
// autil::StringUtil::toString(
// userResult->getNaviResult()->id.queryId));
// std::ofstream graphFile(graphFileName);
// graphFile << graphStr;
std::vector<NaviUserData> dataVec;
std::vector<NaviResultPtr> resultVec;
auto naviResult = showData(userResult, thisShow, expectCount,
dataCount, dataVec, resultVec);
EXPECT_EQ(9u, dataVec.size());
std::cout << "data count: " << dataVec.size() << std::endl;
dataVec.clear();
resultVec.clear();
showResultAndSaveVis(userResult);
EXPECT_EQ(EC_NONE, naviResult->getErrorCode());
}
std::cout << "thread exited" << std::endl;
});
std::vector<autil::ThreadPtr> threads;
for (size_t i = 0; i < threadCount; i++) {
threads.push_back(autil::Thread::createThread(
func, "test_thread_" + autil::StringUtil::toString(i)));
}
threads.clear();
// for (const auto &data : dataVec) {
// std::cout << "session: "
// << CommonUtil::formatSessionId(data.id)
// << std::endl;
// if (data.data) {
// std::cout << "node: " << data.node << ", port: " << data.port
// << ": ";
// data.data->show();
// } else {
// std::cout << "null data" << std::endl;
// }
// }
// for (const auto &result : resultVec) {
// result->show();
// }
std::cout << "begin destruct" << std::endl;
// usleep(100 * 1000 * 1000);
}
bool buildGraphForkWithTerminator(const std::string &outputName,
GraphDef *graphDef,
const std::vector<std::string> &bizList)
{
GraphBuilder builder(graphDef);
builder.newSubGraph(randItem(bizList));
auto sourceFork = builder.node("source_node_fork").kernel("SourceKernel").jsonAttrs(R"json({"times" : 20})json");
auto currentScope = builder.getCurrentScope();
builder.addScope();
auto fork = builder.node("fork").kernel("SubGraphKernel");
{
auto scopeTerminator = builder.getScopeTerminator();
scopeTerminator.kernel("TestScopeTerminator").out("out").asGraphOutput("scope");
}
builder.scope(currentScope);
auto clientWorldFork = builder.node("client_world_fork").kernel("WorldKernel");
fork.in("input1").from(sourceFork.out("output1"));
clientWorldFork.in("input1").from(fork.out("output1"));
clientWorldFork.out("output1").asGraphOutput(outputName);
{
auto scopeTerminator = builder.getScopeTerminator();
scopeTerminator.kernel("TestScopeTerminator").out("out").asGraphOutput("scope2");
}
return builder.ok();
}
TEST_F(NaviExampleTest, testForkWithTerminator) {
ResourceMapPtr rootResourceMap(new ResourceMap());
std::string configPath = NAVI_TEST_DATA_PATH + "config/cluster/";
std::string biz1Config = NAVI_TEST_DATA_PATH + "config/cluster/biz1.py";
NaviTestCluster cluster;
ASSERT_TRUE(
cluster.addServer("host_0", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_1", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_2", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_3", _loader, configPath, rootResourceMap));
std::vector<std::string> hostList = { "host_0", "host_1", "host_2",
"host_3", "host_4", "host_5",
"host_6", "host_7" };
ASSERT_TRUE(cluster.addBiz("host_0", "client_biz", 1, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_1", "server_biz_1", 3, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_2", "server_biz_1", 3, 1, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_3", "server_biz_1", 3, 2, biz1Config));
ASSERT_TRUE(cluster.start());
rootResourceMap.reset();
auto bizList = cluster.getBizList();
EXPECT_TRUE(buildGraphForkWithTerminator("o1", _graphDef, { "client_biz" }));
RunGraphParams params;
params.setTimeoutMs(1000000000);
params.setTraceLevel("ERROR");
params.setCollectMetric(true);
enum RunMode {
RM_SINGLE = 1,
RM_PERF = 2,
};
RunMode mode= RM_SINGLE;
size_t queryCount = 1;
size_t threadCount = 1;
bool show = false;
if (RM_SINGLE == mode) {
show = true;
} else if (RM_PERF == mode) {
string perfParamEnvStr = autil::EnvUtil::getEnv("perf_param");
if (!perfParamEnvStr.empty()) {
auto vec = autil::StringUtil::split(perfParamEnvStr, ";");
ASSERT_EQ(2u, vec.size());
queryCount = autil::StringUtil::fromString<int32_t>(vec[0]);
threadCount = autil::StringUtil::fromString<int32_t>(vec[1]);
} else {
queryCount = 1000000;
threadCount = 1;
}
}
bool abort = false;
atomic64_t counter;
atomic_set(&counter, 0);
// std::cout << _graphDef->DebugString() << std::endl;
std::ofstream ofile("./graph.serialize");
ofile << _graphDef->SerializeAsString();
auto func = std::bind([&]() {
for (size_t i = 0; i < queryCount; i++) {
// std::cout << "begin round " << i << std::endl;
if (abort) {
break;
}
auto thisShow = show;
thisShow = true;
auto current = atomic_inc_return(&counter);
if (current == 2000) {
// thisShow = true;
std::cout << "show true" << std::endl;
// params.setTraceLevel("");
}
auto def = new GraphDef();
def->CopyFrom(*_graphDef);
int64_t expectCount = 20;
int64_t dataCount = 0;
auto graphStr = def->SerializeAsString();
// auto host = randItem(hostList);
// std::cout << host << std::endl;
auto navi = cluster.getNavi("host_0");
auto userResult = navi->runGraph(def, params);
std::vector<NaviUserData> dataVec;
std::vector<NaviResultPtr> resultVec;
auto naviResult = showData(userResult, thisShow, expectCount,
dataCount, dataVec, resultVec);
EXPECT_EQ(38u, dataVec.size());
std::cout << "data count: " << dataVec.size() << std::endl;
dataVec.clear();
resultVec.clear();
}
std::cout << "thread exited" << std::endl;
});
std::vector<autil::ThreadPtr> threads;
for (size_t i = 0; i < threadCount; i++) {
threads.push_back(autil::Thread::createThread(
func, "test_thread_" + autil::StringUtil::toString(i)));
}
threads.clear();
std::cout << "begin destruct" << std::endl;
}
bool buildGraphForkDownStreamAbort(const std::string &outputName,
GraphDef *graphDef,
const std::vector<std::string> &bizList) {
GraphBuilder builder(graphDef);
builder.newSubGraph(randItem(bizList));
auto sourceFork = builder.node("source_node_fork").kernel("SourceKernel").jsonAttrs(R"json({"times" : 20})json");
auto currentScope = builder.getCurrentScope();
builder.addScope();
auto fork = builder.node("fork").kernel("SubGraphKernel");
{
auto scopeTerminator = builder.getScopeTerminator();
scopeTerminator.kernel("TestScopeTerminator").out("out").asGraphOutput("scope");
}
builder.addScope();
auto clientWorldFork = builder.node("client_world_fork").kernel("AbortKernel");
fork.in("input1").from(sourceFork.out("output1"));
clientWorldFork.in("input1").from(fork.out("output1"));
clientWorldFork.out("output1").asGraphOutput(outputName);
{
auto scopeTerminator = builder.getScopeTerminator();
scopeTerminator.kernel("TestScopeTerminator").out("out").asGraphOutput("scope2");
}
builder.scope(currentScope);
auto identity = builder.node("identity").kernel("IdentityTestKernel");
identity.in("input1").from(fork.out("output1"));
identity.out("output1").asGraphOutput("identity");
return builder.ok();
}
TEST_F(NaviExampleTest, testForkDownStreamAbort) {
ResourceMapPtr rootResourceMap(new ResourceMap());
std::string configPath = NAVI_TEST_DATA_PATH + "config/cluster/";
std::string biz1Config = NAVI_TEST_DATA_PATH + "config/cluster/biz1.py";
NaviTestCluster cluster;
ASSERT_TRUE(
cluster.addServer("host_0", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_1", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_2", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_3", _loader, configPath, rootResourceMap));
std::vector<std::string> hostList = { "host_0", "host_1", "host_2",
"host_3", "host_4", "host_5",
"host_6", "host_7" };
ASSERT_TRUE(cluster.addBiz("host_0", "client_biz", 1, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_1", "server_biz_1", 3, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_2", "server_biz_1", 3, 1, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_3", "server_biz_1", 3, 2, biz1Config));
ASSERT_TRUE(cluster.start());
rootResourceMap.reset();
auto bizList = cluster.getBizList();
EXPECT_TRUE(buildGraphForkDownStreamAbort("o1", _graphDef, { "client_biz" }));
RunGraphParams params;
params.setTimeoutMs(1000000000);
params.setTraceLevel("ERROR");
params.setCollectMetric(true);
enum RunMode {
RM_SINGLE = 1,
RM_PERF = 2,
};
RunMode mode= RM_SINGLE;
size_t queryCount = 1;
size_t threadCount = 1;
bool show = false;
if (RM_SINGLE == mode) {
show = true;
} else if (RM_PERF == mode) {
string perfParamEnvStr = autil::EnvUtil::getEnv("perf_param");
if (!perfParamEnvStr.empty()) {
auto vec = autil::StringUtil::split(perfParamEnvStr, ";");
ASSERT_EQ(2u, vec.size());
queryCount = autil::StringUtil::fromString<int32_t>(vec[0]);
threadCount = autil::StringUtil::fromString<int32_t>(vec[1]);
} else {
queryCount = 1000000;
threadCount = 1;
}
}
bool abort = false;
atomic64_t counter;
atomic_set(&counter, 0);
// std::cout << _graphDef->DebugString() << std::endl;
std::ofstream ofile("./graph.serialize");
ofile << _graphDef->SerializeAsString();
auto func = std::bind([&]() {
for (size_t i = 0; i < queryCount; i++) {
// std::cout << "begin round " << i << std::endl;
if (abort) {
break;
}
auto thisShow = show;
thisShow = true;
auto current = atomic_inc_return(&counter);
if (current == 2000) {
// thisShow = true;
std::cout << "show true" << std::endl;
// params.setTraceLevel("");
}
auto def = new GraphDef();
def->CopyFrom(*_graphDef);
int64_t expectCount = 20;
int64_t dataCount = 0;
auto graphStr = def->SerializeAsString();
// auto host = randItem(hostList);
// std::cout << host << std::endl;
auto navi = cluster.getNavi("host_0");
auto userResult = navi->runGraph(def, params);
std::vector<NaviUserData> dataVec;
std::vector<NaviResultPtr> resultVec;
auto naviResult = showData(userResult, thisShow, expectCount,
dataCount, dataVec, resultVec);
EXPECT_EQ(39u, dataVec.size());
EXPECT_EQ(EC_NONE, naviResult->getErrorCode());
// EXPECT_EQ("abort", naviResult->errorEvent.message);
std::cout << "data count: " << dataVec.size() << std::endl;
dataVec.clear();
resultVec.clear();
}
std::cout << "thread exited" << std::endl;
});
std::vector<autil::ThreadPtr> threads;
for (size_t i = 0; i < threadCount; i++) {
threads.push_back(autil::Thread::createThread(
func, "test_thread_" + autil::StringUtil::toString(i)));
}
threads.clear();
std::cout << "begin destruct" << std::endl;
}
bool buildGraphForkWithStop(const std::string &outputName,
GraphDef *graphDef,
const std::vector<std::string> &bizList) {
GraphBuilder builder(graphDef);
builder.newSubGraph(randItem(bizList));
auto sourceFork = builder.node("source_node_fork").kernel("SourceKernel").jsonAttrs(R"json({"times" : 20})json");
auto currentScope = builder.getCurrentScope();
builder.addScope();
auto fork = builder.node("fork").kernel("SubGraphKernel");
{
auto scopeTerminator = builder.getScopeTerminator();
scopeTerminator.kernel("TestScopeTerminator").out("out").asGraphOutput("scope");
}
builder.scope(currentScope);
auto clientWorldFork = builder.node("client_world_fork").kernel("WorldKernel");
auto stop1 = builder.node("stop_node1").kernel("StopKernel");
auto stop2 = builder.node("stop_node2").kernel("StopKernel");
fork.in("input1").from(sourceFork.out("output1"));
stop1.in("input1").from(fork.out("output1"));
stop2.in("input1").from(sourceFork.out("output1"));
clientWorldFork.in("input1").from(fork.out("output1"));
clientWorldFork.out("output1").asGraphOutput(outputName);
{
auto scopeTerminator = builder.getScopeTerminator();
scopeTerminator.kernel("TestScopeTerminator").out("out").asGraphOutput("scope2");
}
return builder.ok();
}
TEST_F(NaviExampleTest, testForkWithStop) {
ResourceMapPtr rootResourceMap(new ResourceMap());
std::string configPath = NAVI_TEST_DATA_PATH + "config/cluster/";
std::string biz1Config = NAVI_TEST_DATA_PATH + "config/cluster/biz1.py";
NaviTestCluster cluster;
ASSERT_TRUE(
cluster.addServer("host_0", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_1", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_2", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_3", _loader, configPath, rootResourceMap));
std::vector<std::string> hostList = { "host_0", "host_1", "host_2",
"host_3", "host_4", "host_5",
"host_6", "host_7" };
ASSERT_TRUE(cluster.addBiz("host_0", "client_biz", 1, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_1", "server_biz_1", 3, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_2", "server_biz_1", 3, 1, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_3", "server_biz_1", 3, 2, biz1Config));
ASSERT_TRUE(cluster.start());
rootResourceMap.reset();
auto bizList = cluster.getBizList();
EXPECT_TRUE(buildGraphForkWithStop("o1", _graphDef, { "client_biz" }));
RunGraphParams params;
params.setTimeoutMs(1000000000);
params.setTraceLevel("ERROR");
params.setCollectMetric(true);
enum RunMode {
RM_SINGLE = 1,
RM_PERF = 2,
};
RunMode mode= RM_SINGLE;
size_t queryCount = 1;
size_t threadCount = 1;
bool show = false;
if (RM_SINGLE == mode) {
show = true;
} else if (RM_PERF == mode) {
string perfParamEnvStr = autil::EnvUtil::getEnv("perf_param");
if (!perfParamEnvStr.empty()) {
auto vec = autil::StringUtil::split(perfParamEnvStr, ";");
ASSERT_EQ(2u, vec.size());
queryCount = autil::StringUtil::fromString<int32_t>(vec[0]);
threadCount = autil::StringUtil::fromString<int32_t>(vec[1]);
} else {
queryCount = 1000000;
threadCount = 1;
}
}
bool abort = false;
atomic64_t counter;
atomic_set(&counter, 0);
NaviLoggerProvider logProvider("schedule1");
// std::cout << _graphDef->DebugString() << std::endl;
std::ofstream ofile("./graph.serialize");
ofile << _graphDef->SerializeAsString();
auto func = std::bind([&]() {
for (size_t i = 0; i < queryCount; i++) {
// std::cout << "begin round " << i << std::endl;
if (abort) {
break;
}
auto thisShow = show;
thisShow = true;
auto current = atomic_inc_return(&counter);
if (current == 2000) {
// thisShow = true;
std::cout << "show true" << std::endl;
// params.setTraceLevel("");
}
auto def = new GraphDef();
def->CopyFrom(*_graphDef);
int64_t expectCount = 20;
int64_t dataCount = 0;
auto graphStr = def->SerializeAsString();
// auto host = randItem(hostList);
// std::cout << host << std::endl;
auto navi = cluster.getNavi("host_0");
auto userResult = navi->runGraph(def, params);
std::vector<NaviUserData> dataVec;
std::vector<NaviResultPtr> resultVec;
auto naviResult = showData(userResult, thisShow, expectCount,
dataCount, dataVec, resultVec);
EXPECT_EQ(38u, dataVec.size());
std::cout << "data count: " << dataVec.size() << std::endl;
dataVec.clear();
resultVec.clear();
}
std::cout << "thread exited" << std::endl;
});
std::vector<autil::ThreadPtr> threads;
for (size_t i = 0; i < threadCount; i++) {
threads.push_back(autil::Thread::createThread(
func, "test_thread_" + autil::StringUtil::toString(i)));
}
threads.clear();
std::cout << "begin destruct" << std::endl;
}
bool buildScopeTerminatorWithInput(const std::string &outputName,
GraphDef *graphDef,
const std::vector<std::string> &bizList)
{
GraphBuilder builder(graphDef);
builder.newSubGraph(randItem(bizList));
auto sourceFork = builder.node("source_node_fork").kernel("SourceKernel").jsonAttrs(R"json({"times" : 20})json");
auto abort = builder.node("abort").kernel("WorldKernel");
abort.in("input1").from(sourceFork.out("output1"));
auto scopeTerminator = builder.getScopeTerminator();
scopeTerminator.kernel("TestScopeTerminator").out("out").asGraphOutput("scope");
scopeTerminator.in(SCOPE_TERMINATOR_INPUT_PORT).autoNext().from(abort.out("output1"));
return builder.ok();
}
TEST_F(NaviExampleTest, testScopeTerminatorWithInput) {
ResourceMapPtr rootResourceMap(new ResourceMap());
std::string configPath = NAVI_TEST_DATA_PATH + "config/cluster/";
std::string biz1Config = NAVI_TEST_DATA_PATH + "config/cluster/biz1.py";
NaviTestCluster cluster;
ASSERT_TRUE(
cluster.addServer("host_0", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_1", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_2", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_3", _loader, configPath, rootResourceMap));
std::vector<std::string> hostList = { "host_0", "host_1", "host_2",
"host_3", "host_4", "host_5",
"host_6", "host_7" };
ASSERT_TRUE(cluster.addBiz("host_0", "client_biz", 1, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_1", "server_biz_1", 3, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_2", "server_biz_1", 3, 1, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_3", "server_biz_1", 3, 2, biz1Config));
ASSERT_TRUE(cluster.start());
rootResourceMap.reset();
auto bizList = cluster.getBizList();
EXPECT_TRUE(buildScopeTerminatorWithInput("o1", _graphDef, { "client_biz" }));
RunGraphParams params;
params.setTimeoutMs(1000000000);
params.setTraceLevel("ERROR");
params.setCollectMetric(true);
enum RunMode {
RM_SINGLE = 1,
RM_PERF = 2,
};
RunMode mode= RM_SINGLE;
size_t queryCount = 1;
size_t threadCount = 1;
bool show = false;
if (RM_SINGLE == mode) {
show = true;
} else if (RM_PERF == mode) {
string perfParamEnvStr = autil::EnvUtil::getEnv("perf_param");
if (!perfParamEnvStr.empty()) {
auto vec = autil::StringUtil::split(perfParamEnvStr, ";");
ASSERT_EQ(2u, vec.size());
queryCount = autil::StringUtil::fromString<int32_t>(vec[0]);
threadCount = autil::StringUtil::fromString<int32_t>(vec[1]);
} else {
queryCount = 1000000;
threadCount = 1;
}
}
bool abort = false;
atomic64_t counter;
atomic_set(&counter, 0);
// std::cout << _graphDef->DebugString() << std::endl;
// std::ofstream ofile("./graph.serialize");
std::ofstream ofile(std::string("/home/zq103303/alibaba/navi/http_server/searcher_graph.def"));
ofile << _graphDef->SerializeAsString();
auto func = std::bind([&]() {
for (size_t i = 0; i < queryCount; i++) {
// std::cout << "begin round " << i << std::endl;
if (abort) {
break;
}
auto thisShow = show;
thisShow = true;
auto current = atomic_inc_return(&counter);
if (current == 2000) {
// thisShow = true;
std::cout << "show true" << std::endl;
// params.setTraceLevel("");
}
auto def = new GraphDef();
def->CopyFrom(*_graphDef);
int64_t expectCount = 20;
int64_t dataCount = 0;
auto graphStr = def->SerializeAsString();
// auto host = randItem(hostList);
// std::cout << host << std::endl;
auto navi = cluster.getNavi("host_0");
auto userResult = navi->runGraph(def, params);
std::vector<NaviUserData> dataVec;
std::vector<NaviResultPtr> resultVec;
auto naviResult = showData(userResult, thisShow, expectCount,
dataCount, dataVec, resultVec);
EXPECT_EQ(9u, dataVec.size());
std::cout << "data count: " << dataVec.size() << std::endl;
dataVec.clear();
resultVec.clear();
}
std::cout << "thread exited" << std::endl;
});
std::vector<autil::ThreadPtr> threads;
for (size_t i = 0; i < threadCount; i++) {
threads.push_back(autil::Thread::createThread(
func, "test_thread_" + autil::StringUtil::toString(i)));
}
threads.clear();
std::cout << "begin destruct" << std::endl;
}
bool buildGraphForkWithError(const std::string &outputName,
GraphDef *graphDef,
const std::vector<std::string> &bizList)
{
GraphBuilder builder(graphDef);
builder.newSubGraph(randItem(bizList));
auto sourceFork = builder.node("source_node_fork").kernel("SourceKernel").jsonAttrs(R"json({"times" : 2000000})json");
auto currentScope = builder.getCurrentScope();
builder.addScope();
auto fork = builder.node("fork").kernel("SubGraphKernel").integerAttr("abort", 1);
{
auto scopeTerminator = builder.getScopeTerminator();
scopeTerminator.kernel("TestScopeTerminator").out("out").asGraphOutput("scope");
}
builder.scope(currentScope);
auto clientWorldFork = builder.node("client_world_fork").kernel("WorldKernel");
fork.in("input1").from(sourceFork.out("output1"));
fork.out("output2").asGraphOutput("fork_output2");
clientWorldFork.in("input1").from(fork.out("output1"));
clientWorldFork.out("output1").asGraphOutput(outputName);
{
auto scopeTerminator = builder.getScopeTerminator();
scopeTerminator.kernel("TestScopeTerminator").out("out").asGraphOutput("scope2");
}
return builder.ok();
}
TEST_F(NaviExampleTest, testForkGraphError) {
ResourceMapPtr rootResourceMap(new ResourceMap());
std::string configPath = NAVI_TEST_DATA_PATH + "config/cluster/";
std::string biz1Config = NAVI_TEST_DATA_PATH + "config/cluster/biz1.py";
NaviTestCluster cluster;
ASSERT_TRUE(
cluster.addServer("host_0", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_1", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_2", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_3", _loader, configPath, rootResourceMap));
std::vector<std::string> hostList = { "host_0", "host_1", "host_2",
"host_3", "host_4", "host_5",
"host_6", "host_7" };
ASSERT_TRUE(cluster.addBiz("host_0", "client_biz", 1, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_1", "server_biz_1", 3, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_2", "server_biz_1", 3, 1, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_3", "server_biz_1", 3, 2, biz1Config));
ASSERT_TRUE(cluster.start());
rootResourceMap.reset();
auto bizList = cluster.getBizList();
EXPECT_TRUE(buildGraphForkWithError("o1", _graphDef, { "client_biz" }));
RunGraphParams params;
params.setTimeoutMs(1000000000);
params.setTraceLevel("ERROR");
params.setCollectMetric(true);
enum RunMode {
RM_SINGLE = 1,
RM_PERF = 2,
};
RunMode mode= RM_SINGLE;
size_t queryCount = 1;
size_t threadCount = 1;
bool show = false;
if (RM_SINGLE == mode) {
show = true;
} else if (RM_PERF == mode) {
string perfParamEnvStr = autil::EnvUtil::getEnv("perf_param");
if (!perfParamEnvStr.empty()) {
auto vec = autil::StringUtil::split(perfParamEnvStr, ";");
ASSERT_EQ(2u, vec.size());
queryCount = autil::StringUtil::fromString<int32_t>(vec[0]);
threadCount = autil::StringUtil::fromString<int32_t>(vec[1]);
} else {
queryCount = 1000000;
threadCount = 1;
}
}
bool abort = false;
atomic64_t counter;
atomic_set(&counter, 0);
// std::cout << _graphDef->DebugString() << std::endl;
std::ofstream ofile("./graph.serialize");
ofile << _graphDef->SerializeAsString();
auto func = std::bind([&]() {
for (size_t i = 0; i < queryCount; i++) {
// std::cout << "begin round " << i << std::endl;
if (abort) {
break;
}
auto thisShow = show;
thisShow = true;
auto current = atomic_inc_return(&counter);
if (current == 2000) {
// thisShow = true;
std::cout << "show true" << std::endl;
// params.setTraceLevel("");
}
auto def = new GraphDef();
def->CopyFrom(*_graphDef);
int64_t expectCount = 20;
int64_t dataCount = 0;
auto graphStr = def->SerializeAsString();
// auto host = randItem(hostList);
// std::cout << host << std::endl;
auto navi = cluster.getNavi("host_0");
auto userResult = navi->runGraph(def, params);
std::vector<NaviUserData> dataVec;
std::vector<NaviResultPtr> resultVec;
auto naviResult = showData(userResult, thisShow, expectCount,
dataCount, dataVec, resultVec);
EXPECT_EQ(28u, dataVec.size());
EXPECT_EQ(EC_NONE, naviResult->getErrorCode());
std::cout << "data count: " << dataVec.size() << std::endl;
dataVec.clear();
resultVec.clear();
}
std::cout << "thread exited" << std::endl;
});
std::vector<autil::ThreadPtr> threads;
for (size_t i = 0; i < threadCount; i++) {
threads.push_back(autil::Thread::createThread(
func, "test_thread_" + autil::StringUtil::toString(i)));
}
threads.clear();
std::cout << "begin destruct" << std::endl;
}
bool buildGraphForkRecur(const std::string &outputName, GraphDef *graphDef,
const std::vector<std::string> &bizList)
{
GraphBuilder builder(graphDef);
builder.newSubGraph(randItem(bizList));
auto sourceFork = builder.node("source_node_fork").kernel("SourceKernel").jsonAttrs(R"json({"times" : 20})json");
auto currentScope = builder.getCurrentScope();
builder.scopeErrorHandleStrategy(currentScope, EHS_ERROR_AS_FATAL);
builder.addScope();
auto fork = builder.node("fork").kernel("SubGraphRecurKernel");
builder.scope(currentScope);
auto clientWorldFork = builder.node("client_world_fork").kernel("WorldKernel2");
fork.in("input1").from(sourceFork.out("output1"));
clientWorldFork.in("input1").from(fork.out("output2"));
clientWorldFork.out("output1").asGraphOutput(outputName);
fork.out("output1").asGraphOutput(outputName + "_2");
return builder.ok();
}
TEST_F(NaviExampleTest, testForkRecur) {
ResourceMapPtr rootResourceMap(new ResourceMap());
std::string configPath = NAVI_TEST_DATA_PATH + "config/cluster/";
std::string biz1Config = NAVI_TEST_DATA_PATH + "config/cluster/biz1.py";
NaviTestCluster cluster;
ASSERT_TRUE(
cluster.addServer("host_0", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_1", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_2", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_3", _loader, configPath, rootResourceMap));
ASSERT_TRUE(
cluster.addServer("host_4", _loader, configPath, rootResourceMap));
std::vector<std::string> hostList = { "host_0", "host_1", "host_2",
"host_3", "host_4", "host_5",
"host_6", "host_7" };
ASSERT_TRUE(cluster.addBiz("host_0", "client_biz", 1, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_1", "server_biz_1", 2, 0, biz1Config));
ASSERT_TRUE(cluster.addBiz("host_2", "server_biz_1", 2, 1, biz1Config));
ASSERT_TRUE(cluster.start());
rootResourceMap.reset();
auto bizList = cluster.getBizList();
EXPECT_TRUE(buildGraphForkRecur("o1", _graphDef, { "client_biz" }));
EXPECT_TRUE(buildGraphForkRecur("o2", _graphDef, { "client_biz" }));
EXPECT_TRUE(buildGraphForkRecur("o3", _graphDef, { "client_biz" }));
EXPECT_TRUE(buildGraphForkRecur("o4", _graphDef, { "server_biz_1" }));
EXPECT_TRUE(buildGraphForkRecur("o5", _graphDef, { "server_biz_1" }));
EXPECT_TRUE(buildGraphForkRecur("o6", _graphDef, { "server_biz_1" }));
RunGraphParams params;
params.setTimeoutMs(1000000000);
params.setTraceLevel("debug");
params.setCollectMetric(true);
params.setCollectPerf(true);
usleep(3 * 1000 * 1000);
enum RunMode {
RM_SINGLE = 1,
RM_PERF = 2,
};
RunMode mode= RM_SINGLE;
size_t queryCount = 1;
size_t threadCount = 1;
bool show = false;
if (RM_SINGLE == mode) {
show = true;
} else if (RM_PERF == mode) {
string perfParamEnvStr = autil::EnvUtil::getEnv("perf_param");
if (!perfParamEnvStr.empty()) {
auto vec = autil::StringUtil::split(perfParamEnvStr, ";");
ASSERT_EQ(2u, vec.size());
queryCount = autil::StringUtil::fromString<int32_t>(vec[0]);
threadCount = autil::StringUtil::fromString<int32_t>(vec[1]);
} else {
queryCount = 1000000;
threadCount = 1;
}
}
bool abort = false;
atomic64_t counter;
atomic_set(&counter, 0);
auto func = std::bind([&]() {
for (size_t i = 0; i < queryCount; i++) {
if (abort) {
break;
}
auto thisShow = show;
thisShow = true;
auto def = new GraphDef();
def->CopyFrom(*_graphDef);
int64_t expectCount = 20;
int64_t dataCount = 0;
auto graphStr = def->SerializeAsString();
// auto host = randItem(hostList);
// std::cout << host << std::endl;
auto navi = cluster.getNavi("host_1");
auto userResult = navi->runGraph(def, params);
std::vector<NaviUserData> dataVec;
std::vector<NaviResultPtr> resultVec;
auto naviResult = showData(userResult, thisShow, expectCount,
dataCount, dataVec, resultVec);
EXPECT_EQ(378u, dataVec.size());
std::cout << "data count: " << dataVec.size() << std::endl;
dataVec.clear();
resultVec.clear();
showResultAndSaveVis(userResult);
EXPECT_EQ(EC_NONE, naviResult->getErrorCode());
}
std::cout << "thread exited" << std::endl;
});
std::vector<autil::ThreadPtr> threads;
for (size_t i = 0; i < threadCount; i++) {
threads.push_back(autil::Thread::createThread(
func, "test_thread_" + autil::StringUtil::toString(i)));
}
threads.clear();
std::cout << "begin destruct" << std::endl;
}
}