protected int loadTable()

in hbase-diagnostics/src/main/java/org/apache/hadoop/hbase/util/LoadTestTool.java [575:763]


  protected int loadTable() throws IOException {
    if (cmd.hasOption(OPT_ZK_QUORUM)) {
      conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
    }
    if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
    }

    if (isInitOnly) {
      LOG.info("Initializing only; no reads or writes");
      initTestTable();
      return 0;
    }

    if (!isSkipInit) {
      initTestTable();
    }
    LoadTestDataGenerator dataGen = null;
    if (cmd.hasOption(OPT_GENERATOR)) {
      String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
      dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
      String[] args;
      if (dataGen instanceof LoadTestDataGeneratorWithACL) {
        LOG.info("Using LoadTestDataGeneratorWithACL");
        if (User.isHBaseSecurityEnabled(conf)) {
          LOG.info("Security is enabled");
          authnFileName = clazzAndArgs[1];
          superUser = clazzAndArgs[2];
          userNames = clazzAndArgs[3];
          args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
          Properties authConfig = new Properties();
          authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
          try {
            addAuthInfoToConf(authConfig, conf, superUser, userNames);
          } catch (IOException exp) {
            LOG.error(exp.toString(), exp);
            return EXIT_FAILURE;
          }
          userOwner = User.create(KerberosUtils.loginAndReturnUGI(conf, superUser));
        } else {
          superUser = clazzAndArgs[1];
          userNames = clazzAndArgs[2];
          args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
          userOwner = User.createUserForTesting(conf, superUser, new String[0]);
        }
      } else {
        args = clazzAndArgs.length == 1
          ? new String[0]
          : Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
      }
      dataGen.initialize(args);
    } else {
      // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
      dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
        minColsPerKey, maxColsPerKey, families);
    }

    if (userOwner != null) {
      LOG.info("Granting permissions for user " + userOwner.getShortName());
      Permission.Action[] actions = { Permission.Action.ADMIN, Permission.Action.CREATE,
        Permission.Action.READ, Permission.Action.WRITE };
      try {
        AccessControlClient.grant(ConnectionFactory.createConnection(conf), tableName,
          userOwner.getShortName(), null, null, actions);
      } catch (Throwable e) {
        LOG.error(HBaseMarkers.FATAL,
          "Error in granting permission for the user " + userOwner.getShortName(), e);
        return EXIT_FAILURE;
      }
    }

    if (userNames != null) {
      // This will be comma separated list of expressions.
      String users[] = userNames.split(",");
      User user = null;
      for (String userStr : users) {
        if (User.isHBaseSecurityEnabled(conf)) {
          user = User.create(KerberosUtils.loginAndReturnUGI(conf, userStr));
        } else {
          user = User.createUserForTesting(conf, userStr, new String[0]);
        }
      }
    }

    if (isWrite) {
      if (userOwner != null) {
        writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
      } else {
        String writerClass = null;
        if (cmd.hasOption(OPT_WRITER)) {
          writerClass = cmd.getOptionValue(OPT_WRITER);
        } else {
          writerClass = MultiThreadedWriter.class.getCanonicalName();
        }

        writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
      }
      writerThreads.setMultiPut(isMultiPut);
    }

    if (isUpdate) {
      if (userOwner != null) {
        updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
          userOwner, userNames);
      } else {
        String updaterClass = null;
        if (cmd.hasOption(OPT_UPDATER)) {
          updaterClass = cmd.getOptionValue(OPT_UPDATER);
        } else {
          updaterClass = MultiThreadedUpdater.class.getCanonicalName();
        }
        updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
      }
      updaterThreads.setBatchUpdate(isBatchUpdate);
      updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
    }

    if (isRead) {
      if (userOwner != null) {
        readerThreads =
          new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent, userNames);
      } else {
        String readerClass = null;
        if (cmd.hasOption(OPT_READER)) {
          readerClass = cmd.getOptionValue(OPT_READER);
        } else {
          readerClass = MultiThreadedReader.class.getCanonicalName();
        }
        readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
      }
      readerThreads.setMaxErrors(maxReadErrors);
      readerThreads.setKeyWindow(keyWindow);
      readerThreads.setMultiGetBatchSize(multiGetBatchSize);
      readerThreads.setRegionReplicaId(regionReplicaId);
    }

    if (isUpdate && isWrite) {
      LOG.info("Concurrent write/update workload: making updaters aware of the " + "write point");
      updaterThreads.linkToWriter(writerThreads);
    }

    if (isRead && (isUpdate || isWrite)) {
      LOG.info("Concurrent write/read workload: making readers aware of the " + "write point");
      readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
    }

    if (isWrite) {
      System.out.println("Starting to write data...");
      writerThreads.start(startKey, endKey, numWriterThreads);
    }

    if (isUpdate) {
      LOG.info("Starting to mutate data...");
      System.out.println("Starting to mutate data...");
      // TODO : currently append and increment operations not tested with tags
      // Will update this after it is done
      updaterThreads.start(startKey, endKey, numUpdaterThreads);
    }

    if (isRead) {
      System.out.println("Starting to read data...");
      readerThreads.start(startKey, endKey, numReaderThreads);
    }

    if (isWrite) {
      writerThreads.waitForFinish();
    }

    if (isUpdate) {
      updaterThreads.waitForFinish();
    }

    if (isRead) {
      readerThreads.waitForFinish();
    }

    boolean success = true;
    if (isWrite) {
      success = success && writerThreads.getNumWriteFailures() == 0;
    }
    if (isUpdate) {
      success = success && updaterThreads.getNumWriteFailures() == 0;
    }
    if (isRead) {
      success =
        success && readerThreads.getNumReadErrors() == 0 && readerThreads.getNumReadFailures() == 0;
    }
    return success ? EXIT_SUCCESS : EXIT_FAILURE;
  }