in Services/DataX.Flow/DataX.Flow.InteractiveQuery/KernelService.cs [217:294]
private ApiResult LoadandRunSteps(IKernel kernel, bool isReSample, List<ReferenceDataObject> referenceDatas, List<FunctionObject> functions)
{
try
{
bool normalizationWarning = false;
// Load steps from blob
XmlSerializer ser = new XmlSerializer(typeof(steps));
using (StringReader sreader = new StringReader(_setupStepsXml))
{
using (XmlReader reader = XmlReader.Create(sreader))
{
_steps = (steps)ser.Deserialize(reader);
}
}
string result = "";
if (!isReSample)
{
// Run the steps
for (int i = 0; i < _steps.Items.Count(); i++)
{
Logger.LogInformation($"steps.Items[{i}] ran successfully");
result = kernel.ExecuteCode(_steps.Items[i].Value);
LogErrors(result, _steps.Items[i].Value, $"InitializationStep[{i}]");
_steps.Items[i].Value = NormalizationSnippetHelper(ref i, ref normalizationWarning, result, _steps.Items);
}
}
else
{
// Run the resample steps
for (int i = _steps.Items.Count() - 2; i < _steps.Items.Count(); i++)
{
Logger.LogInformation($"steps.Items[i].Value: _steps.Items[{i}]");
result = kernel.ExecuteCode(_steps.Items[i].Value);
var errors = CheckErrors(result);
if (!string.IsNullOrEmpty(errors))
{
Logger.LogError($"Initialization step: {_steps.Items[i].Value}. Resulting Error: {result}");
}
_steps.Items[i].Value = NormalizationSnippetHelper(ref i, ref normalizationWarning, result, _steps.Items);
}
}
// Now run the steps to load the reference data
LoadReferenceData(kernel, referenceDatas);
// Now load the UDFs and UDAFs
LoadFunctions(kernel, functions);
var error = CheckErrors(result);
if (!string.IsNullOrEmpty(error))
{
return ApiResult.CreateError("{'Error':'" + error + "'}");
}
else
{
KernelResult responseObject = new KernelResult
{
Result = kernel.Id
};
if (normalizationWarning)
{
responseObject.Message = "Warning: Normalization query in the Input tab could not be applied, probably because some columns in the query are not part of the schema. Please update the schema or try auto-generating it using longer duration in the Input tab.";
}
else
{
responseObject.Message = string.Empty;
}
return ApiResult.CreateSuccess(JObject.FromObject(responseObject));
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"ErrorMessage: {ex.Message} SetupSteps: {_setupStepsXml}");
return ApiResult.CreateError(ex.ToString());
}
}