in ogg-plugin/src/main/java/com/aliyun/odps/ogg/handler/datahub/TableRecordBuilder.java [177:274]
private void buildTupleRecord(Record record, TableMapping tableMapping, RecordEntry recordEntry) {
TupleRecordData recordData = new TupleRecordData(tableMapping.getRecordSchema());
RecordSchema recordSchema = tableMapping.getRecordSchema();
StringBuilder hashString = new StringBuilder();
List<DsColumn> columns = record.op.getColumns();
String rowIdColumn = tableMapping.getRowIdColumn();
if (StringUtils.isNotBlank(rowIdColumn)) {
DsToken token = record.rowIdToken;
if (!token.isSet()) {
logger.error("BuildRecord failed, oracle table token TKN-ROWID is not set, can not get oracle rowid, table: {}",
tableMapping.getOracleFullTableName());
throw new RuntimeException("oracle table token TKN-ROWID is not set, can not get oracle rowid");
}
recordData.setField(rowIdColumn, token.getValue());
}
String ctype = tableMapping.getcTypeColumn();
if (StringUtils.isNotBlank(ctype)) {
recordData.setField(ctype, record.opType);
}
String ctime = tableMapping.getcTimeColumn();
if (StringUtils.isNotBlank(ctime)) {
if (recordSchema.getField(ctime).getType() == FieldType.STRING) {
recordData.setField(ctime, record.op.getTimestamp());
} else if (recordSchema.getField(ctime).getType() == FieldType.TIMESTAMP) {
recordData.setField(ctime, convertStrToMicroseconds(record.op.getTimestamp()));
} else {
logger.error("BuildRecord failed, cTimeColumn type must be string or timestamp in DataHub, type: {}",
recordSchema.getField(ctime).getType().name());
throw new RuntimeException("cTimeColumn type must be string or timestamp in DataHub");
}
}
String cId = tableMapping.getcIdColumn();
if (StringUtils.isNotBlank(cId)) {
recordData.setField(cId, record.recordId);
}
Timestamp timestamp = Timestamp.valueOf(record.op.getTimestamp());
Map<String, String> constMap = tableMapping.getConstColumnMappings();
if (constMap != null && !constMap.isEmpty()) {
for (Map.Entry<String, String> entry : constMap.entrySet()) {
recordData.setField(entry.getKey(), BucketPath.escapeString(entry.getValue(),
timestamp.getTime(), tableMapping.getConstColumnMappings()));
}
}
for (int i = 0; i < columns.size(); i++) {
String columnName = record.op.getTableMeta().getColumnName(i).toLowerCase();
ColumnMapping columnMapping = tableMapping.getColumnMappings().get(columnName);
if (columnMapping == null) {
continue;
}
DsColumn dsColumn = columns.get(i);
String afterValue = dsColumn.hasAfterValue() ? dsColumn.getAfterValue() : null;
String beforeValue = dsColumn.hasBeforeValue() ? dsColumn.getBeforeValue() : null;
String dest = columnMapping.getDest();
String hashVal = null;
if (StringUtils.isNotBlank(dest)) {
hashVal = afterValue;
if (columnMapping.isKeyColumn()) {
if (dsColumn.hasAfterValue()) {
setTupleData(recordData, recordSchema.getField(dest), afterValue,
columnMapping.isDateFormat(), columnMapping.getSimpleDateFormat());
} else {
setTupleData(recordData, recordSchema.getField(dest), beforeValue,
columnMapping.isDateFormat(), columnMapping.getSimpleDateFormat());
hashVal = beforeValue;
}
} else {
setTupleData(recordData, recordSchema.getField(dest), afterValue,
columnMapping.isDateFormat(), columnMapping.getSimpleDateFormat());
}
}
String destOld = columnMapping.getDestOld();
if (StringUtils.isNotBlank(destOld)) {
hashVal = hashVal != null ? hashVal : beforeValue;
setTupleData(recordData, recordSchema.getField(destOld), beforeValue,
columnMapping.isDateFormat(), columnMapping.getSimpleDateFormat());
}
if (columnMapping.isShardColumn() && hashVal != null) {
hashString.append(hashVal);
}
}
if (hashString.length() > 0) {
recordEntry.setPartitionKey(hashString.toString());
}
recordEntry.setRecordData(recordData);
}