public async Task ExecuteQueryAsync()

in Services/DataX.Flow/DataX.Flow.InteractiveQuery/KernelService.cs [451:534]


        public async Task<ApiResult> ExecuteQueryAsync(string code, string kernelId)
        {
            try
            {
                if (string.IsNullOrEmpty(code))
                {
                    return ApiResult.CreateError("{\"Error\":\"Please select a query in the UI\"}");
                }

                if(code.Trim().StartsWith(_QuerySeparator))
                {
                    code = code.Replace(_QuerySeparator, "");
                }

                // Attach to kernel
                IKernel kernel = GetKernel(kernelId);

                // Remove Timewindow
                Regex timeRegex = new Regex(@"TIMEWINDOW\s{0,}\(\s{0,}.*\s{0,}\)", RegexOptions.IgnoreCase);
                Match match = timeRegex.Match(code);
                if (match.Success)
                {
                    code = code.Replace(match.Groups[0].Value, "");
                }

                // Handle WITH UPSERT
                Regex r1 = new Regex(@"\s{0,}([^;]*)WITH\s{1,}UPSERT\s{0,}([^;]*)", RegexOptions.IgnoreCase);
                Match m1 = r1.Match(code);
                if (m1.Success)
                {
                    string newQuery = m1.Groups[2].Value.Trim() + " = " + m1.Groups[1].Value.Trim() + "\r\n";
                    code = code.Replace(m1.Groups[0].Value, newQuery);
                }

                // Now check what kind of code we have, and execute
                code = code.TrimEnd(';');
                Regex r2 = new Regex("(.*)=([^;]*)");
                Match m2 = r2.Match(code);
                Regex r3 = new Regex(@"(\s{1,}CREATE TABLE\s{1,}[^;]*)", RegexOptions.IgnoreCase);
                Match m3 = r3.Match(" " + code);
                string g = Guid.NewGuid().ToString().Replace("-", "");
                string s = "";
                bool isTableCreateCommand = false;
                if (m2.Success)
                {
                    // SQL query assigning results to a table. Example: T1 = SELECT ...
                    string table = m2.Groups[1].Value.Trim();
                    s = $"val {table}Table = sql(\"{m2.Groups[2].Value}\"); {table}Table.toJSON.take({_MaxCount}).foreach(println); {table}Table.createOrReplaceTempView(\"{table}\")";
                }
                else if (m3.Success)
                {
                    // CREATE State query
                    isTableCreateCommand = true;
                    s = $"val results{g} = sql(\"{m3.Groups[1].Value.Trim()}\"); println(\"done\")";
                }
                else
                {
                    // SQL query without assigning results to a table. Example: SELECT ...
                    s = $"val results{g} = sql(\"{code}\"); results{g}.toJSON.take({_MaxCount}).foreach(println)";
                }

                // Execute code                
                string result = await Task.Run(() => kernel.ExecuteCode(ReplaceNewLineFeed(s)));

                // If the table already exists, then it is not an error. Mark this done.
                if (isTableCreateCommand && result != null && result.Contains("TableAlreadyExistsException"))
                {
                    result = "done";
                }

                if (!string.IsNullOrEmpty(result))
                {
                    return ConvertToJson(_MaxCount, result);
                }
                else
                {
                    return ApiResult.CreateError("{\"Error\":\"No Results\"}");
                }
            }
            catch (Exception ex)
            {
                return ApiResult.CreateError(ex.ToString());
            }
        }