in flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java [105:211]
public void configure(Context context) {
metaStoreUri = context.getString(Config.HIVE_METASTORE);
if (metaStoreUri == null) {
throw new IllegalArgumentException(Config.HIVE_METASTORE + " config setting is not " +
"specified for sink " + getName());
}
if (metaStoreUri.equalsIgnoreCase("null")) { // for testing support
metaStoreUri = null;
}
proxyUser = null; // context.getString("hive.proxyUser"); not supported by hive api yet
database = context.getString(Config.HIVE_DATABASE);
if (database == null) {
throw new IllegalArgumentException(Config.HIVE_DATABASE + " config setting is not " +
"specified for sink " + getName());
}
table = context.getString(Config.HIVE_TABLE);
if (table == null) {
throw new IllegalArgumentException(Config.HIVE_TABLE + " config setting is not " +
"specified for sink " + getName());
}
String partitions = context.getString(Config.HIVE_PARTITION);
if (partitions != null) {
partitionVals = Arrays.asList(partitions.split(","));
}
txnsPerBatchAsk = context.getInteger(Config.HIVE_TXNS_PER_BATCH_ASK, DEFAULT_TXNSPERBATCH);
if (txnsPerBatchAsk < 0) {
LOG.warn(getName() + ". hive.txnsPerBatchAsk must be positive number. Defaulting to "
+ DEFAULT_TXNSPERBATCH);
txnsPerBatchAsk = DEFAULT_TXNSPERBATCH;
}
batchSize = context.getInteger(Config.BATCH_SIZE, DEFAULT_BATCHSIZE);
if (batchSize < 0) {
LOG.warn(getName() + ". batchSize must be positive number. Defaulting to "
+ DEFAULT_BATCHSIZE);
batchSize = DEFAULT_BATCHSIZE;
}
idleTimeout = context.getInteger(Config.IDLE_TIMEOUT, DEFAULT_IDLETIMEOUT);
if (idleTimeout < 0) {
LOG.warn(getName() + ". idleTimeout must be positive number. Defaulting to "
+ DEFAULT_IDLETIMEOUT);
idleTimeout = DEFAULT_IDLETIMEOUT;
}
callTimeout = context.getInteger(Config.CALL_TIMEOUT, DEFAULT_CALLTIMEOUT);
if (callTimeout < 0) {
LOG.warn(getName() + ". callTimeout must be positive number. Defaulting to "
+ DEFAULT_CALLTIMEOUT);
callTimeout = DEFAULT_CALLTIMEOUT;
}
heartBeatInterval = context.getInteger(Config.HEART_BEAT_INTERVAL, DEFAULT_HEARTBEATINTERVAL);
if (heartBeatInterval < 0) {
LOG.warn(getName() + ". heartBeatInterval must be positive number. Defaulting to "
+ DEFAULT_HEARTBEATINTERVAL);
heartBeatInterval = DEFAULT_HEARTBEATINTERVAL;
}
maxOpenConnections = context.getInteger(Config.MAX_OPEN_CONNECTIONS,
DEFAULT_MAXOPENCONNECTIONS);
autoCreatePartitions = context.getBoolean("autoCreatePartitions", true);
// Timestamp processing
useLocalTime = context.getBoolean(Config.USE_LOCAL_TIME_STAMP, false);
String tzName = context.getString(Config.TIME_ZONE);
timeZone = (tzName == null) ? null : TimeZone.getTimeZone(tzName);
needRounding = context.getBoolean(Config.ROUND, false);
String unit = context.getString(Config.ROUND_UNIT, Config.MINUTE);
if (unit.equalsIgnoreCase(Config.HOUR)) {
this.roundUnit = Calendar.HOUR_OF_DAY;
} else if (unit.equalsIgnoreCase(Config.MINUTE)) {
this.roundUnit = Calendar.MINUTE;
} else if (unit.equalsIgnoreCase(Config.SECOND)) {
this.roundUnit = Calendar.SECOND;
} else {
LOG.warn(getName() + ". Rounding unit is not valid, please set one of " +
"minute, hour or second. Rounding will be disabled");
needRounding = false;
}
this.roundValue = context.getInteger(Config.ROUND_VALUE, 1);
if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE) {
Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
"Round value must be > 0 and <= 60");
} else if (roundUnit == Calendar.HOUR_OF_DAY) {
Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
"Round value must be > 0 and <= 24");
}
// Serializer
serializerType = context.getString(Config.SERIALIZER, "");
if (serializerType.isEmpty()) {
throw new IllegalArgumentException("serializer config setting is not " +
"specified for sink " + getName());
}
serializer = createSerializer(serializerType);
serializer.configure(context);
Preconditions.checkArgument(batchSize > 0, "batchSize must be greater than 0");
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}