public void parquetFileStartRead()

in hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java [763:892]


    public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
        String schemaString = readerSliceConfig.getString(Key.PARQUET_SCHEMA);
        if (StringUtils.isNotBlank(schemaString)) {
            LOG.info("You config parquet schema, use it {}", schemaString);
        } else {
            schemaString = getParquetSchema(sourceParquetFilePath, hadoopConf);
            LOG.info("Parquet schema parsed from: {} , schema is {}", sourceParquetFilePath, schemaString);
            if (StringUtils.isBlank(schemaString)) {
                throw DataXException.asDataXException("ParquetSchema is required, please check your config");
            }
        }
        MessageType parquetSchema = null;
        List<org.apache.parquet.schema.Type> parquetTypes = null;
        Map<String, ParquetMeta> parquetMetaMap = null;
        int fieldCount = 0;
        try {
            parquetSchema = MessageTypeParser.parseMessageType(schemaString);
            fieldCount = parquetSchema.getFieldCount();
            parquetTypes = parquetSchema.getFields();
            parquetMetaMap = ParquetMessageHelper.parseParquetTypes(parquetTypes);
        } catch (Exception e) {
            String message = String.format("Error parsing to MessageType via Schema string [%s]", schemaString);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.PARSE_MESSAGE_TYPE_FROM_SCHEMA_ERROR, e);
        }
        List<ColumnEntry> column = UnstructuredStorageReaderUtil.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
        boolean isUtcTimestamp = readerSliceConfig.getBool(Key.PARQUET_UTC_TIMESTAMP, false);
        boolean isReadAllColumns = (column == null || column.size() == 0) ? true : false;
        LOG.info("ReadingAllColums: " + isReadAllColumns);

        /**
         * 支持 hive 表中间加列场景
         *
         * 开关默认 false,在 hive表存在中间加列的场景打开,需要根据 name排序
         * 不默认打开的原因
         * 1、存量hdfs任务,只根据 index获取字段,无name字段配置
         * 2、中间加列场景比较少
         * 3、存量任务可能存在列错位的问题,不能随意纠正
         */
        boolean supportAddMiddleColumn = readerSliceConfig.getBool(Key.SUPPORT_ADD_MIDDLE_COLUMN, false);

        boolean printNullValueException = readerSliceConfig.getBool("printNullValueException", false);
        List<Integer> ignoreIndex = readerSliceConfig.getList("ignoreIndex", new ArrayList<Integer>(), Integer.class);

        JobConf conf = new JobConf(hadoopConf);
        ParquetReader<Group> reader = null;
        try {
            Path parquetFilePath = new Path(sourceParquetFilePath);
            GroupReadSupport readSupport = new GroupReadSupport();
            readSupport.init(conf, null, parquetSchema);
            // 这里初始化parquetReader的时候,会getFileSystem,如果是HA集群,期间会根据hadoopConfig中区加载failover类,这里初始化builder带上conf
            ParquetReader.Builder parquetReaderBuilder = ParquetReader.builder(readSupport, parquetFilePath);
            parquetReaderBuilder.withConf(hadoopConf);
            reader = parquetReaderBuilder.build();
            Group g = null;

            // 从文件名中解析分区信息
            List<ColumnEntry> hivePartitionColumnEntrys = UnstructuredStorageReaderUtil.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.HIVE_PARTION_COLUMN);
            ArrayList<Column> hivePartitionColumns = new ArrayList<>();
            hivePartitionColumns = UnstructuredStorageReaderUtil.getHivePartitionColumns(sourceParquetFilePath, hivePartitionColumnEntrys);
            List<String> schemaFieldList = null;
            Map<Integer, String> colNameIndexMap = null;
            Map<Integer, Integer> indexMap = null;
            if (supportAddMiddleColumn) {
                boolean nonName = column.stream().anyMatch(columnEntry -> StringUtils.isEmpty(columnEntry.getName()));
                if (nonName) {
                    throw new DataXException("You configured column item without name, please correct it");
                }
                List<org.apache.parquet.schema.Type> parquetFileFields = getParquetFileFields(parquetFilePath, hadoopConf);
                schemaFieldList = parquetFileFields.stream().map(org.apache.parquet.schema.Type::getName).collect(Collectors.toList());
                colNameIndexMap = new ConcurrentHashMap<>();
                Map<Integer, String> finalColNameIndexMap = colNameIndexMap;
                column.forEach(columnEntry -> finalColNameIndexMap.put(columnEntry.getIndex(), columnEntry.getName()));
                Iterator<Map.Entry<Integer, String>> iterator = finalColNameIndexMap.entrySet().iterator();
                while (iterator.hasNext()) {
                    Map.Entry<Integer, String> next = iterator.next();
                    if (!schemaFieldList.contains(next.getValue())) {
                        finalColNameIndexMap.remove((next.getKey()));
                    }
                }
                LOG.info("SupportAddMiddleColumn is true, fields from parquet file is {}, " +
                        "colNameIndexMap is {}", JSON.toJSONString(schemaFieldList), JSON.toJSONString(colNameIndexMap));
                fieldCount = column.size();
                indexMap = new HashMap<>();
                for (int j = 0; j < fieldCount; j++) {
                    if (colNameIndexMap.containsKey(j)) {
                        int index = findIndex(schemaFieldList, findEleInMap(colNameIndexMap, j));
                        indexMap.put(j, index);
                    }
                }
            }
            while ((g = reader.read()) != null) {
                List<Object> formattedRecord = new ArrayList<Object>(fieldCount);
                try {
                    for (int j = 0; j < fieldCount; j++) {
                        Object data = null;
                        try {
                            if (null != ignoreIndex && !ignoreIndex.isEmpty() && ignoreIndex.contains(j)) {
                                data = null;
                            } else {
                                if (supportAddMiddleColumn) {
                                    if (!colNameIndexMap.containsKey(j)) {
                                        formattedRecord.add(null);
                                        continue;
                                    } else {
                                        data = DFSUtil.this.readFields(g, parquetTypes.get(indexMap.get(j)), indexMap.get(j), parquetMetaMap, isUtcTimestamp);
                                    }
                                } else {
                                    data = DFSUtil.this.readFields(g, parquetTypes.get(j), j, parquetMetaMap, isUtcTimestamp);
                                }
                            }
                        } catch (RuntimeException e) {
                            if (printNullValueException) {
                                LOG.warn(e.getMessage());
                            }
                        }
                        formattedRecord.add(data);
                    }
                    transportOneRecord(column, formattedRecord, recordSender, taskPluginCollector, isReadAllColumns, nullFormat, hivePartitionColumns);
                } catch (Exception e) {
                    throw DataXException.asDataXException(HdfsReaderErrorCode.READ_PARQUET_ERROR, e);
                }
            }
        } catch (Exception e) {
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_PARQUET_ERROR, e);
        } finally {
            org.apache.commons.io.IOUtils.closeQuietly(reader);
        }
    }