public static void main()

in src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java [60:241]


  public static void main(String[] args) throws Exception {
    try (ContinuousEnv env = new ContinuousEnv(args)) {

      AccumuloClient client = env.getAccumuloClient();
      Properties testProps = env.getTestProperties();
      final int tableCount =
          Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_TABLE_COUNT));
      final long rowMin = Long.parseLong(testProps.getProperty(TestProps.CI_SPLIT_INGEST_ROW_MIN));
      final long rowMax = Long.parseLong(testProps.getProperty(TestProps.CI_SPLIT_INGEST_ROW_MAX));
      final int maxColF = Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INGEST_MAX_CF));
      final int maxColQ = Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INGEST_MAX_CQ));
      final int initialTabletCount =
          Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INITIAL_TABLETS));
      final int initialData =
          Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_WRITE_SIZE));
      String initialSplitThresholdStr = testProps.getProperty(TestProps.CI_SPLIT_THRESHOLD);
      final long initialSplitThreshold =
          ConfigurationTypeHelper.getFixedMemoryAsBytes(initialSplitThresholdStr);
      final int splitThresholdReductionFactor =
          Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_THRESHOLD_REDUCTION_FACTOR));
      final int testRounds =
          Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_TEST_ROUNDS));

      // disable deletes for ingest
      testProps.setProperty(TestProps.CI_INGEST_DELETE_PROBABILITY, "0.0");

      final Random random = env.getRandom();

      Preconditions.checkArgument(tableCount > 0, "Test cannot run without any tables");

      final List<String> tableNames = IntStream.range(1, tableCount + 1)
          .mapToObj(i -> NAMESPACE + ".table" + i).collect(Collectors.toList());

      try {
        client.namespaceOperations().create(NAMESPACE);
      } catch (NamespaceExistsException e) {
        log.warn("The namespace '{}' already exists. Continuing with existing namespace.",
            NAMESPACE);
      }

      final String firstTable = tableNames.get(0);

      Map<String,String> tableProps =
          Map.of(Property.TABLE_SPLIT_THRESHOLD.getKey(), initialSplitThresholdStr);

      log.info("Properties being used to create tables for this test: {}", tableProps);

      log.info("Creating initial table: {}", firstTable);
      CreateTable.createTable(client, firstTable, initialTabletCount, rowMin, rowMax, tableProps,
          Map.of());

      log.info("Ingesting {} entries into first table, {}.", initialData, firstTable);
      var splitSupplier = ContinuousIngest.createSplitSupplier(client, firstTable);
      var randomFactory = RandomGeneratorFactory.create(env, client, splitSupplier, random);
      var batchWriterFactory =
          ContinuousIngest.BatchWriterFactory.create(client, env, splitSupplier);
      ContinuousIngest.doIngest(client, randomFactory, batchWriterFactory, firstTable, testProps,
          maxColF, maxColQ, initialData, false, random);

      client.tableOperations().flush(firstTable);

      // clone tables instead of ingesting into each. it's a lot quicker
      log.info("Creating {} more tables by cloning the first", tableCount - 1);
      tableNames.stream().parallel().skip(1).forEach(tableName -> {
        try {
          client.tableOperations().clone(firstTable, tableName, true, null, null);
        } catch (TableExistsException e) {
          log.warn(
              "table {} already exists. Continuing with existing table. Previous data will affect splits",
              tableName);
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      });

      StringBuilder testResults = new StringBuilder();
      testResults.append("Test results:\n");
      testResults.append("Total test rounds: ").append(testRounds).append("\n");
      testResults.append("Table count: ").append(tableCount).append("\n");

      SECONDS.sleep(5);

      // main loop
      // reduce the split threshold then wait for the expected file size per tablet to be reached
      long previousSplitThreshold = initialSplitThreshold;
      for (int i = 0; i < testRounds; i++) {

        // apply the reduction factor to the previous threshold
        final long splitThreshold = previousSplitThreshold / splitThresholdReductionFactor;
        final String splitThresholdStr = bytesToMemoryString(splitThreshold);
        final int totalSplitCountBefore = getTotalSplitCount(client, tableNames);

        log.info("Changing split threshold on all tables from {} to {}",
            bytesToMemoryString(previousSplitThreshold), splitThresholdStr);

        long beforeThresholdUpdate = System.nanoTime();

        // update the split threshold on all tables
        tableNames.stream().parallel().forEach(tableName -> {
          try {
            client.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(),
                splitThresholdStr);
          } catch (Exception e) {
            throw new RuntimeException(e);
          }
        });

        log.info("Waiting for each tablet to have a sum file size <= {}", splitThresholdStr);

        // wait for all tablets to reach the expected sum file size
        tableNames.stream().parallel().forEach(tableName -> {
          int elapsedMillis = 0;
          long sleepMillis = SECONDS.toMillis(1);
          try {
            // wait for each tablet to reach the expected sum file size
            while (true) {
              Collection<Long> tabletFileSizes = getTabletFileSizes(client, tableName).values();
              // filter out the tablets that are already the expected size
              Set<Long> offendingTabletSizes =
                  tabletFileSizes.stream().filter(tabletFileSize -> tabletFileSize > splitThreshold)
                      .collect(Collectors.toSet());
              // if all tablets are good, move on
              if (offendingTabletSizes.isEmpty()) {
                break;
              }

              elapsedMillis += sleepMillis;
              // log every 3 seconds
              if (elapsedMillis % SECONDS.toMillis(3) == 0) {
                double averageFileSize =
                    offendingTabletSizes.stream().mapToLong(l -> l).average().orElse(0);
                long diff = (long) (averageFileSize - splitThreshold);
                log.info(
                    "{} tablets have file sizes not yet <= {} on table {}. Diff of avg offending file(s): {}",
                    offendingTabletSizes.size(), splitThresholdStr, tableName,
                    bytesToMemoryString(diff));
              }
              MILLISECONDS.sleep(sleepMillis);
            }
          } catch (Exception e) {
            throw new RuntimeException(e);
          }
        });

        long timeTakenNanos = System.nanoTime() - beforeThresholdUpdate;
        long seconds = NANOSECONDS.toSeconds(timeTakenNanos);
        long millis = NANOSECONDS.toMillis(timeTakenNanos);

        final int splitCountAfter = getTotalSplitCount(client, tableNames);
        final int splitCountThisRound = splitCountAfter - totalSplitCountBefore;

        log.info(
            "Time taken for all tables to reach expected total file size ({}): {} seconds ({}ms)",
            splitThresholdStr, seconds, millis);

        testResults.append("Test round ").append(i).append(":\n");
        testResults.append("TABLE_SPLIT_THRESHOLD ")
            .append(bytesToMemoryString(previousSplitThreshold)).append(" -> ")
            .append(splitThresholdStr).append("\n");
        testResults.append("Splits count:         ").append(totalSplitCountBefore).append(" -> ")
            .append(splitCountAfter).append("\n");
        String splitsPerSecond = String.format("%.2f", (double) splitCountThisRound / seconds);
        testResults.append("Splits per second:    ").append(splitsPerSecond).append("\n");

        previousSplitThreshold = splitThreshold;
      }

      log.info("Test completed successfully.");
      log.info(testResults.toString());
      log.info("Deleting tables");
      tableNames.stream().parallel().forEach(tableName -> {
        try {
          client.tableOperations().delete(tableName);
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      });
      log.info("Deleting namespace");
      client.namespaceOperations().delete(NAMESPACE);

    }
  }