private String genMappings()

in elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchWriter.java [204:377]


        private String genMappings(String dstDynamic, String typeName, boolean isGreaterOrEqualThan7) {
            String mappings;
            Map<String, Object> propMap = new HashMap<String, Object>();
            List<ElasticSearchColumn> columnList = new ArrayList<ElasticSearchColumn>();
            ElasticSearchColumn combineItem = null;

            List column = conf.getList("column");
            if (column != null) {
                for (Object col : column) {
                    JSONObject jo = JSONObject.parseObject(col.toString());
                    String colName = jo.getString("name");
                    String colTypeStr = jo.getString("type");
                    if (colTypeStr == null) {
                        throw DataXException.asDataXException(ElasticSearchWriterErrorCode.BAD_CONFIG_VALUE, col.toString() + " column must have type");
                    }
                    ElasticSearchFieldType colType = ElasticSearchFieldType.getESFieldType(colTypeStr);
                    if (colType == null) {
                        throw DataXException.asDataXException(ElasticSearchWriterErrorCode.BAD_CONFIG_VALUE, col.toString() + " unsupported type");
                    }

                    ElasticSearchColumn columnItem = new ElasticSearchColumn();

                    if (Key.PRIMARY_KEY_COLUMN_NAME.equals(colName)) {
                        // 兼容已有版本
                        colType = ElasticSearchFieldType.ID;
                        colTypeStr = "id";
                    }

                    columnItem.setName(colName);
                    columnItem.setType(colTypeStr);

                    JSONArray combineFields = jo.getJSONArray("combineFields");
                    if (combineFields != null && !combineFields.isEmpty() && ElasticSearchFieldType.ID.equals(ElasticSearchFieldType.getESFieldType(colTypeStr))) {
                        List<String> fields = new ArrayList<String>();
                        for (Object item : combineFields) {
                            fields.add((String) item);
                        }
                        columnItem.setCombineFields(fields);
                        combineItem = columnItem;
                    }

                    String combineFieldsValueSeparator = jo.getString("combineFieldsValueSeparator");
                    if (StringUtils.isNotBlank(combineFieldsValueSeparator)) {
                        columnItem.setCombineFieldsValueSeparator(combineFieldsValueSeparator);
                    }

                    // 如果是id,version,routing,不需要创建mapping
                    if (colType == ElasticSearchFieldType.ID || colType == ElasticSearchFieldType.VERSION || colType == ElasticSearchFieldType.ROUTING) {
                        columnList.add(columnItem);
                        continue;
                    }

                    // 如果是组合id中的字段,不需要创建mapping
                    // 所以组合id的定义必须要在columns最前面
                    if (combineItem != null && combineItem.getCombineFields().contains(colName)) {
                        columnList.add(columnItem);
                        continue;
                    }
                    columnItem.setDstArray(false);
                    Boolean array = jo.getBoolean("array");
                    if (array != null) {
                        columnItem.setArray(array);
                        Boolean dstArray = jo.getBoolean("dstArray");
                        if(dstArray!=null) {
                            columnItem.setDstArray(dstArray);
                        }
                    } else {
                        columnItem.setArray(false);
                    }
                    Boolean jsonArray = jo.getBoolean("json_array");
                    if (jsonArray != null) {
                        columnItem.setJsonArray(jsonArray);
                    } else {
                        columnItem.setJsonArray(false);
                    }
                    Map<String, Object> field = new HashMap<String, Object>();
                    field.put("type", colTypeStr);
                    //https://www.elastic.co/guide/en/elasticsearch/reference/5.2/breaking_50_mapping_changes.html#_literal_index_literal_property
                    // https://www.elastic.co/guide/en/elasticsearch/guide/2.x/_deep_dive_on_doc_values.html#_disabling_doc_values
                    field.put("doc_values", jo.getBoolean("doc_values"));
                    field.put("ignore_above", jo.getInteger("ignore_above"));
                    field.put("index", jo.getBoolean("index"));
                    switch (colType) {
                        case STRING:
                            // 兼容string类型,ES5之前版本
                            break;
                        case KEYWORD:
                            // https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-search-speed.html#_warm_up_global_ordinals
                            field.put("eager_global_ordinals", jo.getBoolean("eager_global_ordinals"));
                            break;
                        case TEXT:
                            field.put("analyzer", jo.getString("analyzer"));
                            // 优化disk使用,也同步会提高index性能
                            // https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-disk-usage.html
                            field.put("norms", jo.getBoolean("norms"));
                            field.put("index_options", jo.getBoolean("index_options"));
                            if(jo.getString("fields") != null) {
                                field.put("fields", jo.getJSONObject("fields"));
                            }
                            break;
                        case DATE:
                            if (Boolean.TRUE.equals(jo.getBoolean("origin"))) {
                                if (jo.getString("format") != null) {
                                    field.put("format", jo.getString("format"));
                                }
                                // es原生format覆盖原先来的format
                                if (jo.getString("dstFormat") != null) {
                                    field.put("format", jo.getString("dstFormat"));
                                }
                                if(jo.getBoolean("origin") != null) {
                                    columnItem.setOrigin(jo.getBoolean("origin"));
                                }
                            } else {
                                columnItem.setTimeZone(jo.getString("timezone"));
                                columnItem.setFormat(jo.getString("format"));
                            }
                            break;
                        case GEO_SHAPE:
                            field.put("tree", jo.getString("tree"));
                            field.put("precision", jo.getString("precision"));
                            break;
                        case OBJECT:
                        case NESTED:
                            if (jo.getString("dynamic") != null) {
                                field.put("dynamic", jo.getString("dynamic"));
                            }
                            break;
                        default:
                            break;
                    }
                    if (jo.containsKey("other_params")) {
                        field.putAll(jo.getJSONObject("other_params"));
                    }
                    propMap.put(colName, field);
                    columnList.add(columnItem);
                }
            }

            long version = System.currentTimeMillis();
            LOGGER.info("unified version: {}", version);
            conf.set("version", version);
            conf.set(WRITE_COLUMNS, JSON.toJSONString(columnList));

            LOGGER.info(JSON.toJSONString(columnList));

            Map<String, Object> rootMappings = new HashMap<String, Object>();
            Map<String, Object> typeMappings = new HashMap<String, Object>();
            typeMappings.put("properties", propMap);
            rootMappings.put(typeName, typeMappings);

            // 7.x以后版本取消了index中关于type的指定,所以mapping的格式只能支持
            // {
            //      "properties" : {
            //          "abc" : {
            //              "type" : "text"
            //              }
            //           }
            // }
            // properties 外不能再嵌套typeName

            if(StringUtils.isNotBlank(dstDynamic)) {
                typeMappings.put("dynamic", dstDynamic);
            }
            if (isGreaterOrEqualThan7) {
                mappings = JSON.toJSONString(typeMappings);
            } else {
                mappings = JSON.toJSONString(rootMappings);
            }
            if (StringUtils.isBlank(mappings)) {
                throw DataXException.asDataXException(ElasticSearchWriterErrorCode.BAD_CONFIG_VALUE, "must have mappings");
            }

            return mappings;
        }