grpc-gcp-benchmarks/src/main/SpannerClientTestCases.java [58:234]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    this.numOfRpcs = numOfRpcs;
    this.numOfThreads = numOfThreads;
    this.spannerOptions = SpannerOptions.newBuilder().build();

    int columnBytes = Integer.min(payload, MAX_SIZE_PER_COLUMN);
    if (payload > columnBytes) {
      throw new IllegalStateException(
          "Payload in SpannerClient mode cannot be larger than + " + MAX_SIZE_PER_COLUMN);
    }
    char[] charArray = new char[columnBytes];
    Arrays.fill(charArray, 'z');
    this.colContent = new String(charArray);
  }

  private DatabaseClient getDbClient(Spanner spanner) {
    System.out.println("Project id:" + spannerOptions.getProjectId().toString());
    return spanner.getDatabaseClient(DatabaseId.of("cloudprober-test", INSTANCE_ID, DATABASE_ID));
  }

  public void prepareTestData() throws InterruptedException {
    Spanner spanner = spannerOptions.getService();
    DatabaseClient db = getDbClient(spanner);

    try {
      long start = System.currentTimeMillis();
      // Clean the existing data.
      List<Mutation> deletes = new ArrayList<>();
      deletes.add(Mutation.delete(LARGE_TABLE, KeySet.all()));
      db.write(deletes);
      System.out.println(
          String.format(
              "\nDeleted the previous large_table in %d ms.", System.currentTimeMillis() - start));

      List<Mutation> mutations = new ArrayList<>();
      for (int j = 1; j <= numOfThreads; j++) {
        for (int k = 1; k <= numOfRpcs; k++) {
          mutations.add(
              Mutation.newInsertBuilder(LARGE_TABLE)
                  .set("id")
                  .to("SpannerClient-rpc" + k + "thread" + j)
                  .set("data")
                  .to(colContent)
                  .build());
        }
      }
      start = System.currentTimeMillis();
      db.write(mutations);
      System.out.println(
          String.format(
              "\nLarge test table generated in %d ms.", System.currentTimeMillis() - start));
    } finally {
      spanner.close();
    }
  }

  public void testUpdateData() throws InterruptedException {
    Spanner spanner = spannerOptions.getService();
    DatabaseClient db = getDbClient(spanner);
    System.out.println("\nTestUpdateData");
    try {
      // Warm ups.
      updateDataOneThread(db, 1, null);
      Func func = (int thread, List<Long> result) -> updateDataOneThread(db, thread, result);
      runTest(func);
    } finally {
      spanner.close();
    }
  }

  public void testRead() throws InterruptedException {
    Spanner spanner = spannerOptions.getService();
    DatabaseClient db = getDbClient(spanner);
    System.out.println("\nTestRead");
    try {
      // Warm ups.
      readOneThread(db, 1, null);
      Func func = (int thread, List<Long> result) -> readOneThread(db, thread, result);
      runTest(func);
    } finally {
      spanner.close();
    }
  }

  public void readOneThread(DatabaseClient db, int thread, List<Long> result) {
    for (int i = 0; i < numOfRpcs; i++) {
      int lineCount = 0;
      long start = System.currentTimeMillis();
      try (ResultSet resultSet =
          db.singleUse().read(LARGE_TABLE, KeySet.all(), Arrays.asList("id", "data", "rpc"))) {
        while (resultSet.next()) {
          lineCount++;
        }
      }
      if (result != null) {
        result.add(System.currentTimeMillis() - start);
      }
      if (lineCount != numOfRpcs * numOfThreads) {
        System.out.println("WARNING: Imcomplete data.");
      }
    }
  }

  private void updateDataOneThread(DatabaseClient db, int thread, List<Long> result) {
    for (int i = 1; i <= numOfRpcs; i++) {
      List<Mutation> mutations = new ArrayList<>();
      mutations.add(
          Mutation.newUpdateBuilder("LARGE_TABLE")
              .set("id")
              .to("SpannerClient-rpc" + i + "thread" + thread)
              .set("data")
              .to(colContent)
              .set("rpc")
              .to(i)
              .build());
      long start = System.currentTimeMillis();
      db.write(mutations);
      if (result != null) {
        result.add(System.currentTimeMillis() - start);
      }
    }
  }

  private void runTest(Func func) throws InterruptedException {
    List<Long> result = new CopyOnWriteArrayList<>();
    List<Thread> threads = new ArrayList<>();
    if (numOfThreads > 1) {
      for (int t = 0; t < numOfThreads; t++) {
        final int threadNum = t + 1;
        threads.add(new Thread(() -> func.operate(threadNum, result)));
      }
    }
    long start = System.currentTimeMillis();
    if (numOfThreads > 1) {
      for (Thread t : threads) {
        t.start();
      }
    } else {
      func.operate(1, result);
    }

    for (Thread t : threads) {
      t.join();
    }
    // Waiting for all responses.
    while (result.size() < numOfRpcs * numOfThreads) {}

    // Summerize the result.
    long dur = System.currentTimeMillis() - start;
    int numOfChannels = 1;
    Collections.sort(result);
    System.out.println(
        String.format(
            "Number of threads: %d\t "
                + "Number of Channels: %d\t"
                + "Total number of RPC's: %d\n"
                + "\t\tAvg"
                + "\tMin"
                + "\tMed"
                + "\tp90"
                + "\tp99"
                + "\tp100"
                + "\tQPS\n"
                + "  Time(ms)\t%d\t%d\t%d\t%d\t%d\t%d\t%f",
            numOfThreads,
            numOfChannels,
            numOfThreads * numOfRpcs,
            result.stream().mapToLong(Long::longValue).sum() / result.size(),
            result.get(0),
            result.get((int) (result.size() * 0.5)),
            result.get((int) (result.size() * 0.9)),
            result.get((int) (result.size() * 0.99)),
            result.get(result.size() - 1),
            numOfRpcs * numOfThreads / (double) dur));
  }

  private interface Func {
    void operate(int thread, List<Long> result);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



grpc-gcp-benchmarks/src/main/java/com/google/grpc/gcp/SpannerClientTestCases.java [60:237]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    this.numOfRpcs = numOfRpcs;
    this.numOfThreads = numOfThreads;
    // setUseGrpcGcpExtension(true)
    this.spannerOptions = SpannerOptions.newBuilder().build();

    int columnBytes = Integer.min(payload, MAX_SIZE_PER_COLUMN);
    if (payload > columnBytes) {
      throw new IllegalStateException(
          "Payload in SpannerClient mode cannot be larger than + " + MAX_SIZE_PER_COLUMN);
    }
    char[] charArray = new char[columnBytes];
    Arrays.fill(charArray, 'z');
    this.colContent = new String(charArray);
  }

  private DatabaseClient getDbClient(Spanner spanner) {
    System.out.println("Project id:" + spannerOptions.getProjectId().toString());
    return spanner.getDatabaseClient(DatabaseId.of("cloudprober-test", INSTANCE_ID, DATABASE_ID));
  }

  public void prepareTestData() throws InterruptedException {
    Spanner spanner = spannerOptions.getService();
    DatabaseClient db = getDbClient(spanner);

    try {
      long start = System.currentTimeMillis();
      // Clean the existing data.
      List<Mutation> deletes = new ArrayList<>();
      deletes.add(Mutation.delete(LARGE_TABLE, KeySet.all()));
      db.write(deletes);
      System.out.println(
          String.format(
              "\nDeleted the previous large_table in %d ms.", System.currentTimeMillis() - start));

      List<Mutation> mutations = new ArrayList<>();
      for (int j = 1; j <= numOfThreads; j++) {
        for (int k = 1; k <= numOfRpcs; k++) {
          mutations.add(
              Mutation.newInsertBuilder(LARGE_TABLE)
                  .set("id")
                  .to("SpannerClient-rpc" + k + "thread" + j)
                  .set("data")
                  .to(colContent)
                  .build());
        }
      }
      start = System.currentTimeMillis();
      db.write(mutations);
      System.out.println(
          String.format(
              "\nLarge test table generated in %d ms.", System.currentTimeMillis() - start));
    } finally {
      spanner.close();
    }
  }

  public void testUpdateData() throws InterruptedException {
    Spanner spanner = spannerOptions.getService();
    DatabaseClient db = getDbClient(spanner);
    System.out.println("\nTestUpdateData");
    try {
      // Warm ups.
      updateDataOneThread(db, 1, null);
      Func func = (int thread, List<Long> result) -> updateDataOneThread(db, thread, result);
      runTest(func);
    } finally {
      spanner.close();
    }
  }

  public void testRead() throws InterruptedException {
    Spanner spanner = spannerOptions.getService();
    DatabaseClient db = getDbClient(spanner);
    System.out.println("\nTestRead");
    try {
      // Warm ups.
      readOneThread(db, 1, null);
      Func func = (int thread, List<Long> result) -> readOneThread(db, thread, result);
      runTest(func);
    } finally {
      spanner.close();
    }
  }

  public void readOneThread(DatabaseClient db, int thread, List<Long> result) {
    for (int i = 0; i < numOfRpcs; i++) {
      int lineCount = 0;
      long start = System.currentTimeMillis();
      try (ResultSet resultSet =
          db.singleUse().read(LARGE_TABLE, KeySet.all(), Arrays.asList("id", "data", "rpc"))) {
        while (resultSet.next()) {
          lineCount++;
        }
      }
      if (result != null) {
        result.add(System.currentTimeMillis() - start);
      }
      if (lineCount != numOfRpcs * numOfThreads) {
        System.out.println("WARNING: Imcomplete data.");
      }
    }
  }

  private void updateDataOneThread(DatabaseClient db, int thread, List<Long> result) {
    for (int i = 1; i <= numOfRpcs; i++) {
      List<Mutation> mutations = new ArrayList<>();
      mutations.add(
          Mutation.newUpdateBuilder("LARGE_TABLE")
              .set("id")
              .to("SpannerClient-rpc" + i + "thread" + thread)
              .set("data")
              .to(colContent)
              .set("rpc")
              .to(i)
              .build());
      long start = System.currentTimeMillis();
      db.write(mutations);
      if (result != null) {
        result.add(System.currentTimeMillis() - start);
      }
    }
  }

  private void runTest(Func func) throws InterruptedException {
    List<Long> result = new CopyOnWriteArrayList<>();
    List<Thread> threads = new ArrayList<>();
    if (numOfThreads > 1) {
      for (int t = 0; t < numOfThreads; t++) {
        final int threadNum = t + 1;
        threads.add(new Thread(() -> func.operate(threadNum, result)));
      }
    }
    long start = System.currentTimeMillis();
    if (numOfThreads > 1) {
      for (Thread t : threads) {
        t.start();
      }
    } else {
      func.operate(1, result);
    }

    for (Thread t : threads) {
      t.join();
    }
    // Waiting for all responses.
    while (result.size() < numOfRpcs * numOfThreads) {}

    // Summerize the result.
    long dur = System.currentTimeMillis() - start;
    int numOfChannels = 1;
    Collections.sort(result);
    System.out.println(
        String.format(
            "Number of threads: %d\t "
                + "Number of Channels: %d\t"
                + "Total number of RPC's: %d\n"
                + "\t\tAvg"
                + "\tMin"
                + "\tMed"
                + "\tp90"
                + "\tp99"
                + "\tp100"
                + "\tQPS\n"
                + "  Time(ms)\t%d\t%d\t%d\t%d\t%d\t%d\t%f",
            numOfThreads,
            numOfChannels,
            numOfThreads * numOfRpcs,
            result.stream().mapToLong(Long::longValue).sum() / result.size(),
            result.get(0),
            result.get((int) (result.size() * 0.5)),
            result.get((int) (result.size() * 0.9)),
            result.get((int) (result.size() * 0.99)),
            result.get(result.size() - 1),
            numOfRpcs * numOfThreads / (double) dur));
  }

  private interface Func {
    void operate(int thread, List<Long> result);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



