public void orcFileStartRead()

in hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java [331:422]


    public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
                                 RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
        LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
        List<ColumnEntry> column = UnstructuredStorageReaderUtil
                .getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
        StringBuilder allColumns = new StringBuilder();
        StringBuilder allColumnTypes = new StringBuilder();
        boolean isReadAllColumns = false;
        int columnIndexMax = -1;
        // 判断是否读取所有列
        if (null == column || column.size() == 0) {
            int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
            columnIndexMax = allColumnsCount - 1;
            isReadAllColumns = true;
        } else {
            columnIndexMax = getMaxIndex(column);
        }
        for (int i = 0; i <= columnIndexMax; i++) {
            allColumns.append("col");
            allColumnTypes.append("string");
            if (i != columnIndexMax) {
                allColumns.append(",");
                allColumnTypes.append(":");
            }
        }
        if (columnIndexMax >= 0) {
            JobConf conf = new JobConf(hadoopConf);
            Path orcFilePath = new Path(sourceOrcFilePath);
            Properties p = new Properties();
            p.setProperty("columns", allColumns.toString());
            p.setProperty("columns.types", allColumnTypes.toString());
            try {
                OrcSerde serde = new OrcSerde();
                serde.initialize(conf, p);
                StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector();
                InputFormat<?, ?> in = new OrcInputFormat();
                FileInputFormat.setInputPaths(conf, orcFilePath.toString());

                //If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
                //Each file as a split
                //TODO multy threads
                // OrcInputFormat getSplits params numSplits not used, splits size = block numbers
                InputSplit[] splits;
                try {
                    splits = in.getSplits(conf, 1);
                } catch (Exception splitException) {
                    if (Boolean.TRUE.equals(this.skipEmptyOrcFile)) {
                        boolean isOrcFileEmptyException = checkIsOrcEmptyFileExecption(splitException);
                        if (isOrcFileEmptyException) {
                            LOG.info("skipEmptyOrcFile: true, \"{}\" is an empty orc file, skip it!", sourceOrcFilePath);
                            return;
                        }
                    }
                    throw splitException;
                }
                for (InputSplit split : splits) {
                    {
                        RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
                        Object key = reader.createKey();
                        Object value = reader.createValue();
                        // 获取列信息
                        List<? extends StructField> fields = inspector.getAllStructFieldRefs();

                        List<Object> recordFields;
                        while (reader.next(key, value)) {
                            recordFields = new ArrayList<Object>();

                            for (int i = 0; i <= columnIndexMax; i++) {
                                Object field = inspector.getStructFieldData(value, fields.get(i));
                                recordFields.add(field);
                            }
                            List<ColumnEntry> hivePartitionColumnEntrys = UnstructuredStorageReaderUtil.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.HIVE_PARTION_COLUMN);
                            ArrayList<Column> hivePartitionColumns = new ArrayList<>();
                            hivePartitionColumns = UnstructuredStorageReaderUtil.getHivePartitionColumns(sourceOrcFilePath, hivePartitionColumnEntrys);
                            transportOneRecord(column, recordFields, recordSender,
                                    taskPluginCollector, isReadAllColumns, nullFormat,hivePartitionColumns);
                        }
                        reader.close();
                    }
                }
            } catch (Exception e) {
                String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
                        , sourceOrcFilePath);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
            }
        } else {
            String message = String.format("请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
            throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
        }
    }