in Services/DataX.Flow/DataX.Flow.InteractiveQuery/KernelService.cs [451:534]
public async Task<ApiResult> ExecuteQueryAsync(string code, string kernelId)
{
try
{
if (string.IsNullOrEmpty(code))
{
return ApiResult.CreateError("{\"Error\":\"Please select a query in the UI\"}");
}
if(code.Trim().StartsWith(_QuerySeparator))
{
code = code.Replace(_QuerySeparator, "");
}
// Attach to kernel
IKernel kernel = GetKernel(kernelId);
// Remove Timewindow
Regex timeRegex = new Regex(@"TIMEWINDOW\s{0,}\(\s{0,}.*\s{0,}\)", RegexOptions.IgnoreCase);
Match match = timeRegex.Match(code);
if (match.Success)
{
code = code.Replace(match.Groups[0].Value, "");
}
// Handle WITH UPSERT
Regex r1 = new Regex(@"\s{0,}([^;]*)WITH\s{1,}UPSERT\s{0,}([^;]*)", RegexOptions.IgnoreCase);
Match m1 = r1.Match(code);
if (m1.Success)
{
string newQuery = m1.Groups[2].Value.Trim() + " = " + m1.Groups[1].Value.Trim() + "\r\n";
code = code.Replace(m1.Groups[0].Value, newQuery);
}
// Now check what kind of code we have, and execute
code = code.TrimEnd(';');
Regex r2 = new Regex("(.*)=([^;]*)");
Match m2 = r2.Match(code);
Regex r3 = new Regex(@"(\s{1,}CREATE TABLE\s{1,}[^;]*)", RegexOptions.IgnoreCase);
Match m3 = r3.Match(" " + code);
string g = Guid.NewGuid().ToString().Replace("-", "");
string s = "";
bool isTableCreateCommand = false;
if (m2.Success)
{
// SQL query assigning results to a table. Example: T1 = SELECT ...
string table = m2.Groups[1].Value.Trim();
s = $"val {table}Table = sql(\"{m2.Groups[2].Value}\"); {table}Table.toJSON.take({_MaxCount}).foreach(println); {table}Table.createOrReplaceTempView(\"{table}\")";
}
else if (m3.Success)
{
// CREATE State query
isTableCreateCommand = true;
s = $"val results{g} = sql(\"{m3.Groups[1].Value.Trim()}\"); println(\"done\")";
}
else
{
// SQL query without assigning results to a table. Example: SELECT ...
s = $"val results{g} = sql(\"{code}\"); results{g}.toJSON.take({_MaxCount}).foreach(println)";
}
// Execute code
string result = await Task.Run(() => kernel.ExecuteCode(ReplaceNewLineFeed(s)));
// If the table already exists, then it is not an error. Mark this done.
if (isTableCreateCommand && result != null && result.Contains("TableAlreadyExistsException"))
{
result = "done";
}
if (!string.IsNullOrEmpty(result))
{
return ConvertToJson(_MaxCount, result);
}
else
{
return ApiResult.CreateError("{\"Error\":\"No Results\"}");
}
}
catch (Exception ex)
{
return ApiResult.CreateError(ex.ToString());
}
}