public void parse()

in server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java [167:423]


    public void parse(
        BytesReference data,
        @Nullable String defaultIndex,
        @Nullable String defaultType,
        @Nullable String defaultRouting,
        @Nullable FetchSourceContext defaultFetchSourceContext,
        @Nullable String defaultPipeline,
        @Nullable Boolean defaultRequireAlias,
        boolean allowExplicitIndex,
        XContentType xContentType,
        Consumer<IndexRequest> indexRequestConsumer,
        Consumer<UpdateRequest> updateRequestConsumer,
        Consumer<DeleteRequest> deleteRequestConsumer
    ) throws IOException {
        XContent xContent = xContentType.xContent();
        int line = 0;
        int from = 0;
        byte marker = xContent.streamSeparator();
        boolean typesDeprecationLogged = false;
        // Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
        // deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
        // reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
        final Map<String, String> stringDeduplicator = new HashMap<>();
        while (true) {
            int nextMarker = findNextMarker(marker, from, data);
            if (nextMarker == -1) {
                break;
            }
            line++;

            // now parse the action
            try (XContentParser parser = createParser(data, xContent, from, nextMarker)) {
                // move pointers
                from = nextMarker + 1;

                // Move to START_OBJECT
                XContentParser.Token token = parser.nextToken();
                if (token == null) {
                    continue;
                }
                if (token != XContentParser.Token.START_OBJECT) {
                    throw new IllegalArgumentException(
                        "Malformed action/metadata line ["
                            + line
                            + "], expected "
                            + XContentParser.Token.START_OBJECT
                            + " but found ["
                            + token
                            + "]"
                    );
                }
                // Move to FIELD_NAME, that's the action
                token = parser.nextToken();
                if (token != XContentParser.Token.FIELD_NAME) {
                    throw new IllegalArgumentException(
                        "Malformed action/metadata line ["
                            + line
                            + "], expected "
                            + XContentParser.Token.FIELD_NAME
                            + " but found ["
                            + token
                            + "]"
                    );
                }
                String action = parser.currentName();

                String index = defaultIndex;
                String type = defaultType;
                String id = null;
                String routing = defaultRouting;
                FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
                String opType = null;
                long version = Versions.MATCH_ANY;
                VersionType versionType = VersionType.INTERNAL;
                long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
                long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
                int retryOnConflict = 0;
                String pipeline = defaultPipeline;
                boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;

                // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
                // or START_OBJECT which will have another set of parameters
                token = parser.nextToken();

                if (token == XContentParser.Token.START_OBJECT) {
                    String currentFieldName = null;
                    while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                        if (token == XContentParser.Token.FIELD_NAME) {
                            currentFieldName = parser.currentName();
                        } else if (token.isValue()) {
                            if (INDEX.match(currentFieldName, parser.getDeprecationHandler())) {
                                if (!allowExplicitIndex) {
                                    throw new IllegalArgumentException("explicit index in bulk is not allowed");
                                }
                                index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
                            } else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
                                if (warnOnTypeUsage && typesDeprecationLogged == false) {
                                    deprecationLogger.deprecate("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE);
                                    typesDeprecationLogged = true;
                                }
                                type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
                            } else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
                                id = parser.text();
                            } else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
                                routing = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
                            } else if (OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
                                opType = parser.text();
                            } else if (VERSION.match(currentFieldName, parser.getDeprecationHandler())) {
                                version = parser.longValue();
                            } else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
                                versionType = VersionType.fromString(parser.text());
                            } else if (IF_SEQ_NO.match(currentFieldName, parser.getDeprecationHandler())) {
                                ifSeqNo = parser.longValue();
                            } else if (IF_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) {
                                ifPrimaryTerm = parser.longValue();
                            } else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
                                retryOnConflict = parser.intValue();
                            } else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
                                pipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
                            } else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
                                fetchSourceContext = FetchSourceContext.fromXContent(parser);
                            } else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) {
                                requireAlias = parser.booleanValue();
                            } else {
                                throw new IllegalArgumentException(
                                    "Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
                                );
                            }
                        } else if (token == XContentParser.Token.START_ARRAY) {
                            throw new IllegalArgumentException(
                                "Malformed action/metadata line ["
                                    + line
                                    + "], expected a simple value for field ["
                                    + currentFieldName
                                    + "] but found ["
                                    + token
                                    + "]"
                            );
                        } else if (token == XContentParser.Token.START_OBJECT
                            && SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
                                fetchSourceContext = FetchSourceContext.fromXContent(parser);
                            } else if (token != XContentParser.Token.VALUE_NULL) {
                                throw new IllegalArgumentException(
                                    "Malformed action/metadata line ["
                                        + line
                                        + "], expected a simple value for field ["
                                        + currentFieldName
                                        + "] but found ["
                                        + token
                                        + "]"
                                );
                            }
                    }
                } else if (token != XContentParser.Token.END_OBJECT) {
                    throw new IllegalArgumentException(
                        "Malformed action/metadata line ["
                            + line
                            + "], expected "
                            + XContentParser.Token.START_OBJECT
                            + " or "
                            + XContentParser.Token.END_OBJECT
                            + " but found ["
                            + token
                            + "]"
                    );
                }

                if ("delete".equals(action)) {
                    deleteRequestConsumer.accept(
                        new DeleteRequest(index, type, id).routing(routing)
                            .version(version)
                            .versionType(versionType)
                            .setIfSeqNo(ifSeqNo)
                            .setIfPrimaryTerm(ifPrimaryTerm)
                    );
                } else {
                    nextMarker = findNextMarker(marker, from, data);
                    if (nextMarker == -1) {
                        break;
                    }
                    line++;

                    // we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks
                    // of index request.
                    if ("index".equals(action)) {
                        if (opType == null) {
                            indexRequestConsumer.accept(
                                new IndexRequest(index, type, id).routing(routing)
                                    .version(version)
                                    .versionType(versionType)
                                    .setPipeline(pipeline)
                                    .setIfSeqNo(ifSeqNo)
                                    .setIfPrimaryTerm(ifPrimaryTerm)
                                    .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
                                    .setRequireAlias(requireAlias)
                            );
                        } else {
                            indexRequestConsumer.accept(
                                new IndexRequest(index, type, id).routing(routing)
                                    .version(version)
                                    .versionType(versionType)
                                    .create("create".equals(opType))
                                    .setPipeline(pipeline)
                                    .setIfSeqNo(ifSeqNo)
                                    .setIfPrimaryTerm(ifPrimaryTerm)
                                    .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
                                    .setRequireAlias(requireAlias)
                            );
                        }
                    } else if ("create".equals(action)) {
                        indexRequestConsumer.accept(
                            new IndexRequest(index, type, id).routing(routing)
                                .version(version)
                                .versionType(versionType)
                                .create(true)
                                .setPipeline(pipeline)
                                .setIfSeqNo(ifSeqNo)
                                .setIfPrimaryTerm(ifPrimaryTerm)
                                .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
                                .setRequireAlias(requireAlias)
                        );
                    } else if ("update".equals(action)) {
                        if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
                            throw new IllegalArgumentException(
                                "Update requests do not support versioning. " + "Please use `if_seq_no` and `if_primary_term` instead"
                            );
                        }
                        UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing)
                            .retryOnConflict(retryOnConflict)
                            .setIfSeqNo(ifSeqNo)
                            .setIfPrimaryTerm(ifPrimaryTerm)
                            .setRequireAlias(requireAlias)
                            .routing(routing);
                        try (
                            XContentParser sliceParser = createParser(
                                sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType),
                                xContent
                            )
                        ) {
                            updateRequest.fromXContent(sliceParser);
                        }
                        if (fetchSourceContext != null) {
                            updateRequest.fetchSource(fetchSourceContext);
                        }
                        IndexRequest upsertRequest = updateRequest.upsertRequest();
                        if (upsertRequest != null) {
                            upsertRequest.setPipeline(defaultPipeline);
                        }

                        updateRequestConsumer.accept(updateRequest);
                    }
                    // move pointers
                    from = nextMarker + 1;
                }
            }
        }
    }