func()

in backend/helpers/pluginhelper/api/api_collector.go [411:504]


func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler func(int, []byte, *http.Response) errors.Error) {
	if reqData.Pager == nil {
		reqData.Pager = &Pager{
			Page: 1,
			Size: 100,
			Skip: 0,
		}
	}
	apiUrl, err := collector.generateUrl(reqData.Pager, reqData.Input)
	if err != nil {
		panic(err)
	}
	var apiQuery url.Values
	if collector.args.Query != nil {
		apiQuery, err = collector.args.Query(reqData)
		if err != nil {
			panic(err)
		}
	}
	var reqBody interface{}
	if collector.args.RequestBody != nil {
		reqBody = collector.args.RequestBody(reqData)
		if err != nil {
			panic(err)
		}
	}

	apiHeader := (http.Header)(nil)
	if collector.args.Header != nil {
		apiHeader, err = collector.args.Header(reqData)
		if err != nil {
			panic(err)
		}
	}
	logger := collector.args.Ctx.GetLogger()
	logger.Debug("fetchAsync <<< enqueueing for %s %v", apiUrl, apiQuery)
	responseHandler := func(res *http.Response) errors.Error {
		defer logger.Debug("fetchAsync >>> done for %s %v", apiUrl, apiQuery)
		logger := collector.args.Ctx.GetLogger()
		// read body to buffer
		body, err := io.ReadAll(res.Body)
		if err != nil {
			return errors.Default.Wrap(err, fmt.Sprintf("error reading response from %s", apiUrl))
		}
		res.Body.Close()
		res.Body = io.NopCloser(bytes.NewBuffer(body))
		// convert body to array of RawJSON
		items, err := collector.args.ResponseParser(res)
		if err != nil {
			if errors.Is(err, ErrFinishCollect) {
				logger.Info("a fetch stop by parser, reqInput: #%s", reqData.Params)
				handler = nil
			} else {
				return errors.Default.Wrap(err, fmt.Sprintf("error parsing response from %s", apiUrl))
			}
		}
		// save to db
		count := len(items)
		if count == 0 {
			collector.args.Ctx.IncProgress(1)
			return nil
		}
		db := collector.args.Ctx.GetDal()
		urlString := res.Request.URL.String()
		rows := make([]*RawData, count)
		for i, msg := range items {
			rows[i] = &RawData{
				Params: collector.params,
				Data:   msg,
				Url:    urlString,
				Input:  reqData.InputJSON,
			}
		}
		err = db.Create(rows, dal.From(collector.table))
		if err != nil {
			return errors.Default.Wrap(err, fmt.Sprintf("error inserting raw rows into %s", collector.table))
		}
		logger.Debug("fetchAsync === total %d rows were saved into database", count)
		// increase progress only when it was not nested
		collector.args.Ctx.IncProgress(1)
		if handler != nil {
			// trigger next fetch, but return if ErrFinishCollect got from ResponseParser
			res.Body = io.NopCloser(bytes.NewBuffer(body))
			return handler(count, body, res)
		}
		return nil
	}
	if collector.args.Method == http.MethodPost {
		collector.args.ApiClient.DoPostAsync(apiUrl, apiQuery, reqBody, apiHeader, responseHandler)
	} else {
		collector.args.ApiClient.DoGetAsync(apiUrl, apiQuery, apiHeader, responseHandler)
	}
	logger.Debug("fetchAsync === enqueued for %s %v", apiUrl, apiQuery)
}