in src/CRA.ClientLibrary/DataProcessing/Utilities/DeploymentUtils.cs [103:141]
private static async Task<bool> DeployTask(int taskIndex, TaskBase[] tasks, string[] tasksIds, Dictionary<string, bool> tasksDeploymentStatus, CRAClientLibrary client, OperatorsToplogy topology)
{
if (!tasksDeploymentStatus[tasksIds[taskIndex]])
{
bool isSuccessful = true;
foreach (var fromInput in tasks[taskIndex].EndpointsDescriptor.FromInputs.Keys)
{
int fromInputIndex = RetrieveTaskIndexOfOperator(fromInput, tasksIds);
isSuccessful = isSuccessful & await DeployTask(fromInputIndex, tasks, tasksIds, tasksDeploymentStatus, client, topology);
}
foreach (var fromSecondaryInput in tasks[taskIndex].EndpointsDescriptor.SecondaryFromInputs.Keys)
{
int fromSecondaryInputIndex = RetrieveTaskIndexOfOperator(fromSecondaryInput, tasksIds);
isSuccessful = isSuccessful & await DeployTask(fromSecondaryInputIndex, tasks, tasksIds, tasksDeploymentStatus, client, topology);
}
if (isSuccessful)
{
if (tasks[taskIndex].OperationType == OperatorType.Produce)
{
isSuccessful = isSuccessful & await DeployProduceTask(client, (ProduceTask)tasks[taskIndex], topology);
if (isSuccessful) tasksDeploymentStatus[tasksIds[taskIndex]] = true;
}
else if (tasks[taskIndex].OperationType == OperatorType.Subscribe)
{
isSuccessful = isSuccessful & await DeploySubscribeTask(client, (SubscribeTask)tasks[taskIndex], topology);
if (isSuccessful) tasksDeploymentStatus[tasksIds[taskIndex]] = true;
}
else if (tasks[taskIndex].OperationType == OperatorType.Move)
{
isSuccessful = isSuccessful & await DeployShuffleReduceTask(client, (ShuffleTask)tasks[taskIndex], topology);
if (isSuccessful) tasksDeploymentStatus[tasksIds[taskIndex]] = true;
}
}
return isSuccessful;
}
else
return true;
}