in backend/helpers/pluginhelper/api/api_collector.go [411:504]
func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler func(int, []byte, *http.Response) errors.Error) {
if reqData.Pager == nil {
reqData.Pager = &Pager{
Page: 1,
Size: 100,
Skip: 0,
}
}
apiUrl, err := collector.generateUrl(reqData.Pager, reqData.Input)
if err != nil {
panic(err)
}
var apiQuery url.Values
if collector.args.Query != nil {
apiQuery, err = collector.args.Query(reqData)
if err != nil {
panic(err)
}
}
var reqBody interface{}
if collector.args.RequestBody != nil {
reqBody = collector.args.RequestBody(reqData)
if err != nil {
panic(err)
}
}
apiHeader := (http.Header)(nil)
if collector.args.Header != nil {
apiHeader, err = collector.args.Header(reqData)
if err != nil {
panic(err)
}
}
logger := collector.args.Ctx.GetLogger()
logger.Debug("fetchAsync <<< enqueueing for %s %v", apiUrl, apiQuery)
responseHandler := func(res *http.Response) errors.Error {
defer logger.Debug("fetchAsync >>> done for %s %v", apiUrl, apiQuery)
logger := collector.args.Ctx.GetLogger()
// read body to buffer
body, err := io.ReadAll(res.Body)
if err != nil {
return errors.Default.Wrap(err, fmt.Sprintf("error reading response from %s", apiUrl))
}
res.Body.Close()
res.Body = io.NopCloser(bytes.NewBuffer(body))
// convert body to array of RawJSON
items, err := collector.args.ResponseParser(res)
if err != nil {
if errors.Is(err, ErrFinishCollect) {
logger.Info("a fetch stop by parser, reqInput: #%s", reqData.Params)
handler = nil
} else {
return errors.Default.Wrap(err, fmt.Sprintf("error parsing response from %s", apiUrl))
}
}
// save to db
count := len(items)
if count == 0 {
collector.args.Ctx.IncProgress(1)
return nil
}
db := collector.args.Ctx.GetDal()
urlString := res.Request.URL.String()
rows := make([]*RawData, count)
for i, msg := range items {
rows[i] = &RawData{
Params: collector.params,
Data: msg,
Url: urlString,
Input: reqData.InputJSON,
}
}
err = db.Create(rows, dal.From(collector.table))
if err != nil {
return errors.Default.Wrap(err, fmt.Sprintf("error inserting raw rows into %s", collector.table))
}
logger.Debug("fetchAsync === total %d rows were saved into database", count)
// increase progress only when it was not nested
collector.args.Ctx.IncProgress(1)
if handler != nil {
// trigger next fetch, but return if ErrFinishCollect got from ResponseParser
res.Body = io.NopCloser(bytes.NewBuffer(body))
return handler(count, body, res)
}
return nil
}
if collector.args.Method == http.MethodPost {
collector.args.ApiClient.DoPostAsync(apiUrl, apiQuery, reqBody, apiHeader, responseHandler)
} else {
collector.args.ApiClient.DoGetAsync(apiUrl, apiQuery, apiHeader, responseHandler)
}
logger.Debug("fetchAsync === enqueued for %s %v", apiUrl, apiQuery)
}