public long loadData()

in benchmarks/tpcc/src/main/java/com/google/cloud/pgadapter/tpcc/dataloader/DataLoader.java [87:287]


  public long loadData() throws Exception {
    if (tpccConfiguration.isTruncateBeforeLoad()) {
      truncate();
    }

    long totalRowCount = 0L;
    int warehouseCount = tpccConfiguration.getWarehouses();
    totalRowCount += warehouseCount;
    ListenableFuture<Long> warehouseFuture =
        loadDataExecutor.submit(
            () -> {
              long rowCount = loadTable(new WarehouseRowProducer(status, warehouseCount));
              LOG.info("Loaded {} warehouse records", rowCount);
              return rowCount;
            });

    int itemCount = tpccConfiguration.getItemCount();
    totalRowCount += itemCount;
    ListenableFuture<Long> itemFuture =
        loadDataExecutor.submit(
            () -> {
              long rowCount = loadTable(new ItemRowProducer(status, itemCount));
              LOG.info("Loaded {} item records", rowCount);
              return rowCount;
            });

    // Wait for warehouse insert to finish before continuing.
    warehouseFuture.get();
    List<ListenableFuture<Long>> districtFutures = new ArrayList<>(warehouseCount);
    for (int warehouse = 0; warehouse < warehouseCount; warehouse++) {
      long warehouseId = Long.reverse(warehouse);
      districtFutures.add(
          loadDataExecutor.submit(
              () -> {
                long rowCount =
                    loadTable(
                        new DistrictRowProducer(
                            status, warehouseId, tpccConfiguration.getDistrictsPerWarehouse()));
                LOG.info("Loaded {} district records for warehouse {}", rowCount, warehouseId);
                return rowCount;
              }));
      totalRowCount += tpccConfiguration.getDistrictsPerWarehouse();
    }
    // Wait for all districts and items to have been loaded before continuing with customers and
    // stock.
    Futures.allAsList(districtFutures).get();
    itemFuture.get();

    List<ListenableFuture<Long>> customerFutures =
        new ArrayList<>(warehouseCount * tpccConfiguration.getDistrictsPerWarehouse());
    List<ListenableFuture<Long>> stockFutures = new ArrayList<>(warehouseCount);
    for (int warehouse = 0; warehouse < warehouseCount; warehouse++) {
      long warehouseId = Long.reverse(warehouse);
      stockFutures.add(
          loadDataExecutor.submit(
              () -> {
                long rowCount =
                    loadTable(
                        new StockRowProducer(
                            status, warehouseId, tpccConfiguration.getItemCount()));
                LOG.info("Loaded {} stock records for warehouse {}", rowCount, warehouseId);
                return rowCount;
              }));
      totalRowCount += tpccConfiguration.getItemCount();
      for (int district = 0; district < tpccConfiguration.getDistrictsPerWarehouse(); district++) {
        long districtId = Long.reverse(district);
        customerFutures.add(
            loadDataExecutor.submit(
                () -> {
                  long rowCount =
                      loadTable(
                          new CustomerRowProducer(
                              status,
                              warehouseId,
                              districtId,
                              tpccConfiguration.getCustomersPerDistrict()));
                  LOG.info(
                      "Loaded {} customer records for warehouse {} and district {}",
                      rowCount,
                      warehouseId,
                      districtId);
                  return rowCount;
                }));
        totalRowCount += tpccConfiguration.getCustomersPerDistrict();
      }
    }
    // Wait for all customers to have been loaded before continuing with history and orders.
    Futures.allAsList(customerFutures).get();

    List<ListenableFuture<Long>> historyFutures =
        new ArrayList<>(warehouseCount * tpccConfiguration.getDistrictsPerWarehouse());
    List<ListenableFuture<Long>> orderFutures =
        new ArrayList<>(warehouseCount * tpccConfiguration.getDistrictsPerWarehouse());
    for (int warehouse = 0; warehouse < warehouseCount; warehouse++) {
      long warehouseId = Long.reverse(warehouse);
      for (int district = 0; district < tpccConfiguration.getDistrictsPerWarehouse(); district++) {
        long districtId = Long.reverse(district);
        historyFutures.add(
            loadDataExecutor.submit(
                () -> {
                  long rowCount =
                      loadTable(
                          new HistoryRowProducer(
                              status,
                              warehouseId,
                              districtId,
                              tpccConfiguration.getCustomersPerDistrict()));
                  LOG.info(
                      "Loaded {} history records for warehouse {} and district {}",
                      rowCount,
                      warehouseId,
                      districtId);
                  return rowCount;
                }));
        totalRowCount += tpccConfiguration.getCustomersPerDistrict();

        orderFutures.add(
            loadDataExecutor.submit(
                () -> {
                  long rowCount =
                      loadTable(
                          new OrderRowProducer(
                              status,
                              warehouseId,
                              districtId,
                              tpccConfiguration.getCustomersPerDistrict(),
                              tpccConfiguration.getCustomersPerDistrict()));
                  LOG.info(
                      "Loaded {} order records for warehouse {} and district {}",
                      rowCount,
                      warehouseId,
                      districtId);
                  return rowCount;
                }));
        totalRowCount += tpccConfiguration.getCustomersPerDistrict();
      }
    }

    // Wait for all orders and stock to have loaded before continuing with new_orders and
    // order_lines.
    Futures.allAsList(orderFutures).get();
    Futures.allAsList(stockFutures).get();

    List<ListenableFuture<Long>> newOrderFutures = new ArrayList<>();
    List<ListenableFuture<Long>> orderLineFutures = new ArrayList<>();
    for (int warehouse = 0; warehouse < warehouseCount; warehouse++) {
      long warehouseId = Long.reverse(warehouse);
      for (int district = 0; district < tpccConfiguration.getDistrictsPerWarehouse(); district++) {
        long districtId = Long.reverse(district);
        newOrderFutures.add(
            loadDataExecutor.submit(
                () -> {
                  long rowCount =
                      loadTable(
                          new NewOrderRowProducer(
                              status,
                              warehouseId,
                              districtId,
                              tpccConfiguration.getCustomersPerDistrict()));
                  LOG.info(
                      "Loaded {} new_order records for warehouse {} and district {}",
                      rowCount,
                      warehouseId,
                      districtId);
                  return rowCount;
                }));
        totalRowCount += tpccConfiguration.getCustomersPerDistrict() / 3;

        orderLineFutures.add(
            loadDataExecutor.submit(
                () -> {
                  long rowCount =
                      loadTable(
                          new OrderLineRowProducer(
                              status,
                              warehouseId,
                              districtId,
                              tpccConfiguration.getItemCount(),
                              tpccConfiguration.getCustomersPerDistrict()));
                  LOG.info(
                      "Loaded {} order_line records for warehouse {} and district {}",
                      rowCount,
                      warehouseId,
                      districtId);
                  return rowCount;
                }));
        totalRowCount += tpccConfiguration.getCustomersPerDistrict();
      }
    }

    // Wait for all remaining data loaders to finish.
    Futures.allAsList(historyFutures).get();
    Futures.allAsList(newOrderFutures).get();
    Futures.allAsList(orderLineFutures).get();

    loadDataExecutor.shutdown();
    if (!loadDataExecutor.awaitTermination(60L, TimeUnit.SECONDS)) {
      throw new TimeoutException("Loading data timed out while waiting for executor to shut down.");
    }
    return totalRowCount;
  }