func()

in backend/helpers/pluginhelper/api/api_extractor_stateful.go [96:204]


func (extractor *StatefulApiExtractor[InputType]) Execute() errors.Error {
	// load data from database
	db := extractor.GetDal()
	logger := extractor.GetLogger()
	table := extractor.GetRawDataTable()
	params := extractor.GetRawDataParams()
	if !db.HasTable(table) {
		return nil
	}

	clauses := []dal.Clause{
		dal.Select("id"),
		dal.From(table),
		dal.Where("params = ?", params),
		dal.Orderby("id ASC"),
	}

	if extractor.IsIncremental() {
		since := extractor.GetSince()
		if since != nil {
			clauses = append(clauses, dal.Where("created_at >= ? ", since))
		}
	}
	clauses = append(clauses, dal.Where("created_at < ? ", extractor.GetUntil()))

	// first get total count for progress tracking
	count, err := db.Count(clauses...)
	if err != nil {
		return errors.Default.Wrap(err, "error getting count of records")
	}
	logger.Info("get data from %s where params=%s and got %d with clauses %+v", table, params, count, clauses)

	// get all IDs
	var ids []uint64
	err = db.Pluck("id", &ids, clauses...)
	if err != nil {
		return errors.Default.Wrap(err, "error getting IDs")
	}

	// batch save divider
	divider := NewBatchSaveDivider(extractor.SubTaskContext, extractor.GetBatchSize(), table, params)
	divider.SetIncrementalMode(extractor.IsIncremental())

	// progress
	extractor.SetProgress(0, -1)
	ctx := extractor.GetContext()

	// process each record individually by ID
	for _, id := range ids {
		select {
		case <-ctx.Done():
			return errors.Convert(ctx.Err())
		default:
		}

		// load full record by ID
		row := &RawData{}
		err := db.First(row, dal.From(table), dal.Where("id = ?", id))
		if err != nil {
			return errors.Default.Wrap(err, "error loading full row by ID")
		}

		body := new(InputType)
		err = errors.Convert(json.Unmarshal(row.Data, body))
		if err != nil {
			return err
		}

		if extractor.BeforeExtract != nil {
			err = extractor.BeforeExtract(body, extractor.SubtaskStateManager)
			if err != nil {
				return err
			}
		}

		results, err := extractor.Extract(body, row)
		if err != nil {
			return errors.Default.Wrap(err, "error calling plugin Extract implementation")
		}

		for _, result := range results {
			// get the batch operator for the specific type
			batch, err := divider.ForType(reflect.TypeOf(result))
			if err != nil {
				return errors.Default.Wrap(err, "error getting batch from result")
			}
			// set raw data origin field
			setRawDataOrigin(result, common.RawDataOrigin{
				RawDataTable:  table,
				RawDataParams: params,
				RawDataId:     row.ID,
			})
			// records get saved into db when slots were max outed
			err = batch.Add(result)
			if err != nil {
				return errors.Default.Wrap(err, "error adding result to batch")
			}
		}
		extractor.IncProgress(1)
	}

	// save the last batches
	err = divider.Close()
	if err != nil {
		return err
	}
	// save the incremental state
	return extractor.SubtaskStateManager.Close()
}