private void doBatchInsert()

in elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchWriter.java [673:1058]


        private void doBatchInsert(final List<Record> writerBuffer) {
            Map<String, Object> data = null;
            Bulk.Builder bulkactionTmp = null;
            int totalNumber = writerBuffer.size();
            int dirtyDataNumber = 0;
            if (this.isGreaterOrEqualThan7) {
                bulkactionTmp = new Bulk.Builder().defaultIndex(this.index);
            } else {
                bulkactionTmp = new Bulk.Builder().defaultIndex(this.index).defaultType(this.type);
            }
            final Bulk.Builder bulkaction = bulkactionTmp;
            // 增加url的参数
            for (Map.Entry<String, Object> entry : urlParams.entrySet()) {
                bulkaction.setParameter(entry.getKey(), entry.getValue());
            }
            for (Record record : writerBuffer) {
                data = new HashMap<String, Object>();
                String id = null;
                String parent = null;
                String routing = null;
                String version = null;
                String columnName = null;
                Column column = null;
                try {
                    for (int i = 0; i < record.getColumnNumber(); i++) {
                        column = record.getColumn(i);
                        columnName = columnList.get(i).getName();
                        // 如果组合id不等于null,需要把相关的字段全部忽略
                        if (combinedIdColumn != null) {
                            if (combinedIdColumn.getCombineFields().contains(columnName)) {
                                continue;
                            }
                        }
                        //如果是json数组,当成对象类型处理
                        ElasticSearchFieldType columnType = columnList.get(i).isJsonArray() ? ElasticSearchFieldType.NESTED : typeList.get(i);

                        Boolean dstArray = columnList.get(i).isDstArray();

                        //如果是数组类型,那它传入的是字符串类型,也有可能是null
                        if (columnList.get(i).isArray() && null != column.asString()) {
                            String[] dataList = column.asString().split(splitter);
                            if (!columnType.equals(ElasticSearchFieldType.DATE)) {
                                if (dstArray) {
                                    try {
                                        // 根据客户配置的类型,转换成相应的类型
                                        switch (columnType) {
                                            case BYTE:
                                            case KEYWORD:
                                            case TEXT:
                                                data.put(columnName, dataList);
                                                break;
                                            case SHORT:
                                            case INTEGER:
                                                if (StringUtils.isBlank(column.asString().trim())) {
                                                    data.put(columnName, null);
                                                } else {
                                                    Integer[] intDataList = new Integer[dataList.length];
                                                    for (int j = 0; j < dataList.length; j++) {
                                                        dataList[j] = dataList[j].trim();
                                                        if (StringUtils.isNotBlank(dataList[j])) {
                                                            intDataList[j] = Integer.valueOf(dataList[j]);
                                                        }
                                                    }
                                                    data.put(columnName, intDataList);
                                                }
                                                break;
                                            case LONG:
                                                if (StringUtils.isBlank(column.asString().trim())) {
                                                    data.put(columnName, null);
                                                } else {
                                                    Long[] longDataList = new Long[dataList.length];
                                                    for (int j = 0; j < dataList.length; j++) {
                                                        dataList[j] = dataList[j].trim();
                                                        if (StringUtils.isNotBlank(dataList[j])) {
                                                            longDataList[j] = Long.valueOf(dataList[j]);
                                                        }
                                                    }
                                                    data.put(columnName, longDataList);
                                                }
                                                break;
                                            case FLOAT:
                                            case DOUBLE:
                                                if (StringUtils.isBlank(column.asString().trim())) {
                                                    data.put(columnName, null);
                                                } else {
                                                    Double[] doubleDataList = new Double[dataList.length];
                                                    for (int j = 0; j < dataList.length; j++) {
                                                        dataList[j] = dataList[j].trim();
                                                        if (StringUtils.isNotBlank(dataList[j])) {
                                                            doubleDataList[j] = Double.valueOf(dataList[j]);
                                                        }
                                                    }
                                                    data.put(columnName, doubleDataList);
                                                }
                                                break;
                                            default:
                                                data.put(columnName, dataList);
                                                break;
                                        }
                                    } catch (Exception e) {
                                        LOGGER.info("脏数据,记录:{}", record.toString());
                                        continue;
                                    }
                                } else {
                                    data.put(columnName, dataList);
                                }
                            } else {
                                data.put(columnName, dataList);
                            }
                        } else {
                            // LOGGER.info("columnType: {} integer: {}", columnType, column.asString());
                            switch (columnType) {
                                case ID:
                                    if (id != null) {
                                        id += record.getColumn(i).asString();
                                    } else {
                                        id = record.getColumn(i).asString();
                                    }
                                    break;
                                case PARENT:
                                    if (parent != null) {
                                        parent += record.getColumn(i).asString();
                                    } else {
                                        parent = record.getColumn(i).asString();
                                    }
                                    break;
                                case ROUTING:
                                    if (routing != null) {
                                        routing += record.getColumn(i).asString();
                                    } else {
                                        routing = record.getColumn(i).asString();
                                    }
                                    break;

                                case VERSION:
                                    if (version != null) {
                                        version += record.getColumn(i).asString();
                                    } else {
                                        version = record.getColumn(i).asString();
                                    }
                                    break;
                                case DATE:
                                    String dateStr = getDateStr(columnList.get(i), column);
                                    data.put(columnName, dateStr);
                                    break;
                                case KEYWORD:
                                case STRING:
                                case TEXT:
                                case IP:
                                case GEO_POINT:
                                case IP_RANGE:
                                    data.put(columnName, column.asString());
                                    break;
                                case BOOLEAN:
                                    data.put(columnName, column.asBoolean());
                                    break;
                                case BYTE:
                                case BINARY:
                                    // json序列化不支持byte类型,es支持的binary类型,必须传入base64的格式
                                    data.put(columnName, column.asString());
                                    break;
                                case LONG:
                                    data.put(columnName, column.asLong());
                                    break;
                                case INTEGER:
                                    data.put(columnName, column.asLong());
                                    break;
                                case SHORT:
                                    data.put(columnName, column.asLong());
                                    break;
                                case FLOAT:
                                case DOUBLE:
                                    data.put(columnName, column.asDouble());
                                    break;
                                case GEO_SHAPE:
                                case DATE_RANGE:
                                case INTEGER_RANGE:
                                case FLOAT_RANGE:
                                case LONG_RANGE:
                                case DOUBLE_RANGE:
                                    if (null == column.asString()) {
                                        data.put(columnName, column.asString());
                                    } else {
                                        data.put(columnName, JSON.parse(column.asString()));
                                    }
                                    break;
                                case NESTED:
                                case OBJECT:
                                    if (null == column.asString()) {
                                        data.put(columnName, column.asString());
                                    } else {
                                        // 转json格式
                                        data.put(columnName, JSON.parse(column.asString()));
                                    }
                                    break;
                                default:
                                throw DataXException.asDataXException(ElasticSearchWriterErrorCode.BAD_CONFIG_VALUE, String.format(
                                        "Type error: unsupported type %s for column %s", columnType, columnName));
                            }
                        }
                    }
                    
                    
                    if (this.hasPrimaryKeyInfo) {
                        List<String> idData = new ArrayList<String>();
                        for (String eachCol : this.primaryKeyInfo.getColumn()) {
                            Column recordColumn = record.getColumn(this.colNameToIndexMap.get(eachCol));
                            idData.add(recordColumn.asString());
                        }
                        id = StringUtils.join(idData, this.primaryKeyInfo.getFieldDelimiter());
                    }
                    if (this.hasEsPartitionColumn) {
                        List<String> idData = new ArrayList<String>();
                        for (PartitionColumn eachCol : this.esPartitionColumn) {
                            Column recordColumn = record.getColumn(this.colNameToIndexMap.get(eachCol.getName()));
                            idData.add(recordColumn.asString());
                        }
                        routing = StringUtils.join(idData, "");
                    }
                } catch (Exception e) {
                    // 脏数据
                    super.getTaskPluginCollector().collectDirtyRecord(record,
                            String.format("parse error for column: %s errorMessage: %s", columnName, e.getMessage()));
                    dirtyDataNumber++;
                    // 处理下一个record
                    continue;
                }
                
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("id: {} routing: {} data: {}", id, routing, JSON.toJSONString(data));
                }
                

                if (isDeleteRecord(record)) {
                    Delete.Builder builder = new Delete.Builder(id);
                    bulkaction.addAction(builder.build());
                } else {
                    // 使用用户自定义组合唯一键
                    if (combinedIdColumn != null) {
                        try {
                            id = processIDCombineFields(record, combinedIdColumn);
                            // LOGGER.debug("id: {}", id);
                        } catch (Exception e) {
                            // 脏数据
                            super.getTaskPluginCollector().collectDirtyRecord(record,
                                    String.format("parse error for column: %s errorMessage: %s", columnName, e.getMessage()));
                            // 处理下一个record
                            dirtyDataNumber++;
                            continue;
                        }
                    }
                    switch (actionType) {
                        case INDEX:
                            // 先进行json序列化,jest client的gson序列化会把等号按照html序列化
                            Index.Builder builder = null;
                            if (this.enableWriteNull) {
                                builder = new Index.Builder(
                                        JSONObject.toJSONString(data, JSONWriter.Feature.WriteMapNullValue,
                                                JSONWriter.Feature.WriteEnumUsingToString));
                            } else {
                                builder = new Index.Builder(JSONObject.toJSONString(data));
                            }
                            if (id != null) {
                                builder.id(id);
                            }
                            if (parent != null) {
                                builder.setParameter(Parameters.PARENT, parent);
                            }
                            if (routing != null) {
                                builder.setParameter(Parameters.ROUTING, routing);
                            }
                            if (version != null) {
                                builder.setParameter(Parameters.VERSION, version);
                                builder.setParameter(Parameters.VERSION_TYPE, "external");
                            }
                            bulkaction.addAction(builder.build());
                            break;
                        case UPDATE:
                            // doc: https://www.cnblogs.com/crystaltu/articles/6992935.html
                            // doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
                            Map<String, Object> updateDoc = new HashMap<String, Object>();
                            updateDoc.put("doc", data);
                            updateDoc.put("doc_as_upsert", true);
                            Update.Builder update = null;
                            if (this.enableWriteNull) {
                                // write: {a:"1",b:null}
                            update = new Update.Builder(
                                    JSONObject.toJSONString(updateDoc, JSONWriter.Feature.WriteMapNullValue,
                                            JSONWriter.Feature.WriteEnumUsingToString));
                            // 在DEFAULT_GENERATE_FEATURE基础上,只增加了SerializerFeature.WRITE_MAP_NULL_FEATURES
                            } else {
                                // write: {"a":"1"}
                                update = new Update.Builder(JSONObject.toJSONString(updateDoc));
                            }
                            if (id != null) {
                                update.id(id);
                            }
                            if (parent != null) {
                                update.setParameter(Parameters.PARENT, parent);
                            }
                            if (routing != null) {
                                update.setParameter(Parameters.ROUTING, routing);
                            }
                            // version type [EXTERNAL] is not supported by the update API
                            if (version != null) {
                                update.setParameter(Parameters.VERSION, version);
                            }
                            bulkaction.addAction(update.build());
                            break;
                        default:
                            break;
                    }
                }
            }
            
            if (dirtyDataNumber >= totalNumber) {
                // all batch is dirty data
                LOGGER.warn("all this batch is dirty data, dirtyDataNumber: {} totalDataNumber: {}", dirtyDataNumber,
                        totalNumber);
                return;
            }
            
            BulkResult bulkResult = null;
            try {
                bulkResult = RetryUtil.executeWithRetry(new Callable<BulkResult>() {
                    @Override
                    public BulkResult call() throws Exception {
                        JestResult jestResult = esClient.bulkInsert(bulkaction);
                        if (jestResult.isSucceeded()) {
                            return null;
                        }
                        String msg = String.format("response code: [%d] error :[%s]", jestResult.getResponseCode(),
                                jestResult.getErrorMessage());
                        LOGGER.warn(msg);
                        if (esClient.isBulkResult(jestResult)) {
                            BulkResult brst = (BulkResult) jestResult;
                            List<BulkResult.BulkResultItem> failedItems = brst.getFailedItems();
                            for (BulkResult.BulkResultItem item : failedItems) {
                                if (item.status != 400) {
                                    // 400 BAD_REQUEST 如果非数据异常,请求异常,则不允许忽略
                                    throw DataXException.asDataXException(ElasticSearchWriterErrorCode.ES_INDEX_INSERT,
                                            String.format("status:[%d], error: %s", item.status, item.error));
                                } else {
                                    // 如果用户选择不忽略解析错误,则抛异常,默认为忽略
                                    if (!Key.isIgnoreParseError(conf)) {
                                        throw new NoReRunException(ElasticSearchWriterErrorCode.ES_INDEX_INSERT,
                                                String.format(
                                                        "status:[%d], error: %s, config not ignoreParseError so throw this error",
                                                        item.status, item.error));
                                    }
                                }
                            }
                            return brst;
                        } else {
                            Integer status = esClient.getStatus(jestResult);
                            switch (status) {
                            case 429: // TOO_MANY_REQUESTS
                                LOGGER.warn("server response too many requests, so auto reduce speed");
                                break;
                            default:
                                break;
                            }
                            throw DataXException.asDataXException(ElasticSearchWriterErrorCode.ES_INDEX_INSERT,
                                    jestResult.getErrorMessage());
                        }
                    }
                }, this.trySize, this.tryInterval, false, Arrays.asList(DataXException.class));
            } catch (Exception e) {
                if (Key.isIgnoreWriteError(this.conf)) {
                    LOGGER.warn(String.format("Retry [%d] write failed, ignore the error, continue to write!", trySize));
                } else {
                    throw DataXException.asDataXException(ElasticSearchWriterErrorCode.ES_INDEX_INSERT, e.getMessage(), e);
                }
            }
            
            if (null != bulkResult) {
                List<BulkResult.BulkResultItem> items = bulkResult.getItems();
                for (int idx = 0; idx < items.size(); ++idx) {
                    BulkResult.BulkResultItem item = items.get(idx);
                    if (item.error != null && !"".equals(item.error)) {
                        super.getTaskPluginCollector().collectDirtyRecord(writerBuffer.get(idx),
                                String.format("status:[%d], error: %s", item.status, item.error));
                    }
                }
            }
        }