in client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java [163:258]
private void update(MappingConfig config, Dml dml) {
List<Map<String, Object>> data = dml.getData();
List<Map<String, Object>> old = dml.getOld();
if (old == null || old.isEmpty() || data == null || data.isEmpty()) {
return;
}
MappingConfig.HbaseMapping hbaseMapping = config.getHbaseMapping();
// if (!validHTable(config)) {
// logger.error("HBase table '{}' not exists",
// hbaseMapping.getHbaseTable());
// return;
// }
MappingConfig.ColumnItem rowKeyColumn = hbaseMapping.getRowKeyColumn();
int index = 0;
int i = 1;
boolean complete = false;
List<HRow> rows = new ArrayList<>();
out: for (Map<String, Object> r : data) {
byte[] rowKeyBytes;
if (hbaseMapping.getRowKey() != null) {
String[] rowKeyColumns = hbaseMapping.getRowKey().trim().split(",");
// 判断是否有复合主键修改
for (String updateColumn : old.get(index).keySet()) {
for (String rowKeyColumnName : rowKeyColumns) {
if (rowKeyColumnName.equalsIgnoreCase(updateColumn)) {
// 调用删除插入操作
deleteAndInsert(config, dml);
continue out;
}
}
}
String rowKeyVale = getRowKeys(rowKeyColumns, r);
rowKeyBytes = Bytes.toBytes(rowKeyVale);
} else if (rowKeyColumn == null) {
rowKeyBytes = typeConvert(null, hbaseMapping, r.values().iterator().next());
} else {
rowKeyBytes = getRowKeyBytes(hbaseMapping, rowKeyColumn, r);
}
if (rowKeyBytes == null) throw new RuntimeException("rowKey值为空");
Map<String, MappingConfig.ColumnItem> columnItems = hbaseMapping.getColumnItems();
HRow hRow = new HRow(rowKeyBytes);
for (String updateColumn : old.get(index).keySet()) {
if (hbaseMapping.getExcludeColumns() != null
&& hbaseMapping.getExcludeColumns().contains(updateColumn)) {
continue;
}
MappingConfig.ColumnItem columnItem = columnItems.get(updateColumn);
if (columnItem == null) {
String family = hbaseMapping.getFamily();
String qualifier = updateColumn;
if (hbaseMapping.isUppercaseQualifier()) {
qualifier = qualifier.toUpperCase();
}
Object newVal = r.get(updateColumn);
if (newVal == null) {
hRow.addCell(family, qualifier, null);
} else {
hRow.addCell(family, qualifier, typeConvert(null, hbaseMapping, newVal));
}
} else {
// 排除修改id的情况
if (columnItem.isRowKey()) continue;
Object newVal = r.get(updateColumn);
if (newVal == null) {
hRow.addCell(columnItem.getFamily(), columnItem.getQualifier(), null);
} else {
hRow.addCell(columnItem.getFamily(),
columnItem.getQualifier(),
typeConvert(columnItem, hbaseMapping, newVal));
}
}
}
rows.add(hRow);
complete = false;
if (i % config.getHbaseMapping().getCommitBatch() == 0 && !rows.isEmpty()) {
hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
rows.clear();
complete = true;
}
i++;
index++;
}
if (!complete && !rows.isEmpty()) {
hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
}
}