private void update()

in client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java [163:258]


    private void update(MappingConfig config, Dml dml) {
        List<Map<String, Object>> data = dml.getData();
        List<Map<String, Object>> old = dml.getOld();
        if (old == null || old.isEmpty() || data == null || data.isEmpty()) {
            return;
        }

        MappingConfig.HbaseMapping hbaseMapping = config.getHbaseMapping();

        // if (!validHTable(config)) {
        // logger.error("HBase table '{}' not exists",
        // hbaseMapping.getHbaseTable());
        // return;
        // }

        MappingConfig.ColumnItem rowKeyColumn = hbaseMapping.getRowKeyColumn();
        int index = 0;
        int i = 1;
        boolean complete = false;
        List<HRow> rows = new ArrayList<>();
        out: for (Map<String, Object> r : data) {
            byte[] rowKeyBytes;

            if (hbaseMapping.getRowKey() != null) {
                String[] rowKeyColumns = hbaseMapping.getRowKey().trim().split(",");

                // 判断是否有复合主键修改
                for (String updateColumn : old.get(index).keySet()) {
                    for (String rowKeyColumnName : rowKeyColumns) {
                        if (rowKeyColumnName.equalsIgnoreCase(updateColumn)) {
                            // 调用删除插入操作
                            deleteAndInsert(config, dml);
                            continue out;
                        }
                    }
                }

                String rowKeyVale = getRowKeys(rowKeyColumns, r);
                rowKeyBytes = Bytes.toBytes(rowKeyVale);
            } else if (rowKeyColumn == null) {
                rowKeyBytes = typeConvert(null, hbaseMapping, r.values().iterator().next());
            } else {
                rowKeyBytes = getRowKeyBytes(hbaseMapping, rowKeyColumn, r);
            }
            if (rowKeyBytes == null) throw new RuntimeException("rowKey值为空");

            Map<String, MappingConfig.ColumnItem> columnItems = hbaseMapping.getColumnItems();
            HRow hRow = new HRow(rowKeyBytes);
            for (String updateColumn : old.get(index).keySet()) {
                if (hbaseMapping.getExcludeColumns() != null
                    && hbaseMapping.getExcludeColumns().contains(updateColumn)) {
                    continue;
                }
                MappingConfig.ColumnItem columnItem = columnItems.get(updateColumn);
                if (columnItem == null) {
                    String family = hbaseMapping.getFamily();
                    String qualifier = updateColumn;
                    if (hbaseMapping.isUppercaseQualifier()) {
                        qualifier = qualifier.toUpperCase();
                    }

                    Object newVal = r.get(updateColumn);

                    if (newVal == null) {
                        hRow.addCell(family, qualifier, null);
                    } else {
                        hRow.addCell(family, qualifier, typeConvert(null, hbaseMapping, newVal));
                    }
                } else {
                    // 排除修改id的情况
                    if (columnItem.isRowKey()) continue;

                    Object newVal = r.get(updateColumn);
                    if (newVal == null) {
                        hRow.addCell(columnItem.getFamily(), columnItem.getQualifier(), null);
                    } else {
                        hRow.addCell(columnItem.getFamily(),
                            columnItem.getQualifier(),
                            typeConvert(columnItem, hbaseMapping, newVal));
                    }
                }
            }
            rows.add(hRow);
            complete = false;
            if (i % config.getHbaseMapping().getCommitBatch() == 0 && !rows.isEmpty()) {
                hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
                rows.clear();
                complete = true;
            }
            i++;
            index++;
        }
        if (!complete && !rows.isEmpty()) {
            hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
        }
    }