func()

in x-pack/filebeat/input/httpjson/request.go [36:210]


func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publisher inputcursor.Publisher) error {
	var (
		n                       int
		ids                     []string
		err                     error
		urlCopy                 url.URL
		urlString               string
		httpResp                *http.Response
		initialResponse         []*http.Response
		intermediateResps       []*http.Response
		finalResps              []*http.Response
		isChainWithPageExpected bool
		chainIndex              int
	)

	//nolint:bodyclose // response body is closed through drainBody method
	for i, rf := range r.requestFactories {
		finalResps = nil
		intermediateResps = nil
		// iterate over collected ids from last response
		if i == 0 {
			// perform and store regular call responses
			httpResp, err = rf.collectResponse(ctx, trCtx, r)
			if err != nil {
				return fmt.Errorf("failed to collect first response: %w", err)
			}

			if rf.saveFirstResponse {
				// store first response in transform context
				var bodyMap map[string]interface{}
				body, err := io.ReadAll(httpResp.Body)
				if err != nil {
					return fmt.Errorf("failed to read http response body: %w", err)
				}
				httpResp.Body = io.NopCloser(bytes.NewReader(body))
				err = json.Unmarshal(body, &bodyMap)
				if err != nil {
					r.log.Errorf("unable to unmarshal first_response.body: %v", textContextError{error: err, body: body})
				}
				firstResponse := response{
					url:    *httpResp.Request.URL,
					header: httpResp.Header.Clone(),
					body:   bodyMap,
				}
				trCtx.updateFirstResponse(firstResponse)
			}

			if len(r.requestFactories) == 1 {
				finalResps = append(finalResps, httpResp)
				p := newPublisher(trCtx, publisher, true, r.metrics, r.log)
				r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, true, p)
				n = p.eventCount()
				continue
			}

			// if flow of control reaches here, that means there are more than 1 request factories
			// if a chain step exists, only then we will initialize flags & variables here which are required for chaining
			if r.requestFactories[i+1].isChain {
				chainIndex = i + 1
				resp, err := cloneResponse(httpResp)
				if err != nil {
					return err
				}
				// the response is cloned and added to finalResps here, since the response of the 1st page (whether pagination exists or not), will
				// be sent for further processing to check if any response processors can be applied or not and at the same time update the last_response,
				// first_event & last_event cursor values.
				finalResps = append(finalResps, resp)

				// if a pagination request factory exists at the root level along with a chain step, only then we will initialize flags & variables here
				// which are required for chaining with root level pagination
				if r.responseProcessors[i].pagination.requestFactory != nil {
					isChainWithPageExpected = true
					resp, err := cloneResponse(httpResp)
					if err != nil {
						return err
					}
					initialResponse = append(initialResponse, resp)
				}
			}

			intermediateResps = append(intermediateResps, httpResp)
			ids, err = r.getIdsFromResponses(intermediateResps, r.requestFactories[i+1].replace)
			if err != nil {
				return err
			}
			// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
			p := newPublisher(trCtx, publisher, false, r.metrics, r.log)
			r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, false, p)
			n = p.eventCount()
		} else {
			if len(ids) == 0 {
				n = 0
				continue
			}
			urlCopy = rf.url
			urlString = rf.url.String()

			// new transform context for every chain step, derived from parent transform context
			var chainTrCtx *transformContext
			if rf.isChain {
				chainTrCtx = trCtx.clone()
			}

			var val string
			var doReplaceWith bool
			var replaceArr []string
			if rf.replaceWith != "" {
				replaceArr = strings.Split(rf.replaceWith, ",")
				val, doReplaceWith, err = fetchValueFromContext(chainTrCtx, strings.TrimSpace(replaceArr[1]))
				if err != nil {
					return err
				}
			}

			// perform request over collected ids
			for _, id := range ids {
				// reformat urls of requestFactory using ids
				rf.url, err = generateNewUrl(rf.replace, urlString, id)
				if err != nil {
					return fmt.Errorf("failed to generate new url: %w", err)
				}

				// reformat url accordingly if replaceWith clause exists
				if doReplaceWith {
					rf.url, err = generateNewUrl(strings.TrimSpace(replaceArr[0]), rf.url.String(), val)
					if err != nil {
						return fmt.Errorf("failed to generate new url with replacement: %w", err)
					}
				}
				// collect data from new urls
				httpResp, err = rf.collectResponse(ctx, chainTrCtx, r)
				if err != nil {
					return fmt.Errorf("failed to collect tail response %d: %w", i, err)
				}
				// store data according to response type
				if i == len(r.requestFactories)-1 && len(ids) != 0 {
					finalResps = append(finalResps, httpResp)
				} else {
					intermediateResps = append(intermediateResps, httpResp)
				}
			}
			rf.url = urlCopy

			var resps []*http.Response
			if i == len(r.requestFactories)-1 {
				resps = finalResps
			} else {
				// The if comdition (i < len(r.requestFactories)) ensures this branch never runs to the last element
				// of r.requestFactories, therefore r.requestFactories[i+1] will never be out of bounds.
				ids, err = r.getIdsFromResponses(intermediateResps, r.requestFactories[i+1].replace)
				if err != nil {
					return err
				}
				resps = intermediateResps
			}

			p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log)
			if rf.isChain {
				rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p)
			} else {
				r.responseProcessors[i].startProcessing(ctx, trCtx, resps, true, p)
			}
			n += p.eventCount()
		}
	}

	defer httpResp.Body.Close()
	// if pagination exists for the parent request along with chaining, then for each page response the chain is processed
	if isChainWithPageExpected {
		n += r.processRemainingChainEvents(ctx, trCtx, publisher, initialResponse, chainIndex)
	}
	r.log.Infof("request finished: %d events published", n)

	return nil
}