in elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchWriter.java [673:1058]
private void doBatchInsert(final List<Record> writerBuffer) {
Map<String, Object> data = null;
Bulk.Builder bulkactionTmp = null;
int totalNumber = writerBuffer.size();
int dirtyDataNumber = 0;
if (this.isGreaterOrEqualThan7) {
bulkactionTmp = new Bulk.Builder().defaultIndex(this.index);
} else {
bulkactionTmp = new Bulk.Builder().defaultIndex(this.index).defaultType(this.type);
}
final Bulk.Builder bulkaction = bulkactionTmp;
// 增加url的参数
for (Map.Entry<String, Object> entry : urlParams.entrySet()) {
bulkaction.setParameter(entry.getKey(), entry.getValue());
}
for (Record record : writerBuffer) {
data = new HashMap<String, Object>();
String id = null;
String parent = null;
String routing = null;
String version = null;
String columnName = null;
Column column = null;
try {
for (int i = 0; i < record.getColumnNumber(); i++) {
column = record.getColumn(i);
columnName = columnList.get(i).getName();
// 如果组合id不等于null,需要把相关的字段全部忽略
if (combinedIdColumn != null) {
if (combinedIdColumn.getCombineFields().contains(columnName)) {
continue;
}
}
//如果是json数组,当成对象类型处理
ElasticSearchFieldType columnType = columnList.get(i).isJsonArray() ? ElasticSearchFieldType.NESTED : typeList.get(i);
Boolean dstArray = columnList.get(i).isDstArray();
//如果是数组类型,那它传入的是字符串类型,也有可能是null
if (columnList.get(i).isArray() && null != column.asString()) {
String[] dataList = column.asString().split(splitter);
if (!columnType.equals(ElasticSearchFieldType.DATE)) {
if (dstArray) {
try {
// 根据客户配置的类型,转换成相应的类型
switch (columnType) {
case BYTE:
case KEYWORD:
case TEXT:
data.put(columnName, dataList);
break;
case SHORT:
case INTEGER:
if (StringUtils.isBlank(column.asString().trim())) {
data.put(columnName, null);
} else {
Integer[] intDataList = new Integer[dataList.length];
for (int j = 0; j < dataList.length; j++) {
dataList[j] = dataList[j].trim();
if (StringUtils.isNotBlank(dataList[j])) {
intDataList[j] = Integer.valueOf(dataList[j]);
}
}
data.put(columnName, intDataList);
}
break;
case LONG:
if (StringUtils.isBlank(column.asString().trim())) {
data.put(columnName, null);
} else {
Long[] longDataList = new Long[dataList.length];
for (int j = 0; j < dataList.length; j++) {
dataList[j] = dataList[j].trim();
if (StringUtils.isNotBlank(dataList[j])) {
longDataList[j] = Long.valueOf(dataList[j]);
}
}
data.put(columnName, longDataList);
}
break;
case FLOAT:
case DOUBLE:
if (StringUtils.isBlank(column.asString().trim())) {
data.put(columnName, null);
} else {
Double[] doubleDataList = new Double[dataList.length];
for (int j = 0; j < dataList.length; j++) {
dataList[j] = dataList[j].trim();
if (StringUtils.isNotBlank(dataList[j])) {
doubleDataList[j] = Double.valueOf(dataList[j]);
}
}
data.put(columnName, doubleDataList);
}
break;
default:
data.put(columnName, dataList);
break;
}
} catch (Exception e) {
LOGGER.info("脏数据,记录:{}", record.toString());
continue;
}
} else {
data.put(columnName, dataList);
}
} else {
data.put(columnName, dataList);
}
} else {
// LOGGER.info("columnType: {} integer: {}", columnType, column.asString());
switch (columnType) {
case ID:
if (id != null) {
id += record.getColumn(i).asString();
} else {
id = record.getColumn(i).asString();
}
break;
case PARENT:
if (parent != null) {
parent += record.getColumn(i).asString();
} else {
parent = record.getColumn(i).asString();
}
break;
case ROUTING:
if (routing != null) {
routing += record.getColumn(i).asString();
} else {
routing = record.getColumn(i).asString();
}
break;
case VERSION:
if (version != null) {
version += record.getColumn(i).asString();
} else {
version = record.getColumn(i).asString();
}
break;
case DATE:
String dateStr = getDateStr(columnList.get(i), column);
data.put(columnName, dateStr);
break;
case KEYWORD:
case STRING:
case TEXT:
case IP:
case GEO_POINT:
case IP_RANGE:
data.put(columnName, column.asString());
break;
case BOOLEAN:
data.put(columnName, column.asBoolean());
break;
case BYTE:
case BINARY:
// json序列化不支持byte类型,es支持的binary类型,必须传入base64的格式
data.put(columnName, column.asString());
break;
case LONG:
data.put(columnName, column.asLong());
break;
case INTEGER:
data.put(columnName, column.asLong());
break;
case SHORT:
data.put(columnName, column.asLong());
break;
case FLOAT:
case DOUBLE:
data.put(columnName, column.asDouble());
break;
case GEO_SHAPE:
case DATE_RANGE:
case INTEGER_RANGE:
case FLOAT_RANGE:
case LONG_RANGE:
case DOUBLE_RANGE:
if (null == column.asString()) {
data.put(columnName, column.asString());
} else {
data.put(columnName, JSON.parse(column.asString()));
}
break;
case NESTED:
case OBJECT:
if (null == column.asString()) {
data.put(columnName, column.asString());
} else {
// 转json格式
data.put(columnName, JSON.parse(column.asString()));
}
break;
default:
throw DataXException.asDataXException(ElasticSearchWriterErrorCode.BAD_CONFIG_VALUE, String.format(
"Type error: unsupported type %s for column %s", columnType, columnName));
}
}
}
if (this.hasPrimaryKeyInfo) {
List<String> idData = new ArrayList<String>();
for (String eachCol : this.primaryKeyInfo.getColumn()) {
Column recordColumn = record.getColumn(this.colNameToIndexMap.get(eachCol));
idData.add(recordColumn.asString());
}
id = StringUtils.join(idData, this.primaryKeyInfo.getFieldDelimiter());
}
if (this.hasEsPartitionColumn) {
List<String> idData = new ArrayList<String>();
for (PartitionColumn eachCol : this.esPartitionColumn) {
Column recordColumn = record.getColumn(this.colNameToIndexMap.get(eachCol.getName()));
idData.add(recordColumn.asString());
}
routing = StringUtils.join(idData, "");
}
} catch (Exception e) {
// 脏数据
super.getTaskPluginCollector().collectDirtyRecord(record,
String.format("parse error for column: %s errorMessage: %s", columnName, e.getMessage()));
dirtyDataNumber++;
// 处理下一个record
continue;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("id: {} routing: {} data: {}", id, routing, JSON.toJSONString(data));
}
if (isDeleteRecord(record)) {
Delete.Builder builder = new Delete.Builder(id);
bulkaction.addAction(builder.build());
} else {
// 使用用户自定义组合唯一键
if (combinedIdColumn != null) {
try {
id = processIDCombineFields(record, combinedIdColumn);
// LOGGER.debug("id: {}", id);
} catch (Exception e) {
// 脏数据
super.getTaskPluginCollector().collectDirtyRecord(record,
String.format("parse error for column: %s errorMessage: %s", columnName, e.getMessage()));
// 处理下一个record
dirtyDataNumber++;
continue;
}
}
switch (actionType) {
case INDEX:
// 先进行json序列化,jest client的gson序列化会把等号按照html序列化
Index.Builder builder = null;
if (this.enableWriteNull) {
builder = new Index.Builder(
JSONObject.toJSONString(data, JSONWriter.Feature.WriteMapNullValue,
JSONWriter.Feature.WriteEnumUsingToString));
} else {
builder = new Index.Builder(JSONObject.toJSONString(data));
}
if (id != null) {
builder.id(id);
}
if (parent != null) {
builder.setParameter(Parameters.PARENT, parent);
}
if (routing != null) {
builder.setParameter(Parameters.ROUTING, routing);
}
if (version != null) {
builder.setParameter(Parameters.VERSION, version);
builder.setParameter(Parameters.VERSION_TYPE, "external");
}
bulkaction.addAction(builder.build());
break;
case UPDATE:
// doc: https://www.cnblogs.com/crystaltu/articles/6992935.html
// doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
Map<String, Object> updateDoc = new HashMap<String, Object>();
updateDoc.put("doc", data);
updateDoc.put("doc_as_upsert", true);
Update.Builder update = null;
if (this.enableWriteNull) {
// write: {a:"1",b:null}
update = new Update.Builder(
JSONObject.toJSONString(updateDoc, JSONWriter.Feature.WriteMapNullValue,
JSONWriter.Feature.WriteEnumUsingToString));
// 在DEFAULT_GENERATE_FEATURE基础上,只增加了SerializerFeature.WRITE_MAP_NULL_FEATURES
} else {
// write: {"a":"1"}
update = new Update.Builder(JSONObject.toJSONString(updateDoc));
}
if (id != null) {
update.id(id);
}
if (parent != null) {
update.setParameter(Parameters.PARENT, parent);
}
if (routing != null) {
update.setParameter(Parameters.ROUTING, routing);
}
// version type [EXTERNAL] is not supported by the update API
if (version != null) {
update.setParameter(Parameters.VERSION, version);
}
bulkaction.addAction(update.build());
break;
default:
break;
}
}
}
if (dirtyDataNumber >= totalNumber) {
// all batch is dirty data
LOGGER.warn("all this batch is dirty data, dirtyDataNumber: {} totalDataNumber: {}", dirtyDataNumber,
totalNumber);
return;
}
BulkResult bulkResult = null;
try {
bulkResult = RetryUtil.executeWithRetry(new Callable<BulkResult>() {
@Override
public BulkResult call() throws Exception {
JestResult jestResult = esClient.bulkInsert(bulkaction);
if (jestResult.isSucceeded()) {
return null;
}
String msg = String.format("response code: [%d] error :[%s]", jestResult.getResponseCode(),
jestResult.getErrorMessage());
LOGGER.warn(msg);
if (esClient.isBulkResult(jestResult)) {
BulkResult brst = (BulkResult) jestResult;
List<BulkResult.BulkResultItem> failedItems = brst.getFailedItems();
for (BulkResult.BulkResultItem item : failedItems) {
if (item.status != 400) {
// 400 BAD_REQUEST 如果非数据异常,请求异常,则不允许忽略
throw DataXException.asDataXException(ElasticSearchWriterErrorCode.ES_INDEX_INSERT,
String.format("status:[%d], error: %s", item.status, item.error));
} else {
// 如果用户选择不忽略解析错误,则抛异常,默认为忽略
if (!Key.isIgnoreParseError(conf)) {
throw new NoReRunException(ElasticSearchWriterErrorCode.ES_INDEX_INSERT,
String.format(
"status:[%d], error: %s, config not ignoreParseError so throw this error",
item.status, item.error));
}
}
}
return brst;
} else {
Integer status = esClient.getStatus(jestResult);
switch (status) {
case 429: // TOO_MANY_REQUESTS
LOGGER.warn("server response too many requests, so auto reduce speed");
break;
default:
break;
}
throw DataXException.asDataXException(ElasticSearchWriterErrorCode.ES_INDEX_INSERT,
jestResult.getErrorMessage());
}
}
}, this.trySize, this.tryInterval, false, Arrays.asList(DataXException.class));
} catch (Exception e) {
if (Key.isIgnoreWriteError(this.conf)) {
LOGGER.warn(String.format("Retry [%d] write failed, ignore the error, continue to write!", trySize));
} else {
throw DataXException.asDataXException(ElasticSearchWriterErrorCode.ES_INDEX_INSERT, e.getMessage(), e);
}
}
if (null != bulkResult) {
List<BulkResult.BulkResultItem> items = bulkResult.getItems();
for (int idx = 0; idx < items.size(); ++idx) {
BulkResult.BulkResultItem item = items.get(idx);
if (item.error != null && !"".equals(item.error)) {
super.getTaskPluginCollector().collectDirtyRecord(writerBuffer.get(idx),
String.format("status:[%d], error: %s", item.status, item.error));
}
}
}
}