in server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java [299:524]
private boolean parseActionLine(BytesReference data, int from, int to) throws IOException {
assert currentRequest == null;
// Reset the fields which are accessed during document line parsing
currentType = null;
currentPipeline = defaultPipeline;
currentListExecutedPipelines = defaultListExecutedPipelines != null && defaultListExecutedPipelines;
currentFetchSourceContext = defaultFetchSourceContext;
try (XContentParser parser = createParser(xContentType.xContent(), data, from, to)) {
// Move to START_OBJECT
XContentParser.Token token = parser.nextToken();
if (token == null) {
return false;
}
if (token != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ line
+ "], expected "
+ XContentParser.Token.START_OBJECT
+ " but found ["
+ token
+ "]"
);
}
// Move to FIELD_NAME, that's the action
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ line
+ "], expected "
+ XContentParser.Token.FIELD_NAME
+ " but found ["
+ token
+ "]"
);
}
String action = parser.currentName();
if (SUPPORTED_ACTIONS.contains(action) == false) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ line
+ "], expected field [create], [delete], [index] or [update] but found ["
+ action
+ "]"
);
}
String index = defaultIndex;
String id = null;
String routing = defaultRouting;
String opType = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
int retryOnConflict = 0;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
boolean requireDataStream = defaultRequireDataStream != null && defaultRequireDataStream;
Map<String, String> dynamicTemplates = Map.of();
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
token = parser.nextToken();
if (token == XContentParser.Token.START_OBJECT) {
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (INDEX.match(currentFieldName, parser.getDeprecationHandler())) {
if (allowExplicitIndex == false) {
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (deprecateOrErrorOnType) {
throw new IllegalArgumentException(
"Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
);
}
currentType = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
id = parser.text();
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
routing = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
opType = parser.text();
} else if (VERSION.match(currentFieldName, parser.getDeprecationHandler())) {
version = parser.longValue();
} else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
versionType = VersionType.fromString(parser.text());
} else if (IF_SEQ_NO.match(currentFieldName, parser.getDeprecationHandler())) {
ifSeqNo = parser.longValue();
} else if (IF_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) {
ifPrimaryTerm = parser.longValue();
} else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
retryOnConflict = parser.intValue();
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
currentPipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
currentFetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) {
requireAlias = parser.booleanValue();
} else if (REQUIRE_DATA_STREAM.match(currentFieldName, parser.getDeprecationHandler())) {
requireDataStream = parser.booleanValue();
} else if (LIST_EXECUTED_PIPELINES.match(currentFieldName, parser.getDeprecationHandler())) {
currentListExecutedPipelines = parser.booleanValue();
} else {
throw new IllegalArgumentException(
"Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
);
}
} else if (token == XContentParser.Token.START_ARRAY) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ line
+ "], expected a simple value for field ["
+ currentFieldName
+ "] but found ["
+ token
+ "]"
);
} else if (token == XContentParser.Token.START_OBJECT
&& DYNAMIC_TEMPLATES.match(currentFieldName, parser.getDeprecationHandler())) {
dynamicTemplates = parser.mapStrings();
} else if (token == XContentParser.Token.START_OBJECT
&& SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
currentFetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (token != XContentParser.Token.VALUE_NULL) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ line
+ "], expected a simple value for field ["
+ currentFieldName
+ "] but found ["
+ token
+ "]"
);
}
}
} else if (token != XContentParser.Token.END_OBJECT) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ line
+ "], expected "
+ XContentParser.Token.START_OBJECT
+ " or "
+ XContentParser.Token.END_OBJECT
+ " but found ["
+ token
+ "]"
);
}
checkBulkActionIsProperlyClosed(parser, line);
if ("delete".equals(action)) {
if (dynamicTemplates.isEmpty() == false) {
throw new IllegalArgumentException(
"Delete request in line [" + line + "] does not accept " + DYNAMIC_TEMPLATES.getPreferredName()
);
}
currentRequest = new DeleteRequest(index).id(id)
.routing(routing)
.version(version)
.versionType(versionType)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm);
} else {
// we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks
// of index request.
if ("index".equals(action) || "create".equals(action)) {
var indexRequest = new IndexRequest(index).id(id)
.routing(routing)
.version(version)
.versionType(versionType)
.setPipeline(currentPipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.setDynamicTemplates(dynamicTemplates)
.setRequireAlias(requireAlias)
.setRequireDataStream(requireDataStream)
.setListExecutedPipelines(currentListExecutedPipelines)
.setIncludeSourceOnError(config.includeSourceOnError());
if ("create".equals(action)) {
indexRequest = indexRequest.create(true);
} else if (opType != null) {
indexRequest = indexRequest.create("create".equals(opType));
}
currentRequest = indexRequest;
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
throw new IllegalArgumentException(
"Update requests do not support versioning. " + "Please use `if_seq_no` and `if_primary_term` instead"
);
}
if (requireDataStream) {
throw new IllegalArgumentException(
"Update requests do not support the `require_data_stream` flag, "
+ "as data streams do not support update operations"
);
}
// TODO: support dynamic_templates in update requests
if (dynamicTemplates.isEmpty() == false) {
throw new IllegalArgumentException(
"Update request in line [" + line + "] does not accept " + DYNAMIC_TEMPLATES.getPreferredName()
);
}
UpdateRequest updateRequest = new UpdateRequest().index(index)
.id(id)
.routing(routing)
.retryOnConflict(retryOnConflict)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.setRequireAlias(requireAlias)
.routing(routing);
currentRequest = updateRequest;
}
}
}
return true;
}