in clicache/integration-test/ThinClientCqAttributesMutatorTests.cs [270:396]
public void ProcessCQ(string locators)
{
CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true,
null, locators, "__TESTPOOL1_", true);
IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName);
Portfolio p1 = new Portfolio(1, 100);
Portfolio p2 = new Portfolio(2, 100);
Portfolio p3 = new Portfolio(3, 100);
Portfolio p4 = new Portfolio(4, 100);
region["1"] = p1;
region["2"] = p2;
region["3"] = p3;
region["4"] = p4;
var qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService();
CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>();
ICqListener<object, object> cqLstner = new MyCqListener<object, object>();
ICqStatusListener<object, object> cqStatusLstner = new MyCqStatusListener<object, object>(1);
ICqListener<object, object>[] v = new ICqListener<object, object>[2];
cqFac.AddCqListener(cqLstner);
v[0] = cqLstner;
v[1] = cqStatusLstner;
cqFac.InitCqListeners(v);
Util.Log("InitCqListeners called");
CqAttributes<object, object> cqAttr = cqFac.Create();
CqQuery<object, object> qry1 = qs.NewCq("CQ1", "select * from /" + QERegionName + " p where p.ID >= 1", cqAttr, false);
qry1.Execute();
Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
region["4"] = p1;
region["3"] = p2;
region["2"] = p3;
region["1"] = p4;
Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
qry1 = qs.GetCq<object, object>("CQ1");
cqAttr = qry1.GetCqAttributes();
ICqListener<object, object>[] vl = cqAttr.getCqListeners();
Assert.IsNotNull(vl);
Assert.AreEqual(2, vl.Length);
cqLstner = vl[0];
Assert.IsNotNull(cqLstner);
MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>;
Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore());
Assert.AreEqual(4, myLisner.getEventCountBefore());
cqStatusLstner = (ICqStatusListener<object, object>)vl[1];
Assert.IsNotNull(cqStatusLstner);
MyCqStatusListener<object, object> myStatLisner = (MyCqStatusListener<object, object>)cqStatusLstner;// as MyCqStatusListener<object, object>;
Util.Log("event count:{0}, error count {1}.", myStatLisner.getEventCountBefore(), myStatLisner.getErrorCountBefore());
Assert.AreEqual(1, myStatLisner.getCqConnectedCount());
Assert.AreEqual(4, myStatLisner.getEventCountBefore());
CqAttributesMutator<object, object> mutator = qry1.GetCqAttributesMutator();
mutator.RemoveCqListener(cqLstner);
cqAttr = qry1.GetCqAttributes();
Util.Log("cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length);
Assert.AreEqual(1, cqAttr.getCqListeners().Length);
mutator.RemoveCqListener(cqStatusLstner);
cqAttr = qry1.GetCqAttributes();
Util.Log("1 cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length);
Assert.AreEqual(0, cqAttr.getCqListeners().Length);
ICqListener<object, object>[] v2 = new ICqListener<object, object>[2];
v2[0] = cqLstner;
v2[1] = cqStatusLstner;
MyCqListener<object, object> myLisner2 = (MyCqListener<object, object>)cqLstner;
myLisner2.Clear();
MyCqStatusListener<object, object> myStatLisner2 = (MyCqStatusListener<object, object>)cqStatusLstner;
myStatLisner2.Clear();
mutator.SetCqListeners(v2);
cqAttr = qry1.GetCqAttributes();
Assert.AreEqual(2, cqAttr.getCqListeners().Length);
region["4"] = p1;
region["3"] = p2;
region["2"] = p3;
region["1"] = p4;
Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
qry1 = qs.GetCq<object, object>("CQ1");
cqAttr = qry1.GetCqAttributes();
ICqListener<object, object>[] v3 = cqAttr.getCqListeners();
Assert.IsNotNull(v3);
Assert.AreEqual(2, vl.Length);
cqLstner = v3[0];
Assert.IsNotNull(cqLstner);
myLisner2 = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>;
Util.Log("event count:{0}, error count {1}.", myLisner2.getEventCountBefore(), myLisner2.getErrorCountBefore());
Assert.AreEqual(4, myLisner2.getEventCountBefore());
cqStatusLstner = (ICqStatusListener<object, object>)v3[1];
Assert.IsNotNull(cqStatusLstner);
myStatLisner2 = (MyCqStatusListener<object, object>)cqStatusLstner;// as MyCqStatusListener<object, object>;
Util.Log("event count:{0}, error count {1}.", myStatLisner2.getEventCountBefore(), myStatLisner2.getErrorCountBefore());
Assert.AreEqual(0, myStatLisner2.getCqConnectedCount());
Assert.AreEqual(4, myStatLisner2.getEventCountBefore());
mutator = qry1.GetCqAttributesMutator();
mutator.RemoveCqListener(cqLstner);
cqAttr = qry1.GetCqAttributes();
Util.Log("cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length);
Assert.AreEqual(1, cqAttr.getCqListeners().Length);
mutator.RemoveCqListener(cqStatusLstner);
cqAttr = qry1.GetCqAttributes();
Util.Log("1 cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length);
Assert.AreEqual(0, cqAttr.getCqListeners().Length);
region["4"] = p1;
region["3"] = p2;
region["2"] = p3;
region["1"] = p4;
Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
qry1 = qs.GetCq<object, object>("CQ1");
cqAttr = qry1.GetCqAttributes();
ICqListener<object, object>[] v4 = cqAttr.getCqListeners();
Assert.IsNotNull(v4);
Assert.AreEqual(0, v4.Length);
Util.Log("cqAttr.getCqListeners() done");
}