cppcache/integration-test/testThinClientCqDurable.cpp (831 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <string> #include <sstream> #include <thread> #include <chrono> #include <geode/CqAttributesFactory.hpp> #include <geode/CqAttributes.hpp> #include <geode/CqListener.hpp> #include <geode/CqQuery.hpp> #include <geode/RegionFactory.hpp> #include "fw_dunit.hpp" #include "ThinClientHelper.hpp" #define ROOT_SCOPE DISTRIBUTED_ACK #include <geode/Query.hpp> #include <geode/QueryService.hpp> #include <geode/RegionShortcut.hpp> #include "QueryStrings.hpp" #include "QueryHelper.hpp" #include "ThinClientCQ.hpp" #define CLIENT1 s1p1 #define SERVER1 s2p1 #define CLIENT2 s1p2 using apache::geode::client::Cacheable; using apache::geode::client::CacheFactory; using apache::geode::client::CqAttributesFactory; using apache::geode::client::CqEvent; using apache::geode::client::CqListener; using apache::geode::client::CqOperation; using apache::geode::client::Exception; using apache::geode::client::IllegalStateException; using apache::geode::client::QueryService; using apache::geode::client::RegionShortcut; const char *durableIds[] = {"DurableId1", "DurableId2"}; const char *cqName = "MyCq"; const char *durableCQNamesClient1[] = { "durableCQ1Client1", "durableCQ2Client1", "durableCQ3Client1", "durableCQ4Client1", "durableCQ5Client1", "durableCQ6Client1", "durableCQ7Client1", "durableCQ8Client1"}; const char *durableCQNamesClient2[] = { "durableCQ1Client2", "durableCQ2Client2", "durableCQ3Client2", "durableCQ4Client2", "durableCQ5Client2", "durableCQ6Client2", "durableCQ7Client2", "durableCQ8Client2"}; static bool m_isPdx = false; void initClientWithId(int ClientIdx, bool typeRegistered = false) { auto pp = Properties::create(); pp->insert("durable-client-id", durableIds[ClientIdx]); pp->insert("durable-timeout", std::chrono::seconds(60)); pp->insert("notify-ack-interval", std::chrono::seconds(1)); initClient(true, pp); if (typeRegistered == false) { try { auto serializationRegistry = CacheRegionHelper::getCacheImpl(cacheHelper->getCache().get()) ->getSerializationRegistry(); serializationRegistry->addDataSerializableType( Position::createDeserializable, 2); serializationRegistry->addDataSerializableType( Portfolio::createDeserializable, 3); serializationRegistry->addPdxSerializableType( PositionPdx::createDeserializable); serializationRegistry->addPdxSerializableType( PortfolioPdx::createDeserializable); } catch (const IllegalStateException &) { // ignore exception } } } class MyCqListener1 : public CqListener { public: static int m_cntEvents; void onEvent(const CqEvent &cqe) override { m_cntEvents++; const char *opStr = "Default"; std::shared_ptr<CacheableInt32> value( std::dynamic_pointer_cast<CacheableInt32>(cqe.getNewValue())); std::shared_ptr<CacheableInt32> key( std::dynamic_pointer_cast<CacheableInt32>(cqe.getKey())); switch (cqe.getQueryOperation()) { case CqOperation::OP_TYPE_CREATE: { opStr = "CREATE"; break; } case CqOperation::OP_TYPE_UPDATE: { opStr = "UPDATE"; break; } case CqOperation::OP_TYPE_DESTROY: { opStr = "UPDATE"; break; } case CqOperation::OP_TYPE_INVALID: case CqOperation::OP_TYPE_INVALIDATE: case CqOperation::OP_TYPE_REGION_CLEAR: case CqOperation::OP_TYPE_MARKER: break; } LOGINFO("MyCqListener1::OnEvent called with %s, key[%s], value=(%s)", opStr, key->toString().c_str(), value->toString().c_str()); } void onError(const CqEvent &) override { LOGINFO("MyCqListener1::OnError called"); } void close() override { LOGINFO("MyCqListener1::close called"); } }; int MyCqListener1::m_cntEvents = 0; const char *regionNamesCq[] = {"Portfolios", "Positions", "Portfolios2", "Portfolios3"}; int onEventCount = 0; int onErrorCount = 0; int onEventCountBefore = 0; class MyCqListener : public CqListener { void onEvent(const CqEvent &) override { // LOG("MyCqListener::OnEvent called"); onEventCount++; } void onError(const CqEvent &) override { // LOG("MyCqListener::OnError called"); onErrorCount++; } void close() override { LOG("MyCqListener::close called"); } }; DUNIT_TASK_DEFINITION(SERVER1, CreateLocator) { if (isLocator) CacheHelper::initLocator(1); LOG("Locator1 started"); } END_TASK_DEFINITION void createServer(bool locator = false) { LOG("Starting SERVER1..."); if (isLocalServer) { CacheHelper::initServer(1, "remotequery.xml", locator ? locatorsG : std::string{}); } LOG("SERVER1 started"); } void createServer_XML() { LOG("Starting SERVER..."); if (isLocalServer) { CacheHelper::initServer(1, "serverDurableClient.xml"); } LOG("SERVER started"); } DUNIT_TASK_DEFINITION(SERVER1, CreateServer1) { createServer(false); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(SERVER1, CreateServer) { createServer_XML(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(SERVER1, CreateServer1_Locator) { createServer(true); } END_TASK_DEFINITION void stepOne() { initClientWithId(0); createRegionForCQ(regionNamesCq[0], USE_ACK, true); auto regptr = getHelper()->getRegion(regionNamesCq[0]); auto subregPtr = regptr->createSubregion(regionNamesCq[1], regptr->getAttributes()); LOG("StepOne complete."); } void RunDurableCqClient() { // Create durable client's properties using api. auto pp = Properties::create(); pp->insert("durable-client-id", "DurableClientId"); pp->insert("durable-timeout", std::chrono::seconds(3600)); // Create a Geode Cache Programmatically. auto cacheFactory = CacheFactory(pp); auto cachePtr = std::make_shared<Cache>(cacheFactory.create()); auto poolFactory = cachePtr->getPoolManager().createFactory(); poolFactory.setSubscriptionEnabled(true) .setSubscriptionAckInterval(std::chrono::milliseconds(5000)) .setSubscriptionMessageTrackingTimeout(std::chrono::milliseconds(50000)) .create(""); LOGINFO("Created the Geode Cache Programmatically"); auto regionFactory = cachePtr->createRegionFactory(RegionShortcut::CACHING_PROXY); // Create the Region Programmatically. auto regionPtr = regionFactory.create("DistRegionAck"); LOGINFO("Created the Region Programmatically"); // Get the QueryService from the Cache. auto qrySvcPtr = cachePtr->getQueryService(); // Create CqAttributes and Install Listener CqAttributesFactory cqFac; auto cqLstner = std::make_shared<MyCqListener1>(); cqFac.addCqListener(cqLstner); auto cqAttr = cqFac.create(); LOGINFO("Attached CqListener"); // create a new Cq Query const char *qryStr = "select * from /DistRegionAck "; auto qry = qrySvcPtr->newCq("MyCq", qryStr, cqAttr, true); LOGINFO("Created new CqQuery"); // execute Cq Query qry->execute(); std::this_thread::sleep_for(std::chrono::seconds(10)); LOGINFO("Executed new CqQuery"); // Send ready for Event message to Server( only for Durable Clients ). // Server will send queued events to client after recieving this. cachePtr->readyForEvents(); LOGINFO("Sent ReadyForEvents message to server"); // wait for some time to recieve events std::this_thread::sleep_for(std::chrono::seconds(10)); // Close the Geode Cache with keepalive = true. Server will queue events // for // durable registered keys and will deliver all events when client will // reconnect // within timeout period and send "readyForEvents()" cachePtr->close(true); LOGINFO("Closed the Geode Cache with keepalive as true"); } void RunFeederClient() { auto cacheFactory = CacheFactory(); LOGINFO("Feeder connected to the Geode Distributed System"); auto cachePtr = std::make_shared<Cache>(cacheFactory.create()); LOGINFO("Created the Geode Cache"); auto regionFactory = cachePtr->createRegionFactory(RegionShortcut::PROXY); LOGINFO("Created the RegionFactory"); // Create the Region Programmatically. auto regionPtr = regionFactory.create("DistRegionAck"); LOGINFO("Created the Region Programmatically."); for (int i = 0; i < 10; i++) { auto keyPtr = CacheableInt32::create(i); auto valPtr = CacheableInt32::create(i); regionPtr->put(keyPtr, valPtr); } std::this_thread::sleep_for(std::chrono::seconds(10)); LOGINFO("put on 0-10 keys done."); // Close the Geode Cache cachePtr->close(); LOGINFO("Closed the Geode Cache"); } void RunFeederClient1() { auto cacheFactory = CacheFactory(); LOGINFO("Feeder connected to the Geode Distributed System"); auto cachePtr = std::make_shared<Cache>(cacheFactory.create()); LOGINFO("Created the Geode Cache"); auto regionFactory = cachePtr->createRegionFactory(RegionShortcut::PROXY); LOGINFO("Created the RegionFactory"); // Create the Region Programmatically. auto regionPtr = regionFactory.create("DistRegionAck"); LOGINFO("Created the Region Programmatically."); for (int i = 10; i < 20; i++) { auto keyPtr = CacheableInt32::create(i); auto valPtr = CacheableInt32::create(i); regionPtr->put(keyPtr, valPtr); } std::this_thread::sleep_for(std::chrono::seconds(10)); LOGINFO("put on 0-10 keys done."); // Close the Geode Cache cachePtr->close(); LOGINFO("Closed the Geode Cache"); } DUNIT_TASK_DEFINITION(CLIENT1, RunDurableClient) { RunDurableCqClient(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT2, RunFeeder) { RunFeederClient(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT2, RunFeeder1) { RunFeederClient1(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT1, StepOne_PoolLocator) { stepOne(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT1, VerifyEvents) { LOGINFO("MyCqListener1::m_cntEvents = %d ", MyCqListener1::m_cntEvents); ASSERT(MyCqListener1::m_cntEvents == 20, "Incorrect events, expected 20"); } END_TASK_DEFINITION void stepOne2() { initClientWithId(1); createRegionForCQ(regionNamesCq[0], USE_ACK, true); auto regptr = getHelper()->getRegion(regionNamesCq[0]); auto subregPtr = regptr->createSubregion(regionNamesCq[1], regptr->getAttributes()); LOG("StepOne2 complete."); } DUNIT_TASK_DEFINITION(CLIENT2, StepOne2_PoolLocator) { stepOne2(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT1, StepTwo) { auto regPtr0 = getHelper()->getRegion(regionNamesCq[0]); auto subregPtr0 = regPtr0->getSubregion(regionNamesCq[1]); QueryHelper *qh = &QueryHelper::getHelper(); if (!m_isPdx) { qh->populatePortfolioData(regPtr0, 130, 20, 20); qh->populatePositionData(subregPtr0, 130, 20); } else { qh->populatePortfolioPdxData(regPtr0, 130, 20, 20); qh->populatePositionPdxData(subregPtr0, 130, 20); } LOG("StepTwo complete."); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT1, StepThree) { QueryHelper::getHelper(); auto pool = getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]); std::shared_ptr<QueryService> qs; if (pool != nullptr) { qs = pool->getQueryService(); } else { qs = getHelper()->cachePtr->getQueryService(); } CqAttributesFactory cqFac; auto cqLstner = std::make_shared<MyCqListener>(); cqFac.addCqListener(cqLstner); auto cqAttr = cqFac.create(); auto qryStr = "select * from /Portfolios p where p.ID < 3"; auto &&qry = qs->newCq(cqName, qryStr, cqAttr); try { LOG("EXECUTE 1 START"); auto &&results = qry->executeWithInitialResults(); LOG("EXECUTE 1 STOP"); LOG(std::string("results size=") + std::to_string(results->size())); } catch (const Exception &excp) { std::string logmsg = ""; logmsg += excp.getName(); logmsg += ": "; logmsg += excp.what(); LOG(logmsg.c_str()); LOG(excp.getStackTrace()); } LOG("StepThree complete."); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT2, StepTwo2) { auto regPtr0 = getHelper()->getRegion(regionNamesCq[0]); auto subregPtr0 = regPtr0->getSubregion(regionNamesCq[1]); QueryHelper *qh = &QueryHelper::getHelper(); qh->populatePortfolioData(regPtr0, 140, 30, 20); qh->populatePositionData(subregPtr0, 140, 30); std::shared_ptr<Cacheable> port = nullptr; for (int i = 1; i < 140; i++) { if (!m_isPdx) { port = std::shared_ptr<Cacheable>(new Portfolio(i, 20)); } else { port = std::shared_ptr<Cacheable>(new PortfolioPdx(i, 20)); } auto keyport = CacheableKey::create("port1-1"); regPtr0->put(keyport, port); SLEEP(10); // sleep a while to allow server query to complete } LOG("StepTwo2 complete."); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT1, Client1Down) { getHelper()->disconnect(true); cleanProc(); LOG("Clnt1Down complete: Keepalive = True"); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT2, Client2Down) { getHelper()->disconnect(true); cleanProc(); LOG("Clnt2Down complete: Keepalive = True"); } END_TASK_DEFINITION void client1Up() { // No RegisterIntrest again initClientWithId(0, true); createRegionForCQ(regionNamesCq[0], USE_ACK, true); LOG("Client1Up complete."); QueryHelper::getHelper(); std::shared_ptr<QueryService> qs = getHelper() ->getCache() ->getPoolManager() .find(regionNamesCq[0]) ->getQueryService(); CqAttributesFactory cqFac; auto cqLstner = std::make_shared<MyCqListener>(); cqFac.addCqListener(cqLstner); auto cqAttr = cqFac.create(); const char *qryStr = "select * from /Portfolios p where p.ID < 3"; auto qry = qs->newCq(cqName, qryStr, cqAttr); try { LOG("EXECUTE 1 START"); qry->execute(); LOG("EXECUTE 1 STOP"); } catch (const Exception &excp) { std::string logmsg = ""; logmsg += excp.getName(); logmsg += ": "; logmsg += excp.what(); LOG(logmsg.c_str()); LOG(excp.getStackTrace()); } try { getHelper()->cachePtr->readyForEvents(); } catch (...) { LOG("Exception occured while sending readyForEvents"); } } void client1UpDurableCQList() { // No RegisterIntrest again initClientWithId(0, true); createRegionForCQ(regionNamesCq[0], USE_ACK, true); LOG("Client1Up complete."); QueryHelper::getHelper(); } void client2UpDurableCQList() { // No RegisterIntrest again initClientWithId(1, true); createRegionForCQ(regionNamesCq[0], USE_ACK, true); LOG("Client2Up complete."); QueryHelper::getHelper(); } DUNIT_TASK_DEFINITION(CLIENT1, Client1Up_Pool) { client1Up(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT1, Client1UpDurableCQList_Pool) { client1UpDurableCQList(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT2, Client2UpDurableCQList_Pool) { client2UpDurableCQList(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT1, StepFour) { QueryHelper::getHelper(); auto pool = getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]); std::shared_ptr<QueryService> qs; if (pool != nullptr) { qs = pool->getQueryService(); } else { qs = getHelper()->cachePtr->getQueryService(); } char buf[1024]; try { // TEST_COVERAGE LOGINFO( "CLIENT-1 StepFour: verifying getCq() behaviour for the invalid CQ " "Name"); auto invalidCqptr = qs->getCq("InValidCQ"); ASSERT( invalidCqptr == nullptr, "Cqptr must be nullptr, as it getCq() is invoked on invalid CQ name"); /*if(invalidCqptr == nullptr){ LOGINFO("Testing getCq(InvalidName) :: invalidCqptr is nullptr"); }else{ LOGINFO("Testing getCq(InvalidName) :: invalidCqptr is NOT nullptr"); }*/ auto cqy = qs->getCq(cqName); cqy->stop(); SLEEP(1500); // sleep 0.025 min to allow server stop query to complete auto cqStats = cqy->getStatistics(); std::stringstream strm; strm << "numInserts[" << cqStats->numInserts() << "], numDeletes[" << cqStats->numDeletes() << "], numUpdates[" << cqStats->numUpdates() << "], numEvents[" << cqStats->numEvents() << "]"; LOG(strm.str()); strm.str(""); strm << "MyCount:onEventCount=" << onEventCount << ", onErrorCount = " << onErrorCount; LOG(strm.str()); ASSERT(cqStats->numEvents() > 0, "stats incorrect!"); cqy->close(); } catch (Exception &excp) { std::string failmsg = ""; failmsg += excp.getName(); failmsg += ": "; failmsg += excp.what(); LOG(failmsg.c_str()); FAIL(failmsg.c_str()); LOG(excp.getStackTrace()); } LOG("StepFour complete."); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT1, CloseCache1) { LOG("cleanProc 1..."); cleanProc(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT2, CloseCache2) { LOG("cleanProc 2..."); cleanProc(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(SERVER1, CloseServer1) { LOG("closing Server1..."); if (isLocalServer) { CacheHelper::closeServer(1); LOG("SERVER1 stopped"); } } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(SERVER1, CloseLocator) { if (isLocator) { CacheHelper::closeLocator(1); LOG("Locator1 stopped"); } } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT1, SetPortfolioTypeToPdx) { m_isPdx = true; } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT1, UnsetPortfolioTypeToPdx) { m_isPdx = false; } END_TASK_DEFINITION bool isDurableCQName(const char *continuousQueryName, int clientID, bool isRecycled) { bool bRetVal = false; int i = 0; if (clientID == 1) { if (!isRecycled) { for (i = 0; i < 4; i++) { if (strcmp(continuousQueryName, durableCQNamesClient1[i]) == 0) break; } if (i < 4) bRetVal = true; } else { for (i = 0; i < 8; i++) { if (strcmp(continuousQueryName, durableCQNamesClient1[i]) == 0) break; } if (i < 8) bRetVal = true; } } else if (clientID == 2) { if (!isRecycled) { for (i = 0; i < 4; i++) { if (strcmp(continuousQueryName, durableCQNamesClient2[i]) == 0) break; } if (i < 4) bRetVal = true; } else { for (i = 0; i < 8; i++) { if (strcmp(continuousQueryName, durableCQNamesClient2[i]) == 0) break; } if (i < 8) bRetVal = true; } } return bRetVal; } void doThinClientCqDurable() { CALL_TASK(CreateLocator); CALL_TASK(CreateServer1_Locator); CALL_TASK(StepOne_PoolLocator); CALL_TASK(StepOne2_PoolLocator); CALL_TASK(StepTwo); CALL_TASK(StepThree); CALL_TASK(StepTwo2); CALL_TASK(Client1Down); CALL_TASK(Client1Up_Pool); CALL_TASK(StepFour); CALL_TASK(CloseCache1); CALL_TASK(CloseCache2); CALL_TASK(CloseServer1); CALL_TASK(CloseLocator); } DUNIT_TASK_DEFINITION(CLIENT1, RegisterCqs1) { auto pool = getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]); std::shared_ptr<QueryService> qs; if (pool != nullptr) { qs = pool->getQueryService(); } else { qs = getHelper()->cachePtr->getQueryService(); } CqAttributesFactory cqFac; auto cqAttr = cqFac.create(); qs->newCq(durableCQNamesClient1[0], "select * from /Portfolios p where p.ID < 3", cqAttr, true) ->executeWithInitialResults(); qs->newCq(durableCQNamesClient1[1], "select * from /Portfolios p where p.ID > 5", cqAttr, true) ->executeWithInitialResults(); qs->newCq(durableCQNamesClient1[2], "select * from /Portfolios p where p.ID > 10", cqAttr, false) ->executeWithInitialResults(); qs->newCq(durableCQNamesClient1[3], "select * from /Portfolios p where p.ID = 0", cqAttr, false) ->executeWithInitialResults(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT1, RegisterCqsAfterClientup1) { auto pool = getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]); std::shared_ptr<QueryService> qs; if (pool != nullptr) { qs = pool->getQueryService(); } else { qs = getHelper()->cachePtr->getQueryService(); } CqAttributesFactory cqFac; auto cqAttr = cqFac.create(); qs->newCq(durableCQNamesClient1[4], "select * from /Portfolios p where p.ID < 3", cqAttr, true) ->executeWithInitialResults(); qs->newCq(durableCQNamesClient1[5], "select * from /Portfolios p where p.ID > 5", cqAttr, true) ->executeWithInitialResults(); qs->newCq(durableCQNamesClient1[6], "select * from /Portfolios p where p.ID > 10", cqAttr, false) ->executeWithInitialResults(); qs->newCq(durableCQNamesClient1[7], "select * from /Portfolios p where p.ID = 0", cqAttr, false) ->executeWithInitialResults(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT2, RegisterCqs2) { auto pool = getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]); std::shared_ptr<QueryService> qs; if (pool != nullptr) { qs = pool->getQueryService(); } else { qs = getHelper()->cachePtr->getQueryService(); } CqAttributesFactory cqFac; auto cqAttr = cqFac.create(); qs->newCq(durableCQNamesClient2[0], "select * from /Portfolios p where p.ID < 3", cqAttr, true) ->executeWithInitialResults(); qs->newCq(durableCQNamesClient2[1], "select * from /Portfolios p where p.ID > 5", cqAttr, true) ->executeWithInitialResults(); qs->newCq(durableCQNamesClient2[2], "select * from /Portfolios p where p.ID > 10", cqAttr, true) ->executeWithInitialResults(); qs->newCq(durableCQNamesClient2[3], "select * from /Portfolios p where p.ID = 5", cqAttr, true) ->executeWithInitialResults(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT2, RegisterCqsAfterClientup2) { auto pool = getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]); std::shared_ptr<QueryService> qs; if (pool != nullptr) { qs = pool->getQueryService(); } else { qs = getHelper()->cachePtr->getQueryService(); } CqAttributesFactory cqFac; auto cqAttr = cqFac.create(); qs->newCq(durableCQNamesClient2[4], "select * from /Portfolios p where p.ID < 3", cqAttr, true) ->executeWithInitialResults(); qs->newCq(durableCQNamesClient2[5], "select * from /Portfolios p where p.ID > 5", cqAttr, true) ->executeWithInitialResults(); qs->newCq(durableCQNamesClient2[6], "select * from /Portfolios p where p.ID > 10", cqAttr, true) ->executeWithInitialResults(); qs->newCq(durableCQNamesClient2[7], "select * from /Portfolios p where p.ID = 5", cqAttr, true) ->executeWithInitialResults(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT1, VerifyCqs1) { auto pool = getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]); std::shared_ptr<QueryService> qs; if (pool != nullptr) { qs = pool->getQueryService(); } else { qs = getHelper()->cachePtr->getQueryService(); } auto durableCqListPtr = qs->getAllDurableCqsFromServer(); ASSERT(durableCqListPtr != nullptr, "Durable CQ List should not be null"); ASSERT(durableCqListPtr->size() == 2, "Durable CQ List lenght should be 2"); ASSERT( isDurableCQName(durableCqListPtr->at(0)->toString().c_str(), 1, false), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(1)->toString().c_str(), 1, false), "Durable CQ name should be in the durable cq list"); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT1, VerifyCqsAfterClientup1) { auto pool = getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]); std::shared_ptr<QueryService> qs; if (pool != nullptr) { qs = pool->getQueryService(); } else { qs = getHelper()->cachePtr->getQueryService(); } auto durableCqListPtr = qs->getAllDurableCqsFromServer(); ASSERT(durableCqListPtr != nullptr, "Durable CQ List should not be null"); ASSERT(durableCqListPtr->size() == 4, "Durable CQ List length should be 4"); ASSERT( isDurableCQName(durableCqListPtr->at(0)->toString().c_str(), 1, true), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(1)->toString().c_str(), 1, true), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(2)->toString().c_str(), 1, true), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(3)->toString().c_str(), 1, true), "Durable CQ name should be in the durable cq list"); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT2, VerifyCqs2) { auto pool = getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]); std::shared_ptr<QueryService> qs; if (pool != nullptr) { qs = pool->getQueryService(); } else { qs = getHelper()->cachePtr->getQueryService(); } auto durableCqListPtr = qs->getAllDurableCqsFromServer(); ASSERT(durableCqListPtr != nullptr, "Durable CQ List should not be null"); ASSERT(durableCqListPtr->size() == 4, "Durable CQ List lenght should be 4"); ASSERT( isDurableCQName(durableCqListPtr->at(0)->toString().c_str(), 2, false), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(1)->toString().c_str(), 2, false), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(2)->toString().c_str(), 2, false), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(3)->toString().c_str(), 2, false), "Durable CQ name should be in the durable cq list"); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT2, VerifyCqsAfterClientup2) { auto pool = getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]); std::shared_ptr<QueryService> qs; if (pool != nullptr) { qs = pool->getQueryService(); } else { qs = getHelper()->cachePtr->getQueryService(); } auto durableCqListPtr = qs->getAllDurableCqsFromServer(); ASSERT(durableCqListPtr != nullptr, "Durable CQ List should not be null"); ASSERT(durableCqListPtr->size() == 8, "Durable CQ List lenght should be 8"); ASSERT( isDurableCQName(durableCqListPtr->at(0)->toString().c_str(), 2, true), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(1)->toString().c_str(), 2, true), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(2)->toString().c_str(), 2, true), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(3)->toString().c_str(), 2, true), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(4)->toString().c_str(), 2, true), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(5)->toString().c_str(), 2, true), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(6)->toString().c_str(), 2, true), "Durable CQ name should be in the durable cq list"); ASSERT( isDurableCQName(durableCqListPtr->at(7)->toString().c_str(), 2, true), "Durable CQ name should be in the durable cq list"); } END_TASK_DEFINITION void verifyEmptyDurableCQList() { auto pool = getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]); std::shared_ptr<QueryService> qs; if (pool != nullptr) { qs = pool->getQueryService(); } else { qs = getHelper()->cachePtr->getQueryService(); } auto durableCqListPtr = qs->getAllDurableCqsFromServer(); ASSERT(durableCqListPtr == nullptr, "Durable CQ List should be null"); } DUNIT_TASK_DEFINITION(CLIENT1, VerifyEmptyDurableCQList1) { verifyEmptyDurableCQList(); } END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT2, VerifyEmptyDurableCQList2) { verifyEmptyDurableCQList(); } END_TASK_DEFINITION void getDurableCQsFromServerEmptyList() { CALL_TASK(CreateLocator); CALL_TASK(CreateServer1_Locator); CALL_TASK(StepOne_PoolLocator); CALL_TASK(StepOne2_PoolLocator); CALL_TASK(VerifyEmptyDurableCQList1); CALL_TASK(VerifyEmptyDurableCQList2); CALL_TASK(CloseCache1); CALL_TASK(CloseCache2); CALL_TASK(CloseServer1); CALL_TASK(CloseLocator); } void getDurableCQsFromServer() { CALL_TASK(CreateLocator); CALL_TASK(CreateServer1_Locator); CALL_TASK(StepOne_PoolLocator); CALL_TASK(StepOne2_PoolLocator); CALL_TASK(RegisterCqs1); CALL_TASK(RegisterCqs2); CALL_TASK(VerifyCqs1); CALL_TASK(VerifyCqs2); CALL_TASK(CloseCache1); CALL_TASK(CloseCache2); CALL_TASK(CloseServer1); CALL_TASK(CloseLocator); } void getDurableCQsFromServerWithCyclicClients() { CALL_TASK(CreateLocator); CALL_TASK(CreateServer1_Locator); CALL_TASK(StepOne_PoolLocator); CALL_TASK(StepOne2_PoolLocator); CALL_TASK(RegisterCqs1); CALL_TASK(RegisterCqs2); CALL_TASK(VerifyCqs1); CALL_TASK(VerifyCqs2); CALL_TASK(Client1Down); CALL_TASK(Client2Down); CALL_TASK(Client1UpDurableCQList_Pool); CALL_TASK(Client2UpDurableCQList_Pool); CALL_TASK(RegisterCqsAfterClientup1); CALL_TASK(RegisterCqsAfterClientup2); CALL_TASK(VerifyCqsAfterClientup1); CALL_TASK(VerifyCqsAfterClientup2); CALL_TASK(CloseCache1); CALL_TASK(CloseCache2); CALL_TASK(CloseServer1); CALL_TASK(CloseLocator); } void setPortfolioPdxType() { CALL_TASK(SetPortfolioTypeToPdx); } void UnsetPortfolioType() { CALL_TASK(UnsetPortfolioTypeToPdx); } void doThinClientCqDurable1() { CALL_TASK(CreateServer); // First Run of Durable Client CALL_TASK(RunDurableClient); // Intermediate Feeder, feeding events CALL_TASK(RunFeeder); // Reconnect Durable Client CALL_TASK(RunDurableClient); // Intermediate Feeder, feeding events again CALL_TASK(RunFeeder1); // Reconnect Durable Client again CALL_TASK(RunDurableClient); // Verify we get 20 events CALL_TASK(VerifyEvents); CALL_TASK(CloseServer1); } DUNIT_MAIN { UnsetPortfolioType(); for (int runIdx = 1; runIdx <= 2; ++runIdx) { doThinClientCqDurable(); setPortfolioPdxType(); getDurableCQsFromServerEmptyList(); getDurableCQsFromServer(); getDurableCQsFromServerWithCyclicClients(); } doThinClientCqDurable1(); } END_MAIN