private async Task ExecuteQueryInternalAsync()

in csharp/src/Drivers/BigQuery/BigQueryStatement.cs [81:218]


        private async Task<QueryResult> ExecuteQueryInternalAsync()
        {
            QueryOptions queryOptions = ValidateOptions();
            BigQueryJob job = await Client.CreateQueryJobAsync(SqlQuery, null, queryOptions);

            JobReference jobReference = job.Reference;
            GetQueryResultsOptions getQueryResultsOptions = new GetQueryResultsOptions();

            if (Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out string? timeoutSeconds) == true &&
                int.TryParse(timeoutSeconds, out int seconds) &&
                seconds >= 0)
            {
                getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds);
            }

            Func<Task<BigQueryJob>> checkJobStatus = async () =>
            {
                while (true)
                {
                    var jobWithStatus = await Client.GetJobAsync(jobReference);

                    if (jobWithStatus.State == JobState.Done)
                    {
                        if (jobWithStatus.Status.ErrorResult != null)
                        {
                            // TODO: log
                            Debug.WriteLine($"Error: {jobWithStatus.Status.ErrorResult.Message}");
                        }

                        return jobWithStatus;
                    }
                }
            };

            await ExecuteWithRetriesAsync<BigQueryJob>(checkJobStatus);

            Func<Task<BigQueryResults>> getJobResults = async () =>
            {
                // if the authentication token was reset, then we need a new job with the latest token
                BigQueryJob completedJob = await Client.GetJobAsync(jobReference);
                return await completedJob.GetQueryResultsAsync();
            };

            BigQueryResults results = await ExecuteWithRetriesAsync(getJobResults);

            TokenProtectedReadClientManger clientMgr = new TokenProtectedReadClientManger(Credential);
            clientMgr.UpdateToken = () => Task.Run(() =>
            {
                this.bigQueryConnection.SetCredential();
                clientMgr.UpdateCredential(Credential);
            });

            // For multi-statement queries, the results.TableReference is null
            if (results.TableReference == null)
            {
                string statementType = string.Empty;
                if (Options?.TryGetValue(BigQueryParameters.StatementType, out string? statementTypeString) == true)
                {
                    statementType = statementTypeString;
                }
                int statementIndex = 1;
                if (Options?.TryGetValue(BigQueryParameters.StatementIndex, out string? statementIndexString) == true &&
                    int.TryParse(statementIndexString, out int statementIndexInt) &&
                    statementIndexInt > 0)
                {
                    statementIndex = statementIndexInt;
                }
                string evaluationKind = string.Empty;
                if (Options?.TryGetValue(BigQueryParameters.EvaluationKind, out string? evaluationKindString) == true)
                {
                    evaluationKind = evaluationKindString;
                }

                Func<Task<BigQueryResults>> getMultiJobResults = async () =>
                {
                    // To get the results of all statements in a multi-statement query, enumerate the child jobs. Related public docs: https://cloud.google.com/bigquery/docs/multi-statement-queries#get_all_executed_statements.
                    // Can filter by StatementType and EvaluationKind. Related public docs: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobstatistics2, https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#evaluationkind
                    ListJobsOptions listJobsOptions = new ListJobsOptions();
                    listJobsOptions.ParentJobId = results.JobReference.JobId;
                    var joblist = Client.ListJobs(listJobsOptions)
                        .Select(job => Client.GetJob(job.Reference))
                        .Where(job => string.IsNullOrEmpty(evaluationKind) || job.Statistics.ScriptStatistics.EvaluationKind.Equals(evaluationKind, StringComparison.OrdinalIgnoreCase))
                        .Where(job => string.IsNullOrEmpty(statementType) || job.Statistics.Query.StatementType.Equals(statementType, StringComparison.OrdinalIgnoreCase))
                        .OrderBy(job => job.Resource.Statistics.CreationTime)
                        .ToList();

                    if (joblist.Count > 0)
                    {
                        if (statementIndex < 1 || statementIndex > joblist.Count)
                        {
                            throw new ArgumentOutOfRangeException($"The specified index {statementIndex} is out of range. There are {joblist.Count} jobs available.");
                        }
                        return await joblist[statementIndex - 1].GetQueryResultsAsync(getQueryResultsOptions);
                    }

                    throw new AdbcException($"Unable to obtain result from statement [{statementIndex}]", AdbcStatusCode.InvalidData);
                };

                results = await ExecuteWithRetriesAsync(getMultiJobResults);
            }

            if (results?.TableReference == null)
            {
                throw new AdbcException("There is no query statement");
            }

            string table = $"projects/{results.TableReference.ProjectId}/datasets/{results.TableReference.DatasetId}/tables/{results.TableReference.TableId}";

            int maxStreamCount = 1;

            if (Options?.TryGetValue(BigQueryParameters.MaxFetchConcurrency, out string? maxStreamCountString) == true)
            {
                if (int.TryParse(maxStreamCountString, out int count))
                {
                    if (count >= 0)
                    {
                        maxStreamCount = count;
                    }
                }
            }

            ReadSession rs = new ReadSession { Table = table, DataFormat = DataFormat.Arrow };

            Func<Task<ReadSession>> createReadSession = () => clientMgr.ReadClient.CreateReadSessionAsync("projects/" + results.TableReference.ProjectId, rs, maxStreamCount);

            ReadSession rrs = await ExecuteWithRetriesAsync<ReadSession>(createReadSession);

            long totalRows = results.TotalRows == null ? -1L : (long)results.TotalRows.Value;

            var readers = rrs.Streams
                             .Select(s => ReadChunkWithRetries(clientMgr, s.Name))
                             .Where(chunk => chunk != null)
                             .Cast<IArrowReader>();

            IArrowArrayStream stream = new MultiArrowReader(TranslateSchema(results.Schema), readers);

            return new QueryResult(totalRows, stream);
        }