public void process()

in hive_data_transfer_udtf/src/main/java/odps/data/dump/MaxComputeDataTransferUDTFMultiPart.java [88:207]


    public void process(Object[] args) throws HiveException {
        Object table = args[0];
        Object column = args[1];
        Object partition = args[2];
        StringObjectInspector soi0 = (StringObjectInspector)argumentOIs[0];
        StringObjectInspector soi1 = (StringObjectInspector)argumentOIs[1];
        StringObjectInspector soi2 = (StringObjectInspector)argumentOIs[2];
        String tmpTableName = soi0.getPrimitiveJavaObject(table);
        String columnStr = soi2.getPrimitiveJavaObject(column);
        String tmpPartitionName = soi1.getPrimitiveJavaObject(partition);

        if (columnNames == null) {
            columnNames = columnStr.split(",");
        }
        for (int i = 0; i < columnNames.length; i++) {
            columnNames[i] = columnNames[i].trim();
        }
        if (partitionNames == null) {
            partitionNames = tmpPartitionName.split(",");
        }
        for (int i = 0; i < partitionNames.length; i++) {
            partitionNames[i] = partitionNames[i].trim();
        }

        // get partition spec
        StringBuilder sb = new StringBuilder();
        for (int i = 3 + columnNames.length, j = 0; i < args.length; ++i,++j) {
            Object colValue = args[i];
            if (colValue == null) {
                continue;
            }
            PrimitiveObjectInspector poi = (PrimitiveObjectInspector)argumentOIs[i];
            Object colValueJavaObj = poi.getPrimitiveJavaObject(colValue);
            if (colValueJavaObj == null) {
                continue;
            }
            sb.append(partitionNames[j]);
            sb.append("=");
            sb.append(colValueJavaObj.toString());
            if (j < partitionNames.length-1) {
                sb.append(",");
            }
        }
        String tmpPartitionSpec = sb.toString();

        if ((!tableName.equals(tmpTableName)) || (!partitionSpec.equals(tmpPartitionSpec))) {
            System.out.println("switch partition: " + partitionSpec + " to : " + tmpPartitionSpec);
            tableName = tmpTableName.trim();
            partitionSpec = tmpPartitionSpec;
            if (uploadSession != null) {
                try {
                    writer.close();
                    writer = null;
                    uploadSession.commit();
                    uploadSession = null;
                } catch (TunnelException e) {
                    throw new HiveException("commit failed", e);
                } catch (IOException e) {
                    throw new HiveException("commit failed", e);
                }
            }

            try {
                Table t = odps.tables().get(tableName);
                tableSchema = t.getSchema();
                if (StringUtils.isEmpty(partitionSpec)) {
                    uploadSession = tunnel.createUploadSession(odpsConfig.getProjectName(), tableName);
                } else {
                    uploadSession = tunnel.createUploadSession(odpsConfig.getProjectName(), tableName, new PartitionSpec(
                        partitionSpec));
                }

                try {
                    if (writer == null) {
                        writer = uploadSession.openBufferedWriter(true); // compress transfer
                        ((TunnelBufferedWriter)writer).setBufferSize(256 * 1024 * 1024);
                    }
                } catch (TunnelException e) {
                    throw new HiveException("create buffered writer failed", e);
                }
            } catch (OdpsException e) {
                throw new HiveException("create upload session failed!", e);
            }
        }

        ArrayRecord product = (ArrayRecord) uploadSession.newRecord();
        for (int i = 3; i < args.length-partitionNames.length; ++i) {
            Object colValue = args[i];
            if (colValue == null) {
                continue;
            }
            PrimitiveObjectInspector poi = (PrimitiveObjectInspector)argumentOIs[i];
            Object colValueJavaObj = poi.getPrimitiveJavaObject(colValue);
            if (colValueJavaObj == null) {
                continue;
            }
            String columnName = columnNames[i-3];
            try {
                OdpsType odpsType = tableSchema.getColumn(columnName).getType();
                String value = colValueJavaObj.toString();
                SimpleDateFormat format = null;
                if (odpsType.equals(OdpsType.TIMESTAMP)) {
                    for (String fmt : TIMESTAMP_PATTERNS) {
                        if (fmt.length() == value.length()) {
                            format = new SimpleDateFormat(fmt);
                            break;
                        }
                    }
                }
                RecordUtil.setFieldValue(product, columnName, value, odpsType, format);
            } catch (ParseException e) {
                throw new HiveException("set Field value failed", e);
            }
        }
        try {
            writer.write(product);
        } catch (IOException e) {
            throw new HiveException("write record failed", e);
        }
    }