in backend/plugins/github_graphql/tasks/pr_collector.go [169:283]
func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*tasks.GithubTaskData)
var err errors.Error
apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: tasks.GithubApiParams{
ConnectionId: data.Options.ConnectionId,
Name: data.Options.Name,
},
Table: RAW_PRS_TABLE,
})
if err != nil {
return err
}
// collect new PRs since the previous run
since := apiCollector.GetSince()
err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
PageSize: 10,
/*
(Optional) Return query string for request, or you can plug them into UrlTemplate directly
*/
BuildQuery: func(reqData *api.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
query := &GraphqlQueryPrWrapper{}
if reqData == nil {
return query, map[string]interface{}{}, nil
}
ownerName := strings.Split(data.Options.Name, "/")
variables := map[string]interface{}{
"pageSize": graphql.Int(reqData.Pager.Size),
"skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
"owner": graphql.String(ownerName[0]),
"name": graphql.String(ownerName[1]),
}
return query, variables, nil
},
GetPageInfo: func(iQuery interface{}, args *api.GraphqlCollectorArgs) (*api.GraphqlQueryPageInfo, error) {
query := iQuery.(*GraphqlQueryPrWrapper)
return query.Repository.PullRequests.PageInfo, nil
},
ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) {
query := queryWrapper.(*GraphqlQueryPrWrapper)
prs := query.Repository.PullRequests.Prs
for _, rawL := range prs {
if since != nil && since.After(rawL.UpdatedAt) {
return messages, api.ErrFinishCollect
}
messages = append(messages, errors.Must1(json.Marshal(rawL)))
}
return
},
})
if err != nil {
return err
}
// refetch(refresh) for existing PRs in the database that are still OPEN
db := taskCtx.GetDal()
cursor, err := db.Cursor(
dal.From(models.GithubPullRequest{}.TableName()),
dal.Where("state = ? AND repo_id = ? AND connection_id=?", "OPEN", data.Options.GithubId, data.Options.ConnectionId),
)
if err != nil {
return err
}
iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.GithubPullRequest{}))
if err != nil {
return err
}
prUpdatedAt := make(map[int]time.Time)
err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
Input: iterator,
InputStep: 100,
Incremental: true,
BuildQuery: func(reqData *api.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
query := &GraphqlQueryPrDetailWrapper{}
if reqData == nil {
return query, map[string]interface{}{}, nil
}
ownerName := strings.Split(data.Options.Name, "/")
inputPrs := reqData.Input.([]interface{})
outputPrs := []map[string]interface{}{}
for _, i := range inputPrs {
inputPr := i.(*models.GithubPullRequest)
outputPrs = append(outputPrs, map[string]interface{}{
`number`: graphql.Int(inputPr.Number),
})
prUpdatedAt[inputPr.Number] = inputPr.GithubUpdatedAt
}
variables := map[string]interface{}{
"pullRequest": outputPrs,
"owner": graphql.String(ownerName[0]),
"name": graphql.String(ownerName[1]),
}
return query, variables, nil
},
ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) {
query := queryWrapper.(*GraphqlQueryPrDetailWrapper)
prs := query.Repository.PullRequests
for _, rawL := range prs {
if rawL.UpdatedAt.After(prUpdatedAt[rawL.Number]) {
messages = append(messages, errors.Must1(json.Marshal(rawL)))
}
}
return
},
})
if err != nil {
return err
}
return apiCollector.Execute()
}