in elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchWriter.java [204:377]
private String genMappings(String dstDynamic, String typeName, boolean isGreaterOrEqualThan7) {
String mappings;
Map<String, Object> propMap = new HashMap<String, Object>();
List<ElasticSearchColumn> columnList = new ArrayList<ElasticSearchColumn>();
ElasticSearchColumn combineItem = null;
List column = conf.getList("column");
if (column != null) {
for (Object col : column) {
JSONObject jo = JSONObject.parseObject(col.toString());
String colName = jo.getString("name");
String colTypeStr = jo.getString("type");
if (colTypeStr == null) {
throw DataXException.asDataXException(ElasticSearchWriterErrorCode.BAD_CONFIG_VALUE, col.toString() + " column must have type");
}
ElasticSearchFieldType colType = ElasticSearchFieldType.getESFieldType(colTypeStr);
if (colType == null) {
throw DataXException.asDataXException(ElasticSearchWriterErrorCode.BAD_CONFIG_VALUE, col.toString() + " unsupported type");
}
ElasticSearchColumn columnItem = new ElasticSearchColumn();
if (Key.PRIMARY_KEY_COLUMN_NAME.equals(colName)) {
// 兼容已有版本
colType = ElasticSearchFieldType.ID;
colTypeStr = "id";
}
columnItem.setName(colName);
columnItem.setType(colTypeStr);
JSONArray combineFields = jo.getJSONArray("combineFields");
if (combineFields != null && !combineFields.isEmpty() && ElasticSearchFieldType.ID.equals(ElasticSearchFieldType.getESFieldType(colTypeStr))) {
List<String> fields = new ArrayList<String>();
for (Object item : combineFields) {
fields.add((String) item);
}
columnItem.setCombineFields(fields);
combineItem = columnItem;
}
String combineFieldsValueSeparator = jo.getString("combineFieldsValueSeparator");
if (StringUtils.isNotBlank(combineFieldsValueSeparator)) {
columnItem.setCombineFieldsValueSeparator(combineFieldsValueSeparator);
}
// 如果是id,version,routing,不需要创建mapping
if (colType == ElasticSearchFieldType.ID || colType == ElasticSearchFieldType.VERSION || colType == ElasticSearchFieldType.ROUTING) {
columnList.add(columnItem);
continue;
}
// 如果是组合id中的字段,不需要创建mapping
// 所以组合id的定义必须要在columns最前面
if (combineItem != null && combineItem.getCombineFields().contains(colName)) {
columnList.add(columnItem);
continue;
}
columnItem.setDstArray(false);
Boolean array = jo.getBoolean("array");
if (array != null) {
columnItem.setArray(array);
Boolean dstArray = jo.getBoolean("dstArray");
if(dstArray!=null) {
columnItem.setDstArray(dstArray);
}
} else {
columnItem.setArray(false);
}
Boolean jsonArray = jo.getBoolean("json_array");
if (jsonArray != null) {
columnItem.setJsonArray(jsonArray);
} else {
columnItem.setJsonArray(false);
}
Map<String, Object> field = new HashMap<String, Object>();
field.put("type", colTypeStr);
//https://www.elastic.co/guide/en/elasticsearch/reference/5.2/breaking_50_mapping_changes.html#_literal_index_literal_property
// https://www.elastic.co/guide/en/elasticsearch/guide/2.x/_deep_dive_on_doc_values.html#_disabling_doc_values
field.put("doc_values", jo.getBoolean("doc_values"));
field.put("ignore_above", jo.getInteger("ignore_above"));
field.put("index", jo.getBoolean("index"));
switch (colType) {
case STRING:
// 兼容string类型,ES5之前版本
break;
case KEYWORD:
// https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-search-speed.html#_warm_up_global_ordinals
field.put("eager_global_ordinals", jo.getBoolean("eager_global_ordinals"));
break;
case TEXT:
field.put("analyzer", jo.getString("analyzer"));
// 优化disk使用,也同步会提高index性能
// https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-disk-usage.html
field.put("norms", jo.getBoolean("norms"));
field.put("index_options", jo.getBoolean("index_options"));
if(jo.getString("fields") != null) {
field.put("fields", jo.getJSONObject("fields"));
}
break;
case DATE:
if (Boolean.TRUE.equals(jo.getBoolean("origin"))) {
if (jo.getString("format") != null) {
field.put("format", jo.getString("format"));
}
// es原生format覆盖原先来的format
if (jo.getString("dstFormat") != null) {
field.put("format", jo.getString("dstFormat"));
}
if(jo.getBoolean("origin") != null) {
columnItem.setOrigin(jo.getBoolean("origin"));
}
} else {
columnItem.setTimeZone(jo.getString("timezone"));
columnItem.setFormat(jo.getString("format"));
}
break;
case GEO_SHAPE:
field.put("tree", jo.getString("tree"));
field.put("precision", jo.getString("precision"));
break;
case OBJECT:
case NESTED:
if (jo.getString("dynamic") != null) {
field.put("dynamic", jo.getString("dynamic"));
}
break;
default:
break;
}
if (jo.containsKey("other_params")) {
field.putAll(jo.getJSONObject("other_params"));
}
propMap.put(colName, field);
columnList.add(columnItem);
}
}
long version = System.currentTimeMillis();
LOGGER.info("unified version: {}", version);
conf.set("version", version);
conf.set(WRITE_COLUMNS, JSON.toJSONString(columnList));
LOGGER.info(JSON.toJSONString(columnList));
Map<String, Object> rootMappings = new HashMap<String, Object>();
Map<String, Object> typeMappings = new HashMap<String, Object>();
typeMappings.put("properties", propMap);
rootMappings.put(typeName, typeMappings);
// 7.x以后版本取消了index中关于type的指定,所以mapping的格式只能支持
// {
// "properties" : {
// "abc" : {
// "type" : "text"
// }
// }
// }
// properties 外不能再嵌套typeName
if(StringUtils.isNotBlank(dstDynamic)) {
typeMappings.put("dynamic", dstDynamic);
}
if (isGreaterOrEqualThan7) {
mappings = JSON.toJSONString(typeMappings);
} else {
mappings = JSON.toJSONString(rootMappings);
}
if (StringUtils.isBlank(mappings)) {
throw DataXException.asDataXException(ElasticSearchWriterErrorCode.BAD_CONFIG_VALUE, "must have mappings");
}
return mappings;
}