in backend/helpers/pluginhelper/api/api_collector_with_state.go [145:260]
func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArgs) (plugin.SubTask, errors.Error) {
// create a manager which could execute multiple collector but acts as a single subtask to callers
manager, err := NewStatefulApiCollector(RawDataSubTaskArgs{
Ctx: args.Ctx,
Options: args.Options,
Params: args.Params,
Table: args.Table,
}, args.TimeAfter)
if err != nil {
return nil, err
}
// // prepare the basic variables
var isIncremental = manager.IsIncremental()
var createdAfter *time.Time
if isIncremental {
createdAfter = manager.LatestState.LatestSuccessStart
} else {
createdAfter = manager.TimeAfter
}
// step 1: create a collector to collect newly added records
err = manager.InitCollector(ApiCollectorArgs{
ApiClient: args.ApiClient,
// common
Incremental: isIncremental,
UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
Query: func(reqData *RequestData) (url.Values, errors.Error) {
if args.CollectNewRecordsByList.Query != nil {
return args.CollectNewRecordsByList.Query(reqData, createdAfter)
}
return nil, nil
},
Header: func(reqData *RequestData) (http.Header, errors.Error) {
if args.CollectNewRecordsByList.Header != nil {
return args.CollectNewRecordsByList.Header(reqData, createdAfter)
}
return nil, nil
},
MinTickInterval: args.CollectNewRecordsByList.MinTickInterval,
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
items, err := args.CollectNewRecordsByList.ResponseParser(res)
if err != nil {
return nil, err
}
if len(items) == 0 {
return nil, nil
}
// time filter or diff sync
if createdAfter != nil && args.CollectNewRecordsByList.GetCreated != nil {
// if the first record of the page was created before createdAfter, return emtpy set and stop
firstCreated, err := args.CollectNewRecordsByList.GetCreated(items[0])
if err != nil {
return nil, err
}
if firstCreated.Before(*createdAfter) {
return nil, ErrFinishCollect
}
// if the last record was created before createdAfter, return records and stop
lastCreated, err := args.CollectNewRecordsByList.GetCreated(items[len(items)-1])
if err != nil {
return nil, err
}
if lastCreated.Before(*createdAfter) {
return items, ErrFinishCollect
}
}
return items, err
},
AfterResponse: args.CollectNewRecordsByList.AfterResponse,
RequestBody: args.CollectNewRecordsByList.RequestBody,
Method: args.CollectNewRecordsByList.Method,
// pagination
PageSize: args.CollectNewRecordsByList.PageSize,
Concurrency: args.CollectNewRecordsByList.Concurrency,
GetNextPageCustomData: args.CollectNewRecordsByList.GetNextPageCustomData,
GetTotalPages: args.CollectNewRecordsByList.GetTotalPages,
})
if err != nil {
return nil, err
}
// step 2: create another collector to collect updated records
// TODO: this creates cursor before previous step gets executed, which is too early, to be optimized
input, err := args.CollectUnfinishedDetails.BuildInputIterator()
if err != nil {
return nil, err
}
err = manager.InitCollector(ApiCollectorArgs{
ApiClient: args.ApiClient,
// common
Incremental: true,
Input: input,
UrlTemplate: args.CollectUnfinishedDetails.UrlTemplate,
Query: func(reqData *RequestData) (url.Values, errors.Error) {
if args.CollectUnfinishedDetails.Query != nil {
return args.CollectUnfinishedDetails.Query(reqData, createdAfter)
}
return nil, nil
},
Header: func(reqData *RequestData) (http.Header, errors.Error) {
if args.CollectUnfinishedDetails.Header != nil {
return args.CollectUnfinishedDetails.Header(reqData, createdAfter)
}
return nil, nil
},
MinTickInterval: args.CollectUnfinishedDetails.MinTickInterval,
ResponseParser: args.CollectUnfinishedDetails.ResponseParser,
AfterResponse: args.CollectUnfinishedDetails.AfterResponse,
RequestBody: args.CollectUnfinishedDetails.RequestBody,
Method: args.CollectUnfinishedDetails.Method,
})
return manager, err
}