in Sources/Tools/PsiStoreTool/Utility.cs [364:519]
internal static int ExecuteTask(string stream, string store, string path, string name, IEnumerable<string> assemblies, IEnumerable<string> args)
{
Console.WriteLine($"Execute Task (stream={stream}, store={store}, path={path}, name={name}, assemblies={assemblies}, args={args})");
// find task
var tasks = LoadTasks(assemblies).Where(t => t.Attribute.Name == name).ToArray();
if (tasks.Length == 0)
{
throw new Exception($"Could not find task named '{name}'.");
}
else if (tasks.Length > 1)
{
throw new Exception($"Ambiguous task name ({tasks.Count()} tasks found named '{name}').");
}
var task = tasks[0];
// process task
using (var pipeline = Pipeline.Create(
deliveryPolicy: task.Attribute.DeliveryPolicyLatestMessage ? DeliveryPolicy.LatestMessage : DeliveryPolicy.Unlimited,
enableDiagnostics: task.Attribute.EnableDiagnostics))
{
var importer = store != null ? PsiStore.Open(pipeline, store, Path.GetFullPath(path)) : null;
// prepare parameters
var streamMode = stream?.Length > 0;
var messageIndex = -1;
var envelopeIndex = -1;
var argList = args.ToArray();
var argIndex = 0;
var parameterInfo = task.Method.GetParameters();
var parameters = new object[parameterInfo.Length];
for (var i = 0; i < parameterInfo.Length; i++)
{
var p = parameterInfo[i];
if (p.ParameterType.IsAssignableFrom(typeof(Importer)))
{
parameters[i] = importer ?? throw new ArgumentException("Error: Task requires a store, but no store argument supplied (-s).");
}
else if (p.ParameterType.IsAssignableFrom(typeof(Pipeline)))
{
parameters[i] = pipeline;
}
else if (p.ParameterType.IsAssignableFrom(typeof(Envelope)))
{
envelopeIndex = i;
}
else if (streamMode && messageIndex == -1)
{
messageIndex = i; // assumed first arg
}
else
{
void ProcessArgs(string friendlyName, Func<string, object> parser)
{
if (argIndex < args.Count())
{
// take from command-line args
try
{
parameters[i] = parser(argList[argIndex++]);
}
catch (Exception ex)
{
throw new ArgumentException($"Error: Parameter '{p.Name}' ({i}) expected {friendlyName}.", ex);
}
}
else
{
// get value interactively
do
{
try
{
Console.Write($"{p.Name} ({friendlyName})? ");
parameters[i] = parser(Console.ReadLine());
}
catch
{
Console.WriteLine($"Error: Expected {friendlyName}.");
}
}
while (parameters[i] == null);
}
}
if (p.ParameterType.IsAssignableFrom(typeof(double)))
{
ProcessArgs("double", v => double.Parse(v));
}
else if (p.ParameterType.IsAssignableFrom(typeof(int)))
{
ProcessArgs("integer", v => int.Parse(v));
}
else if (p.ParameterType.IsAssignableFrom(typeof(bool)))
{
ProcessArgs("boolean", v => bool.Parse(v));
}
else if (p.ParameterType.IsAssignableFrom(typeof(DateTime)))
{
ProcessArgs("datetime", v => DateTime.Parse(v));
}
else if (p.ParameterType.IsAssignableFrom(typeof(TimeSpan)))
{
ProcessArgs("timespan", v => TimeSpan.Parse(v));
}
else if (p.ParameterType.IsAssignableFrom(typeof(string)))
{
ProcessArgs("string", v => v);
}
else
{
throw new ArgumentException($"Unexpected parameter type ({p.ParameterType}).");
}
}
}
if (streamMode)
{
if (importer == null)
{
throw new ArgumentException("Error: Task requires a stream within a store, but no store argument supplied (-s).");
}
importer.OpenDynamicStream(stream).Do((m, e) =>
{
if (messageIndex != -1)
{
parameters[messageIndex] = m;
}
if (envelopeIndex != -1)
{
parameters[envelopeIndex] = e;
}
task.Method.Invoke(null, parameters);
});
}
else
{
task.Method.Invoke(null, parameters);
}
if (importer != null)
{
pipeline.ProgressReportInterval = TimeSpan.FromSeconds(1);
pipeline.RunAsync(
task.Attribute.ReplayAllRealTime ? ReplayDescriptor.ReplayAllRealTime : ReplayDescriptor.ReplayAll,
new Progress<double>(p => Console.WriteLine($"Progress: {p * 100.0:F2}%")));
pipeline.WaitAll();
}
}
return 0;
}