in x-pack/filebeat/input/httpjson/request.go [36:210]
func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publisher inputcursor.Publisher) error {
var (
n int
ids []string
err error
urlCopy url.URL
urlString string
httpResp *http.Response
initialResponse []*http.Response
intermediateResps []*http.Response
finalResps []*http.Response
isChainWithPageExpected bool
chainIndex int
)
//nolint:bodyclose // response body is closed through drainBody method
for i, rf := range r.requestFactories {
finalResps = nil
intermediateResps = nil
// iterate over collected ids from last response
if i == 0 {
// perform and store regular call responses
httpResp, err = rf.collectResponse(ctx, trCtx, r)
if err != nil {
return fmt.Errorf("failed to collect first response: %w", err)
}
if rf.saveFirstResponse {
// store first response in transform context
var bodyMap map[string]interface{}
body, err := io.ReadAll(httpResp.Body)
if err != nil {
return fmt.Errorf("failed to read http response body: %w", err)
}
httpResp.Body = io.NopCloser(bytes.NewReader(body))
err = json.Unmarshal(body, &bodyMap)
if err != nil {
r.log.Errorf("unable to unmarshal first_response.body: %v", textContextError{error: err, body: body})
}
firstResponse := response{
url: *httpResp.Request.URL,
header: httpResp.Header.Clone(),
body: bodyMap,
}
trCtx.updateFirstResponse(firstResponse)
}
if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
p := newPublisher(trCtx, publisher, true, r.metrics, r.log)
r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, true, p)
n = p.eventCount()
continue
}
// if flow of control reaches here, that means there are more than 1 request factories
// if a chain step exists, only then we will initialize flags & variables here which are required for chaining
if r.requestFactories[i+1].isChain {
chainIndex = i + 1
resp, err := cloneResponse(httpResp)
if err != nil {
return err
}
// the response is cloned and added to finalResps here, since the response of the 1st page (whether pagination exists or not), will
// be sent for further processing to check if any response processors can be applied or not and at the same time update the last_response,
// first_event & last_event cursor values.
finalResps = append(finalResps, resp)
// if a pagination request factory exists at the root level along with a chain step, only then we will initialize flags & variables here
// which are required for chaining with root level pagination
if r.responseProcessors[i].pagination.requestFactory != nil {
isChainWithPageExpected = true
resp, err := cloneResponse(httpResp)
if err != nil {
return err
}
initialResponse = append(initialResponse, resp)
}
}
intermediateResps = append(intermediateResps, httpResp)
ids, err = r.getIdsFromResponses(intermediateResps, r.requestFactories[i+1].replace)
if err != nil {
return err
}
// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
p := newPublisher(trCtx, publisher, false, r.metrics, r.log)
r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, false, p)
n = p.eventCount()
} else {
if len(ids) == 0 {
n = 0
continue
}
urlCopy = rf.url
urlString = rf.url.String()
// new transform context for every chain step, derived from parent transform context
var chainTrCtx *transformContext
if rf.isChain {
chainTrCtx = trCtx.clone()
}
var val string
var doReplaceWith bool
var replaceArr []string
if rf.replaceWith != "" {
replaceArr = strings.Split(rf.replaceWith, ",")
val, doReplaceWith, err = fetchValueFromContext(chainTrCtx, strings.TrimSpace(replaceArr[1]))
if err != nil {
return err
}
}
// perform request over collected ids
for _, id := range ids {
// reformat urls of requestFactory using ids
rf.url, err = generateNewUrl(rf.replace, urlString, id)
if err != nil {
return fmt.Errorf("failed to generate new url: %w", err)
}
// reformat url accordingly if replaceWith clause exists
if doReplaceWith {
rf.url, err = generateNewUrl(strings.TrimSpace(replaceArr[0]), rf.url.String(), val)
if err != nil {
return fmt.Errorf("failed to generate new url with replacement: %w", err)
}
}
// collect data from new urls
httpResp, err = rf.collectResponse(ctx, chainTrCtx, r)
if err != nil {
return fmt.Errorf("failed to collect tail response %d: %w", i, err)
}
// store data according to response type
if i == len(r.requestFactories)-1 && len(ids) != 0 {
finalResps = append(finalResps, httpResp)
} else {
intermediateResps = append(intermediateResps, httpResp)
}
}
rf.url = urlCopy
var resps []*http.Response
if i == len(r.requestFactories)-1 {
resps = finalResps
} else {
// The if comdition (i < len(r.requestFactories)) ensures this branch never runs to the last element
// of r.requestFactories, therefore r.requestFactories[i+1] will never be out of bounds.
ids, err = r.getIdsFromResponses(intermediateResps, r.requestFactories[i+1].replace)
if err != nil {
return err
}
resps = intermediateResps
}
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log)
if rf.isChain {
rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p)
} else {
r.responseProcessors[i].startProcessing(ctx, trCtx, resps, true, p)
}
n += p.eventCount()
}
}
defer httpResp.Body.Close()
// if pagination exists for the parent request along with chaining, then for each page response the chain is processed
if isChainWithPageExpected {
n += r.processRemainingChainEvents(ctx, trCtx, publisher, initialResponse, chainIndex)
}
r.log.Infof("request finished: %d events published", n)
return nil
}