in backend/plugins/github_graphql/tasks/issue_collector.go [96:206]
func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*githubTasks.GithubTaskData)
apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: githubTasks.GithubApiParams{
ConnectionId: data.Options.ConnectionId,
Name: data.Options.Name,
},
Table: RAW_ISSUES_TABLE,
})
if err != nil {
return err
}
// collect new issues since the previous run
since := apiCollector.GetSince()
err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
PageSize: 10,
BuildQuery: func(reqData *api.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
query := &GraphqlQueryIssueWrapper{}
if reqData == nil {
return query, map[string]interface{}{}, nil
}
ownerName := strings.Split(data.Options.Name, "/")
variables := map[string]interface{}{
"pageSize": graphql.Int(reqData.Pager.Size),
"skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
"owner": graphql.String(ownerName[0]),
"name": graphql.String(ownerName[1]),
}
return query, variables, nil
},
GetPageInfo: func(iQuery interface{}, args *api.GraphqlCollectorArgs) (*api.GraphqlQueryPageInfo, error) {
query := iQuery.(*GraphqlQueryIssueWrapper)
return query.Repository.IssueList.PageInfo, nil
},
ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) {
query := queryWrapper.(*GraphqlQueryIssueWrapper)
issues := query.Repository.IssueList.Issues
for _, rawL := range issues {
if since != nil && since.After(rawL.UpdatedAt) {
return messages, api.ErrFinishCollect
}
messages = append(messages, errors.Must1(json.Marshal(rawL)))
}
return
},
})
if err != nil {
return err
}
// refetch(refresh) for existing issues in the database that are still OPEN
db := taskCtx.GetDal()
cursor, err := db.Cursor(
dal.From(models.GithubIssue{}.TableName()),
dal.Where("state = ? AND repo_id = ? AND connection_id=?", "OPEN", data.Options.GithubId, data.Options.ConnectionId),
)
if err != nil {
return err
}
iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.GithubIssue{}))
if err != nil {
return err
}
issueUpdatedAt := make(map[int]time.Time)
err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
Input: iterator,
InputStep: 100,
Incremental: true,
BuildQuery: func(reqData *api.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
query := &GraphqlQueryIssueDetailWrapper{}
if reqData == nil {
return query, map[string]interface{}{}, nil
}
ownerName := strings.Split(data.Options.Name, "/")
inputIssues := reqData.Input.([]interface{})
outputIssues := []map[string]interface{}{}
for _, i := range inputIssues {
inputIssue := i.(*models.GithubIssue)
outputIssues = append(outputIssues, map[string]interface{}{
`number`: graphql.Int(inputIssue.Number),
})
issueUpdatedAt[inputIssue.Number] = inputIssue.GithubUpdatedAt
}
variables := map[string]interface{}{
"issue": outputIssues,
"owner": graphql.String(ownerName[0]),
"name": graphql.String(ownerName[1]),
}
return query, variables, nil
},
ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) {
query := queryWrapper.(*GraphqlQueryIssueDetailWrapper)
issues := query.Repository.Issues
for _, rawL := range issues {
if rawL.UpdatedAt.After(issueUpdatedAt[rawL.Number]) {
messages = append(messages, errors.Must1(json.Marshal(rawL)))
}
}
return
},
})
if err != nil {
return err
}
return apiCollector.Execute()
}