private unsafe void FanoutSearch_impl_Recv()

in src/Modules/LIKQ/FanoutSearch/Core/FanoutSearchModule.Core.cs [495:619]


        private unsafe void FanoutSearch_impl_Recv(AsynReqArgs request_args)
        {
            int request_size = request_args.Size;
            byte* request_buffer = request_args.Buffer + request_args.Offset;
            int current_hop = *(int*)(request_buffer);
            int request_transaction_id = *(int*)(request_buffer + 4);
            int request_path_cnt = (request_size - 2 * sizeof(int)) / (sizeof(long) * (current_hop + 1));
            long* request_paths_ptr = (long*)(request_buffer + 2 * sizeof(int));
            int single_path_len = current_hop + 1;

            bool msg_too_big = false;

            AggregationObject aggregation_obj;

            //Console.WriteLine("hop: {0}", current_hop);
            //Console.WriteLine("tx: {0}", request_transaction_id);

            if (!this.m_aggregationObjects.TryGetValue(request_transaction_id, out aggregation_obj)
                ||
                _CheckTimeout(aggregation_obj))
            { /* Timeout. */ return; }

            var predicate = current_hop == 0 ?
                p => Action.Continue :
                aggregation_obj.predicates[current_hop - 1];

            if (current_hop == aggregation_obj.maxHop)
            {
                System.Collections.Concurrent.ConcurrentBag<int> result_indices = new System.Collections.Concurrent.ConcurrentBag<int>();

                Parallel.For(0, request_path_cnt, (i, state) =>
                {
                    long cell_id = request_paths_ptr[i * single_path_len + current_hop];
                    try
                    {
                        Verbs.m_path_ptr = request_paths_ptr + i * single_path_len;
                        Verbs.m_path_len = single_path_len;
                        using (var cell = s_useICellFunc(cell_id))
                        {
                            if ((~predicate(cell)).HasFlag(c_action_inv_return))
                                result_indices.Add(i);
                        }
                    }
                    catch { }

                    if (0 == (i & 0xFF) && _CheckTimeout(aggregation_obj))
                    {
                        state.Break();
                    }
                });

                var results = result_indices.Select(i => GetPathDescriptor(&request_paths_ptr[i * single_path_len], current_hop)).ToList();
                CommitAggregationResults(request_transaction_id, aggregation_obj, results);
                return;
            }

            var edgeType_list = aggregation_obj.edgeTypes[current_hop];
            bool enumerate_all_edges = edgeType_list.Count == 0;
            var negate_edge_types = new HashSet<string>(edgeType_list.Where(_ => _.FirstOrDefault() == '!').Select(_ => _.Substring(1)));

            if (negate_edge_types.Count == 0)
                negate_edge_types = null;

            using (var dispatcher = new MessageDispatcher(current_hop + 1, request_transaction_id))
            {
                Lazy<List<FanoutPathDescriptor>> intermediate_result_paths = new Lazy<List<FanoutPathDescriptor>>(isThreadSafe: true);
                Parallel.For(0, request_path_cnt, (i, loopstate) =>
                {
                    long cell_id = request_paths_ptr[i * single_path_len + current_hop];
                    long last_id = -1;
                    if (current_hop > 0)
                        last_id = request_paths_ptr[i * single_path_len + (current_hop - 1)];
                    try
                    {
                        Verbs.m_path_ptr = request_paths_ptr + i * single_path_len;
                        Verbs.m_path_len = single_path_len;
                        using (var cell = s_useICellFunc(cell_id))
                        {
                            var action = ~predicate(cell);

                            if (action.HasFlag(c_action_inv_return))
                            {
                                var intermediate_paths = intermediate_result_paths.Value;
                                lock (intermediate_paths)
                                {
                                    intermediate_paths.Add(GetPathDescriptor(&request_paths_ptr[i * single_path_len], current_hop));
                                }
                            }

                            if (action.HasFlag(c_action_inv_continue))
                            {
                                VisitNeighbors(current_hop, edgeType_list, enumerate_all_edges, negate_edge_types, dispatcher, &request_paths_ptr[i * single_path_len], last_id, cell);
                            }
                        }
                    }
                    catch (MessageTooLongException) { msg_too_big = true; }
                    catch { }

                    if (msg_too_big)
                    {
                        loopstate.Break();
                        return;
                    }

                    if (0 == (i & 0xFF) && _CheckTimeout(aggregation_obj))
                    {
                        loopstate.Break();
                        return;
                    }

                });//END Parallel.For

                if (msg_too_big) throw new MessageTooLongException();

                if (intermediate_result_paths.IsValueCreated)
                {
                    using (var intermediate_results = new FanoutAggregationMessageWriter(intermediate_result_paths.Value, request_transaction_id))
                    {
                        IntermediateResult(aggregation_obj.aggregationServer, intermediate_results);
                    }
                }

                dispatcher.Dispatch();
            }
        }