in src/Modules/LIKQ/FanoutSearch/Core/FanoutSearchModule.Core.cs [495:619]
private unsafe void FanoutSearch_impl_Recv(AsynReqArgs request_args)
{
int request_size = request_args.Size;
byte* request_buffer = request_args.Buffer + request_args.Offset;
int current_hop = *(int*)(request_buffer);
int request_transaction_id = *(int*)(request_buffer + 4);
int request_path_cnt = (request_size - 2 * sizeof(int)) / (sizeof(long) * (current_hop + 1));
long* request_paths_ptr = (long*)(request_buffer + 2 * sizeof(int));
int single_path_len = current_hop + 1;
bool msg_too_big = false;
AggregationObject aggregation_obj;
//Console.WriteLine("hop: {0}", current_hop);
//Console.WriteLine("tx: {0}", request_transaction_id);
if (!this.m_aggregationObjects.TryGetValue(request_transaction_id, out aggregation_obj)
||
_CheckTimeout(aggregation_obj))
{ /* Timeout. */ return; }
var predicate = current_hop == 0 ?
p => Action.Continue :
aggregation_obj.predicates[current_hop - 1];
if (current_hop == aggregation_obj.maxHop)
{
System.Collections.Concurrent.ConcurrentBag<int> result_indices = new System.Collections.Concurrent.ConcurrentBag<int>();
Parallel.For(0, request_path_cnt, (i, state) =>
{
long cell_id = request_paths_ptr[i * single_path_len + current_hop];
try
{
Verbs.m_path_ptr = request_paths_ptr + i * single_path_len;
Verbs.m_path_len = single_path_len;
using (var cell = s_useICellFunc(cell_id))
{
if ((~predicate(cell)).HasFlag(c_action_inv_return))
result_indices.Add(i);
}
}
catch { }
if (0 == (i & 0xFF) && _CheckTimeout(aggregation_obj))
{
state.Break();
}
});
var results = result_indices.Select(i => GetPathDescriptor(&request_paths_ptr[i * single_path_len], current_hop)).ToList();
CommitAggregationResults(request_transaction_id, aggregation_obj, results);
return;
}
var edgeType_list = aggregation_obj.edgeTypes[current_hop];
bool enumerate_all_edges = edgeType_list.Count == 0;
var negate_edge_types = new HashSet<string>(edgeType_list.Where(_ => _.FirstOrDefault() == '!').Select(_ => _.Substring(1)));
if (negate_edge_types.Count == 0)
negate_edge_types = null;
using (var dispatcher = new MessageDispatcher(current_hop + 1, request_transaction_id))
{
Lazy<List<FanoutPathDescriptor>> intermediate_result_paths = new Lazy<List<FanoutPathDescriptor>>(isThreadSafe: true);
Parallel.For(0, request_path_cnt, (i, loopstate) =>
{
long cell_id = request_paths_ptr[i * single_path_len + current_hop];
long last_id = -1;
if (current_hop > 0)
last_id = request_paths_ptr[i * single_path_len + (current_hop - 1)];
try
{
Verbs.m_path_ptr = request_paths_ptr + i * single_path_len;
Verbs.m_path_len = single_path_len;
using (var cell = s_useICellFunc(cell_id))
{
var action = ~predicate(cell);
if (action.HasFlag(c_action_inv_return))
{
var intermediate_paths = intermediate_result_paths.Value;
lock (intermediate_paths)
{
intermediate_paths.Add(GetPathDescriptor(&request_paths_ptr[i * single_path_len], current_hop));
}
}
if (action.HasFlag(c_action_inv_continue))
{
VisitNeighbors(current_hop, edgeType_list, enumerate_all_edges, negate_edge_types, dispatcher, &request_paths_ptr[i * single_path_len], last_id, cell);
}
}
}
catch (MessageTooLongException) { msg_too_big = true; }
catch { }
if (msg_too_big)
{
loopstate.Break();
return;
}
if (0 == (i & 0xFF) && _CheckTimeout(aggregation_obj))
{
loopstate.Break();
return;
}
});//END Parallel.For
if (msg_too_big) throw new MessageTooLongException();
if (intermediate_result_paths.IsValueCreated)
{
using (var intermediate_results = new FanoutAggregationMessageWriter(intermediate_result_paths.Value, request_transaction_id))
{
IntermediateResult(aggregation_obj.aggregationServer, intermediate_results);
}
}
dispatcher.Dispatch();
}
}