private List _PullSelectionsAndAssembleResults()

in src/Modules/LIKQ/FanoutSearch/Core/FanoutSearchModule.Core.cs [297:467]


        private List<ResultPathDescriptor> _PullSelectionsAndAssembleResults(int transaction_id, FanoutQueryMessageReader request, AggregationObject aggregation_obj)
        {
            int result_set_capacity;
            IEnumerable<FanoutPathDescriptor> result_set;

            List<ResultPathDescriptor> rpaths;
            Protocols.TSL.NodeDescriptor r_desc;
            bool[] has_return_selections;
            bool[] has_outlink_selections;
            lock (aggregation_obj)
            {
                if (request.take_count == 0)
                {
                    result_set_capacity = aggregation_obj.results.Count - request.skip_count;
                    result_set = aggregation_obj.results.Skip(request.skip_count);
                }
                else
                {
                    result_set_capacity = request.take_count;
                    result_set = aggregation_obj.results.Skip(request.skip_count).Take(request.take_count);
                }

                if (result_set_capacity < 0) result_set_capacity = 0;
                // Assemble result message.
                rpaths = new List<ResultPathDescriptor>(capacity: result_set_capacity);
                r_desc = new Protocols.TSL.NodeDescriptor(field_selections: null);
                has_return_selections = request.return_selection.Select(_ => _.Count != 0).ToArray();
                has_outlink_selections = request.return_selection.Select(_ => _.Contains(JsonDSL.graph_outlinks)).ToArray();


                foreach (var fpath in result_set)
                {
                    ResultPathDescriptor rpath = new ResultPathDescriptor(nodes: new List<Protocols.TSL.NodeDescriptor>());

                    r_desc.id = fpath.hop_0;
                    r_desc.field_selections = has_return_selections[0] ? new List<string>() : null;
                    rpath.nodes.Add(r_desc);

                    if (fpath.hop_1.HasValue)
                    {
                        r_desc.id = fpath.hop_1.Value;
                        r_desc.field_selections = has_return_selections[1] ? new List<string>() : null;
                        rpath.nodes.Add(r_desc);
                    }

                    if (fpath.hop_2.HasValue)
                    {
                        r_desc.id = fpath.hop_2.Value;
                        r_desc.field_selections = has_return_selections[2] ? new List<string>() : null;
                        rpath.nodes.Add(r_desc);
                    }

                    if (fpath.hop_3.HasValue)
                    {
                        r_desc.id = fpath.hop_3.Value;
                        r_desc.field_selections = has_return_selections[3] ? new List<string>() : null;
                        rpath.nodes.Add(r_desc);
                    }

                    if (fpath.hop_n != null)
                    {
                        int n = 4;
                        foreach (var id in fpath.hop_n)
                        {
                            r_desc.id = id;
                            r_desc.field_selections = has_return_selections[n] ? new List<string>() : null;
                            rpath.nodes.Add(r_desc);
                            ++n;
                        }
                    }

                    rpaths.Add(rpath);
                }
            }

            if (request.return_selection.Any(_ => _.Count > 0))
            {
                Log.WriteLine("Transaction #{0}: pulling selections.", transaction_id);
                Stopwatch pull_selection_timer = Stopwatch.StartNew();

                int hop_count = request.maxHop + 1;
                List<List<string>> return_selections = request.return_selection.Select(s => s.Select(_ => (string)_).ToList()).ToList();
                GetNodesInfoRequestWriter[,] node_info_writers = new GetNodesInfoRequestWriter[hop_count, Global.ServerCount];
                GetNodesInfoResponse[,] node_info_readers = new GetNodesInfoResponse[hop_count, Global.ServerCount];
                int[,] reader_idx = new int[hop_count, Global.ServerCount];
                Func<long, int> hash_func = Global.CloudStorage.GetPartitionIdByCellId;

                try
                {
                    Parallel.For(0, hop_count, (i, state) =>
                    {
                        if (has_return_selections[i])
                        {
                            //  create msg
                            for (int j = 0; j < Global.ServerCount; ++j)
                            {
                                node_info_writers[i, j] = new GetNodesInfoRequestWriter(fields: return_selections[i]);
                                if (has_outlink_selections[i])
                                {
                                    node_info_writers[i, j].secondary_ids = new List<long>();
                                }
                            }

                            try
                            {
                                //  populate msg
                                foreach (var rpath in rpaths)
                                {
                                    if (i < rpath.nodes.Count)
                                    {
                                        var id = rpath.nodes[i].id;
                                        node_info_writers[i, hash_func(id)].ids.Add(id);
                                        if (has_outlink_selections[i])
                                        {
                                            long edge_dest_id = (i < rpath.nodes.Count - 1) ? rpath.nodes[i + 1].id : -1;
                                            node_info_writers[i, hash_func(id)].secondary_ids.Add(edge_dest_id);
                                        }
                                    }
                                }

                                //  dispatch msg
                                Parallel.For(0, Global.ServerCount, j =>
                                    {
                                        var reader = _GetNodesInfo_impl(j, node_info_writers[i, j]);
                                        node_info_readers[i, j] = reader;
                                        reader.Dispose();
                                    });

                                //  consume msg
                                foreach (var rpath in rpaths)
                                {
                                    if (i < rpath.nodes.Count)
                                    {
                                        var id = rpath.nodes[i].id;
                                        var j = hash_func(id);
                                        var idx = reader_idx[i, j]++;

                                        rpath.nodes[i].field_selections.AddRange(node_info_readers[i, j].infoList[idx].values);
                                    }
                                }

                            }
                            catch (AggregateException ex) when (ex.InnerExceptions.Any(_ => _ is AccessorResizeException || _ is MessageTooLongException))
                            {
                                throw new MessageTooLongException();
                            }
                            catch (AccessorResizeException)
                            {
                                throw new MessageTooLongException();
                            }
                            finally
                            {
                                //  destruct msg
                                for (int j = 0; j < Global.ServerCount; ++j)
                                {
                                    node_info_writers[i, j].Dispose();
                                }
                            }
                        }
                    });
                }
                catch (AggregateException ex) when (ex.InnerExceptions.Any(_ => _ is MessageTooLongException || _ is AccessorResizeException))
                {
                    throw new MessageTooLongException();
                }

                pull_selection_timer.Stop();
                Log.WriteLine("Transaction #{0}: pulling selections complete. Time = {1}ms.", transaction_id, pull_selection_timer.ElapsedMilliseconds);
            }
            return rpaths;
        }