in benchmarks/tpcc/src/main/java/com/google/cloud/pgadapter/tpcc/dataloader/DataLoader.java [87:287]
public long loadData() throws Exception {
if (tpccConfiguration.isTruncateBeforeLoad()) {
truncate();
}
long totalRowCount = 0L;
int warehouseCount = tpccConfiguration.getWarehouses();
totalRowCount += warehouseCount;
ListenableFuture<Long> warehouseFuture =
loadDataExecutor.submit(
() -> {
long rowCount = loadTable(new WarehouseRowProducer(status, warehouseCount));
LOG.info("Loaded {} warehouse records", rowCount);
return rowCount;
});
int itemCount = tpccConfiguration.getItemCount();
totalRowCount += itemCount;
ListenableFuture<Long> itemFuture =
loadDataExecutor.submit(
() -> {
long rowCount = loadTable(new ItemRowProducer(status, itemCount));
LOG.info("Loaded {} item records", rowCount);
return rowCount;
});
// Wait for warehouse insert to finish before continuing.
warehouseFuture.get();
List<ListenableFuture<Long>> districtFutures = new ArrayList<>(warehouseCount);
for (int warehouse = 0; warehouse < warehouseCount; warehouse++) {
long warehouseId = Long.reverse(warehouse);
districtFutures.add(
loadDataExecutor.submit(
() -> {
long rowCount =
loadTable(
new DistrictRowProducer(
status, warehouseId, tpccConfiguration.getDistrictsPerWarehouse()));
LOG.info("Loaded {} district records for warehouse {}", rowCount, warehouseId);
return rowCount;
}));
totalRowCount += tpccConfiguration.getDistrictsPerWarehouse();
}
// Wait for all districts and items to have been loaded before continuing with customers and
// stock.
Futures.allAsList(districtFutures).get();
itemFuture.get();
List<ListenableFuture<Long>> customerFutures =
new ArrayList<>(warehouseCount * tpccConfiguration.getDistrictsPerWarehouse());
List<ListenableFuture<Long>> stockFutures = new ArrayList<>(warehouseCount);
for (int warehouse = 0; warehouse < warehouseCount; warehouse++) {
long warehouseId = Long.reverse(warehouse);
stockFutures.add(
loadDataExecutor.submit(
() -> {
long rowCount =
loadTable(
new StockRowProducer(
status, warehouseId, tpccConfiguration.getItemCount()));
LOG.info("Loaded {} stock records for warehouse {}", rowCount, warehouseId);
return rowCount;
}));
totalRowCount += tpccConfiguration.getItemCount();
for (int district = 0; district < tpccConfiguration.getDistrictsPerWarehouse(); district++) {
long districtId = Long.reverse(district);
customerFutures.add(
loadDataExecutor.submit(
() -> {
long rowCount =
loadTable(
new CustomerRowProducer(
status,
warehouseId,
districtId,
tpccConfiguration.getCustomersPerDistrict()));
LOG.info(
"Loaded {} customer records for warehouse {} and district {}",
rowCount,
warehouseId,
districtId);
return rowCount;
}));
totalRowCount += tpccConfiguration.getCustomersPerDistrict();
}
}
// Wait for all customers to have been loaded before continuing with history and orders.
Futures.allAsList(customerFutures).get();
List<ListenableFuture<Long>> historyFutures =
new ArrayList<>(warehouseCount * tpccConfiguration.getDistrictsPerWarehouse());
List<ListenableFuture<Long>> orderFutures =
new ArrayList<>(warehouseCount * tpccConfiguration.getDistrictsPerWarehouse());
for (int warehouse = 0; warehouse < warehouseCount; warehouse++) {
long warehouseId = Long.reverse(warehouse);
for (int district = 0; district < tpccConfiguration.getDistrictsPerWarehouse(); district++) {
long districtId = Long.reverse(district);
historyFutures.add(
loadDataExecutor.submit(
() -> {
long rowCount =
loadTable(
new HistoryRowProducer(
status,
warehouseId,
districtId,
tpccConfiguration.getCustomersPerDistrict()));
LOG.info(
"Loaded {} history records for warehouse {} and district {}",
rowCount,
warehouseId,
districtId);
return rowCount;
}));
totalRowCount += tpccConfiguration.getCustomersPerDistrict();
orderFutures.add(
loadDataExecutor.submit(
() -> {
long rowCount =
loadTable(
new OrderRowProducer(
status,
warehouseId,
districtId,
tpccConfiguration.getCustomersPerDistrict(),
tpccConfiguration.getCustomersPerDistrict()));
LOG.info(
"Loaded {} order records for warehouse {} and district {}",
rowCount,
warehouseId,
districtId);
return rowCount;
}));
totalRowCount += tpccConfiguration.getCustomersPerDistrict();
}
}
// Wait for all orders and stock to have loaded before continuing with new_orders and
// order_lines.
Futures.allAsList(orderFutures).get();
Futures.allAsList(stockFutures).get();
List<ListenableFuture<Long>> newOrderFutures = new ArrayList<>();
List<ListenableFuture<Long>> orderLineFutures = new ArrayList<>();
for (int warehouse = 0; warehouse < warehouseCount; warehouse++) {
long warehouseId = Long.reverse(warehouse);
for (int district = 0; district < tpccConfiguration.getDistrictsPerWarehouse(); district++) {
long districtId = Long.reverse(district);
newOrderFutures.add(
loadDataExecutor.submit(
() -> {
long rowCount =
loadTable(
new NewOrderRowProducer(
status,
warehouseId,
districtId,
tpccConfiguration.getCustomersPerDistrict()));
LOG.info(
"Loaded {} new_order records for warehouse {} and district {}",
rowCount,
warehouseId,
districtId);
return rowCount;
}));
totalRowCount += tpccConfiguration.getCustomersPerDistrict() / 3;
orderLineFutures.add(
loadDataExecutor.submit(
() -> {
long rowCount =
loadTable(
new OrderLineRowProducer(
status,
warehouseId,
districtId,
tpccConfiguration.getItemCount(),
tpccConfiguration.getCustomersPerDistrict()));
LOG.info(
"Loaded {} order_line records for warehouse {} and district {}",
rowCount,
warehouseId,
districtId);
return rowCount;
}));
totalRowCount += tpccConfiguration.getCustomersPerDistrict();
}
}
// Wait for all remaining data loaders to finish.
Futures.allAsList(historyFutures).get();
Futures.allAsList(newOrderFutures).get();
Futures.allAsList(orderLineFutures).get();
loadDataExecutor.shutdown();
if (!loadDataExecutor.awaitTermination(60L, TimeUnit.SECONDS)) {
throw new TimeoutException("Loading data timed out while waiting for executor to shut down.");
}
return totalRowCount;
}