in csharp/src/Drivers/BigQuery/BigQueryStatement.cs [81:218]
private async Task<QueryResult> ExecuteQueryInternalAsync()
{
QueryOptions queryOptions = ValidateOptions();
BigQueryJob job = await Client.CreateQueryJobAsync(SqlQuery, null, queryOptions);
JobReference jobReference = job.Reference;
GetQueryResultsOptions getQueryResultsOptions = new GetQueryResultsOptions();
if (Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out string? timeoutSeconds) == true &&
int.TryParse(timeoutSeconds, out int seconds) &&
seconds >= 0)
{
getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds);
}
Func<Task<BigQueryJob>> checkJobStatus = async () =>
{
while (true)
{
var jobWithStatus = await Client.GetJobAsync(jobReference);
if (jobWithStatus.State == JobState.Done)
{
if (jobWithStatus.Status.ErrorResult != null)
{
// TODO: log
Debug.WriteLine($"Error: {jobWithStatus.Status.ErrorResult.Message}");
}
return jobWithStatus;
}
}
};
await ExecuteWithRetriesAsync<BigQueryJob>(checkJobStatus);
Func<Task<BigQueryResults>> getJobResults = async () =>
{
// if the authentication token was reset, then we need a new job with the latest token
BigQueryJob completedJob = await Client.GetJobAsync(jobReference);
return await completedJob.GetQueryResultsAsync();
};
BigQueryResults results = await ExecuteWithRetriesAsync(getJobResults);
TokenProtectedReadClientManger clientMgr = new TokenProtectedReadClientManger(Credential);
clientMgr.UpdateToken = () => Task.Run(() =>
{
this.bigQueryConnection.SetCredential();
clientMgr.UpdateCredential(Credential);
});
// For multi-statement queries, the results.TableReference is null
if (results.TableReference == null)
{
string statementType = string.Empty;
if (Options?.TryGetValue(BigQueryParameters.StatementType, out string? statementTypeString) == true)
{
statementType = statementTypeString;
}
int statementIndex = 1;
if (Options?.TryGetValue(BigQueryParameters.StatementIndex, out string? statementIndexString) == true &&
int.TryParse(statementIndexString, out int statementIndexInt) &&
statementIndexInt > 0)
{
statementIndex = statementIndexInt;
}
string evaluationKind = string.Empty;
if (Options?.TryGetValue(BigQueryParameters.EvaluationKind, out string? evaluationKindString) == true)
{
evaluationKind = evaluationKindString;
}
Func<Task<BigQueryResults>> getMultiJobResults = async () =>
{
// To get the results of all statements in a multi-statement query, enumerate the child jobs. Related public docs: https://cloud.google.com/bigquery/docs/multi-statement-queries#get_all_executed_statements.
// Can filter by StatementType and EvaluationKind. Related public docs: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobstatistics2, https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#evaluationkind
ListJobsOptions listJobsOptions = new ListJobsOptions();
listJobsOptions.ParentJobId = results.JobReference.JobId;
var joblist = Client.ListJobs(listJobsOptions)
.Select(job => Client.GetJob(job.Reference))
.Where(job => string.IsNullOrEmpty(evaluationKind) || job.Statistics.ScriptStatistics.EvaluationKind.Equals(evaluationKind, StringComparison.OrdinalIgnoreCase))
.Where(job => string.IsNullOrEmpty(statementType) || job.Statistics.Query.StatementType.Equals(statementType, StringComparison.OrdinalIgnoreCase))
.OrderBy(job => job.Resource.Statistics.CreationTime)
.ToList();
if (joblist.Count > 0)
{
if (statementIndex < 1 || statementIndex > joblist.Count)
{
throw new ArgumentOutOfRangeException($"The specified index {statementIndex} is out of range. There are {joblist.Count} jobs available.");
}
return await joblist[statementIndex - 1].GetQueryResultsAsync(getQueryResultsOptions);
}
throw new AdbcException($"Unable to obtain result from statement [{statementIndex}]", AdbcStatusCode.InvalidData);
};
results = await ExecuteWithRetriesAsync(getMultiJobResults);
}
if (results?.TableReference == null)
{
throw new AdbcException("There is no query statement");
}
string table = $"projects/{results.TableReference.ProjectId}/datasets/{results.TableReference.DatasetId}/tables/{results.TableReference.TableId}";
int maxStreamCount = 1;
if (Options?.TryGetValue(BigQueryParameters.MaxFetchConcurrency, out string? maxStreamCountString) == true)
{
if (int.TryParse(maxStreamCountString, out int count))
{
if (count >= 0)
{
maxStreamCount = count;
}
}
}
ReadSession rs = new ReadSession { Table = table, DataFormat = DataFormat.Arrow };
Func<Task<ReadSession>> createReadSession = () => clientMgr.ReadClient.CreateReadSessionAsync("projects/" + results.TableReference.ProjectId, rs, maxStreamCount);
ReadSession rrs = await ExecuteWithRetriesAsync<ReadSession>(createReadSession);
long totalRows = results.TotalRows == null ? -1L : (long)results.TotalRows.Value;
var readers = rrs.Streams
.Select(s => ReadChunkWithRetries(clientMgr, s.Name))
.Where(chunk => chunk != null)
.Cast<IArrowReader>();
IArrowArrayStream stream = new MultiArrowReader(TranslateSchema(results.Schema), readers);
return new QueryResult(totalRows, stream);
}