private void buildTupleRecord()

in ogg-plugin/src/main/java/com/aliyun/odps/ogg/handler/datahub/TableRecordBuilder.java [177:274]


    private void buildTupleRecord(Record record, TableMapping tableMapping, RecordEntry recordEntry) {
        TupleRecordData recordData = new TupleRecordData(tableMapping.getRecordSchema());
        RecordSchema recordSchema = tableMapping.getRecordSchema();
        StringBuilder hashString = new StringBuilder();

        List<DsColumn> columns = record.op.getColumns();

        String rowIdColumn = tableMapping.getRowIdColumn();
        if (StringUtils.isNotBlank(rowIdColumn)) {
            DsToken token = record.rowIdToken;
            if (!token.isSet()) {
                logger.error("BuildRecord failed, oracle table token TKN-ROWID is not set, can not get oracle rowid, table: {}",
                        tableMapping.getOracleFullTableName());
                throw new RuntimeException("oracle table token TKN-ROWID is not set, can not get oracle rowid");
            }
            recordData.setField(rowIdColumn, token.getValue());
        }

        String ctype = tableMapping.getcTypeColumn();
        if (StringUtils.isNotBlank(ctype)) {
            recordData.setField(ctype, record.opType);
        }

        String ctime = tableMapping.getcTimeColumn();
        if (StringUtils.isNotBlank(ctime)) {
            if (recordSchema.getField(ctime).getType() == FieldType.STRING) {
                recordData.setField(ctime, record.op.getTimestamp());
            } else if (recordSchema.getField(ctime).getType() == FieldType.TIMESTAMP) {
                recordData.setField(ctime, convertStrToMicroseconds(record.op.getTimestamp()));
            } else {
                logger.error("BuildRecord failed, cTimeColumn type must be string or timestamp in DataHub, type: {}",
                        recordSchema.getField(ctime).getType().name());
                throw new RuntimeException("cTimeColumn type must be string or timestamp in DataHub");
            }
        }

        String cId = tableMapping.getcIdColumn();
        if (StringUtils.isNotBlank(cId)) {
            recordData.setField(cId, record.recordId);
        }

        Timestamp timestamp = Timestamp.valueOf(record.op.getTimestamp());
        Map<String, String> constMap = tableMapping.getConstColumnMappings();
        if (constMap != null && !constMap.isEmpty()) {
            for (Map.Entry<String, String> entry : constMap.entrySet()) {
                recordData.setField(entry.getKey(), BucketPath.escapeString(entry.getValue(),
                        timestamp.getTime(), tableMapping.getConstColumnMappings()));
            }
        }

        for (int i = 0; i < columns.size(); i++) {

            String columnName = record.op.getTableMeta().getColumnName(i).toLowerCase();
            ColumnMapping columnMapping = tableMapping.getColumnMappings().get(columnName);
            if (columnMapping == null) {
                continue;
            }

            DsColumn dsColumn = columns.get(i);
            String afterValue = dsColumn.hasAfterValue() ? dsColumn.getAfterValue() : null;
            String beforeValue = dsColumn.hasBeforeValue() ? dsColumn.getBeforeValue() : null;

            String dest = columnMapping.getDest();
            String hashVal = null;
            if (StringUtils.isNotBlank(dest)) {
                hashVal = afterValue;
                if (columnMapping.isKeyColumn()) {
                    if (dsColumn.hasAfterValue()) {
                        setTupleData(recordData, recordSchema.getField(dest), afterValue,
                                columnMapping.isDateFormat(), columnMapping.getSimpleDateFormat());
                    } else {
                        setTupleData(recordData, recordSchema.getField(dest), beforeValue,
                                columnMapping.isDateFormat(), columnMapping.getSimpleDateFormat());
                        hashVal = beforeValue;
                    }
                } else {
                    setTupleData(recordData, recordSchema.getField(dest), afterValue,
                            columnMapping.isDateFormat(), columnMapping.getSimpleDateFormat());
                }
            }

            String destOld = columnMapping.getDestOld();
            if (StringUtils.isNotBlank(destOld)) {
                hashVal = hashVal != null ? hashVal : beforeValue;
                setTupleData(recordData, recordSchema.getField(destOld), beforeValue,
                        columnMapping.isDateFormat(), columnMapping.getSimpleDateFormat());
            }

            if (columnMapping.isShardColumn() && hashVal != null) {
                hashString.append(hashVal);
            }
        }

        if (hashString.length() > 0) {
            recordEntry.setPartitionKey(hashString.toString());
        }
        recordEntry.setRecordData(recordData);
    }