public void visit()

in src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java [59:110]


  public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception {
    @SuppressWarnings({"unchecked"})
    List<String> tables = (List<String>) state.get("tableList");

    if (tables.isEmpty()) {
      log.trace("No tables to ingest into");
      return;
    }

    String tableName = tables.get(env.getRandom().nextInt(tables.size()));

    String uuid = UUID.randomUUID().toString();
    final Path dir = new Path("/tmp/bulk", uuid);
    final Path fail = new Path(dir + "_fail");
    final FileSystem fs = (FileSystem) state.get("fs");
    fs.mkdirs(fail);
    final int parts = env.getRandom().nextInt(10) + 1;

    TreeSet<String> rows = new TreeSet<>();
    for (int i = 0; i < ROWS; i++)
      rows.add(uuid + String.format("__%06d", i));

    String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
    log.debug("Preparing bulk import to {}", tableName);

    for (int i = 0; i < parts; i++) {
      String fileName = dir + "/" + String.format("part_%d.rf", i);
      try (RFileWriter f = RFile.newWriter().to(fileName).withFileSystem(fs).build()) {
        f.startDefaultLocalityGroup();
        for (String r : rows) {
          Text row = new Text(r);
          for (Column col : COLNAMES) {
            f.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), ONE);
          }
          f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE);
        }
      }
    }
    log.debug("Starting bulk import to {}", tableName);
    try {
      env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName)
          .tableTime(true).load();

      fs.delete(dir, true);
      fs.delete(fail, true);
      log.debug("Finished bulk import to {} start: {} last: {} marker: {}", tableName, rows.first(),
          rows.last(), markerColumnQualifier);
    } catch (TableNotFoundException tnfe) {
      log.debug("Table {} was deleted", tableName);
      tables.remove(tableName);
    }
  }