protected boolean executeSqlImport()

in client-adapter/es6x/src/main/java/com/alibaba/otter/canal/client/adapter/es6x/etl/ESEtlService.java [59:199]


    protected boolean executeSqlImport(DataSource ds, String sql, List<Object> values,
                                       AdapterConfig.AdapterMapping adapterMapping, AtomicLong impCount,
                                       List<String> errMsg) {
        try {
            ESMapping mapping = (ESMapping) adapterMapping;
            Util.sqlRS(ds, sql, values, rs -> {
                int count = 0;
                try {
                    ESBulkRequest esBulkRequest = this.esConnection.new ES6xBulkRequest();

                    long batchBegin = System.currentTimeMillis();
                    while (rs.next()) {
                        Map<String, Object> esFieldData = new LinkedHashMap<>();
                        Object idVal = null;
                        for (FieldItem fieldItem : mapping.getSchemaItem().getSelectFields().values()) {

                            String fieldName = fieldItem.getFieldName();
                            if (mapping.getSkips().contains(fieldName)) {
                                continue;
                            }

                            // 如果是主键字段则不插入
                if (fieldItem.getFieldName().equals(mapping.getId())) {
                    idVal = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
                } else {
                    Object val = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
                    esFieldData.put(Util.cleanColumn(fieldName), val);
                }

            }

            if (!mapping.getRelations().isEmpty()) {
                mapping.getRelations().forEach((relationField, relationMapping) -> {
                    Map<String, Object> relations = new HashMap<>();
                    relations.put("name", relationMapping.getName());
                    if (StringUtils.isNotEmpty(relationMapping.getParent())) {
                        FieldItem parentFieldItem = mapping.getSchemaItem()
                            .getSelectFields()
                            .get(relationMapping.getParent());
                        Object parentVal;
                        try {
                            parentVal = esTemplate.getValFromRS(mapping,
                                rs,
                                parentFieldItem.getFieldName(),
                                parentFieldItem.getFieldName());
                        } catch (SQLException e) {
                            throw new RuntimeException(e);
                        }
                        if (parentVal != null) {
                            relations.put("parent", parentVal.toString());
                            esFieldData.put("$parent_routing", parentVal.toString());

                        }
                    }
                    esFieldData.put(Util.cleanColumn(relationField), relations);
                });
            }

            if (idVal != null) {
                String parentVal = (String) esFieldData.remove("$parent_routing");
                if (mapping.isUpsert()) {
                    ESUpdateRequest esUpdateRequest = this.esConnection.new ES6xUpdateRequest(mapping.getIndex(),
                        mapping.getType(),
                        idVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);

                    if (StringUtils.isNotEmpty(parentVal)) {
                        esUpdateRequest.setRouting(parentVal);
                    }

                    esBulkRequest.add(esUpdateRequest);
                } else {
                    ESIndexRequest esIndexRequest = this.esConnection.new ES6xIndexRequest(mapping.getIndex(),
                        mapping.getType(),
                        idVal.toString()).setSource(esFieldData);
                    if (StringUtils.isNotEmpty(parentVal)) {
                        esIndexRequest.setRouting(parentVal);
                    }
                    esBulkRequest.add(esIndexRequest);
                }
            } else {
                idVal = esFieldData.get(mapping.getPk());
                ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.getIndex(),
                    mapping.getType()).setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal)).size(10000);
                SearchResponse response = esSearchRequest.getResponse();
                for (SearchHit hit : response.getHits()) {
                    ESUpdateRequest esUpdateRequest = this.esConnection.new ES6xUpdateRequest(mapping.getIndex(),
                        mapping.getType(),
                        hit.getId()).setDoc(esFieldData);
                    esBulkRequest.add(esUpdateRequest);
                }
            }

            if (esBulkRequest.numberOfActions() % mapping.getCommitBatch() == 0 && esBulkRequest.numberOfActions() > 0) {
                long esBatchBegin = System.currentTimeMillis();
                ESBulkResponse rp = esBulkRequest.bulk();
                if (rp.hasFailures()) {
                    rp.processFailBulkResponse("全量数据 etl 异常 ");
                }

                if (logger.isTraceEnabled()) {
                    logger.trace("全量数据批量导入批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
                        (System.currentTimeMillis() - batchBegin),
                        (System.currentTimeMillis() - esBatchBegin),
                        esBulkRequest.numberOfActions(),
                        mapping.getIndex());
                }
                batchBegin = System.currentTimeMillis();
                esBulkRequest.resetBulk();
            }
            count++;
            impCount.incrementAndGet();
        }

        if (esBulkRequest.numberOfActions() > 0) {
            long esBatchBegin = System.currentTimeMillis();
            ESBulkResponse rp = esBulkRequest.bulk();
            if (rp.hasFailures()) {
                rp.processFailBulkResponse("全量数据 etl 异常 ");
            }
            if (logger.isTraceEnabled()) {
                logger.trace("全量数据批量导入最后批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
                    (System.currentTimeMillis() - batchBegin),
                    (System.currentTimeMillis() - esBatchBegin),
                    esBulkRequest.numberOfActions(),
                    mapping.getIndex());
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
        errMsg.add(mapping.getIndex() + " etl failed! ==>" + e.getMessage());
        throw new RuntimeException(e);
    }
    return count;
}           );

            return true;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return false;
        }
    }