in tablestore/src/main/java/com/alicloud/openservices/tablestore/core/protocol/PlainBufferConversion.java [163:220]
public static StreamRecord toStreamRecord(PlainBufferRow pbRow, PlainBufferRow pbOriginRow, OtsInternalApi.ActionType actionType, boolean parseInTimeseriesDataFormat)
throws IOException {
StreamRecord record = new StreamRecord();
switch (actionType) {
case PUT_ROW:
record.setRecordType(StreamRecord.RecordType.PUT);
break;
case UPDATE_ROW:
record.setRecordType(StreamRecord.RecordType.UPDATE);
break;
case DELETE_ROW:
record.setRecordType(StreamRecord.RecordType.DELETE);
break;
default:
throw new IOException("Unknown stream record action type:" + actionType.name());
}
if (parseInTimeseriesDataFormat) {
record.setPrimaryKey(toTimeseriesPrimaryKey(pbRow.getPrimaryKey()));
List<RecordColumn> columns = new ArrayList<RecordColumn>();
for (PlainBufferCell cell : pbRow.getCells()) {
columns.add(toTimeseriesRecordColumn(cell));
}
record.setColumns(columns);
if (pbOriginRow != null) {
List<RecordColumn> originColumns = new ArrayList<RecordColumn>();
for (PlainBufferCell cell : pbOriginRow.getCells()) {
originColumns.add(toTimeseriesRecordColumn(cell));
}
record.setOriginColumns(originColumns);
}
} else {
record.setPrimaryKey(toPrimaryKey(pbRow.getPrimaryKey()));
List<RecordColumn> columns = new ArrayList<RecordColumn>();
for (PlainBufferCell cell : pbRow.getCells()) {
columns.add(toRecordColumn(cell));
}
record.setColumns(columns);
if (pbOriginRow != null) {
List<RecordColumn> originColumns = new ArrayList<RecordColumn>();
for (PlainBufferCell cell : pbOriginRow.getCells()) {
originColumns.add(toRecordColumn(cell));
}
record.setOriginColumns(originColumns);
}
}
int epoch = pbRow.getExtension().getSequenceInfo().getEpoch();
long ts = pbRow.getExtension().getSequenceInfo().getTimestamp();
int rowIndex = pbRow.getExtension().getSequenceInfo().getRowIndex();
RecordSequenceInfo seq = new RecordSequenceInfo(epoch, ts, rowIndex);
record.setSequenceInfo(seq);
return record;
}