func CollectIssues()

in backend/plugins/github_graphql/tasks/issue_collector.go [96:206]


func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error {
	data := taskCtx.GetData().(*githubTasks.GithubTaskData)
	apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
		Ctx: taskCtx,
		Params: githubTasks.GithubApiParams{
			ConnectionId: data.Options.ConnectionId,
			Name:         data.Options.Name,
		},
		Table: RAW_ISSUES_TABLE,
	})
	if err != nil {
		return err
	}

	// collect new issues since the previous run
	since := apiCollector.GetSince()
	err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{
		GraphqlClient: data.GraphqlClient,
		PageSize:      10,
		BuildQuery: func(reqData *api.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
			query := &GraphqlQueryIssueWrapper{}
			if reqData == nil {
				return query, map[string]interface{}{}, nil
			}
			ownerName := strings.Split(data.Options.Name, "/")
			variables := map[string]interface{}{
				"pageSize":   graphql.Int(reqData.Pager.Size),
				"skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
				"owner":      graphql.String(ownerName[0]),
				"name":       graphql.String(ownerName[1]),
			}
			return query, variables, nil
		},
		GetPageInfo: func(iQuery interface{}, args *api.GraphqlCollectorArgs) (*api.GraphqlQueryPageInfo, error) {
			query := iQuery.(*GraphqlQueryIssueWrapper)
			return query.Repository.IssueList.PageInfo, nil
		},
		ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) {
			query := queryWrapper.(*GraphqlQueryIssueWrapper)
			issues := query.Repository.IssueList.Issues
			for _, rawL := range issues {
				if since != nil && since.After(rawL.UpdatedAt) {
					return messages, api.ErrFinishCollect
				}
				messages = append(messages, errors.Must1(json.Marshal(rawL)))
			}
			return
		},
	})
	if err != nil {
		return err
	}

	// refetch(refresh) for existing issues in the database that are still OPEN
	db := taskCtx.GetDal()
	cursor, err := db.Cursor(
		dal.From(models.GithubIssue{}.TableName()),
		dal.Where("state = ? AND repo_id = ? AND connection_id=?", "OPEN", data.Options.GithubId, data.Options.ConnectionId),
	)
	if err != nil {
		return err
	}
	iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.GithubIssue{}))
	if err != nil {
		return err
	}
	issueUpdatedAt := make(map[int]time.Time)
	err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{
		GraphqlClient: data.GraphqlClient,
		Input:         iterator,
		InputStep:     100,
		Incremental:   true,
		BuildQuery: func(reqData *api.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
			query := &GraphqlQueryIssueDetailWrapper{}
			if reqData == nil {
				return query, map[string]interface{}{}, nil
			}
			ownerName := strings.Split(data.Options.Name, "/")
			inputIssues := reqData.Input.([]interface{})
			outputIssues := []map[string]interface{}{}
			for _, i := range inputIssues {
				inputIssue := i.(*models.GithubIssue)
				outputIssues = append(outputIssues, map[string]interface{}{
					`number`: graphql.Int(inputIssue.Number),
				})
				issueUpdatedAt[inputIssue.Number] = inputIssue.GithubUpdatedAt
			}
			variables := map[string]interface{}{
				"issue": outputIssues,
				"owner": graphql.String(ownerName[0]),
				"name":  graphql.String(ownerName[1]),
			}
			return query, variables, nil
		},
		ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) {
			query := queryWrapper.(*GraphqlQueryIssueDetailWrapper)
			issues := query.Repository.Issues
			for _, rawL := range issues {
				if rawL.UpdatedAt.After(issueUpdatedAt[rawL.Number]) {
					messages = append(messages, errors.Must1(json.Marshal(rawL)))
				}
			}
			return
		},
	})
	if err != nil {
		return err
	}

	return apiCollector.Execute()
}