in flume-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java [216:319]
public void configure(Context context) {
if (!this.hasVersionAtLeast2()) {
throw new ConfigurationException(
"HBase major version number must be at least 2 for hbase2sink");
}
tableName = context.getString(HBase2SinkConfigurationConstants.CONFIG_TABLE);
String cf = context.getString(
HBase2SinkConfigurationConstants.CONFIG_COLUMN_FAMILY);
batchSize = context.getLong(
HBase2SinkConfigurationConstants.CONFIG_BATCHSIZE, 100L);
Context serializerContext = new Context();
//If not specified, will use HBase defaults.
String eventSerializerType = context.getString(
HBase2SinkConfigurationConstants.CONFIG_SERIALIZER);
Preconditions.checkNotNull(tableName,
"Table name cannot be empty, please specify in configuration file");
Preconditions.checkNotNull(cf,
"Column family cannot be empty, please specify in configuration file");
//Check foe event serializer, if null set event serializer type
if (eventSerializerType == null || eventSerializerType.isEmpty()) {
eventSerializerType =
"org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer";
logger.info("No serializer defined, Will use default");
}
serializerContext.putAll(context.getSubProperties(
HBase2SinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX));
columnFamily = cf.getBytes(Charsets.UTF_8);
try {
Class<? extends HBase2EventSerializer> clazz =
(Class<? extends HBase2EventSerializer>)
Class.forName(eventSerializerType);
serializer = clazz.newInstance();
serializer.configure(serializerContext);
} catch (Exception e) {
logger.error("Could not instantiate event serializer.", e);
Throwables.propagate(e);
}
kerberosKeytab = context.getString(HBase2SinkConfigurationConstants.CONFIG_KEYTAB);
kerberosPrincipal = context.getString(HBase2SinkConfigurationConstants.CONFIG_PRINCIPAL);
enableWal = context.getBoolean(HBase2SinkConfigurationConstants
.CONFIG_ENABLE_WAL, HBase2SinkConfigurationConstants.DEFAULT_ENABLE_WAL);
logger.info("The write to WAL option is set to: " + String.valueOf(enableWal));
if (!enableWal) {
logger.warn("HBase Sink's enableWal configuration is set to false. All " +
"writes to HBase will have WAL disabled, and any data in the " +
"memstore of this region in the Region Server could be lost!");
}
batchIncrements = context.getBoolean(
HBase2SinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
HBase2SinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
if (batchIncrements) {
logger.info("Increment coalescing is enabled. Increments will be " +
"buffered.");
}
String zkQuorum = context.getString(HBase2SinkConfigurationConstants
.ZK_QUORUM);
Integer port = null;
/*
* HBase allows multiple nodes in the quorum, but all need to use the
* same client port. So get the nodes in host:port format,
* and ignore the ports for all nodes except the first one. If no port is
* specified, use default.
*/
if (zkQuorum != null && !zkQuorum.isEmpty()) {
StringBuilder zkBuilder = new StringBuilder();
logger.info("Using ZK Quorum: " + zkQuorum);
String[] zkHosts = zkQuorum.split(",");
int length = zkHosts.length;
for (int i = 0; i < length; i++) {
String[] zkHostAndPort = zkHosts[i].split(":");
zkBuilder.append(zkHostAndPort[0].trim());
if (i != length - 1) {
zkBuilder.append(",");
} else {
zkQuorum = zkBuilder.toString();
}
if (zkHostAndPort[1] == null) {
throw new FlumeException("Expected client port for the ZK node!");
}
if (port == null) {
port = Integer.parseInt(zkHostAndPort[1].trim());
} else if (!port.equals(Integer.parseInt(zkHostAndPort[1].trim()))) {
throw new FlumeException("All Zookeeper nodes in the quorum must " +
"use the same client port.");
}
}
if (port == null) {
port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
}
this.config.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
this.config.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, port);
}
String hbaseZnode = context.getString(
HBase2SinkConfigurationConstants.ZK_ZNODE_PARENT);
if (hbaseZnode != null && !hbaseZnode.isEmpty()) {
this.config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, hbaseZnode);
}
sinkCounter = new SinkCounter(this.getName());
}