processing-pipelines/bigquery/query-runner/csharp/Startup.cs (76 lines of code) (raw):
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.Threading.Tasks;
using Common;
using Google.Cloud.BigQuery.V2;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace QueryRunner
{
public class Startup
{
private const string CloudEventType = "dev.knative.samples.querycompleted";
private const string CloudEventSource = "knative/eventing/samples/queryrunner";
private const string DatasetId = "covid19_jhu_csse";
private string _tableId;
public void ConfigureServices(IServiceCollection services)
{
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
logger.LogInformation("Service is starting...");
app.UseRouting();
var eventReader = new CloudEventReader(logger);
var configReader = new ConfigReader(logger, CloudEventSource, CloudEventType);
var projectId = configReader.Read("PROJECT_ID");
var eventWriter = configReader.ReadEventWriter();
app.UseEndpoints(endpoints =>
{
endpoints.MapPost("/", async context =>
{
var client = await BigQueryClient.CreateAsync(projectId);
var country = await eventReader.ReadCloudSchedulerData(context);
_tableId = country.Replace(" ", "").ToLowerInvariant();
var results = await RunQuery(client, country, logger);
logger.LogInformation("Executed query");
var replyData = new {datasetId = DatasetId, tableId = _tableId, country = country};
await eventWriter.Write(replyData, context);
});
});
}
private async Task<BigQueryResults> RunQuery(BigQueryClient client, string country, ILogger<Startup> logger)
{
var sql = $@"SELECT date, cumulative_confirmed as num_reports
FROM `bigquery-public-data.covid19_open_data.covid19_open_data`
WHERE cumulative_confirmed > 0 and country_name = '{country}' and subregion1_code is NULL";
var table = await GetOrCreateTable(client, logger);
logger.LogInformation($"Executing query: \n{sql}");
return await client.ExecuteQueryAsync(sql, null, new QueryOptions
{
DestinationTable = table.Reference
});
}
private async Task<BigQueryTable> GetOrCreateTable(BigQueryClient client, ILogger<Startup> logger)
{
logger.LogInformation($"Getting/creating destination dataset: {DatasetId}");
var dataset = await client.GetOrCreateDatasetAsync(DatasetId);
try
{
await client.DeleteTableAsync(DatasetId, _tableId); // Start fresh each time
}
catch (Exception e)
{
// Ignore. The table probably did not exist.
logger.LogError($"Table {_tableId} deletion failed: {e.Message}");
}
logger.LogInformation($"Getting/creation destination table: {_tableId}");
var table = await dataset.GetOrCreateTableAsync(_tableId, new TableSchemaBuilder
{
{ "date", BigQueryDbType.Date },
{ "num_reports", BigQueryDbType.Int64 },
}.Build());
return table;
}
}
}