bigquery-storage/api/BigQueryStorage.Samples/AppendRowsPending.cs (76 lines of code) (raw):
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// [START bigquerystorage_append_rows_pending]
using Google.Api.Gax.Grpc;
using Google.Cloud.BigQuery.Storage.V1;
using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using static Google.Cloud.BigQuery.Storage.V1.AppendRowsRequest.Types;
public class AppendRowsPendingSample
{
/// <summary>
/// This code sample demonstrates how to write records in pending mode.
/// Create a write stream, write some sample data, and commit the stream to append the rows.
/// The CustomerRecord proto used in the sample can be seen in Resources folder and generated C# is placed in Data folder in
/// https://github.com/GoogleCloudPlatform/dotnet-docs-samples/tree/main/bigquery-storage/api/BigQueryStorage.Samples
/// </summary>
public async Task AppendRowsPendingAsync(string projectId, string datasetId, string tableId)
{
BigQueryWriteClient bigQueryWriteClient = await BigQueryWriteClient.CreateAsync();
// Initialize a write stream for the specified table.
// When creating the stream, choose the type. Use the Pending type to wait
// until the stream is committed before it is visible. See:
// https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type
WriteStream stream = new WriteStream { Type = WriteStream.Types.Type.Pending };
TableName tableName = TableName.FromProjectDatasetTable(projectId, datasetId, tableId);
stream = await bigQueryWriteClient.CreateWriteStreamAsync(tableName, stream);
// Initialize streaming call, retrieving the stream object
BigQueryWriteClient.AppendRowsStream rowAppender = bigQueryWriteClient.AppendRows();
// Sending requests and retrieving responses can be arbitrarily interleaved.
// Exact sequence will depend on client/server behavior.
// Create task to do something with responses from server.
Task appendResultsHandlerTask = Task.Run(async () =>
{
AsyncResponseStream<AppendRowsResponse> appendRowResults = rowAppender.GetResponseStream();
while (await appendRowResults.MoveNextAsync())
{
AppendRowsResponse responseItem = appendRowResults.Current;
// Do something with responses.
Console.WriteLine($"Appending rows resulted in: {responseItem.AppendResult}");
}
// The response stream has completed.
});
// List of records to be appended in the table.
List<CustomerRecord> records = new List<CustomerRecord>
{
new CustomerRecord { CustomerNumber = 1, CustomerName = "Alice" },
new CustomerRecord { CustomerNumber = 2, CustomerName = "Bob" }
};
// Create a batch of row data by appending serialized bytes to the
// SerializedRows repeated field.
ProtoData protoData = new ProtoData
{
WriterSchema = new ProtoSchema { ProtoDescriptor = CustomerRecord.Descriptor.ToProto() },
Rows = new ProtoRows { SerializedRows = { records.Select(r => r.ToByteString()) } }
};
// Initialize the append row request.
AppendRowsRequest appendRowRequest = new AppendRowsRequest
{
WriteStreamAsWriteStreamName = stream.WriteStreamName,
ProtoRows = protoData
};
// Stream a request to the server.
await rowAppender.WriteAsync(appendRowRequest);
// Append a second batch of data.
protoData = new ProtoData
{
Rows = new ProtoRows { SerializedRows = { new CustomerRecord { CustomerNumber = 3, CustomerName = "Charles" }.ToByteString() } }
};
// Since this is the second request, you only need to include the row data.
// The name of the stream and protocol buffers descriptor is only needed in
// the first request.
appendRowRequest = new AppendRowsRequest
{
// If Offset is not present, the write is performed at the current end of stream.
ProtoRows = protoData
};
await rowAppender.WriteAsync(appendRowRequest);
// Complete writing requests to the stream.
await rowAppender.WriteCompleteAsync();
// Await the handler. This will complete once all server responses have been processed.
await appendResultsHandlerTask;
// A Pending type stream must be "finalized" before being committed. No new
// records can be written to the stream after this method has been called.
await bigQueryWriteClient.FinalizeWriteStreamAsync(stream.Name);
BatchCommitWriteStreamsRequest batchCommitWriteStreamsRequest = new BatchCommitWriteStreamsRequest
{
Parent = tableName.ToString(),
WriteStreams = { stream.Name }
};
BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
await bigQueryWriteClient.BatchCommitWriteStreamsAsync(batchCommitWriteStreamsRequest);
if (batchCommitWriteStreamsResponse.StreamErrors?.Count > 0)
{
// Handle errors here.
Console.WriteLine("Error committing write streams. Individual errors:");
foreach (StorageError error in batchCommitWriteStreamsResponse.StreamErrors)
{
Console.WriteLine(error.ErrorMessage);
}
}
else
{
Console.WriteLine($"Writes to stream {stream.Name} have been committed.");
}
}
}
// [END bigquerystorage_append_rows_pending]