in backend/helpers/pluginhelper/api/api_collector.go [150:230]
func (collector *ApiCollector) Execute() errors.Error {
logger := collector.args.Ctx.GetLogger()
logger.Info("start api collection")
// make sure table is created
db := collector.args.Ctx.GetDal()
err := collector.ensureRawTable(collector.table)
if err != nil {
return errors.Default.Wrap(err, "error auto-migrating collector")
}
// flush data if not incremental collection
if !collector.args.Incremental {
err = db.Delete(&RawData{}, dal.From(collector.table), dal.Where("params = ?", collector.params))
if err != nil {
return errors.Default.Wrap(err, "error deleting data from collector")
}
}
// if MinTickInterval was specified
if collector.args.MinTickInterval != nil {
minTickInterval := *collector.args.MinTickInterval
if minTickInterval <= time.Duration(0) {
return errors.Default.Wrap(err, "MinTickInterval must be greater than 0")
}
oldTickInterval := collector.args.ApiClient.GetTickInterval()
if oldTickInterval < minTickInterval {
// reset the tick interval only if it exceeded the specified limit
logger.Info("set tick interval to %v", minTickInterval.String())
collector.args.ApiClient.Reset(minTickInterval)
defer func() {
logger.Info("restore tick interval to %v", oldTickInterval.String())
collector.args.ApiClient.Reset(oldTickInterval)
}()
}
}
collector.args.Ctx.SetProgress(0, -1)
if collector.args.Input != nil {
iterator := collector.args.Input
defer iterator.Close()
apiClient := collector.args.ApiClient
if apiClient == nil {
return errors.Default.New("api_collector can not Execute with nil apiClient")
}
for {
if !iterator.HasNext() || apiClient.HasError() {
err = collector.args.ApiClient.WaitAsync()
if err != nil {
return err
}
if !iterator.HasNext() || apiClient.HasError() {
break
}
}
var input interface{}
input, err = iterator.Fetch()
if err != nil {
break
}
collector.exec(input)
}
} else {
// or we just did it once
collector.exec(nil)
}
if err != nil {
return errors.Default.Wrap(err, "error executing collector")
}
logger.Debug("wait for all async api to be finished")
err = collector.args.ApiClient.WaitAsync()
if err != nil {
logger.Error(err, "end api collection error")
err = errors.Default.Wrap(err, "Error waiting for async Collector execution")
} else {
logger.Info("end api collection without error")
}
return err
}