public void configure()

in flume-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java [216:319]


  public void configure(Context context) {
    if (!this.hasVersionAtLeast2()) {
      throw new ConfigurationException(
              "HBase major version number must be at least 2 for hbase2sink");
    }

    tableName = context.getString(HBase2SinkConfigurationConstants.CONFIG_TABLE);
    String cf = context.getString(
        HBase2SinkConfigurationConstants.CONFIG_COLUMN_FAMILY);
    batchSize = context.getLong(
        HBase2SinkConfigurationConstants.CONFIG_BATCHSIZE, 100L);
    Context serializerContext = new Context();
    //If not specified, will use HBase defaults.
    String eventSerializerType = context.getString(
            HBase2SinkConfigurationConstants.CONFIG_SERIALIZER);
    Preconditions.checkNotNull(tableName,
        "Table name cannot be empty, please specify in configuration file");
    Preconditions.checkNotNull(cf,
        "Column family cannot be empty, please specify in configuration file");
    //Check foe event serializer, if null set event serializer type
    if (eventSerializerType == null || eventSerializerType.isEmpty()) {
      eventSerializerType =
          "org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer";
      logger.info("No serializer defined, Will use default");
    }
    serializerContext.putAll(context.getSubProperties(
        HBase2SinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX));
    columnFamily = cf.getBytes(Charsets.UTF_8);
    try {
      Class<? extends HBase2EventSerializer> clazz =
          (Class<? extends HBase2EventSerializer>)
              Class.forName(eventSerializerType);
      serializer = clazz.newInstance();
      serializer.configure(serializerContext);
    } catch (Exception e) {
      logger.error("Could not instantiate event serializer.", e);
      Throwables.propagate(e);
    }
    kerberosKeytab = context.getString(HBase2SinkConfigurationConstants.CONFIG_KEYTAB);
    kerberosPrincipal = context.getString(HBase2SinkConfigurationConstants.CONFIG_PRINCIPAL);

    enableWal = context.getBoolean(HBase2SinkConfigurationConstants
        .CONFIG_ENABLE_WAL, HBase2SinkConfigurationConstants.DEFAULT_ENABLE_WAL);
    logger.info("The write to WAL option is set to: " + String.valueOf(enableWal));
    if (!enableWal) {
      logger.warn("HBase Sink's enableWal configuration is set to false. All " +
          "writes to HBase will have WAL disabled, and any data in the " +
          "memstore of this region in the Region Server could be lost!");
    }

    batchIncrements = context.getBoolean(
        HBase2SinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
        HBase2SinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);

    if (batchIncrements) {
      logger.info("Increment coalescing is enabled. Increments will be " +
          "buffered.");
    }

    String zkQuorum = context.getString(HBase2SinkConfigurationConstants
        .ZK_QUORUM);
    Integer port = null;
    /*
     * HBase allows multiple nodes in the quorum, but all need to use the
     * same client port. So get the nodes in host:port format,
     * and ignore the ports for all nodes except the first one. If no port is
     * specified, use default.
     */
    if (zkQuorum != null && !zkQuorum.isEmpty()) {
      StringBuilder zkBuilder = new StringBuilder();
      logger.info("Using ZK Quorum: " + zkQuorum);
      String[] zkHosts = zkQuorum.split(",");
      int length = zkHosts.length;
      for (int i = 0; i < length; i++) {
        String[] zkHostAndPort = zkHosts[i].split(":");
        zkBuilder.append(zkHostAndPort[0].trim());
        if (i != length - 1) {
          zkBuilder.append(",");
        } else {
          zkQuorum = zkBuilder.toString();
        }
        if (zkHostAndPort[1] == null) {
          throw new FlumeException("Expected client port for the ZK node!");
        }
        if (port == null) {
          port = Integer.parseInt(zkHostAndPort[1].trim());
        } else if (!port.equals(Integer.parseInt(zkHostAndPort[1].trim()))) {
          throw new FlumeException("All Zookeeper nodes in the quorum must " +
              "use the same client port.");
        }
      }
      if (port == null) {
        port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
      }
      this.config.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
      this.config.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, port);
    }
    String hbaseZnode = context.getString(
        HBase2SinkConfigurationConstants.ZK_ZNODE_PARENT);
    if (hbaseZnode != null && !hbaseZnode.isEmpty()) {
      this.config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, hbaseZnode);
    }
    sinkCounter = new SinkCounter(this.getName());
  }