internal static int ExecuteTask()

in Sources/Tools/PsiStoreTool/Utility.cs [364:519]


        internal static int ExecuteTask(string stream, string store, string path, string name, IEnumerable<string> assemblies, IEnumerable<string> args)
        {
            Console.WriteLine($"Execute Task (stream={stream}, store={store}, path={path}, name={name}, assemblies={assemblies}, args={args})");

            // find task
            var tasks = LoadTasks(assemblies).Where(t => t.Attribute.Name == name).ToArray();
            if (tasks.Length == 0)
            {
                throw new Exception($"Could not find task named '{name}'.");
            }
            else if (tasks.Length > 1)
            {
                throw new Exception($"Ambiguous task name ({tasks.Count()} tasks found named '{name}').");
            }

            var task = tasks[0];

            // process task
            using (var pipeline = Pipeline.Create(
                deliveryPolicy: task.Attribute.DeliveryPolicyLatestMessage ? DeliveryPolicy.LatestMessage : DeliveryPolicy.Unlimited,
                enableDiagnostics: task.Attribute.EnableDiagnostics))
            {
                var importer = store != null ? PsiStore.Open(pipeline, store, Path.GetFullPath(path)) : null;

                // prepare parameters
                var streamMode = stream?.Length > 0;
                var messageIndex = -1;
                var envelopeIndex = -1;
                var argList = args.ToArray();
                var argIndex = 0;
                var parameterInfo = task.Method.GetParameters();
                var parameters = new object[parameterInfo.Length];
                for (var i = 0; i < parameterInfo.Length; i++)
                {
                    var p = parameterInfo[i];
                    if (p.ParameterType.IsAssignableFrom(typeof(Importer)))
                    {
                        parameters[i] = importer ?? throw new ArgumentException("Error: Task requires a store, but no store argument supplied (-s).");
                    }
                    else if (p.ParameterType.IsAssignableFrom(typeof(Pipeline)))
                    {
                        parameters[i] = pipeline;
                    }
                    else if (p.ParameterType.IsAssignableFrom(typeof(Envelope)))
                    {
                        envelopeIndex = i;
                    }
                    else if (streamMode && messageIndex == -1)
                    {
                        messageIndex = i; // assumed first arg
                    }
                    else
                    {
                        void ProcessArgs(string friendlyName, Func<string, object> parser)
                        {
                            if (argIndex < args.Count())
                            {
                                // take from command-line args
                                try
                                {
                                    parameters[i] = parser(argList[argIndex++]);
                                }
                                catch (Exception ex)
                                {
                                    throw new ArgumentException($"Error: Parameter '{p.Name}' ({i}) expected {friendlyName}.", ex);
                                }
                            }
                            else
                            {
                                // get value interactively
                                do
                                {
                                    try
                                    {
                                        Console.Write($"{p.Name} ({friendlyName})? ");
                                        parameters[i] = parser(Console.ReadLine());
                                    }
                                    catch
                                    {
                                        Console.WriteLine($"Error: Expected {friendlyName}.");
                                    }
                                }
                                while (parameters[i] == null);
                            }
                        }

                        if (p.ParameterType.IsAssignableFrom(typeof(double)))
                        {
                            ProcessArgs("double", v => double.Parse(v));
                        }
                        else if (p.ParameterType.IsAssignableFrom(typeof(int)))
                        {
                            ProcessArgs("integer", v => int.Parse(v));
                        }
                        else if (p.ParameterType.IsAssignableFrom(typeof(bool)))
                        {
                            ProcessArgs("boolean", v => bool.Parse(v));
                        }
                        else if (p.ParameterType.IsAssignableFrom(typeof(DateTime)))
                        {
                            ProcessArgs("datetime", v => DateTime.Parse(v));
                        }
                        else if (p.ParameterType.IsAssignableFrom(typeof(TimeSpan)))
                        {
                            ProcessArgs("timespan", v => TimeSpan.Parse(v));
                        }
                        else if (p.ParameterType.IsAssignableFrom(typeof(string)))
                        {
                            ProcessArgs("string", v => v);
                        }
                        else
                        {
                            throw new ArgumentException($"Unexpected parameter type ({p.ParameterType}).");
                        }
                    }
                }

                if (streamMode)
                {
                    if (importer == null)
                    {
                        throw new ArgumentException("Error: Task requires a stream within a store, but no store argument supplied (-s).");
                    }

                    importer.OpenDynamicStream(stream).Do((m, e) =>
                    {
                        if (messageIndex != -1)
                        {
                            parameters[messageIndex] = m;
                        }

                        if (envelopeIndex != -1)
                        {
                            parameters[envelopeIndex] = e;
                        }

                        task.Method.Invoke(null, parameters);
                    });
                }
                else
                {
                    task.Method.Invoke(null, parameters);
                }

                if (importer != null)
                {
                    pipeline.ProgressReportInterval = TimeSpan.FromSeconds(1);
                    pipeline.RunAsync(
                        task.Attribute.ReplayAllRealTime ? ReplayDescriptor.ReplayAllRealTime : ReplayDescriptor.ReplayAll,
                        new Progress<double>(p => Console.WriteLine($"Progress: {p * 100.0:F2}%")));
                    pipeline.WaitAll();
                }
            }

            return 0;
        }