in hive_data_transfer_udtf/src/main/java/odps/data/dump/MaxComputeDataTransferUDTFMultiPart.java [88:207]
public void process(Object[] args) throws HiveException {
Object table = args[0];
Object column = args[1];
Object partition = args[2];
StringObjectInspector soi0 = (StringObjectInspector)argumentOIs[0];
StringObjectInspector soi1 = (StringObjectInspector)argumentOIs[1];
StringObjectInspector soi2 = (StringObjectInspector)argumentOIs[2];
String tmpTableName = soi0.getPrimitiveJavaObject(table);
String columnStr = soi2.getPrimitiveJavaObject(column);
String tmpPartitionName = soi1.getPrimitiveJavaObject(partition);
if (columnNames == null) {
columnNames = columnStr.split(",");
}
for (int i = 0; i < columnNames.length; i++) {
columnNames[i] = columnNames[i].trim();
}
if (partitionNames == null) {
partitionNames = tmpPartitionName.split(",");
}
for (int i = 0; i < partitionNames.length; i++) {
partitionNames[i] = partitionNames[i].trim();
}
// get partition spec
StringBuilder sb = new StringBuilder();
for (int i = 3 + columnNames.length, j = 0; i < args.length; ++i,++j) {
Object colValue = args[i];
if (colValue == null) {
continue;
}
PrimitiveObjectInspector poi = (PrimitiveObjectInspector)argumentOIs[i];
Object colValueJavaObj = poi.getPrimitiveJavaObject(colValue);
if (colValueJavaObj == null) {
continue;
}
sb.append(partitionNames[j]);
sb.append("=");
sb.append(colValueJavaObj.toString());
if (j < partitionNames.length-1) {
sb.append(",");
}
}
String tmpPartitionSpec = sb.toString();
if ((!tableName.equals(tmpTableName)) || (!partitionSpec.equals(tmpPartitionSpec))) {
System.out.println("switch partition: " + partitionSpec + " to : " + tmpPartitionSpec);
tableName = tmpTableName.trim();
partitionSpec = tmpPartitionSpec;
if (uploadSession != null) {
try {
writer.close();
writer = null;
uploadSession.commit();
uploadSession = null;
} catch (TunnelException e) {
throw new HiveException("commit failed", e);
} catch (IOException e) {
throw new HiveException("commit failed", e);
}
}
try {
Table t = odps.tables().get(tableName);
tableSchema = t.getSchema();
if (StringUtils.isEmpty(partitionSpec)) {
uploadSession = tunnel.createUploadSession(odpsConfig.getProjectName(), tableName);
} else {
uploadSession = tunnel.createUploadSession(odpsConfig.getProjectName(), tableName, new PartitionSpec(
partitionSpec));
}
try {
if (writer == null) {
writer = uploadSession.openBufferedWriter(true); // compress transfer
((TunnelBufferedWriter)writer).setBufferSize(256 * 1024 * 1024);
}
} catch (TunnelException e) {
throw new HiveException("create buffered writer failed", e);
}
} catch (OdpsException e) {
throw new HiveException("create upload session failed!", e);
}
}
ArrayRecord product = (ArrayRecord) uploadSession.newRecord();
for (int i = 3; i < args.length-partitionNames.length; ++i) {
Object colValue = args[i];
if (colValue == null) {
continue;
}
PrimitiveObjectInspector poi = (PrimitiveObjectInspector)argumentOIs[i];
Object colValueJavaObj = poi.getPrimitiveJavaObject(colValue);
if (colValueJavaObj == null) {
continue;
}
String columnName = columnNames[i-3];
try {
OdpsType odpsType = tableSchema.getColumn(columnName).getType();
String value = colValueJavaObj.toString();
SimpleDateFormat format = null;
if (odpsType.equals(OdpsType.TIMESTAMP)) {
for (String fmt : TIMESTAMP_PATTERNS) {
if (fmt.length() == value.length()) {
format = new SimpleDateFormat(fmt);
break;
}
}
}
RecordUtil.setFieldValue(product, columnName, value, odpsType, format);
} catch (ParseException e) {
throw new HiveException("set Field value failed", e);
}
}
try {
writer.write(product);
} catch (IOException e) {
throw new HiveException("write record failed", e);
}
}