in backend/helpers/pluginhelper/api/api_collector_stateful.go [127:269]
func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArgs) (plugin.SubTask, errors.Error) {
// create a manager which could execute multiple collector but acts as a single subtask to callers
manager, err := NewStatefulApiCollector(RawDataSubTaskArgs{
Ctx: args.Ctx,
Options: args.Options,
Params: args.Params,
Table: args.Table,
})
if err != nil {
return nil, err
}
createdAfter := manager.CollectorStateManager.GetSince()
isIncremental := manager.CollectorStateManager.IsIncremental()
var inputIterator Iterator
if args.CollectNewRecordsByList.BuildInputIterator != nil {
inputIterator, err = args.CollectNewRecordsByList.BuildInputIterator(isIncremental, createdAfter)
if err != nil {
return nil, err
}
}
// step 1: create a collector to collect newly added records
err = manager.InitCollector(ApiCollectorArgs{
ApiClient: args.ApiClient,
// common
Input: inputIterator,
Incremental: isIncremental,
UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
Query: func(reqData *RequestData) (url.Values, errors.Error) {
if args.CollectNewRecordsByList.Query != nil {
return args.CollectNewRecordsByList.Query(reqData, createdAfter)
}
return nil, nil
},
Header: func(reqData *RequestData) (http.Header, errors.Error) {
if args.CollectNewRecordsByList.Header != nil {
return args.CollectNewRecordsByList.Header(reqData, createdAfter)
}
return nil, nil
},
MinTickInterval: args.CollectNewRecordsByList.MinTickInterval,
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
items, err := args.CollectNewRecordsByList.ResponseParser(res)
if err != nil {
return nil, err
}
if len(items) == 0 {
return nil, nil
}
// time filter or diff sync
if createdAfter != nil && args.CollectNewRecordsByList.GetCreated != nil {
// if the first record of the page was created before createdAfter and not a zero value, return empty set and stop
firstCreated, err := args.CollectNewRecordsByList.GetCreated(items[0])
if err != nil {
return nil, err
}
if firstCreated.Before(*createdAfter) && !firstCreated.IsZero() {
return nil, ErrFinishCollect
}
// If last record was created before CreatedAfter, including a zero value, check each record individually
lastCreated, err := args.CollectNewRecordsByList.GetCreated(items[len(items)-1])
if err != nil {
return nil, err
}
if lastCreated.Before(*createdAfter) {
var validItems []json.RawMessage
// Only collect items that were created after the last successful collection to prevent duplicates
for _, item := range items {
itemCreatedAt, err := args.CollectNewRecordsByList.GetCreated(item)
if err != nil {
return nil, err
}
if itemCreatedAt.IsZero() {
// If zero then timestamp is null on the response - accept as valid for downstream processing
validItems = append(validItems, item)
continue
}
if itemCreatedAt.Before(*createdAfter) {
// Once we reach an item that was created before the last successful collection, stop & return
return validItems, ErrFinishCollect
}
validItems = append(validItems, item)
}
}
}
return items, err
},
AfterResponse: args.CollectNewRecordsByList.AfterResponse,
RequestBody: args.CollectNewRecordsByList.RequestBody,
Method: args.CollectNewRecordsByList.Method,
// pagination
PageSize: args.CollectNewRecordsByList.PageSize,
Concurrency: args.CollectNewRecordsByList.Concurrency,
GetNextPageCustomData: args.CollectNewRecordsByList.GetNextPageCustomData,
GetTotalPages: args.CollectNewRecordsByList.GetTotalPages,
})
if err != nil {
return nil, err
}
if args.CollectUnfinishedDetails == nil || !isIncremental {
return manager, nil
}
// step 2: create another collector to collect updated records
// TODO: this creates cursor before previous step gets executed, which is too early, to be optimized
input, err := args.CollectUnfinishedDetails.BuildInputIterator()
if err != nil {
return nil, err
}
err = manager.InitCollector(ApiCollectorArgs{
ApiClient: args.ApiClient,
// common
Incremental: true,
Input: input,
UrlTemplate: args.CollectUnfinishedDetails.UrlTemplate,
Query: func(reqData *RequestData) (url.Values, errors.Error) {
if args.CollectUnfinishedDetails.Query != nil {
return args.CollectUnfinishedDetails.Query(reqData, createdAfter)
}
return nil, nil
},
Header: func(reqData *RequestData) (http.Header, errors.Error) {
if args.CollectUnfinishedDetails.Header != nil {
return args.CollectUnfinishedDetails.Header(reqData, createdAfter)
}
return nil, nil
},
MinTickInterval: args.CollectUnfinishedDetails.MinTickInterval,
ResponseParser: args.CollectUnfinishedDetails.ResponseParser,
AfterResponse: args.CollectUnfinishedDetails.AfterResponse,
RequestBody: args.CollectUnfinishedDetails.RequestBody,
Method: args.CollectUnfinishedDetails.Method,
})
return manager, err
}