in hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java [52:148]
private void validateParameter() {
this.defaultFS = this.writerSliceConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsWriterErrorCode.REQUIRED_VALUE);
//fileType check
this.fileType = this.writerSliceConfig.getNecessaryValue(Key.FILE_TYPE, HdfsWriterErrorCode.REQUIRED_VALUE);
if (!fileType.equalsIgnoreCase("ORC") && !fileType.equalsIgnoreCase("TEXT") && !fileType.equalsIgnoreCase("PARQUET")) {
String message = "HdfsWriter插件目前只支持ORC、TEXT、PARQUET三种格式的文件,请将filetype选项的值配置为ORC、TEXT或PARQUET";
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message);
}
//path
this.path = this.writerSliceConfig.getNecessaryValue(Key.PATH, HdfsWriterErrorCode.REQUIRED_VALUE);
if(!path.startsWith("/")){
String message = String.format("请检查参数path:[%s],需要配置为绝对路径", path);
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message);
}else if(path.contains("*") || path.contains("?")){
String message = String.format("请检查参数path:[%s],不能包含*,?等特殊字符", path);
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message);
}
//fileName
this.fileName = this.writerSliceConfig.getNecessaryValue(Key.FILE_NAME, HdfsWriterErrorCode.REQUIRED_VALUE);
//columns check
this.columns = this.writerSliceConfig.getListConfiguration(Key.COLUMN);
if (null == columns || columns.size() == 0) {
throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE, "您需要指定 columns");
}else{
for (Configuration eachColumnConf : columns) {
eachColumnConf.getNecessaryValue(Key.NAME, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE);
eachColumnConf.getNecessaryValue(Key.TYPE, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE);
}
}
//writeMode check
this.writeMode = this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HdfsWriterErrorCode.REQUIRED_VALUE);
writeMode = writeMode.toLowerCase().trim();
Set<String> supportedWriteModes = Sets.newHashSet("append", "nonconflict", "truncate");
if (!supportedWriteModes.contains(writeMode)) {
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("仅支持append, nonConflict, truncate三种模式, 不支持您配置的 writeMode 模式 : [%s]",
writeMode));
}
this.writerSliceConfig.set(Key.WRITE_MODE, writeMode);
//fieldDelimiter check
this.fieldDelimiter = this.writerSliceConfig.getString(Key.FIELD_DELIMITER,null);
if(null == fieldDelimiter){
throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE,
String.format("您提供配置文件有误,[%s]是必填参数.", Key.FIELD_DELIMITER));
}else if(1 != fieldDelimiter.length()){
// warn: if have, length must be one
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("仅仅支持单字符切分, 您配置的切分为 : [%s]", fieldDelimiter));
}
//compress check
this.compress = this.writerSliceConfig.getString(Key.COMPRESS,null);
if(fileType.equalsIgnoreCase("TEXT")){
Set<String> textSupportedCompress = Sets.newHashSet("GZIP", "BZIP2");
//用户可能配置的是compress:"",空字符串,需要将compress设置为null
if(StringUtils.isBlank(compress) ){
this.writerSliceConfig.set(Key.COMPRESS, null);
}else {
compress = compress.toUpperCase().trim();
if(!textSupportedCompress.contains(compress) ){
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("目前TEXT FILE仅支持GZIP、BZIP2 两种压缩, 不支持您配置的 compress 模式 : [%s]",
compress));
}
}
}else if(fileType.equalsIgnoreCase("ORC")){
Set<String> orcSupportedCompress = Sets.newHashSet("NONE", "SNAPPY");
if(null == compress){
this.writerSliceConfig.set(Key.COMPRESS, "NONE");
}else {
compress = compress.toUpperCase().trim();
if(!orcSupportedCompress.contains(compress)){
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("目前ORC FILE仅支持SNAPPY压缩, 不支持您配置的 compress 模式 : [%s]",
compress));
}
}
}
//Kerberos check
Boolean haveKerberos = this.writerSliceConfig.getBool(Key.HAVE_KERBEROS, false);
if(haveKerberos) {
this.writerSliceConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsWriterErrorCode.REQUIRED_VALUE);
this.writerSliceConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsWriterErrorCode.REQUIRED_VALUE);
}
// encoding check
this.encoding = this.writerSliceConfig.getString(Key.ENCODING,Constant.DEFAULT_ENCODING);
try {
encoding = encoding.trim();
this.writerSliceConfig.set(Key.ENCODING, encoding);
Charsets.toCharset(encoding);
} catch (Exception e) {
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("不支持您配置的编码格式:[%s]", encoding), e);
}
}