in src/Modules/LIKQ/FanoutSearch/Core/FanoutSearchModule.Core.cs [297:467]
private List<ResultPathDescriptor> _PullSelectionsAndAssembleResults(int transaction_id, FanoutQueryMessageReader request, AggregationObject aggregation_obj)
{
int result_set_capacity;
IEnumerable<FanoutPathDescriptor> result_set;
List<ResultPathDescriptor> rpaths;
Protocols.TSL.NodeDescriptor r_desc;
bool[] has_return_selections;
bool[] has_outlink_selections;
lock (aggregation_obj)
{
if (request.take_count == 0)
{
result_set_capacity = aggregation_obj.results.Count - request.skip_count;
result_set = aggregation_obj.results.Skip(request.skip_count);
}
else
{
result_set_capacity = request.take_count;
result_set = aggregation_obj.results.Skip(request.skip_count).Take(request.take_count);
}
if (result_set_capacity < 0) result_set_capacity = 0;
// Assemble result message.
rpaths = new List<ResultPathDescriptor>(capacity: result_set_capacity);
r_desc = new Protocols.TSL.NodeDescriptor(field_selections: null);
has_return_selections = request.return_selection.Select(_ => _.Count != 0).ToArray();
has_outlink_selections = request.return_selection.Select(_ => _.Contains(JsonDSL.graph_outlinks)).ToArray();
foreach (var fpath in result_set)
{
ResultPathDescriptor rpath = new ResultPathDescriptor(nodes: new List<Protocols.TSL.NodeDescriptor>());
r_desc.id = fpath.hop_0;
r_desc.field_selections = has_return_selections[0] ? new List<string>() : null;
rpath.nodes.Add(r_desc);
if (fpath.hop_1.HasValue)
{
r_desc.id = fpath.hop_1.Value;
r_desc.field_selections = has_return_selections[1] ? new List<string>() : null;
rpath.nodes.Add(r_desc);
}
if (fpath.hop_2.HasValue)
{
r_desc.id = fpath.hop_2.Value;
r_desc.field_selections = has_return_selections[2] ? new List<string>() : null;
rpath.nodes.Add(r_desc);
}
if (fpath.hop_3.HasValue)
{
r_desc.id = fpath.hop_3.Value;
r_desc.field_selections = has_return_selections[3] ? new List<string>() : null;
rpath.nodes.Add(r_desc);
}
if (fpath.hop_n != null)
{
int n = 4;
foreach (var id in fpath.hop_n)
{
r_desc.id = id;
r_desc.field_selections = has_return_selections[n] ? new List<string>() : null;
rpath.nodes.Add(r_desc);
++n;
}
}
rpaths.Add(rpath);
}
}
if (request.return_selection.Any(_ => _.Count > 0))
{
Log.WriteLine("Transaction #{0}: pulling selections.", transaction_id);
Stopwatch pull_selection_timer = Stopwatch.StartNew();
int hop_count = request.maxHop + 1;
List<List<string>> return_selections = request.return_selection.Select(s => s.Select(_ => (string)_).ToList()).ToList();
GetNodesInfoRequestWriter[,] node_info_writers = new GetNodesInfoRequestWriter[hop_count, Global.ServerCount];
GetNodesInfoResponse[,] node_info_readers = new GetNodesInfoResponse[hop_count, Global.ServerCount];
int[,] reader_idx = new int[hop_count, Global.ServerCount];
Func<long, int> hash_func = Global.CloudStorage.GetPartitionIdByCellId;
try
{
Parallel.For(0, hop_count, (i, state) =>
{
if (has_return_selections[i])
{
// create msg
for (int j = 0; j < Global.ServerCount; ++j)
{
node_info_writers[i, j] = new GetNodesInfoRequestWriter(fields: return_selections[i]);
if (has_outlink_selections[i])
{
node_info_writers[i, j].secondary_ids = new List<long>();
}
}
try
{
// populate msg
foreach (var rpath in rpaths)
{
if (i < rpath.nodes.Count)
{
var id = rpath.nodes[i].id;
node_info_writers[i, hash_func(id)].ids.Add(id);
if (has_outlink_selections[i])
{
long edge_dest_id = (i < rpath.nodes.Count - 1) ? rpath.nodes[i + 1].id : -1;
node_info_writers[i, hash_func(id)].secondary_ids.Add(edge_dest_id);
}
}
}
// dispatch msg
Parallel.For(0, Global.ServerCount, j =>
{
var reader = _GetNodesInfo_impl(j, node_info_writers[i, j]);
node_info_readers[i, j] = reader;
reader.Dispose();
});
// consume msg
foreach (var rpath in rpaths)
{
if (i < rpath.nodes.Count)
{
var id = rpath.nodes[i].id;
var j = hash_func(id);
var idx = reader_idx[i, j]++;
rpath.nodes[i].field_selections.AddRange(node_info_readers[i, j].infoList[idx].values);
}
}
}
catch (AggregateException ex) when (ex.InnerExceptions.Any(_ => _ is AccessorResizeException || _ is MessageTooLongException))
{
throw new MessageTooLongException();
}
catch (AccessorResizeException)
{
throw new MessageTooLongException();
}
finally
{
// destruct msg
for (int j = 0; j < Global.ServerCount; ++j)
{
node_info_writers[i, j].Dispose();
}
}
}
});
}
catch (AggregateException ex) when (ex.InnerExceptions.Any(_ => _ is MessageTooLongException || _ is AccessorResizeException))
{
throw new MessageTooLongException();
}
pull_selection_timer.Stop();
Log.WriteLine("Transaction #{0}: pulling selections complete. Time = {1}ms.", transaction_id, pull_selection_timer.ElapsedMilliseconds);
}
return rpaths;
}