public Record dataxRecordToOdpsRecord()

in odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/OdpsWriterProxy.java [250:436]


    public Record dataxRecordToOdpsRecord(com.alibaba.datax.common.element.Record dataXRecord) throws Exception {
        int sourceColumnCount = dataXRecord.getColumnNumber();
        ArrayRecord odpsRecord = (ArrayRecord) slaveUpload.newRecord();

        int userConfiguredColumnNumber = this.columnPositions.size();

        if (sourceColumnCount > userConfiguredColumnNumber) {
            throw DataXException.asDataXException(OdpsWriterErrorCode.ILLEGAL_VALUE,
                    MESSAGE_SOURCE.message("odpswriterproxy.1", sourceColumnCount, userConfiguredColumnNumber));
        } else if (sourceColumnCount < userConfiguredColumnNumber) {
            if (printColumnLess) {
                LOG.warn(MESSAGE_SOURCE.message("odpswriterproxy.2", sourceColumnCount, userConfiguredColumnNumber));
            }
            printColumnLess = false;
        }

        int currentIndex = 0;
        int sourceIndex = 0;
        try {
            com.alibaba.datax.common.element.Column columnValue;

            for (; sourceIndex < sourceColumnCount; sourceIndex++) {
                // 跳过分区列
                if (this.columnPositions.get(sourceIndex) == -1) {
                    continue;
                }
                currentIndex = columnPositions.get(sourceIndex);
                TypeInfo typeInfo = this.tableOriginalColumnTypeList.get(currentIndex);
                OdpsType type = typeInfo.getOdpsType();
                String typeName = typeInfo.getTypeName();
                columnValue = dataXRecord.getColumn(sourceIndex);

                if (columnValue == null) {
                    continue;
                }
                // for compatible dt lib, "" as null
                if (this.emptyAsNull && columnValue instanceof StringColumn && "".equals(columnValue.asString())) {
                    continue;
                }

                switch (type) {
                case STRING:
                    String newValue =  (String)OdpsUtil.processOverLengthData(columnValue.asString(), OdpsType.STRING, this.overLengthRule, this.maxFieldLength, this.enableOverLengthOutput);
                    odpsRecord.setString(currentIndex, newValue);
                    break;
                case BIGINT:
                    odpsRecord.setBigint(currentIndex, columnValue.asLong());
                    break;
                case BOOLEAN:
                    odpsRecord.setBoolean(currentIndex, columnValue.asBoolean());
                    break;
                case DATETIME:
                    odpsRecord.setDatetime(currentIndex, columnValue.asDate());
//                    Date datetimeData = columnValue.asDate();
//                    if (null == datetimeData) {
//                        odpsRecord.setDatetime(currentIndex, null);
//                    } else {
//                        Timestamp dateDataForOdps = new Timestamp(datetimeData.getTime());
//                        if (datetimeData instanceof java.sql.Timestamp) {
//                            dateDataForOdps.setNanos(((java.sql.Timestamp)datetimeData).getNanos());
//                        }
//                        odpsRecord.setDatetime(currentIndex, dateDataForOdps);
//                    }
                    break;
                case DATE:
                    Date dateData = columnValue.asDate();
                    if (null == dateData) {
                        odpsRecord.setDatetime(currentIndex, null);
                    } else {
                        if (this.useDateWithCalendar) {
                            odpsRecord.setDate(currentIndex, new java.sql.Date(dateData.getTime()), this.calendarForDate);
                        } else {
                            odpsRecord.setDatetime(currentIndex, new java.sql.Date(dateData.getTime()));
                        }
                    }
                    break;
                case DOUBLE:
                    odpsRecord.setDouble(currentIndex, columnValue.asDouble());
                    break;
                case FLOAT:
                    Double floatValue = columnValue.asDouble();
                    if (null == floatValue) {
                        ((ArrayRecord) odpsRecord).setFloat(currentIndex, null);
                    } else {
                        ((ArrayRecord) odpsRecord).setFloat(currentIndex, floatValue.floatValue());
                    }
                    break;
                case DECIMAL:
                    odpsRecord.setDecimal(currentIndex, columnValue.asBigDecimal());
                    String columnStr = columnValue.asString();
                    if (columnStr != null && columnStr.indexOf(".") >= 36) {
                        throw new Exception(MESSAGE_SOURCE.message("odpswriterproxy.3"));
                    }
                    break;
                case TINYINT:
                    Long tinyintValueStr = columnValue.asLong();
                    if (null == tinyintValueStr) {
                        ((ArrayRecord) odpsRecord).setTinyint(currentIndex, null);
                    } else {
                        ((ArrayRecord) odpsRecord).setTinyint(currentIndex,
                                Byte.valueOf(String.valueOf(tinyintValueStr)));
                    }
                    break;
                case SMALLINT:
                    Long smallIntValue = columnValue.asLong();
                    if (null == smallIntValue) {
                        ((ArrayRecord) odpsRecord).setSmallint(currentIndex, null);
                    } else {
                        ((ArrayRecord) odpsRecord).setSmallint(currentIndex, smallIntValue.shortValue());
                    }
                    break;
                case INT:
                    Long intValue = columnValue.asLong();
                    if (null == intValue) {
                        ((ArrayRecord) odpsRecord).setInt(currentIndex, null);
                    } else {
                        ((ArrayRecord) odpsRecord).setInt(currentIndex, intValue.intValue());
                    }
                    break;
                case VARCHAR:
                    // warn: columnValue.asString() 为 null 时 , odps sdk 有 BUG
                    // 不能用 Varchar 的默认构造函数,不然有 NPE
                    String varcharValueStr = columnValue.asString();
                    Varchar varcharData = null;
                    if (varcharValueStr != null){
                        varcharData = new Varchar(columnValue.asString());
                    }
                    ((ArrayRecord) odpsRecord).setVarchar(currentIndex, varcharData);
                    break;
                case CHAR:
                    String charValueStr = columnValue.asString();
                    Char charData = null;
                    if (charValueStr != null ){
                        charData = new Char(charValueStr);
                    }
                    ((ArrayRecord) odpsRecord).setChar(currentIndex, charData);
                    break;
                case TIMESTAMP:
                    Date timestampData = columnValue.asDate();
                    if (null == timestampData) {
                        ((ArrayRecord) odpsRecord).setTimestamp(currentIndex, null);
                    } else {
                        Timestamp timestampDataForOdps = new Timestamp(timestampData.getTime());
                        if (timestampData instanceof java.sql.Timestamp) {
                            // 纳秒
                            timestampDataForOdps.setNanos(((java.sql.Timestamp)timestampData).getNanos());
                        }
                        // warn优化:如果原来类型就是Timestamp,直接使用就少创建了一个对象
                        ((ArrayRecord) odpsRecord).setTimestamp(currentIndex, timestampDataForOdps);
                    }
                    break;
                case BINARY:
                    Binary newBinaryData  =  (Binary)OdpsUtil.processOverLengthData(new Binary(columnValue.asBytes()), OdpsType.BINARY, this.overLengthRule, this.maxFieldLength, this.enableOverLengthOutput);
                    ((ArrayRecord) odpsRecord).setBinary(currentIndex,columnValue.asBytes() == null ? null : newBinaryData);
                    break;
                case ARRAY:
                    JSONArray arrayJson = JSON.parseArray(columnValue.asString());
                    ((ArrayRecord) odpsRecord).setArray(currentIndex, parseArray(arrayJson, (ArrayTypeInfo) typeInfo));
                    break;
                case MAP:
                    JSONObject mapJson = JSON.parseObject(columnValue.asString());
                    ((ArrayRecord) odpsRecord).setMap(currentIndex, parseMap(mapJson, (MapTypeInfo) typeInfo));
                    break;
                case STRUCT:
                    JSONObject structJson = JSON.parseObject(columnValue.asString());
                    ((ArrayRecord) odpsRecord).setStruct(currentIndex,
                            parseStruct(structJson, (StructTypeInfo) typeInfo));
                    break;
                default:
                    break;
                }
            }

            return odpsRecord;
        } catch (Exception e) {
            String dirtyColumnName = "";
            try {
                dirtyColumnName = this.allColumns.get(currentIndex);
            } catch (Exception ignoreEx) {
                // ignore
            }
            String message = MESSAGE_SOURCE.message("odpswriterproxy.4", sourceIndex, dirtyColumnName);
            this.taskPluginCollector.collectDirtyRecord(dataXRecord, e, message);
            return null;
        }

    }