grpc-gcp-benchmarks/src/benchmarkTest/java/BigtableLoadTest.java (100 lines of code) (raw):

/* * Copyright 2019 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import com.google.auth.oauth2.GoogleCredentials; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.BigtableGrpc.BigtableBlockingStub; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.common.collect.ImmutableList; import com.google.grpc.gcp.GcpManagedChannelBuilder; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.auth.MoreCallCredentials; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Logger; /** * A testing program for running 105 streams concurrently using different channels. * * <p>The original ManagedChannel is not able to hold 105 streams concurrently. The 101st stream * will be blocked and throws a TimeoutException. * * <p>On the other hand, our GcpManagedChannel is able to manage 101 streams concurrently. */ public final class BigtableLoadTest { private static final Logger logger = Logger.getLogger(BigtableLoadTest.class.getName()); private static final int DEFAULT_MAX_STREAM = 100; private static final int MAX_MSG_SIZE = 8 * 1024 * 1024; private static final String BIGTABLE_TARGET = "bigtable.googleapis.com"; // The test-table must be big enough so that the rpc will be blocked. private static final String LARGE_TABLE_NAME = "projects/cloudprober-test/instances/test-instance/tables/test-large-table"; private static final String OAUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; private static GoogleCredentials getCreds() { GoogleCredentials creds; try { creds = GoogleCredentials.getApplicationDefault(); } catch (Exception e) { return null; } ImmutableList<String> requiredScopes = ImmutableList.of(OAUTH_SCOPE); creds = creds.createScoped(requiredScopes); return creds; } private static List<Iterator<ReadRowsResponse>> getReadRowsResponses(ManagedChannel channel) { GoogleCredentials creds = getCreds(); BigtableBlockingStub stub = BigtableGrpc.newBlockingStub(channel).withCallCredentials(MoreCallCredentials.from(creds)); List<Iterator<ReadRowsResponse>> res = new ArrayList<>(); ReadRowsRequest request = ReadRowsRequest.newBuilder().setTableName(LARGE_TABLE_NAME).build(); for (int i = 0; i < DEFAULT_MAX_STREAM + 5; i++) { Iterator<ReadRowsResponse> iter = stub.readRows(request); res.add(iter); } return res; } private static boolean readReadRowsResponses(List<Iterator<ReadRowsResponse>> res) { for (Iterator<ReadRowsResponse> iter : res) { iter.next(); } return true; } private static boolean runManyManyStreams(ManagedChannel channel) throws InterruptedException, ExecutionException { boolean finishRunning = true; List<Iterator<ReadRowsResponse>> res = getReadRowsResponses(channel); ExecutorService executor = Executors.newSingleThreadExecutor(); Callable<Object> task = new Callable<Object>() { public Object call() { return readReadRowsResponses(res); } }; Future<Object> future = executor.submit(task); try { Object result = future.get(90, TimeUnit.SECONDS); } catch (TimeoutException e) { finishRunning = false; } finally { executor.shutdownNow(); } return finishRunning; } public static void main(String[] args) throws InterruptedException, ExecutionException { ManagedChannelBuilder builder = ManagedChannelBuilder.forAddress(BIGTABLE_TARGET, 443).maxInboundMessageSize(MAX_MSG_SIZE); // Running 105 streams using GcpManagedChannel. ManagedChannel gcpChannel = GcpManagedChannelBuilder.forDelegateBuilder(builder).build(); logger.info("Start running 105 concurrent streams using GcpManagedChannel."); if (runManyManyStreams(gcpChannel)) { logger.info("Finish running 105 concurrent streams using GcpManagedChannel."); } else { logger.severe("Unable to run 105 concurrent streams using GcpManagedChannel."); } gcpChannel.shutdownNow(); // Running 105 streams using ManagedChannel. ManagedChannel channel = builder.build(); logger.info("Start running 105 concurrent streams using ManagedChannel."); if (runManyManyStreams(channel)) { logger.severe("Finish running 105 concurrent streams using ManagedChannel."); } else { logger.info("Unable to run 105 concurrent streams using ManagedChannel."); } channel.shutdownNow(); } }