func()

in backend/helpers/pluginhelper/api/graphql_collector.go [360:449]


func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData *GraphqlRequestData, handler func(query interface{}) errors.Error) {
	if reqData.Pager == nil {
		reqData.Pager = &CursorPager{
			SkipCursor: nil,
			Size:       collector.args.PageSize,
		}
	}
	query, variables, err := collector.args.BuildQuery(reqData)
	if err != nil {
		collector.checkError(errors.Default.Wrap(err, `graphql collector BuildQuery failed`))
		return
	}

	logger := collector.args.Ctx.GetLogger()
	dataErrors, err := collector.args.GraphqlClient.Query(query, variables)
	if err != nil {
		if err == context.Canceled {
			// direct error message for error combine
			collector.checkError(err)
		} else {
			collector.checkError(errors.Default.Wrap(err, `graphql query failed`))
		}
		return
	}
	if len(dataErrors) > 0 {
		if collector.args.ResponseParserWithDataErrors == nil {
			for _, dataError := range dataErrors {
				collector.checkError(errors.Default.Wrap(dataError, `graphql query got error`))
			}
			return
		}
		// else: error will deal by ResponseParserWithDataErrors
	}
	defer logger.Debug("fetchAsync >>> done for %v %v", query, variables)

	paramsBytes, err := json.Marshal(query)
	if err != nil {
		collector.checkError(errors.Default.Wrap(err, `graphql collector marshal query failed`))
		return
	}
	db := collector.args.Ctx.GetDal()
	queryStr, _ := graphql.ConstructQuery(query, variables)
	variablesJson, err := json.Marshal(variables)
	if err != nil {
		collector.checkError(errors.Default.Wrap(err, `variables in graphql query can not marshal to json`))
		return
	}
	row := &RawData{
		Params: collector.params,
		Data:   paramsBytes,
		Url:    queryStr,
		Input:  variablesJson,
	}
	err = db.Create(row, dal.From(collector.table))
	if err != nil {
		collector.checkError(errors.Default.Wrap(err, `not created row table in graphql collector`))
		return
	}

	var results []interface{}
	if collector.args.ResponseParserWithDataErrors != nil {
		results, err = collector.args.ResponseParserWithDataErrors(query, variables, dataErrors)
	} else {
		results, err = collector.args.ResponseParser(query, variables)
	}
	if err != nil {
		if errors.Is(err, ErrFinishCollect) {
			logger.Info("collector finish by parser, rawId: #%d", row.ID)
			handler = nil
		} else {
			collector.checkError(errors.Default.Wrap(err, `not parsed response in graphql collector`))
			return
		}
	}
	err = collector.BatchSaveWithOrigin(divider, results, row)
	if err != nil {
		collector.checkError(err)
		return
	}

	collector.args.Ctx.IncProgress(1)
	if handler != nil {
		// trigger next fetch, but return if ErrFinishCollect got from ResponseParser
		err = handler(query)
		if err != nil {
			collector.checkError(errors.Default.Wrap(err, `handle failed in graphql collector`))
			return
		}
	}
}