in server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java [167:423]
public void parse(
BytesReference data,
@Nullable String defaultIndex,
@Nullable String defaultType,
@Nullable String defaultRouting,
@Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline,
@Nullable Boolean defaultRequireAlias,
boolean allowExplicitIndex,
XContentType xContentType,
Consumer<IndexRequest> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
) throws IOException {
XContent xContent = xContentType.xContent();
int line = 0;
int from = 0;
byte marker = xContent.streamSeparator();
boolean typesDeprecationLogged = false;
// Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
final Map<String, String> stringDeduplicator = new HashMap<>();
while (true) {
int nextMarker = findNextMarker(marker, from, data);
if (nextMarker == -1) {
break;
}
line++;
// now parse the action
try (XContentParser parser = createParser(data, xContent, from, nextMarker)) {
// move pointers
from = nextMarker + 1;
// Move to START_OBJECT
XContentParser.Token token = parser.nextToken();
if (token == null) {
continue;
}
if (token != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ line
+ "], expected "
+ XContentParser.Token.START_OBJECT
+ " but found ["
+ token
+ "]"
);
}
// Move to FIELD_NAME, that's the action
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ line
+ "], expected "
+ XContentParser.Token.FIELD_NAME
+ " but found ["
+ token
+ "]"
);
}
String action = parser.currentName();
String index = defaultIndex;
String type = defaultType;
String id = null;
String routing = defaultRouting;
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
String opType = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
int retryOnConflict = 0;
String pipeline = defaultPipeline;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
token = parser.nextToken();
if (token == XContentParser.Token.START_OBJECT) {
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (INDEX.match(currentFieldName, parser.getDeprecationHandler())) {
if (!allowExplicitIndex) {
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (warnOnTypeUsage && typesDeprecationLogged == false) {
deprecationLogger.deprecate("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE);
typesDeprecationLogged = true;
}
type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
id = parser.text();
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
routing = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
opType = parser.text();
} else if (VERSION.match(currentFieldName, parser.getDeprecationHandler())) {
version = parser.longValue();
} else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
versionType = VersionType.fromString(parser.text());
} else if (IF_SEQ_NO.match(currentFieldName, parser.getDeprecationHandler())) {
ifSeqNo = parser.longValue();
} else if (IF_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) {
ifPrimaryTerm = parser.longValue();
} else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
retryOnConflict = parser.intValue();
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
pipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) {
requireAlias = parser.booleanValue();
} else {
throw new IllegalArgumentException(
"Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
);
}
} else if (token == XContentParser.Token.START_ARRAY) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ line
+ "], expected a simple value for field ["
+ currentFieldName
+ "] but found ["
+ token
+ "]"
);
} else if (token == XContentParser.Token.START_OBJECT
&& SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (token != XContentParser.Token.VALUE_NULL) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ line
+ "], expected a simple value for field ["
+ currentFieldName
+ "] but found ["
+ token
+ "]"
);
}
}
} else if (token != XContentParser.Token.END_OBJECT) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ line
+ "], expected "
+ XContentParser.Token.START_OBJECT
+ " or "
+ XContentParser.Token.END_OBJECT
+ " but found ["
+ token
+ "]"
);
}
if ("delete".equals(action)) {
deleteRequestConsumer.accept(
new DeleteRequest(index, type, id).routing(routing)
.version(version)
.versionType(versionType)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
);
} else {
nextMarker = findNextMarker(marker, from, data);
if (nextMarker == -1) {
break;
}
line++;
// we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks
// of index request.
if ("index".equals(action)) {
if (opType == null) {
indexRequestConsumer.accept(
new IndexRequest(index, type, id).routing(routing)
.version(version)
.versionType(versionType)
.setPipeline(pipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias)
);
} else {
indexRequestConsumer.accept(
new IndexRequest(index, type, id).routing(routing)
.version(version)
.versionType(versionType)
.create("create".equals(opType))
.setPipeline(pipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias)
);
}
} else if ("create".equals(action)) {
indexRequestConsumer.accept(
new IndexRequest(index, type, id).routing(routing)
.version(version)
.versionType(versionType)
.create(true)
.setPipeline(pipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias)
);
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
throw new IllegalArgumentException(
"Update requests do not support versioning. " + "Please use `if_seq_no` and `if_primary_term` instead"
);
}
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing)
.retryOnConflict(retryOnConflict)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.setRequireAlias(requireAlias)
.routing(routing);
try (
XContentParser sliceParser = createParser(
sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType),
xContent
)
) {
updateRequest.fromXContent(sliceParser);
}
if (fetchSourceContext != null) {
updateRequest.fetchSource(fetchSourceContext);
}
IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
upsertRequest.setPipeline(defaultPipeline);
}
updateRequestConsumer.accept(updateRequest);
}
// move pointers
from = nextMarker + 1;
}
}
}
}