spanner/api/Spanner.Samples/BatchReadRecordsAsync.cs (47 lines of code) (raw):

// Copyright 2020 Google Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // [START spanner_batch_client] using Google.Cloud.Spanner.Data; using System; using System.Linq; using System.Threading; using System.Threading.Tasks; public class BatchReadRecordsAsyncSample { private int _rowsRead; private int _partitionCount; public async Task<(int RowsRead, int Partitions)> BatchReadRecordsAsync(string projectId, string instanceId, string databaseId) { string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}"; using var connection = new SpannerConnection(connectionString); await connection.OpenAsync(); using var transaction = await connection.BeginTransactionAsync( SpannerTransactionCreationOptions.ReadOnly.WithIsDetached(true), new SpannerTransactionOptions { DisposeBehavior = DisposeBehavior.CloseResources }, cancellationToken: default); using var cmd = connection.CreateSelectCommand("SELECT SingerId, FirstName, LastName FROM Singers"); cmd.Transaction = transaction; // A CommandPartition object is serializable and can be used from a different process. // If data boost is enabled, partitioned read and query requests will be executed // using Spanner independent compute resources. var partitions = await cmd.GetReaderPartitionsAsync(PartitionOptions.Default.WithDataBoostEnabled(true)); var transactionId = transaction.TransactionId; await Task.WhenAll(partitions.Select(x => DistributedReadWorkerAsync(x, transactionId))); Console.WriteLine($"Done reading! Total rows read: {_rowsRead:N0} with {_partitionCount} partition(s)"); return (RowsRead: _rowsRead, Partitions: _partitionCount); } private async Task DistributedReadWorkerAsync(CommandPartition readPartition, TransactionId id) { var localId = Interlocked.Increment(ref _partitionCount); using var connection = new SpannerConnection(id.ConnectionString); using var transaction = await connection.BeginTransactionAsync( SpannerTransactionCreationOptions.FromReadOnlyTransactionId(id), transactionOptions: null, cancellationToken: default); using var cmd = connection.CreateCommandWithPartition(readPartition, transaction); using var reader = await cmd.ExecuteReaderAsync(); while (await reader.ReadAsync()) { Interlocked.Increment(ref _rowsRead); Console.WriteLine($"Partition ({localId}) " + $"{reader.GetFieldValue<int>("SingerId")}" + $" {reader.GetFieldValue<string>("FirstName")}" + $" {reader.GetFieldValue<string>("LastName")}"); } Console.WriteLine($"Done with single reader {localId}."); } } // [END spanner_batch_client]