public void write()

in hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/ParquetFileSupport.java [94:234]


    public void write(Record values) {
        if (dataxParquetMode.equalsIgnoreCase("fields")) {
            writeBaseOnFields(values);
            return;
        }

        // NOTE: 下面的实现其实是不对的,只是看代码注释貌似有用户已经在用
        //       所以暂时不动下面的逻辑。
        // 默认走的就是下面的这条代码路径
        if (values != null && columns != null && values.getColumnNumber() == columns.size()) {
            recordConsumer.startMessage();
            for (int i = 0; i < columns.size(); i++) {
                Column value = values.getColumn(i);
                ColumnDescriptor columnDescriptor = columns.get(i);
                Type type = this.schema.getFields().get(i);
                if (value != null) {
                    try {
                        if (this.useRawDataTransf) {
                            if (value.getRawData() == null) {
                                continue;
                            }
                            recordConsumer.startField(columnDescriptor.getPath()[0], i);
                            // 原来使用Column->RawData的方法其实是错误的类型转换策略,会将DataX的数据内部表示形象序列化出去
                            // 但是 Parquet 已经有用户使用了,故暂时只是配置项切换
                            String rawData = value.getRawData().toString();
                            switch (columnDescriptor.getType()) {
                                case BOOLEAN:
                                    recordConsumer.addBoolean(Boolean.parseBoolean(rawData));
                                    break;
                                case FLOAT:
                                    recordConsumer.addFloat(Float.parseFloat(rawData));
                                    break;
                                case DOUBLE:
                                    recordConsumer.addDouble(Double.parseDouble(rawData));
                                    break;
                                case INT32:
                                    OriginalType originalType = type.getOriginalType();
                                    if (originalType != null && StringUtils.equalsIgnoreCase("DATE", originalType.name())) {
                                        int realVal = (int) (new java.sql.Date(Long.parseLong(rawData)).toLocalDate().toEpochDay());
                                        recordConsumer.addInteger(realVal);
                                    } else {
                                        recordConsumer.addInteger(Integer.parseInt(rawData));
                                    }
                                    break;
                                case INT64:
                                    recordConsumer.addLong(Long.valueOf(rawData));
                                    break;
                                case INT96:
                                    recordConsumer.addBinary(timestampColToBinary(value));
                                    break;
                                case BINARY:
                                    recordConsumer.addBinary(Binary.fromString(rawData));
                                    break;
                                case FIXED_LEN_BYTE_ARRAY:
                                    PrimitiveType primitiveType = type.asPrimitiveType();
                                    if (primitiveType.getDecimalMetadata() != null) {
                                        // decimal
                                        recordConsumer.addBinary(decimalToBinary(value, primitiveType.getDecimalMetadata().getPrecision(), primitiveType.getDecimalMetadata().getScale()));
                                        break;
                                    }
                                    /* fall through */
                                default:
                                    recordConsumer.addBinary(Binary.fromString(rawData));
                                    break;
                            }

                            recordConsumer.endField(columnDescriptor.getPath()[0], i);
                        } else {
                            boolean isNull = null == value.getRawData();

                            if (!isNull) {
                                recordConsumer.startField(columnDescriptor.getPath()[0], i);

                                // no skip: empty fields are illegal, the field should be ommited completely instead
                                switch (columnDescriptor.getType()) {
                                    case BOOLEAN:
                                        recordConsumer.addBoolean(value.asBoolean());
                                        break;
                                    case FLOAT:
                                        recordConsumer.addFloat(value.asDouble().floatValue());
                                        break;
                                    case DOUBLE:
                                        recordConsumer.addDouble(value.asDouble());
                                        break;
                                    case INT32:
                                        OriginalType originalType = type.getOriginalType();
                                        if (originalType != null && StringUtils.equalsIgnoreCase("DATE", originalType.name())) {
                                            int realVal = (int) (new java.sql.Date(value.asLong()).toLocalDate().toEpochDay());
                                            recordConsumer.addInteger(realVal);
                                        } else {
                                            recordConsumer.addInteger(value.asLong().intValue());
                                        }
                                        break;
                                    case INT64:
                                        recordConsumer.addLong(value.asLong());
                                        break;
                                    case INT96:
                                        recordConsumer.addBinary(timestampColToBinary(value));
                                        break;
                                    case BINARY:
                                        String valueAsString2Write = null;
                                        if (Column.Type.DATE == value.getType() && null != this.dateParse) {
                                            valueAsString2Write = dateParse.format(value.asDate());
                                        } else {
                                            valueAsString2Write = value.asString();
                                        }
                                        recordConsumer.addBinary(Binary.fromString(valueAsString2Write));
                                        break;
                                    case FIXED_LEN_BYTE_ARRAY:
                                        PrimitiveType primitiveType = type.asPrimitiveType();
                                        if (primitiveType.getDecimalMetadata() != null) {
                                            // decimal
                                            recordConsumer.addBinary(decimalToBinary(value, primitiveType.getDecimalMetadata().getPrecision(), primitiveType.getDecimalMetadata().getScale()));
                                            break;
                                        }
                                        /* fall through */
                                    default:
                                        recordConsumer.addBinary(Binary.fromString(value.asString()));
                                        break;
                                }
                                recordConsumer.endField(columnDescriptor.getPath()[0], i);
                            }
                        }
                    } catch (Exception e) {
                        if (printStackTrace) {
                            printStackTrace = false;
                            LOGGER.warn("write to parquet error: {}", e.getMessage(), e);
                        }
                        // dirty data
                        if (null != this.taskPluginCollector) {
                            // job post 里面的merge taskPluginCollector 为null
                            this.taskPluginCollector.collectDirtyRecord(values, e, e.getMessage());
                        }
                    }
                } else {
                    recordConsumer.addBinary(this.binaryForNull);
                }
            }
            recordConsumer.endMessage();
        }
    }