protected boolean executeSqlImport()

in client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java [102:274]


    protected boolean executeSqlImport(DataSource ds, String sql, List<Object> values,
                                       AdapterConfig.AdapterMapping mapping, AtomicLong impCount, List<String> errMsg) {
        MappingConfig.HbaseMapping hbaseMapping = (MappingConfig.HbaseMapping) mapping;
        try {
            Util.sqlRS(ds, sql, values, rs -> {
                int i = 1;

                try {
                    boolean complete = false;
                    List<HRow> rows = new ArrayList<>();
                    String[] rowKeyColumns = null;
                    if (hbaseMapping.getRowKey() != null) {
                        rowKeyColumns = hbaseMapping.getRowKey().trim().split(",");
                    }
                    while (rs.next()) {
                        int cc = rs.getMetaData().getColumnCount();
                        int[] jdbcTypes = new int[cc];
                        Class<?>[] classes = new Class[cc];
                        for (int j = 1; j <= cc; j++) {
                            int jdbcType = rs.getMetaData().getColumnType(j);
                            jdbcTypes[j - 1] = jdbcType;
                            classes[j - 1] = JdbcTypeUtil.jdbcType2javaType(jdbcType);
                        }
                        HRow row = new HRow();

                        if (rowKeyColumns != null) {
                            // 取rowKey字段拼接
                StringBuilder rowKeyVale = new StringBuilder();
                for (String rowKeyColumnName : rowKeyColumns) {
                    Object obj = rs.getObject(rowKeyColumnName);
                    if (obj != null) {
                        rowKeyVale.append(obj.toString());
                    }
                    rowKeyVale.append("|");
                }
                int len = rowKeyVale.length();
                if (len > 0) {
                    rowKeyVale.delete(len - 1, len);
                }
                row.setRowKey(Bytes.toBytes(rowKeyVale.toString()));
            }

            for (int j = 1; j <= cc; j++) {
                String columnName = rs.getMetaData().getColumnName(j);

                Object val = JdbcTypeUtil.getRSData(rs, columnName, jdbcTypes[j - 1]);
                if (val == null) {
                    continue;
                }

                MappingConfig.ColumnItem columnItem = hbaseMapping.getColumnItems().get(columnName);
                // 没有配置映射
                if (columnItem == null) {
                    String family = hbaseMapping.getFamily();
                    String qualifile = columnName;
                    if (hbaseMapping.isUppercaseQualifier()) {
                        qualifile = qualifile.toUpperCase();
                    }
                    if (MappingConfig.Mode.STRING == hbaseMapping.getMode()) {
                        if (hbaseMapping.getRowKey() == null && j == 1) {
                            row.setRowKey(Bytes.toBytes(val.toString()));
                        } else {
                            row.addCell(family, qualifile, Bytes.toBytes(val.toString()));
                        }
                    } else if (MappingConfig.Mode.NATIVE == hbaseMapping.getMode()) {
                        Type type = Type.getType(classes[j - 1]);
                        if (hbaseMapping.getRowKey() == null && j == 1) {
                            row.setRowKey(TypeUtil.toBytes(val, type));
                        } else {
                            row.addCell(family, qualifile, TypeUtil.toBytes(val, type));
                        }
                    } else if (MappingConfig.Mode.PHOENIX == hbaseMapping.getMode()) {
                        PhType phType = PhType.getType(classes[j - 1]);
                        if (hbaseMapping.getRowKey() == null && j == 1) {
                            row.setRowKey(PhTypeUtil.toBytes(val, phType));
                        } else {
                            row.addCell(family, qualifile, PhTypeUtil.toBytes(val, phType));
                        }
                    }
                } else {
                    // 如果不需要类型转换
                    if (columnItem.getType() == null || "".equals(columnItem.getType())) {
                        if (val instanceof java.sql.Date) {
                            SimpleDateFormat dateFmt = new SimpleDateFormat("yyyy-MM-dd");
                            val = dateFmt.format((Date) val);
                        } else if (val instanceof Timestamp) {
                            SimpleDateFormat datetimeFmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                            val = datetimeFmt.format((Date) val);
                        }

                        byte[] valBytes = Bytes.toBytes(val.toString());
                        if (columnItem.isRowKey()) {
                            if (columnItem.getRowKeyLen() != null) {
                                valBytes = Bytes.toBytes(limitLenNum(columnItem.getRowKeyLen(), val));
                                row.setRowKey(valBytes);
                            } else {
                                row.setRowKey(valBytes);
                            }
                        } else {
                            row.addCell(columnItem.getFamily(), columnItem.getQualifier(), valBytes);
                        }
                    } else {
                        if (MappingConfig.Mode.STRING == hbaseMapping.getMode()) {
                            byte[] valBytes = Bytes.toBytes(val.toString());
                            if (columnItem.isRowKey()) {
                                if (columnItem.getRowKeyLen() != null) {
                                    valBytes = Bytes.toBytes(limitLenNum(columnItem.getRowKeyLen(), val));
                                }
                                row.setRowKey(valBytes);
                            } else {
                                row.addCell(columnItem.getFamily(), columnItem.getQualifier(), valBytes);
                            }
                        } else if (MappingConfig.Mode.NATIVE == hbaseMapping.getMode()) {
                            Type type = Type.getType(columnItem.getType());
                            if (columnItem.isRowKey()) {
                                if (columnItem.getRowKeyLen() != null) {
                                    String v = limitLenNum(columnItem.getRowKeyLen(), val);
                                    row.setRowKey(Bytes.toBytes(v));
                                } else {
                                    row.setRowKey(TypeUtil.toBytes(val, type));
                                }
                            } else {
                                row.addCell(columnItem.getFamily(),
                                    columnItem.getQualifier(),
                                    TypeUtil.toBytes(val, type));
                            }
                        } else if (MappingConfig.Mode.PHOENIX == hbaseMapping.getMode()) {
                            PhType phType = PhType.getType(columnItem.getType());
                            if (columnItem.isRowKey()) {
                                row.setRowKey(PhTypeUtil.toBytes(val, phType));
                            } else {
                                row.addCell(columnItem.getFamily(),
                                    columnItem.getQualifier(),
                                    PhTypeUtil.toBytes(val, phType));
                            }
                        }
                    }
                }
            }

            if (row.getRowKey() == null) throw new RuntimeException("RowKey 值为空");

            rows.add(row);
            complete = false;
            if (i % hbaseMapping.getCommitBatch() == 0 && !rows.isEmpty()) {
                hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
                rows.clear();
                complete = true;
            }
            i++;
            impCount.incrementAndGet();
            if (logger.isDebugEnabled()) {
                logger.debug("successful import count:" + impCount.get());
            }
        }

        if (!complete && !rows.isEmpty()) {
            hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
        }

    } catch (Exception e) {
        logger.error(hbaseMapping.getHbaseTable() + " etl failed! ==>" + e.getMessage(), e);
        errMsg.add(hbaseMapping.getHbaseTable() + " etl failed! ==>" + e.getMessage());
        // throw new RuntimeException(e);
    }
    return i;
}           );
            return true;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return false;
        }
    }