in hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java [763:892]
public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
String schemaString = readerSliceConfig.getString(Key.PARQUET_SCHEMA);
if (StringUtils.isNotBlank(schemaString)) {
LOG.info("You config parquet schema, use it {}", schemaString);
} else {
schemaString = getParquetSchema(sourceParquetFilePath, hadoopConf);
LOG.info("Parquet schema parsed from: {} , schema is {}", sourceParquetFilePath, schemaString);
if (StringUtils.isBlank(schemaString)) {
throw DataXException.asDataXException("ParquetSchema is required, please check your config");
}
}
MessageType parquetSchema = null;
List<org.apache.parquet.schema.Type> parquetTypes = null;
Map<String, ParquetMeta> parquetMetaMap = null;
int fieldCount = 0;
try {
parquetSchema = MessageTypeParser.parseMessageType(schemaString);
fieldCount = parquetSchema.getFieldCount();
parquetTypes = parquetSchema.getFields();
parquetMetaMap = ParquetMessageHelper.parseParquetTypes(parquetTypes);
} catch (Exception e) {
String message = String.format("Error parsing to MessageType via Schema string [%s]", schemaString);
LOG.error(message);
throw DataXException.asDataXException(HdfsReaderErrorCode.PARSE_MESSAGE_TYPE_FROM_SCHEMA_ERROR, e);
}
List<ColumnEntry> column = UnstructuredStorageReaderUtil.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
boolean isUtcTimestamp = readerSliceConfig.getBool(Key.PARQUET_UTC_TIMESTAMP, false);
boolean isReadAllColumns = (column == null || column.size() == 0) ? true : false;
LOG.info("ReadingAllColums: " + isReadAllColumns);
/**
* 支持 hive 表中间加列场景
*
* 开关默认 false,在 hive表存在中间加列的场景打开,需要根据 name排序
* 不默认打开的原因
* 1、存量hdfs任务,只根据 index获取字段,无name字段配置
* 2、中间加列场景比较少
* 3、存量任务可能存在列错位的问题,不能随意纠正
*/
boolean supportAddMiddleColumn = readerSliceConfig.getBool(Key.SUPPORT_ADD_MIDDLE_COLUMN, false);
boolean printNullValueException = readerSliceConfig.getBool("printNullValueException", false);
List<Integer> ignoreIndex = readerSliceConfig.getList("ignoreIndex", new ArrayList<Integer>(), Integer.class);
JobConf conf = new JobConf(hadoopConf);
ParquetReader<Group> reader = null;
try {
Path parquetFilePath = new Path(sourceParquetFilePath);
GroupReadSupport readSupport = new GroupReadSupport();
readSupport.init(conf, null, parquetSchema);
// 这里初始化parquetReader的时候,会getFileSystem,如果是HA集群,期间会根据hadoopConfig中区加载failover类,这里初始化builder带上conf
ParquetReader.Builder parquetReaderBuilder = ParquetReader.builder(readSupport, parquetFilePath);
parquetReaderBuilder.withConf(hadoopConf);
reader = parquetReaderBuilder.build();
Group g = null;
// 从文件名中解析分区信息
List<ColumnEntry> hivePartitionColumnEntrys = UnstructuredStorageReaderUtil.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.HIVE_PARTION_COLUMN);
ArrayList<Column> hivePartitionColumns = new ArrayList<>();
hivePartitionColumns = UnstructuredStorageReaderUtil.getHivePartitionColumns(sourceParquetFilePath, hivePartitionColumnEntrys);
List<String> schemaFieldList = null;
Map<Integer, String> colNameIndexMap = null;
Map<Integer, Integer> indexMap = null;
if (supportAddMiddleColumn) {
boolean nonName = column.stream().anyMatch(columnEntry -> StringUtils.isEmpty(columnEntry.getName()));
if (nonName) {
throw new DataXException("You configured column item without name, please correct it");
}
List<org.apache.parquet.schema.Type> parquetFileFields = getParquetFileFields(parquetFilePath, hadoopConf);
schemaFieldList = parquetFileFields.stream().map(org.apache.parquet.schema.Type::getName).collect(Collectors.toList());
colNameIndexMap = new ConcurrentHashMap<>();
Map<Integer, String> finalColNameIndexMap = colNameIndexMap;
column.forEach(columnEntry -> finalColNameIndexMap.put(columnEntry.getIndex(), columnEntry.getName()));
Iterator<Map.Entry<Integer, String>> iterator = finalColNameIndexMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Integer, String> next = iterator.next();
if (!schemaFieldList.contains(next.getValue())) {
finalColNameIndexMap.remove((next.getKey()));
}
}
LOG.info("SupportAddMiddleColumn is true, fields from parquet file is {}, " +
"colNameIndexMap is {}", JSON.toJSONString(schemaFieldList), JSON.toJSONString(colNameIndexMap));
fieldCount = column.size();
indexMap = new HashMap<>();
for (int j = 0; j < fieldCount; j++) {
if (colNameIndexMap.containsKey(j)) {
int index = findIndex(schemaFieldList, findEleInMap(colNameIndexMap, j));
indexMap.put(j, index);
}
}
}
while ((g = reader.read()) != null) {
List<Object> formattedRecord = new ArrayList<Object>(fieldCount);
try {
for (int j = 0; j < fieldCount; j++) {
Object data = null;
try {
if (null != ignoreIndex && !ignoreIndex.isEmpty() && ignoreIndex.contains(j)) {
data = null;
} else {
if (supportAddMiddleColumn) {
if (!colNameIndexMap.containsKey(j)) {
formattedRecord.add(null);
continue;
} else {
data = DFSUtil.this.readFields(g, parquetTypes.get(indexMap.get(j)), indexMap.get(j), parquetMetaMap, isUtcTimestamp);
}
} else {
data = DFSUtil.this.readFields(g, parquetTypes.get(j), j, parquetMetaMap, isUtcTimestamp);
}
}
} catch (RuntimeException e) {
if (printNullValueException) {
LOG.warn(e.getMessage());
}
}
formattedRecord.add(data);
}
transportOneRecord(column, formattedRecord, recordSender, taskPluginCollector, isReadAllColumns, nullFormat, hivePartitionColumns);
} catch (Exception e) {
throw DataXException.asDataXException(HdfsReaderErrorCode.READ_PARQUET_ERROR, e);
}
}
} catch (Exception e) {
throw DataXException.asDataXException(HdfsReaderErrorCode.READ_PARQUET_ERROR, e);
} finally {
org.apache.commons.io.IOUtils.closeQuietly(reader);
}
}