in flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java [190:303]
public void configure(Context context) {
this.context = context;
filePath = Preconditions.checkNotNull(
context.getString("hdfs.path"), "hdfs.path is required");
fileName = context.getString("hdfs.filePrefix", defaultFileName);
this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix);
inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix);
boolean emptyInUseSuffix = context.getBoolean("hdfs.emptyInUseSuffix", false);
if (emptyInUseSuffix) {
inUseSuffix = "";
String tmpInUseSuffix = context.getString(IN_USE_SUFFIX_PARAM_NAME);
if (tmpInUseSuffix != null) {
LOG.warn("Ignoring parameter " + IN_USE_SUFFIX_PARAM_NAME + " for hdfs sink: " + getName());
}
} else {
inUseSuffix = context.getString(IN_USE_SUFFIX_PARAM_NAME, defaultInUseSuffix);
}
String tzName = context.getString("hdfs.timeZone");
timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval);
rollSize = context.getLong("hdfs.rollSize", defaultRollSize);
rollCount = context.getLong("hdfs.rollCount", defaultRollCount);
batchSize = context.getLong("hdfs.batchSize", defaultBatchSize);
idleTimeout = context.getInteger("hdfs.idleTimeout", 0);
String codecName = context.getString("hdfs.codeC");
fileType = context.getString("hdfs.fileType", defaultFileType);
maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
threadsPoolSize = context.getInteger("hdfs.threadsPoolSize",
defaultThreadPoolSize);
rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize",
defaultRollTimerPoolSize);
String kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal");
String kerbKeytab = context.getString("hdfs.kerberosKeytab");
String proxyUser = context.getString("hdfs.proxyUser");
tryCount = context.getInteger("hdfs.closeTries", defaultTryCount);
if (tryCount <= 0) {
LOG.warn("Retry count value : " + tryCount + " is not " +
"valid. The sink will try to close the file until the file " +
"is eventually closed.");
tryCount = defaultTryCount;
}
retryInterval = context.getLong("hdfs.retryInterval", defaultRetryInterval);
if (retryInterval <= 0) {
LOG.warn("Retry Interval value: " + retryInterval + " is not " +
"valid. If the first close of a file fails, " +
"it may remain open and will not be renamed.");
tryCount = 1;
}
Preconditions.checkArgument(batchSize > 0, "batchSize must be greater than 0");
if (codecName == null) {
codeC = null;
compType = CompressionType.NONE;
} else {
codeC = getCodec(codecName);
// TODO : set proper compression type
compType = CompressionType.BLOCK;
}
// Do not allow user to set fileType DataStream with codeC together
// To prevent output file with compress extension (like .snappy)
if (fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType) && codecName != null) {
throw new IllegalArgumentException("fileType: " + fileType +
" which does NOT support compressed output. Please don't set codeC" +
" or change the fileType if compressed output is desired.");
}
if (fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {
Preconditions.checkNotNull(codeC, "It's essential to set compress codec"
+ " when fileType is: " + fileType);
}
// get the appropriate executor
this.privExecutor = FlumeAuthenticationUtil.getAuthenticator(
kerbConfPrincipal, kerbKeytab).proxyAs(proxyUser);
needRounding = context.getBoolean("hdfs.round", false);
if (needRounding) {
String unit = context.getString("hdfs.roundUnit", "second");
if (unit.equalsIgnoreCase("hour")) {
this.roundUnit = Calendar.HOUR_OF_DAY;
} else if (unit.equalsIgnoreCase("minute")) {
this.roundUnit = Calendar.MINUTE;
} else if (unit.equalsIgnoreCase("second")) {
this.roundUnit = Calendar.SECOND;
} else {
LOG.warn("Rounding unit is not valid, please set one of" +
"minute, hour, or second. Rounding will be disabled");
needRounding = false;
}
this.roundValue = context.getInteger("hdfs.roundValue", 1);
if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE) {
Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
"Round value" +
"must be > 0 and <= 60");
} else if (roundUnit == Calendar.HOUR_OF_DAY) {
Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
"Round value" +
"must be > 0 and <= 24");
}
}
this.useLocalTime = context.getBoolean("hdfs.useLocalTimeStamp", false);
if (useLocalTime) {
clock = new SystemClock();
}
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}