public void ProcessCQ()

in clicache/integration-test/ThinClientCqPdxTest.cs [598:724]


    public void ProcessCQ(string locators)
    {
      CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true,
      null, locators, "__TESTPOOL1_", true);

      IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName);
      Portfolio p1 = new Portfolio(1, 100);
      Portfolio p2 = new Portfolio(2, 100);
      Portfolio p3 = new Portfolio(3, 100);
      Portfolio p4 = new Portfolio(4, 100);

      region["1"] = p1;
      region["2"] = p2;
      region["3"] = p3;
      region["4"] = p4;

      var qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService();
      
      CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>();
      ICqListener<object, object> cqLstner = new MyCqListener<object, object>();
      ICqStatusListener<object, object> cqStatusLstner = new MyCqStatusListener<object, object>(1);

      ICqListener<object, object>[] v = new ICqListener<object, object>[2];
      cqFac.AddCqListener(cqLstner);
      v[0] = cqLstner;
      v[1] = cqStatusLstner;
      cqFac.InitCqListeners(v);
      Util.Log("InitCqListeners called");
      CqAttributes<object, object> cqAttr = cqFac.Create();
      CqQuery<object, object> qry1 = qs.NewCq("CQ1", "select * from /" + QERegionName + "  p where p.ID >= 1", cqAttr, false);
      qry1.Execute();

      Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
      region["4"] = p1;
      region["3"] = p2;
      region["2"] = p3;
      region["1"] = p4;
      Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete

      qry1 = qs.GetCq<object, object>("CQ1");
      cqAttr = qry1.GetCqAttributes();
      ICqListener<object, object>[] vl = cqAttr.getCqListeners();
      Assert.IsNotNull(vl);
      Assert.AreEqual(2, vl.Length);
      cqLstner = vl[0];
      Assert.IsNotNull(cqLstner);
      MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>;
      Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore());
      Assert.AreEqual(4, myLisner.getEventCountBefore());

      cqStatusLstner = (ICqStatusListener<object, object>)vl[1];
      Assert.IsNotNull(cqStatusLstner);
      MyCqStatusListener<object, object> myStatLisner = (MyCqStatusListener<object, object>)cqStatusLstner;// as MyCqStatusListener<object, object>;
      Util.Log("event count:{0}, error count {1}.", myStatLisner.getEventCountBefore(), myStatLisner.getErrorCountBefore());
      Assert.AreEqual(1, myStatLisner.getCqConnectedCount());
      Assert.AreEqual(4, myStatLisner.getEventCountBefore());

      CqAttributesMutator<object, object> mutator = qry1.GetCqAttributesMutator();
      mutator.RemoveCqListener(cqLstner);
      cqAttr = qry1.GetCqAttributes();
      Util.Log("cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length);
      Assert.AreEqual(1, cqAttr.getCqListeners().Length);

      mutator.RemoveCqListener(cqStatusLstner);
      cqAttr = qry1.GetCqAttributes();
      Util.Log("1 cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length);
      Assert.AreEqual(0, cqAttr.getCqListeners().Length);
      
      ICqListener<object, object>[] v2 = new ICqListener<object, object>[2];
      v2[0] = cqLstner;
      v2[1] = cqStatusLstner;
      MyCqListener<object, object> myLisner2 = (MyCqListener<object, object>)cqLstner;
      myLisner2.Clear();
      MyCqStatusListener<object, object> myStatLisner2 = (MyCqStatusListener<object, object>)cqStatusLstner;
      myStatLisner2.Clear();
      mutator.SetCqListeners(v2);
      cqAttr = qry1.GetCqAttributes();
      Assert.AreEqual(2, cqAttr.getCqListeners().Length);

      region["4"] = p1;
      region["3"] = p2;
      region["2"] = p3;
      region["1"] = p4;
      Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete

      qry1 = qs.GetCq<object, object>("CQ1");
      cqAttr = qry1.GetCqAttributes();
      ICqListener<object, object>[] v3 = cqAttr.getCqListeners();
      Assert.IsNotNull(v3);
      Assert.AreEqual(2, vl.Length);
      cqLstner = v3[0];
      Assert.IsNotNull(cqLstner);
      myLisner2 = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>;
      Util.Log("event count:{0}, error count {1}.", myLisner2.getEventCountBefore(), myLisner2.getErrorCountBefore());
      Assert.AreEqual(4, myLisner2.getEventCountBefore());

      cqStatusLstner = (ICqStatusListener<object, object>)v3[1];
      Assert.IsNotNull(cqStatusLstner);
      myStatLisner2 = (MyCqStatusListener<object, object>)cqStatusLstner;// as MyCqStatusListener<object, object>;
      Util.Log("event count:{0}, error count {1}.", myStatLisner2.getEventCountBefore(), myStatLisner2.getErrorCountBefore());
      Assert.AreEqual(0, myStatLisner2.getCqConnectedCount());
      Assert.AreEqual(4, myStatLisner2.getEventCountBefore());

      mutator = qry1.GetCqAttributesMutator();
      mutator.RemoveCqListener(cqLstner);
      cqAttr = qry1.GetCqAttributes();
      Util.Log("cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length);
      Assert.AreEqual(1, cqAttr.getCqListeners().Length);

      mutator.RemoveCqListener(cqStatusLstner);
      cqAttr = qry1.GetCqAttributes();
      Util.Log("1 cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length);
      Assert.AreEqual(0, cqAttr.getCqListeners().Length);

      region["4"] = p1;
      region["3"] = p2;
      region["2"] = p3;
      region["1"] = p4;
      Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete

      qry1 = qs.GetCq<object, object>("CQ1");
      cqAttr = qry1.GetCqAttributes();
      ICqListener<object, object>[] v4 = cqAttr.getCqListeners();      
      Assert.IsNotNull(v4);      
      Assert.AreEqual(0, v4.Length);
      Util.Log("cqAttr.getCqListeners() done");
    }