in client-adapter/escore/src/main/java/com/alibaba/otter/canal/client/adapter/es/core/service/ESSyncService.java [695:799]
private void wholeSqlOperation(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old,
TableItem tableItem) {
ESMapping mapping = config.getEsMapping();
// 防止最后出现groupby 导致sql解析异常
String[] sqlSplit = mapping.getSql().split("GROUP\\ BY(?!(.*)ON)");
String sqlNoWhere = sqlSplit[0];
String sqlGroupBy = "";
if (sqlSplit.length > 1) {
sqlGroupBy = "GROUP BY " + sqlSplit[1];
}
StringBuilder sql = new StringBuilder(sqlNoWhere + " WHERE ");
for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
String columnName = fkFieldItem.getColumn().getColumnName();
Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
ESSyncUtil.appendCondition(sql, value, tableItem.getAlias(), columnName);
}
int len = sql.length();
sql.delete(len - 5, len);
sql.append(sqlGroupBy);
DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
if (logger.isTraceEnabled()) {
logger.trace("Join table update es index by query whole sql, destination:{}, table: {}, index: {}, sql: {}",
config.getDestination(),
dml.getTable(),
mapping.getIndex(),
sql.toString().replace("\n", " "));
}
Util.sqlRS(ds, sql.toString(), rs -> {
try {
while (rs.next()) {
Map<String, Object> esFieldData = new LinkedHashMap<>();
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
if (old != null) {
// 从表子查询
out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {
for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {
if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))
for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
if (old.containsKey(columnItem.getColumnName())) {
Object val = esTemplate.getValFromRS(mapping,
rs,
fieldItem.getFieldName(),
fieldItem.getFieldName());
esFieldData.put(fieldItem.getFieldName(), val);
break out;
}
}
}
}
// 从表非子查询
for (FieldItem fieldItem1 : tableItem.getRelationSelectFieldItems()) {
if (fieldItem1.equals(fieldItem)) {
for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
if (old.containsKey(columnItem.getColumnName())) {
Object val = esTemplate.getValFromRS(mapping,
rs,
fieldItem.getFieldName(),
fieldItem.getFieldName());
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
break;
}
}
}
}
} else {
Object val = esTemplate
.getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
}
}
Map<String, Object> paramsTmp = new LinkedHashMap<>();
for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
for (FieldItem fieldItem : entry.getValue()) {
Object value = esTemplate
.getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
String fieldName = fieldItem.getFieldName();
// 判断是否是主键
if (fieldName.equals(mapping.getId())) {
fieldName = "_id";
}
paramsTmp.put(fieldName, value);
}
}
if (logger.isDebugEnabled()) {
logger.trace(
"Join table update es index by query whole sql, destination:{}, table: {}, index: {}",
config.getDestination(),
dml.getTable(),
mapping.getIndex());
}
esTemplate.updateByQuery(config, paramsTmp, esFieldData);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return 0;
});
}