in backend/helpers/pluginhelper/api/graphql_collector.go [360:449]
func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData *GraphqlRequestData, handler func(query interface{}) errors.Error) {
if reqData.Pager == nil {
reqData.Pager = &CursorPager{
SkipCursor: nil,
Size: collector.args.PageSize,
}
}
query, variables, err := collector.args.BuildQuery(reqData)
if err != nil {
collector.checkError(errors.Default.Wrap(err, `graphql collector BuildQuery failed`))
return
}
logger := collector.args.Ctx.GetLogger()
dataErrors, err := collector.args.GraphqlClient.Query(query, variables)
if err != nil {
if err == context.Canceled {
// direct error message for error combine
collector.checkError(err)
} else {
collector.checkError(errors.Default.Wrap(err, `graphql query failed`))
}
return
}
if len(dataErrors) > 0 {
if collector.args.ResponseParserWithDataErrors == nil {
for _, dataError := range dataErrors {
collector.checkError(errors.Default.Wrap(dataError, `graphql query got error`))
}
return
}
// else: error will deal by ResponseParserWithDataErrors
}
defer logger.Debug("fetchAsync >>> done for %v %v", query, variables)
paramsBytes, err := json.Marshal(query)
if err != nil {
collector.checkError(errors.Default.Wrap(err, `graphql collector marshal query failed`))
return
}
db := collector.args.Ctx.GetDal()
queryStr, _ := graphql.ConstructQuery(query, variables)
variablesJson, err := json.Marshal(variables)
if err != nil {
collector.checkError(errors.Default.Wrap(err, `variables in graphql query can not marshal to json`))
return
}
row := &RawData{
Params: collector.params,
Data: paramsBytes,
Url: queryStr,
Input: variablesJson,
}
err = db.Create(row, dal.From(collector.table))
if err != nil {
collector.checkError(errors.Default.Wrap(err, `not created row table in graphql collector`))
return
}
var results []interface{}
if collector.args.ResponseParserWithDataErrors != nil {
results, err = collector.args.ResponseParserWithDataErrors(query, variables, dataErrors)
} else {
results, err = collector.args.ResponseParser(query, variables)
}
if err != nil {
if errors.Is(err, ErrFinishCollect) {
logger.Info("collector finish by parser, rawId: #%d", row.ID)
handler = nil
} else {
collector.checkError(errors.Default.Wrap(err, `not parsed response in graphql collector`))
return
}
}
err = collector.BatchSaveWithOrigin(divider, results, row)
if err != nil {
collector.checkError(err)
return
}
collector.args.Ctx.IncProgress(1)
if handler != nil {
// trigger next fetch, but return if ErrFinishCollect got from ResponseParser
err = handler(query)
if err != nil {
collector.checkError(errors.Default.Wrap(err, `handle failed in graphql collector`))
return
}
}
}