x-pack/filebeat/input/httpjson/request.go (783 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package httpjson import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "net/http" "net/url" "reflect" "strconv" "strings" "github.com/PaesslerAG/jsonpath" inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/mito/lib/xml" ) const requestNamespace = "request" type httpClient struct { client *http.Client limiter *rateLimiter } func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publisher inputcursor.Publisher) error { var ( n int ids []string err error urlCopy url.URL urlString string httpResp *http.Response initialResponse []*http.Response intermediateResps []*http.Response finalResps []*http.Response isChainWithPageExpected bool chainIndex int ) //nolint:bodyclose // response body is closed through drainBody method for i, rf := range r.requestFactories { finalResps = nil intermediateResps = nil // iterate over collected ids from last response if i == 0 { // perform and store regular call responses httpResp, err = rf.collectResponse(ctx, trCtx, r) if err != nil { return fmt.Errorf("failed to collect first response: %w", err) } if rf.saveFirstResponse { // store first response in transform context var bodyMap map[string]interface{} body, err := io.ReadAll(httpResp.Body) if err != nil { return fmt.Errorf("failed to read http response body: %w", err) } httpResp.Body = io.NopCloser(bytes.NewReader(body)) err = json.Unmarshal(body, &bodyMap) if err != nil { r.log.Errorf("unable to unmarshal first_response.body: %v", textContextError{error: err, body: body}) } firstResponse := response{ url: *httpResp.Request.URL, header: httpResp.Header.Clone(), body: bodyMap, } trCtx.updateFirstResponse(firstResponse) } if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) p := newPublisher(trCtx, publisher, true, r.metrics, r.log) r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, true, p) n = p.eventCount() continue } // if flow of control reaches here, that means there are more than 1 request factories // if a chain step exists, only then we will initialize flags & variables here which are required for chaining if r.requestFactories[i+1].isChain { chainIndex = i + 1 resp, err := cloneResponse(httpResp) if err != nil { return err } // the response is cloned and added to finalResps here, since the response of the 1st page (whether pagination exists or not), will // be sent for further processing to check if any response processors can be applied or not and at the same time update the last_response, // first_event & last_event cursor values. finalResps = append(finalResps, resp) // if a pagination request factory exists at the root level along with a chain step, only then we will initialize flags & variables here // which are required for chaining with root level pagination if r.responseProcessors[i].pagination.requestFactory != nil { isChainWithPageExpected = true resp, err := cloneResponse(httpResp) if err != nil { return err } initialResponse = append(initialResponse, resp) } } intermediateResps = append(intermediateResps, httpResp) ids, err = r.getIdsFromResponses(intermediateResps, r.requestFactories[i+1].replace) if err != nil { return err } // we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values p := newPublisher(trCtx, publisher, false, r.metrics, r.log) r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, false, p) n = p.eventCount() } else { if len(ids) == 0 { n = 0 continue } urlCopy = rf.url urlString = rf.url.String() // new transform context for every chain step, derived from parent transform context var chainTrCtx *transformContext if rf.isChain { chainTrCtx = trCtx.clone() } var val string var doReplaceWith bool var replaceArr []string if rf.replaceWith != "" { replaceArr = strings.Split(rf.replaceWith, ",") val, doReplaceWith, err = fetchValueFromContext(chainTrCtx, strings.TrimSpace(replaceArr[1])) if err != nil { return err } } // perform request over collected ids for _, id := range ids { // reformat urls of requestFactory using ids rf.url, err = generateNewUrl(rf.replace, urlString, id) if err != nil { return fmt.Errorf("failed to generate new url: %w", err) } // reformat url accordingly if replaceWith clause exists if doReplaceWith { rf.url, err = generateNewUrl(strings.TrimSpace(replaceArr[0]), rf.url.String(), val) if err != nil { return fmt.Errorf("failed to generate new url with replacement: %w", err) } } // collect data from new urls httpResp, err = rf.collectResponse(ctx, chainTrCtx, r) if err != nil { return fmt.Errorf("failed to collect tail response %d: %w", i, err) } // store data according to response type if i == len(r.requestFactories)-1 && len(ids) != 0 { finalResps = append(finalResps, httpResp) } else { intermediateResps = append(intermediateResps, httpResp) } } rf.url = urlCopy var resps []*http.Response if i == len(r.requestFactories)-1 { resps = finalResps } else { // The if comdition (i < len(r.requestFactories)) ensures this branch never runs to the last element // of r.requestFactories, therefore r.requestFactories[i+1] will never be out of bounds. ids, err = r.getIdsFromResponses(intermediateResps, r.requestFactories[i+1].replace) if err != nil { return err } resps = intermediateResps } p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log) if rf.isChain { rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p) } else { r.responseProcessors[i].startProcessing(ctx, trCtx, resps, true, p) } n += p.eventCount() } } defer httpResp.Body.Close() // if pagination exists for the parent request along with chaining, then for each page response the chain is processed if isChainWithPageExpected { n += r.processRemainingChainEvents(ctx, trCtx, publisher, initialResponse, chainIndex) } r.log.Infof("request finished: %d events published", n) return nil } // collectResponse returns response from provided request func (rf *requestFactory) collectResponse(ctx context.Context, trCtx *transformContext, r *requester) (*http.Response, error) { var err error var httpResp *http.Response req, err := rf.newHTTPRequest(ctx, trCtx) if err != nil { return nil, fmt.Errorf("failed to create http request: %w", err) } if rf.isChain && rf.chainClient != nil { httpResp, err = rf.chainClient.do(ctx, req) if err != nil { return nil, fmt.Errorf("failed to execute chain http %s: %w", req.Method, err) } } else { httpResp, err = r.client.do(ctx, req) if err != nil { return nil, fmt.Errorf("failed to execute http %s: %w", req.Method, err) } } return httpResp, nil } func (c *httpClient) do(ctx context.Context, req *http.Request) (*http.Response, error) { resp, err := c.limiter.execute(ctx, func() (*http.Response, error) { resp, err := c.client.Do(req) if err == nil { // Read the whole resp.Body so we can release the connection. // This implementation is inspired by httputil.DumpResponse resp.Body, err = drainBody(resp.Body) } return resp, err }) if err != nil { return nil, err } if resp.StatusCode >= http.StatusBadRequest { body, _ := io.ReadAll(resp.Body) if len(body) == 0 { return nil, fmt.Errorf("server responded with status code %d", resp.StatusCode) } return nil, fmt.Errorf("server responded with status code %d: %s", resp.StatusCode, body) } return resp, nil } type requestFactory struct { chainClient *httpClient url url.URL method string body *mapstr.M transforms []basicTransform user string password string encoder encoderFunc replace string replaceWith string isChain bool until *valueTpl chainResponseProcessor *responseProcessor saveFirstResponse bool log *logp.Logger } func newRequestFactory(ctx context.Context, config config, log *logp.Logger, metrics *inputMetrics, reg *monitoring.Registry) ([]*requestFactory, error) { // config validation already checked for errors here rfs := make([]*requestFactory, 0, len(config.Chain)+1) ts, _ := newBasicTransformsFromConfig(registeredTransforms, config.Request.Transforms, requestNamespace, log) // regular call requestFactory object rf := &requestFactory{ url: *config.Request.URL.URL, method: config.Request.Method, body: config.Request.Body, transforms: ts, log: log, encoder: registeredEncoders[config.Request.EncodeAs], saveFirstResponse: config.Response.SaveFirstResponse, } if config.Auth != nil && config.Auth.Basic.isEnabled() { rf.user = config.Auth.Basic.User rf.password = config.Auth.Basic.Password } var xmlDetails map[string]xml.Detail if config.Response.XSD != "" { var err error xmlDetails, err = xml.Details([]byte(config.Response.XSD)) if err != nil { log.Errorf("error while collecting xml decoder type hints: %v", err) return nil, err } } rfs = append(rfs, rf) for _, ch := range config.Chain { var rf *requestFactory // chain calls requestFactory object if ch.Step != nil { ts, _ := newBasicTransformsFromConfig(registeredTransforms, ch.Step.Request.Transforms, requestNamespace, log) ch.Step.Auth = tryAssignAuth(config.Auth, ch.Step.Auth) client, err := newChainHTTPClient(ctx, ch.Step.Auth, ch.Step.Request, log, reg) if err != nil { return nil, fmt.Errorf("failed in creating chain http client with error: %w", err) } responseProcessor := newChainResponseProcessor(ch, client, xmlDetails, metrics, log) rf = &requestFactory{ url: *ch.Step.Request.URL.URL, method: ch.Step.Request.Method, body: ch.Step.Request.Body, transforms: ts, log: log, encoder: registeredEncoders[config.Request.EncodeAs], replace: ch.Step.Replace, replaceWith: ch.Step.ReplaceWith, isChain: true, chainClient: client, chainResponseProcessor: responseProcessor, } if ch.Step.Auth != nil && ch.Step.Auth.Basic.isEnabled() { rf.user = ch.Step.Auth.Basic.User rf.password = ch.Step.Auth.Basic.Password } } else if ch.While != nil { ts, _ := newBasicTransformsFromConfig(registeredTransforms, ch.While.Request.Transforms, requestNamespace, log) policy := newHTTPPolicy(evaluateResponse, ch.While.Until, log) ch.While.Auth = tryAssignAuth(config.Auth, ch.While.Auth) client, err := newChainHTTPClient(ctx, ch.While.Auth, ch.While.Request, log, reg, policy) if err != nil { return nil, fmt.Errorf("failed in creating chain http client with error: %w", err) } responseProcessor := newChainResponseProcessor(ch, client, xmlDetails, metrics, log) rf = &requestFactory{ url: *ch.While.Request.URL.URL, method: ch.While.Request.Method, body: ch.While.Request.Body, transforms: ts, log: log, encoder: registeredEncoders[config.Request.EncodeAs], replace: ch.While.Replace, replaceWith: ch.While.ReplaceWith, until: ch.While.Until, isChain: true, chainClient: client, chainResponseProcessor: responseProcessor, } if ch.While.Auth != nil && ch.While.Auth.Basic.isEnabled() { rf.user = ch.While.Auth.Basic.User rf.password = ch.While.Auth.Basic.Password } } rfs = append(rfs, rf) } return rfs, nil } func evaluateResponse(expression *valueTpl, data []byte, log *logp.Logger) (bool, error) { var dataMap mapstr.M err := json.Unmarshal(data, &dataMap) if err != nil { return false, fmt.Errorf("error while unmarshalling data: %w", textContextError{error: err, body: data}) } tr := transformable{} paramCtx := &transformContext{ firstEvent: &mapstr.M{}, lastEvent: &mapstr.M{}, firstResponse: &response{}, lastResponse: &response{body: dataMap}, } val, err := expression.Execute(paramCtx, tr, "response_evaluation", nil, log) if err != nil { return false, fmt.Errorf("error while evaluating expression: %w", err) } result, err := strconv.ParseBool(val) if err != nil { return false, fmt.Errorf("error while parsing boolean value of string: %w", err) } return result, nil } func tryAssignAuth(parentConfig *authConfig, childConfig *authConfig) *authConfig { if parentConfig != nil && childConfig == nil { return parentConfig } return childConfig } func (rf *requestFactory) newHTTPRequest(ctx context.Context, trCtx *transformContext) (*http.Request, error) { trReq, err := rf.newRequest(trCtx) if err != nil { return nil, err } var body []byte if rf.method == http.MethodPost { if rf.encoder != nil { body, err = rf.encoder(trReq) } else { body, err = encode(trReq.header().Get("Content-Type"), trReq) } if err != nil { return nil, err } } url := trReq.url() req, err := http.NewRequest(rf.method, url.String(), bytes.NewBuffer(body)) if err != nil { return nil, err } req = req.WithContext(ctx) req.Header = trReq.header().Clone() if rf.user != "" || rf.password != "" { req.SetBasicAuth(rf.user, rf.password) } return req, nil } func (rf *requestFactory) newRequest(ctx *transformContext) (transformable, error) { req := transformable{} req.setURL(rf.url) if rf.body != nil { req.setBody(rf.body.Clone()) } header := http.Header{} header.Set("Accept", "application/json") header.Set("User-Agent", userAgent) req.setHeader(header) var err error for _, t := range rf.transforms { req, err = t.run(ctx, req) if err != nil { return transformable{}, err } } if rf.method == http.MethodPost { header = req.header() if header.Get("Content-Type") == "" { header.Set("Content-Type", "application/json") req.setHeader(header) } } rf.log.Debugw("new request", "req", redact{value: mapstrM(req), fields: []string{"header.Authorization"}}) return req, nil } type requester struct { client *httpClient requestFactories []*requestFactory responseProcessors []*responseProcessor metrics *inputMetrics log *logp.Logger } func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, metrics *inputMetrics, log *logp.Logger) *requester { return &requester{ client: client, requestFactories: reqs, responseProcessors: resps, metrics: metrics, log: log, } } // getIdsFromResponses returns ids from responses func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, replace string) ([]string, error) { var b []byte var ids []string var err error // collect ids from all responses for _, resp := range intermediateResps { if resp.Body != nil { b, err = io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("error while reading response body: %w", err) } } // gracefully close response err = resp.Body.Close() if err != nil { return nil, fmt.Errorf("error closing response body: %w", err) } // get replace values from collected json var v interface{} if err := json.Unmarshal(b, &v); err != nil { return nil, fmt.Errorf("cannot unmarshal data: %w", textContextError{error: err, body: b}) } values, err := jsonpath.Get(replace, v) if err != nil { return nil, fmt.Errorf("error while getting keys: %w", err) } switch tresp := values.(type) { case []interface{}: for _, v := range tresp { switch v.(type) { case float64, string: ids = append(ids, fmt.Sprintf("%v", v)) default: r.log.Errorf("events must a number or string, but got %T: skipping", v) continue } } case float64, string: ids = append(ids, fmt.Sprintf("%v", tresp)) default: r.log.Errorf("cannot collect IDs from type %T: %v", values, values) } } return ids, nil } // processRemainingChainEvents, processes the remaining pagination events for chain blocks func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int { // we start from 0, and skip the 1st event since we have already processed it p := newChainProcessor(r, trCtx, publisher, chainIndex) r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, p) return p.eventCount() } // chainProcessor is a chained processing handler. type chainProcessor struct { req *requester trCtx *transformContext pub inputcursor.Publisher idx int tail bool n int } func newChainProcessor(req *requester, trCtx *transformContext, pub inputcursor.Publisher, idx int) *chainProcessor { return &chainProcessor{ req: req, trCtx: trCtx, pub: pub, idx: idx, } } // handleEvents processes msg as a request body in an execution chain. func (p *chainProcessor) handleEvent(ctx context.Context, msg mapstr.M) { if !p.tail { // Skip first event as it has already been processed. p.tail = true return } var response http.Response response.StatusCode = 200 body := new(bytes.Buffer) // we construct a new response here from each of the pagination events err := json.NewEncoder(body).Encode(msg) if err != nil { p.req.log.Errorf("error processing chain event: %v", err) return } response.Body = io.NopCloser(body) // updates the cursor for pagination last_event & last_response when chaining is present p.trCtx.updateLastEvent(msg) p.trCtx.updateCursor() // for each pagination response, we repeat all the chain steps / blocks n, err := p.req.processChainPaginationEvents(ctx, p.trCtx, p.pub, &response, p.idx, p.req.log) if err != nil { if errors.Is(err, notLogged{}) { p.req.log.Debugf("ignored error processing chain event: %v", err) return } p.req.log.Errorf("error processing chain event: %v", err) return } p.n += n err = response.Body.Close() if err != nil { p.req.log.Errorf("error closing http response body: %v", err) } } func (p *chainProcessor) handleError(err error) { if errors.Is(err, notLogged{}) { p.req.log.Debugf("ignored error processing response: %v", err) return } p.req.log.Errorf("error processing response: %v", err) } // notLogged is an error that is not logged except at DEBUG. type notLogged struct { error } func (notLogged) Is(target error) bool { _, ok := target.(notLogged) return ok } // eventCount returns the number of events that have been processed. func (p *chainProcessor) eventCount() int { return p.n } // processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input // //nolint:bodyclose // response body is closed through drainBody method func (r *requester) processChainPaginationEvents(ctx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, response *http.Response, chainIndex int, log *logp.Logger) (int, error) { var ( n int ids []string err error urlCopy url.URL urlString string httpResp *http.Response intermediateResps []*http.Response finalResps []*http.Response ) intermediateResps = append(intermediateResps, response) ids, err = r.getIdsFromResponses(intermediateResps, r.requestFactories[chainIndex].replace) if err != nil { return -1, err } for i := chainIndex; i < len(r.requestFactories); i++ { finalResps = nil intermediateResps = nil rf := r.requestFactories[i] if len(ids) == 0 { n = 0 continue } urlCopy = rf.url urlString = rf.url.String() // new transform context for every chain step, derived from parent transform context chainTrCtx := trCtx.clone() var val string var doReplaceWith bool var replaceArr []string if rf.replaceWith != "" { replaceArr = strings.Split(rf.replaceWith, ",") val, doReplaceWith, err = fetchValueFromContext(chainTrCtx, strings.TrimSpace(replaceArr[1])) if err != nil { return n, err } } // perform request over collected ids for _, id := range ids { // reformat urls of requestFactory using ids rf.url, err = generateNewUrl(rf.replace, urlString, id) if err != nil { return -1, fmt.Errorf("failed to generate new url for chain: %w", err) } // reformat url accordingly if replaceWith clause exists if doReplaceWith { rf.url, err = generateNewUrl(strings.TrimSpace(replaceArr[0]), rf.url.String(), val) if err != nil { return n, fmt.Errorf("failed to generate new url for chain replacement: %w", err) } } // collect data from new urls httpResp, err = rf.collectResponse(ctx, chainTrCtx, r) if err != nil { return -1, fmt.Errorf("failed to collect response: %w", err) } // store data according to response type if i == len(r.requestFactories)-1 && len(ids) != 0 { finalResps = append(finalResps, httpResp) } else { intermediateResps = append(intermediateResps, httpResp) } } rf.url = urlCopy var resps []*http.Response if i == len(r.requestFactories)-1 { resps = finalResps } else { // The if comdition (i < len(r.requestFactories)) ensures this branch never runs to the last element // of r.requestFactories, therefore r.requestFactories[i+1] will never be out of bounds. ids, err = r.getIdsFromResponses(intermediateResps, r.requestFactories[i+1].replace) if err != nil { return -1, err } resps = intermediateResps } p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log) rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p) n += p.eventCount() } return n, nil } // generateNewUrl returns new url value using replacement from oldUrl with ids. // If oldUrl is an opaque URL, the scheme: is dropped and the remaining string // is used as the replacement target. For example // // placeholder:$.result[:] // // becomes // // $.result[:] // // which is now the replacement target. func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) { u, err := url.Parse(oldUrl) if err != nil { return url.URL{}, err } if u.Opaque != "" { oldUrl = u.Opaque } newUrl, err := url.Parse(strings.Replace(oldUrl, replacement, id, 1)) if err != nil { return url.URL{}, fmt.Errorf("failed to replace value in url: %w", err) } return *newUrl, nil } // publisher is an event publication handler. type publisher struct { trCtx *transformContext pub inputcursor.Publisher n int log *logp.Logger metrics *inputMetrics } func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, metrics *inputMetrics, log *logp.Logger) *publisher { if !publish { pub = nil } return &publisher{ trCtx: trCtx, pub: pub, log: log, } } // handleEvent publishes msg to the publishers backing inputcursor.Publisher. func (p *publisher) handleEvent(_ context.Context, msg mapstr.M) { if p.pub != nil { event, err := makeEvent(msg) if err != nil { p.log.Errorf("error creating event: %v: %v", msg, err) return } if err := p.pub.Publish(event, p.trCtx.cursorMap()); err != nil { p.log.Errorf("error publishing event: %v", err) return } } if len(*p.trCtx.firstEventClone()) == 0 { p.trCtx.updateFirstEvent(msg) } p.trCtx.updateLastEvent(msg) p.trCtx.updateCursor() p.metrics.addEventsPublished(1) p.n++ } // handleError logs err. func (p *publisher) handleError(err error) { if errors.Is(err, notLogged{}) { p.log.Debugf("ignored error processing response: %v", err) return } p.log.Errorf("error processing response: %v", err) } // eventCount returns the number of successfully published events. func (p *publisher) eventCount() int { return p.n } const ( // This is generally updated with chain responses, if present, as they continue to occur // Otherwise this is always the last response of the root request w.r.t pagination lastResponse = "last_response" // This is always the first root response firstResponse = "first_response" // This is always the last response of the parent (root) request w.r.t pagination // This is only set if chaining is used parentLastResponse = "parent_last_response" ) func fetchValueFromContext(trCtx *transformContext, expression string) (string, bool, error) { var val interface{} switch keys := processExpression(expression); keys[0] { case lastResponse: respMap, err := responseToMap(trCtx.lastResponse) if err != nil { return "", false, err } val, err = iterateRecursive(respMap, keys[1:], 0) if err != nil { return "", false, err } case parentLastResponse: respMap, err := responseToMap(trCtx.parentTrCtx.lastResponse) if err != nil { return "", false, err } val, err = iterateRecursive(respMap, keys[1:], 0) if err != nil { return "", false, err } case firstResponse: // since first response body is already a map, we do not need to transform it respMap, err := responseToMap(trCtx.firstResponse) if err != nil { return "", false, err } val, err = iterateRecursive(respMap, keys[1:], 0) if err != nil { return "", false, err } // In this scenario we treat the expression as a hardcoded value, with which we will replace the fixed-pattern case expression: return expression, true, nil default: return "", false, fmt.Errorf("context value not supported for key: %q in expression %q", keys[0], expression) } return fmt.Sprint(val), true, nil } // processExpression, splits the expression string based on the separator and looks for // supported keywords. If present, returns an expression array containing separated elements. // If no keywords are present, the expression is treated as a hardcoded value and returned // as a merged string which is the only array element. func processExpression(expression string) []string { if !strings.HasPrefix(expression, ".") { return []string{expression} } switch { case strings.HasPrefix(expression, "."+firstResponse+"."), strings.HasPrefix(expression, "."+lastResponse+"."), strings.HasPrefix(expression, "."+parentLastResponse+"."): return strings.Split(expression, ".")[1:] default: return []string{expression} } } func responseToMap(r *response) (mapstr.M, error) { if r.body == nil { return nil, fmt.Errorf("response body is empty for request url: %s", &r.url) } respMap := map[string]interface{}{ "header": make(mapstr.M), "body": r.body, } for key, value := range r.header { respMap["header"] = mapstr.M{ key: value, } } return respMap, nil } func iterateRecursive(m mapstr.M, keys []string, depth int) (interface{}, error) { val := m[keys[depth]] if val == nil { return nil, fmt.Errorf("value of expression could not be determined for key %s", strings.Join(keys[:depth+1], ".")) } switch v := reflect.ValueOf(val); v.Kind() { case reflect.Bool: return v.Bool(), nil case reflect.Int, reflect.Int8, reflect.Int32, reflect.Int64: return v.Int(), nil case reflect.Uint, reflect.Uint8, reflect.Uint32, reflect.Uint64: return v.Uint(), nil case reflect.Float32, reflect.Float64: return v.Float(), nil case reflect.String: return v.String(), nil case reflect.Map: nextMap, ok := v.Interface().(map[string]interface{}) if !ok { return nil, errors.New("unable to parse the value of the given expression") } depth++ if depth >= len(keys) { return nil, errors.New("value of expression could not be determined") } return iterateRecursive(nextMap, keys, depth) default: return nil, fmt.Errorf("unable to parse the value of the expression %s: type %T is not handled", strings.Join(keys[:depth+1], "."), val) } } // cloneResponse clones required http response attributes func cloneResponse(source *http.Response) (*http.Response, error) { var resp http.Response body, err := io.ReadAll(source.Body) if err != nil { return nil, fmt.Errorf("failed ro read http response body: %w", err) } source.Body = io.NopCloser(bytes.NewReader(body)) resp.Body = io.NopCloser(bytes.NewReader(body)) resp.ContentLength = source.ContentLength resp.Header = source.Header resp.Trailer = source.Trailer resp.StatusCode = source.StatusCode resp.Request = source.Request.Clone(source.Request.Context()) return &resp, nil } // drainBody reads all of b to memory and then returns a equivalent // ReadCloser yielding the same bytes. // // It returns an error if the initial slurp of all bytes fails. It does not attempt // to make the returned ReadCloser have identical error-matching behavior. // // This function is a modified version of drainBody from the http/httputil package. func drainBody(b io.ReadCloser) (r1 io.ReadCloser, err error) { defer b.Close() if b == nil || b == http.NoBody { // No copying needed. Preserve the magic sentinel meaning of NoBody. return http.NoBody, nil } var buf bytes.Buffer if _, err = buf.ReadFrom(b); err != nil { return b, fmt.Errorf("failed to read http.response.body: %w", err) } if err = b.Close(); err != nil { return b, fmt.Errorf("failed to close http.response.body: %w", err) } return io.NopCloser(&buf), nil }