clicache/integration-test/ThinClientCqPdxTest.cs (619 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Threading; namespace Apache.Geode.Client.UnitTests { using NUnit.Framework; using Apache.Geode.DUnitFramework; using Apache.Geode.Client.Tests; using Apache.Geode.Client; [TestFixture] [Category("group3")] [Category("unicast_only")] [Category("generics")] public class ThinClientCqPdxTest : ThinClientRegionSteps { public class MyCqListener<TKey, TResult> : ICqListener<TKey, TResult> { #region Private members private bool m_failedOver = false; private UInt32 m_eventCountBefore = 0; private UInt32 m_errorCountBefore = 0; private UInt32 m_eventCountAfter = 0; private UInt32 m_errorCountAfter = 0; #endregion #region Public accessors public void failedOver() { m_failedOver = true; } public UInt32 getEventCountBefore() { return m_eventCountBefore; } public UInt32 getErrorCountBefore() { return m_errorCountBefore; } public UInt32 getEventCountAfter() { return m_eventCountAfter; } public UInt32 getErrorCountAfter() { return m_errorCountAfter; } #endregion public virtual void OnEvent(CqEvent<TKey, TResult> ev) { Util.Log("MyCqListener::OnEvent called"); if (m_failedOver == true) m_eventCountAfter++; else m_eventCountBefore++; //ISerializable val = ev.getNewValue(); //ICacheableKey key = ev.getKey(); TResult val = (TResult)ev.getNewValue(); /*ICacheableKey*/ TKey key = ev.getKey(); CqOperation opType = ev.getQueryOperation(); //CacheableString keyS = key as CacheableString; string keyS = key.ToString(); //as string; Portfolio pval = val as Portfolio; PortfolioPdx pPdxVal = val as PortfolioPdx; Assert.IsTrue((pPdxVal != null) || (pval != null)); //string opStr = "DESTROY"; /*if (opType == CqOperation.OP_TYPE_CREATE) opStr = "CREATE"; else if (opType == CqOperation.OP_TYPE_UPDATE) opStr = "UPDATE";*/ //Util.Log("key {0}, value ({1},{2}), op {3}.", keyS, // pval.ID, pval.Pkid, opStr); } public virtual void OnError(CqEvent<TKey, TResult> ev) { Util.Log("MyCqListener::OnError called"); if (m_failedOver == true) m_errorCountAfter++; else m_errorCountBefore++; } public virtual void Close() { Util.Log("MyCqListener::close called"); } public virtual void Clear() { Util.Log("MyCqListener::Clear called"); m_eventCountBefore = 0; m_errorCountBefore = 0; m_eventCountAfter = 0; m_errorCountAfter = 0; } } public class MyCqListener1<TKey, TResult> : ICqListener<TKey, TResult> { public static UInt32 m_cntEvents = 0; public virtual void OnEvent(CqEvent<TKey, TResult> ev) { m_cntEvents++; Util.Log("MyCqListener1::OnEvent called"); Object val = (Object)ev.getNewValue(); Object pkey = (Object)ev.getKey(); int value = (int)val; int key = (int)pkey; CqOperation opType = ev.getQueryOperation(); String opStr = "Default"; if (opType == CqOperation.OP_TYPE_CREATE) opStr = "CREATE"; else if (opType == CqOperation.OP_TYPE_UPDATE) opStr = "UPDATE"; Util.Log("MyCqListener1::OnEvent called with {0} , key = {1}, value = {2} ", opStr, key, value); } public virtual void OnError(CqEvent<TKey, TResult> ev) { Util.Log("MyCqListener1::OnError called"); } public virtual void Close() { Util.Log("MyCqListener1::close called"); } } public class MyCqStatusListener<TKey, TResult> : ICqStatusListener<TKey, TResult> { #region Private members private bool m_failedOver = false; private UInt32 m_eventCountBefore = 0; private UInt32 m_errorCountBefore = 0; private UInt32 m_eventCountAfter = 0; private UInt32 m_errorCountAfter = 0; private UInt32 m_CqConnectedCount = 0; private UInt32 m_CqDisConnectedCount = 0; #endregion #region Public accessors public MyCqStatusListener(int id) { } public void failedOver() { m_failedOver = true; } public UInt32 getEventCountBefore() { return m_eventCountBefore; } public UInt32 getErrorCountBefore() { return m_errorCountBefore; } public UInt32 getEventCountAfter() { return m_eventCountAfter; } public UInt32 getErrorCountAfter() { return m_errorCountAfter; } public UInt32 getCqConnectedCount() { return m_CqConnectedCount; } public UInt32 getCqDisConnectedCount() { return m_CqDisConnectedCount; } #endregion public virtual void OnEvent(CqEvent<TKey, TResult> ev) { Util.Log("MyCqStatusListener::OnEvent called"); if (m_failedOver == true) m_eventCountAfter++; else m_eventCountBefore++; TResult val = (TResult)ev.getNewValue(); TKey key = ev.getKey(); CqOperation opType = ev.getQueryOperation(); string keyS = key.ToString(); //as string; } public virtual void OnError(CqEvent<TKey, TResult> ev) { Util.Log("MyCqStatusListener::OnError called"); if (m_failedOver == true) m_errorCountAfter++; else m_errorCountBefore++; } public virtual void Close() { Util.Log("MyCqStatusListener::close called"); } public virtual void OnCqConnected() { m_CqConnectedCount++; Util.Log("MyCqStatusListener::OnCqConnected called"); } public virtual void OnCqDisconnected() { m_CqDisConnectedCount++; Util.Log("MyCqStatusListener::OnCqDisconnected called"); } public virtual void Clear() { Util.Log("MyCqStatusListener::Clear called"); m_eventCountBefore = 0; m_errorCountBefore = 0; m_eventCountAfter = 0; m_errorCountAfter = 0; m_CqConnectedCount = 0; m_CqDisConnectedCount = 0; } } #region Private members private static bool m_usePdxObjects = false; private UnitProcess m_client1; private UnitProcess m_client2; private static string[] QueryRegionNames = { "Portfolios", "Positions", "Portfolios2", "Portfolios3" }; private static string QERegionName = "Portfolios"; private static string CqName = "MyCq"; private static string CqName1 = "testCQAllServersLeave"; private static string CqName2 = "testCQAllServersLeave1"; private static string CqQuery1 = "select * from /DistRegionAck"; private static string CqQuery2 = "select * from /DistRegionAck1"; //private static string CqName1 = "MyCq1"; #endregion protected override ClientBase[] GetClients() { m_client1 = new UnitProcess(); m_client2 = new UnitProcess(); return new ClientBase[] { m_client1, m_client2 }; } [TestFixtureSetUp] public override void InitTests() { base.InitTests(); m_client1.Call(InitClient); m_client2.Call(InitClient); } [TearDown] public override void EndTest() { CacheHelper.StopJavaServers(); base.EndTest(); } public void InitClient() { CacheHelper.Init(); try { CacheHelper.DCache.TypeRegistry.RegisterType(Portfolio.CreateDeserializable, 8); CacheHelper.DCache.TypeRegistry.RegisterType(Position.CreateDeserializable, 7); } catch (IllegalStateException) { // ignore since we run multiple iterations for pool and non pool configs } } public void StepOne(string locators) { CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[0], true, true, null, locators, "__TESTPOOL1_", true); CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[1], true, true, null, locators, "__TESTPOOL1_", true); CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[2], true, true, null, locators, "__TESTPOOL1_", true); CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[3], true, true, null, locators, "__TESTPOOL1_", true); CacheHelper.CreateTCRegion_Pool<object, object>("DistRegionAck", true, true, null, locators, "__TESTPOOL1_", true); IRegion<object, object> region = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]); Apache.Geode.Client.RegionAttributes<object, object> regattrs = region.Attributes; region.CreateSubRegion(QueryRegionNames[1], regattrs); } public void StepTwo(bool usePdxObject) { IRegion<object, object> region0 = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]); IRegion<object, object> subRegion0 = region0.GetSubRegion(QueryRegionNames[1]); IRegion<object, object> region1 = CacheHelper.GetRegion<object, object>(QueryRegionNames[1]); IRegion<object, object> region2 = CacheHelper.GetRegion<object, object>(QueryRegionNames[2]); IRegion<object, object> region3 = CacheHelper.GetRegion<object, object>(QueryRegionNames[3]); QueryHelper<object, object> qh = QueryHelper<object, object>.GetHelper(CacheHelper.DCache); Util.Log("Object type is pdx = " + m_usePdxObjects); Util.Log("SetSize {0}, NumSets {1}.", qh.PortfolioSetSize, qh.PortfolioNumSets); if (!usePdxObject) { qh.PopulatePortfolioData(region0, qh.PortfolioSetSize, qh.PortfolioNumSets); qh.PopulatePositionData(subRegion0, qh.PortfolioSetSize, qh.PortfolioNumSets); qh.PopulatePositionData(region1, qh.PortfolioSetSize, qh.PortfolioNumSets); qh.PopulatePortfolioData(region2, qh.PortfolioSetSize, qh.PortfolioNumSets); qh.PopulatePortfolioData(region3, qh.PortfolioSetSize, qh.PortfolioNumSets); } else { CacheHelper.DCache.TypeRegistry.RegisterPdxType(PortfolioPdx.CreateDeserializable); CacheHelper.DCache.TypeRegistry.RegisterPdxType(PositionPdx.CreateDeserializable); qh.PopulatePortfolioPdxData(region0, qh.PortfolioSetSize, qh.PortfolioNumSets); qh.PopulatePortfolioPdxData(subRegion0, qh.PortfolioSetSize, qh.PortfolioNumSets); qh.PopulatePortfolioPdxData(region1, qh.PortfolioSetSize, qh.PortfolioNumSets); qh.PopulatePortfolioPdxData(region2, qh.PortfolioSetSize, qh.PortfolioNumSets); qh.PopulatePortfolioPdxData(region3, qh.PortfolioSetSize, qh.PortfolioNumSets); } } public void StepTwoQT() { IRegion<object, object> region0 = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]); IRegion<object, object> subRegion0 = region0.GetSubRegion(QueryRegionNames[1]); QueryHelper<object, object> qh = QueryHelper<object, object>.GetHelper(CacheHelper.DCache); qh.PopulatePortfolioData(region0, 100, 20, 100); qh.PopulatePositionData(subRegion0, 100, 20); } public void StepOneQE(string locators) { CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true, null, locators, "__TESTPOOL1_", true); IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName); Portfolio p1 = new Portfolio(1, 100); Portfolio p2 = new Portfolio(2, 100); Portfolio p3 = new Portfolio(3, 100); Portfolio p4 = new Portfolio(4, 100); region["1"] = p1; region["2"] = p2; region["3"] = p3; region["4"] = p4; var qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService(); CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); ICqListener<object, object> cqLstner = new MyCqListener<object, object>(); cqFac.AddCqListener(cqLstner); CqAttributes<object, object> cqAttr = cqFac.Create(); CqQuery<object, object> qry = qs.NewCq(CqName, "select * from /" + QERegionName + " p where p.ID!=2", cqAttr, false); qry.Execute(); Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete region["4"] = p1; region["3"] = p2; region["2"] = p3; region["1"] = p4; Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete qry = qs.GetCq<object, object>(CqName); CqServiceStatistics cqSvcStats = qs.GetCqStatistics(); Assert.AreEqual(1, cqSvcStats.numCqsActive()); Assert.AreEqual(1, cqSvcStats.numCqsCreated()); Assert.AreEqual(1, cqSvcStats.numCqsOnClient()); cqAttr = qry.GetCqAttributes(); ICqListener<object, object>[] vl = cqAttr.getCqListeners(); Assert.IsNotNull(vl); Assert.AreEqual(1, vl.Length); cqLstner = vl[0]; Assert.IsNotNull(cqLstner); MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>; Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore()); CqStatistics cqStats = qry.GetStatistics(); Assert.AreEqual(cqStats.numEvents(), myLisner.getEventCountBefore()); if (myLisner.getEventCountBefore() + myLisner.getErrorCountBefore() == 0) { Assert.Fail("cq before count zero"); } qry.Stop(); Assert.AreEqual(1, cqSvcStats.numCqsStopped()); qry.Close(); Assert.AreEqual(1, cqSvcStats.numCqsClosed()); // Bring down the region region.GetLocalView().DestroyRegion(); } public void StepOnePdxQE(string locators) { CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true, null, locators, "__TESTPOOL1_", true); IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName); PortfolioPdx p1 = new PortfolioPdx(1, 100); PortfolioPdx p2 = new PortfolioPdx(2, 100); PortfolioPdx p3 = new PortfolioPdx(3, 100); PortfolioPdx p4 = new PortfolioPdx(4, 100); region["1"] = p1; region["2"] = p2; region["3"] = p3; region["4"] = p4; var qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService(); CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); ICqListener<object, object> cqLstner = new MyCqListener<object, object>(); cqFac.AddCqListener(cqLstner); CqAttributes<object, object> cqAttr = cqFac.Create(); CqQuery<object, object> qry = qs.NewCq(CqName, "select * from /" + QERegionName + " p where p.ID!=2", cqAttr, false); qry.Execute(); Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete region["4"] = p1; region["3"] = p2; region["2"] = p3; region["1"] = p4; Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete qry = qs.GetCq<object, object>(CqName); CqServiceStatistics cqSvcStats = qs.GetCqStatistics(); Assert.AreEqual(1, cqSvcStats.numCqsActive()); Assert.AreEqual(1, cqSvcStats.numCqsCreated()); Assert.AreEqual(1, cqSvcStats.numCqsOnClient()); cqAttr = qry.GetCqAttributes(); ICqListener<object, object>[] vl = cqAttr.getCqListeners(); Assert.IsNotNull(vl); Assert.AreEqual(1, vl.Length); cqLstner = vl[0]; Assert.IsNotNull(cqLstner); MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>; Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore()); CqStatistics cqStats = qry.GetStatistics(); Assert.AreEqual(cqStats.numEvents(), myLisner.getEventCountBefore()); if (myLisner.getEventCountBefore() + myLisner.getErrorCountBefore() == 0) { Assert.Fail("cq before count zero"); } qry.Stop(); Assert.AreEqual(1, cqSvcStats.numCqsStopped()); qry.Close(); Assert.AreEqual(1, cqSvcStats.numCqsClosed()); // Bring down the region region.GetLocalView().DestroyRegion(); } public void KillServer() { CacheHelper.StopJavaServer(1); Util.Log("Cacheserver 1 stopped."); } public delegate void KillServerDelegate(); /* public void StepOneFailover() { // This is here so that Client1 registers information of the cacheserver // that has been already started CacheHelper.SetupJavaServers("remotequery.xml", "cqqueryfailover.xml"); CacheHelper.StartJavaServer(1, "GFECS1"); Util.Log("Cacheserver 1 started."); CacheHelper.CreateTCRegion(QueryRegionNames[0], true, true, null, true); Region region = CacheHelper.GetVerifyRegion(QueryRegionNames[0]); Portfolio p1 = new Portfolio(1, 100); Portfolio p2 = new Portfolio(2, 200); Portfolio p3 = new Portfolio(3, 300); Portfolio p4 = new Portfolio(4, 400); region.Put("1", p1); region.Put("2", p2); region.Put("3", p3); region.Put("4", p4); } */ /* public void StepTwoFailover() { CacheHelper.StartJavaServer(2, "GFECS2"); Util.Log("Cacheserver 2 started."); IAsyncResult killRes = null; KillServerDelegate ksd = new KillServerDelegate(KillServer); CacheHelper.CreateTCRegion(QueryRegionNames[0], true, true, null, true); IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QueryRegionNames[0]); var qs = CacheHelper.DCache.GetQueryService(); CqAttributesFactory cqFac = new CqAttributesFactory(); ICqListener cqLstner = new MyCqListener(); cqFac.AddCqListener(cqLstner); CqAttributes cqAttr = cqFac.Create(); CqQuery qry = qs.NewCq(CqName1, "select * from /" + QERegionName + " p where p.ID!<4", cqAttr, true); qry.Execute(); Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete qry = qs.GetCq(CqName1); cqAttr = qry.GetCqAttributes(); ICqListener[] vl = cqAttr.getCqListeners(); Assert.IsNotNull(vl); Assert.AreEqual(1, vl.Length); cqLstner = vl[0]; Assert.IsNotNull(cqLstner); MyCqListener myLisner = cqLstner as MyCqListener; if (myLisner.getEventCountAfter() + myLisner.getErrorCountAfter() != 0) { Assert.Fail("cq after count not zero"); } killRes = ksd.BeginInvoke(null, null); Thread.Sleep(18000); // sleep 0.3min to allow failover complete myLisner.failedOver(); Portfolio p1 = new Portfolio(1, 100); Portfolio p2 = new Portfolio(2, 200); Portfolio p3 = new Portfolio(3, 300); Portfolio p4 = new Portfolio(4, 400); region.Put("4", p1); region.Put("3", p2); region.Put("2", p3); region.Put("1", p4); Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete qry = qs.GetCq(CqName1); cqAttr = qry.GetCqAttributes(); vl = cqAttr.getCqListeners(); cqLstner = vl[0]; Assert.IsNotNull(vl); Assert.AreEqual(1, vl.Length); cqLstner = vl[0]; Assert.IsNotNull(cqLstner); myLisner = cqLstner as MyCqListener; if (myLisner.getEventCountAfter() + myLisner.getErrorCountAfter() == 0) { Assert.Fail("no cq after failover"); } killRes.AsyncWaitHandle.WaitOne(); ksd.EndInvoke(killRes); qry.Stop(); qry.Close(); } */ public void ProcessCQ(string locators) { CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true, null, locators, "__TESTPOOL1_", true); IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName); Portfolio p1 = new Portfolio(1, 100); Portfolio p2 = new Portfolio(2, 100); Portfolio p3 = new Portfolio(3, 100); Portfolio p4 = new Portfolio(4, 100); region["1"] = p1; region["2"] = p2; region["3"] = p3; region["4"] = p4; var qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService(); CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); ICqListener<object, object> cqLstner = new MyCqListener<object, object>(); ICqStatusListener<object, object> cqStatusLstner = new MyCqStatusListener<object, object>(1); ICqListener<object, object>[] v = new ICqListener<object, object>[2]; cqFac.AddCqListener(cqLstner); v[0] = cqLstner; v[1] = cqStatusLstner; cqFac.InitCqListeners(v); Util.Log("InitCqListeners called"); CqAttributes<object, object> cqAttr = cqFac.Create(); CqQuery<object, object> qry1 = qs.NewCq("CQ1", "select * from /" + QERegionName + " p where p.ID >= 1", cqAttr, false); qry1.Execute(); Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete region["4"] = p1; region["3"] = p2; region["2"] = p3; region["1"] = p4; Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete qry1 = qs.GetCq<object, object>("CQ1"); cqAttr = qry1.GetCqAttributes(); ICqListener<object, object>[] vl = cqAttr.getCqListeners(); Assert.IsNotNull(vl); Assert.AreEqual(2, vl.Length); cqLstner = vl[0]; Assert.IsNotNull(cqLstner); MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>; Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore()); Assert.AreEqual(4, myLisner.getEventCountBefore()); cqStatusLstner = (ICqStatusListener<object, object>)vl[1]; Assert.IsNotNull(cqStatusLstner); MyCqStatusListener<object, object> myStatLisner = (MyCqStatusListener<object, object>)cqStatusLstner;// as MyCqStatusListener<object, object>; Util.Log("event count:{0}, error count {1}.", myStatLisner.getEventCountBefore(), myStatLisner.getErrorCountBefore()); Assert.AreEqual(1, myStatLisner.getCqConnectedCount()); Assert.AreEqual(4, myStatLisner.getEventCountBefore()); CqAttributesMutator<object, object> mutator = qry1.GetCqAttributesMutator(); mutator.RemoveCqListener(cqLstner); cqAttr = qry1.GetCqAttributes(); Util.Log("cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length); Assert.AreEqual(1, cqAttr.getCqListeners().Length); mutator.RemoveCqListener(cqStatusLstner); cqAttr = qry1.GetCqAttributes(); Util.Log("1 cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length); Assert.AreEqual(0, cqAttr.getCqListeners().Length); ICqListener<object, object>[] v2 = new ICqListener<object, object>[2]; v2[0] = cqLstner; v2[1] = cqStatusLstner; MyCqListener<object, object> myLisner2 = (MyCqListener<object, object>)cqLstner; myLisner2.Clear(); MyCqStatusListener<object, object> myStatLisner2 = (MyCqStatusListener<object, object>)cqStatusLstner; myStatLisner2.Clear(); mutator.SetCqListeners(v2); cqAttr = qry1.GetCqAttributes(); Assert.AreEqual(2, cqAttr.getCqListeners().Length); region["4"] = p1; region["3"] = p2; region["2"] = p3; region["1"] = p4; Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete qry1 = qs.GetCq<object, object>("CQ1"); cqAttr = qry1.GetCqAttributes(); ICqListener<object, object>[] v3 = cqAttr.getCqListeners(); Assert.IsNotNull(v3); Assert.AreEqual(2, vl.Length); cqLstner = v3[0]; Assert.IsNotNull(cqLstner); myLisner2 = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>; Util.Log("event count:{0}, error count {1}.", myLisner2.getEventCountBefore(), myLisner2.getErrorCountBefore()); Assert.AreEqual(4, myLisner2.getEventCountBefore()); cqStatusLstner = (ICqStatusListener<object, object>)v3[1]; Assert.IsNotNull(cqStatusLstner); myStatLisner2 = (MyCqStatusListener<object, object>)cqStatusLstner;// as MyCqStatusListener<object, object>; Util.Log("event count:{0}, error count {1}.", myStatLisner2.getEventCountBefore(), myStatLisner2.getErrorCountBefore()); Assert.AreEqual(0, myStatLisner2.getCqConnectedCount()); Assert.AreEqual(4, myStatLisner2.getEventCountBefore()); mutator = qry1.GetCqAttributesMutator(); mutator.RemoveCqListener(cqLstner); cqAttr = qry1.GetCqAttributes(); Util.Log("cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length); Assert.AreEqual(1, cqAttr.getCqListeners().Length); mutator.RemoveCqListener(cqStatusLstner); cqAttr = qry1.GetCqAttributes(); Util.Log("1 cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length); Assert.AreEqual(0, cqAttr.getCqListeners().Length); region["4"] = p1; region["3"] = p2; region["2"] = p3; region["1"] = p4; Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete qry1 = qs.GetCq<object, object>("CQ1"); cqAttr = qry1.GetCqAttributes(); ICqListener<object, object>[] v4 = cqAttr.getCqListeners(); Assert.IsNotNull(v4); Assert.AreEqual(0, v4.Length); Util.Log("cqAttr.getCqListeners() done"); } public void CreateAndExecuteCQ_StatusListener(string poolName, string cqName, string cqQuery, int id) { var qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService(); CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); cqFac.AddCqListener(new MyCqStatusListener<object, object>(id)); CqAttributes<object, object> cqAttr = cqFac.Create(); CqQuery<object, object> qry = qs.NewCq(cqName, cqQuery, cqAttr, false); qry.Execute(); Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete } public void CreateAndExecuteCQ_Listener(string poolName, string cqName, string cqQuery, int id) { var qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService(); CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); cqFac.AddCqListener(new MyCqListener<object, object>(/*id*/)); CqAttributes<object, object> cqAttr = cqFac.Create(); CqQuery<object, object> qry = qs.NewCq(cqName, cqQuery, cqAttr, false); qry.Execute(); Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete } public void CheckCQStatusOnConnect(string poolName, string cqName, int onCqStatusConnect) { var qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService(); CqQuery<object, object> query = qs.GetCq<object, object>(cqName); CqAttributes<object, object> cqAttr = query.GetCqAttributes(); ICqListener<object, object>[] vl = cqAttr.getCqListeners(); MyCqStatusListener<object, object> myCqStatusLstr = (MyCqStatusListener<object, object>) vl[0]; Util.Log("CheckCQStatusOnConnect = {0} ", myCqStatusLstr.getCqConnectedCount()); Assert.AreEqual(onCqStatusConnect, myCqStatusLstr.getCqConnectedCount()); } public void CheckCQStatusOnDisConnect(string poolName, string cqName, int onCqStatusDisConnect) { var qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService(); CqQuery<object, object> query = qs.GetCq<object, object>(cqName); CqAttributes<object, object> cqAttr = query.GetCqAttributes(); ICqListener<object, object>[] vl = cqAttr.getCqListeners(); MyCqStatusListener<object, object> myCqStatusLstr = (MyCqStatusListener<object, object>)vl[0]; Util.Log("CheckCQStatusOnDisConnect = {0} ", myCqStatusLstr.getCqDisConnectedCount()); Assert.AreEqual(onCqStatusDisConnect, myCqStatusLstr.getCqDisConnectedCount()); } public void PutEntries(string regionName) { IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(regionName); for (int i = 1; i <= 10; i++) { region["key-" + i] = "val-" + i; } Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete } public void CheckCQStatusOnPutEvent(string poolName, string cqName, int onCreateCount) { var qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService(); CqQuery<object, object> query = qs.GetCq<object, object>(cqName); CqAttributes<object, object> cqAttr = query.GetCqAttributes(); ICqListener<object, object>[] vl = cqAttr.getCqListeners(); MyCqStatusListener<object, object> myCqStatusLstr = (MyCqStatusListener<object, object>)vl[0]; Util.Log("CheckCQStatusOnPutEvent = {0} ", myCqStatusLstr.getEventCountBefore()); Assert.AreEqual(onCreateCount, myCqStatusLstr.getEventCountBefore()); } public void CreateRegion(string locators, string servergroup, string regionName, string poolName) { CacheHelper.CreateTCRegion_Pool<object, object>(regionName, true, true, null, locators, poolName, true, servergroup); } void runCqQueryPdxTest() { CacheHelper.SetupJavaServers(true, "remotequeryN.xml"); CacheHelper.StartJavaLocator(1, "GFELOC"); Util.Log("Locator started"); CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); Util.Log("Cacheserver 1 started."); m_client1.Call(StepOne, CacheHelper.Locators); Util.Log("StepOne complete."); m_client1.Call(StepTwo, m_usePdxObjects); Util.Log("StepTwo complete."); if (!m_usePdxObjects) m_client1.Call(StepOneQE, CacheHelper.Locators); else m_client1.Call(StepOnePdxQE, CacheHelper.Locators); Util.Log("StepOne complete."); m_client1.Call(Close); CacheHelper.StopJavaServer(1); Util.Log("Cacheserver 1 stopped."); CacheHelper.StopJavaLocator(1); Util.Log("Locator stopped"); } [Test] public void CqQueryPdxTest() { m_usePdxObjects = true; runCqQueryPdxTest(); m_usePdxObjects = false; } } }