in cupid-table-api/src/main/java/com/aliyun/odps/cupid/table/v1/writer/adaptor/ColDataRowWriter.java [217:331]
private void trans(TypeInfo odpsTypeInfo, Row rec, int ind, int rCnt) throws UnsupportedEncodingException {
if (isString(odpsTypeInfo.getOdpsType())) {
byte[] utf8bytes = null;
if (odpsTypeInfo.getOdpsType() == OdpsType.BINARY) {
utf8bytes = rec.getBytes(ind);
} else {
String utf8String = null;
switch (odpsTypeInfo.getOdpsType()) {
case CHAR:
utf8String = rec.getChar(ind).getValue();
break;
case VARCHAR:
utf8String = rec.getVarchar(ind).getValue();
break;
case STRING:
utf8String = rec.getString(ind);
break;
}
if (odpsTypeInfo.getOdpsType() == OdpsType.CHAR
|| odpsTypeInfo.getOdpsType() == OdpsType.VARCHAR) {
AbstractCharTypeInfo ti = (AbstractCharTypeInfo) odpsTypeInfo;
int maxLength = ti.getLength();
if (utf8String.length() > maxLength) {
utf8String = utf8String.substring(0, maxLength);
}
}
utf8bytes = utf8String.getBytes("utf-8");
}
int numBytes = utf8bytes.length;
if (stringColSizeMap.get(ind) + numBytes > deepBuf[ind].length) {
byte[] newArray = new byte[(deepBuf[ind].length + numBytes) * 2];
System.arraycopy(deepBuf[ind], 0, newArray, 0, stringColSizeMap.get(ind));
deepBuf[ind] = newArray;
}
Platform.copyMemory(utf8bytes, Platform.BYTE_ARRAY_OFFSET,
deepBuf[ind], Platform.BYTE_ARRAY_OFFSET + stringColSizeMap.get(ind), numBytes);
stringColSizeMap.put(ind, stringColSizeMap.get(ind) + numBytes);
Platform.putInt(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt * 4, numBytes);
} else {
switch (odpsTypeInfo.getOdpsType()) {
case BOOLEAN:
Platform.putBoolean(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt, rec.getBoolean(ind));
break;
case TINYINT:
Platform.putByte(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt, rec.getByte(ind));
break;
case SMALLINT:
Platform.putShort(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt * 2, rec.getShort(ind));
break;
case FLOAT:
Platform.putFloat(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt * 4, rec.getFloat(ind));
break;
case INT:
Platform.putInt(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt * 4, rec.getInt(ind));
break;
case DATE:
Platform.putLong(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt * 8, DateUtils.getDayOffset(rec.getDate(ind)));
break;
case DATETIME:
Platform.putLong(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt * 8, DateUtils.date2ms(rec.getDatetime(ind)));
break;
case TIMESTAMP:
Timestamp timeStamp = rec.getTimeStamp(ind);
int nanoSeconds = timeStamp.getNanos();
long seconds = timeStamp.getTime() - (nanoSeconds / 1000000) / 1000;
Platform.putLong(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt * 12, seconds);
Platform.putInt(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt * 12 + 8, nanoSeconds);
break;
case BIGINT:
Platform.putLong(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt * 8, rec.getLong(ind));
break;
case DOUBLE:
Platform.putDouble(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt * 8, rec.getDouble(ind));
break;
case DECIMAL:
DecimalTypeInfo decimalInfo = (DecimalTypeInfo) odpsTypeInfo;
BigDecimal decimal = rec.getDecimal(ind);
if (decimalInfo.getPrecision() > 38) {
byte[] decimalBytes = decimal.toString().getBytes();
int numBytes = decimalBytes.length;
if (stringColSizeMap.get(ind) + numBytes > deepBuf[ind].length) {
byte[] newArray = new byte[(deepBuf[ind].length + numBytes) * 2];
System.arraycopy(deepBuf[ind], 0, newArray, 0, stringColSizeMap.get(ind));
deepBuf[ind] = newArray;
}
System.arraycopy(decimalBytes, 0, deepBuf[ind], stringColSizeMap.get(ind), numBytes);
stringColSizeMap.put(ind, stringColSizeMap.get(ind) + numBytes);
Platform.putInt(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt * 4, numBytes);
} else if (decimalInfo.getPrecision() > 18) {
byte[] byteArray = decimal.unscaledValue().toByteArray();
int length = byteArray.length;
for (int i = 0; i < length; i++) {
Platform.putByte(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt * 16 + i, byteArray[length - i - 1]);
}
if (decimal.signum() == -1) {
for (int i = length; i < 16; i++) {
Platform.putByte(dataBuf[ind], Platform.BYTE_ARRAY_OFFSET + rCnt * 16 + i, (byte) -1);
}
}
} else if (decimalInfo.getPrecision() > 9) {
Platform.putLong(dataBuf[ind],
Platform.BYTE_ARRAY_OFFSET + rCnt * 8, decimal.unscaledValue().longValue());
} else if (decimalInfo.getPrecision() > 4) {
Platform.putInt(dataBuf[ind],
Platform.BYTE_ARRAY_OFFSET + rCnt * 4, decimal.unscaledValue().intValue());
} else {
Platform.putShort(dataBuf[ind],
Platform.BYTE_ARRAY_OFFSET + rCnt * 2, decimal.unscaledValue().shortValue());
}
break;
default:
throw new RuntimeException("not supported type " + odpsTypeInfo.getOdpsType());
}
}
}