in backend/helpers/pluginhelper/api/api_extractor_stateful.go [96:204]
func (extractor *StatefulApiExtractor[InputType]) Execute() errors.Error {
// load data from database
db := extractor.GetDal()
logger := extractor.GetLogger()
table := extractor.GetRawDataTable()
params := extractor.GetRawDataParams()
if !db.HasTable(table) {
return nil
}
clauses := []dal.Clause{
dal.Select("id"),
dal.From(table),
dal.Where("params = ?", params),
dal.Orderby("id ASC"),
}
if extractor.IsIncremental() {
since := extractor.GetSince()
if since != nil {
clauses = append(clauses, dal.Where("created_at >= ? ", since))
}
}
clauses = append(clauses, dal.Where("created_at < ? ", extractor.GetUntil()))
// first get total count for progress tracking
count, err := db.Count(clauses...)
if err != nil {
return errors.Default.Wrap(err, "error getting count of records")
}
logger.Info("get data from %s where params=%s and got %d with clauses %+v", table, params, count, clauses)
// get all IDs
var ids []uint64
err = db.Pluck("id", &ids, clauses...)
if err != nil {
return errors.Default.Wrap(err, "error getting IDs")
}
// batch save divider
divider := NewBatchSaveDivider(extractor.SubTaskContext, extractor.GetBatchSize(), table, params)
divider.SetIncrementalMode(extractor.IsIncremental())
// progress
extractor.SetProgress(0, -1)
ctx := extractor.GetContext()
// process each record individually by ID
for _, id := range ids {
select {
case <-ctx.Done():
return errors.Convert(ctx.Err())
default:
}
// load full record by ID
row := &RawData{}
err := db.First(row, dal.From(table), dal.Where("id = ?", id))
if err != nil {
return errors.Default.Wrap(err, "error loading full row by ID")
}
body := new(InputType)
err = errors.Convert(json.Unmarshal(row.Data, body))
if err != nil {
return err
}
if extractor.BeforeExtract != nil {
err = extractor.BeforeExtract(body, extractor.SubtaskStateManager)
if err != nil {
return err
}
}
results, err := extractor.Extract(body, row)
if err != nil {
return errors.Default.Wrap(err, "error calling plugin Extract implementation")
}
for _, result := range results {
// get the batch operator for the specific type
batch, err := divider.ForType(reflect.TypeOf(result))
if err != nil {
return errors.Default.Wrap(err, "error getting batch from result")
}
// set raw data origin field
setRawDataOrigin(result, common.RawDataOrigin{
RawDataTable: table,
RawDataParams: params,
RawDataId: row.ID,
})
// records get saved into db when slots were max outed
err = batch.Add(result)
if err != nil {
return errors.Default.Wrap(err, "error adding result to batch")
}
}
extractor.IncProgress(1)
}
// save the last batches
err = divider.Close()
if err != nil {
return err
}
// save the incremental state
return extractor.SubtaskStateManager.Close()
}