in backend/plugins/github_graphql/tasks/job_collector.go [102:216]
func CollectGraphqlJobs(taskCtx plugin.SubTaskContext) errors.Error {
logger := taskCtx.GetLogger()
db := taskCtx.GetDal()
data := taskCtx.GetData().(*githubTasks.GithubTaskData)
collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: githubTasks.GithubApiParams{
ConnectionId: data.Options.ConnectionId,
Name: data.Options.Name,
},
Table: RAW_GRAPHQL_JOBS_TABLE,
}, data.TimeAfter)
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("check_suite_node_id"),
dal.From(models.GithubRun{}.TableName()),
dal.Where("repo_id = ? and connection_id=?", data.Options.GithubId, data.Options.ConnectionId),
dal.Orderby("github_updated_at DESC"),
}
if incremental {
clauses = append(clauses, dal.Where("github_updated_at > ?", *collectorWithState.LatestState.LatestSuccessStart))
}
cursor, err := db.Cursor(
clauses...,
)
if err != nil {
return err
}
defer cursor.Close()
iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(SimpleWorkflowRun{}))
if err != nil {
return err
}
err = collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
Input: iterator,
InputStep: 20,
Incremental: incremental,
GraphqlClient: data.GraphqlClient,
BuildQuery: func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
query := &GraphqlQueryCheckRunWrapper{}
if reqData == nil {
return query, map[string]interface{}{}, nil
}
workflowRuns := reqData.Input.([]interface{})
checkSuiteIds := []map[string]interface{}{}
for _, iWorkflowRuns := range workflowRuns {
workflowRun := iWorkflowRuns.(*SimpleWorkflowRun)
checkSuiteIds = append(checkSuiteIds, map[string]interface{}{
`id`: graphql.ID(workflowRun.CheckSuiteNodeID),
})
}
variables := map[string]interface{}{
"node": checkSuiteIds,
}
return query, variables, nil
},
ResponseParserWithDataErrors: func(iQuery interface{}, variables map[string]interface{}, dataErrors []graphql.DataError) ([]interface{}, error) {
for _, dataError := range dataErrors {
// log and ignore
taskCtx.GetLogger().Warn(dataError, `query check run get error but ignore`)
}
query := iQuery.(*GraphqlQueryCheckRunWrapper)
nodes := query.Node
results := make([]interface{}, 0, 1)
for _, node := range nodes {
for _, checkRun := range node.CheckSuite.CheckRuns.Nodes {
paramsBytes, err := json.Marshal(checkRun.Steps.Nodes)
if err != nil {
logger.Error(err, `Marshal checkRun.Steps.Nodes fail and ignore`)
}
githubJob := &models.GithubJob{
ConnectionId: data.Options.ConnectionId,
RunID: node.CheckSuite.WorkflowRun.DatabaseId,
RepoId: data.Options.GithubId,
ID: checkRun.DatabaseId,
NodeID: checkRun.Id,
HTMLURL: checkRun.DetailsUrl,
Status: strings.ToUpper(checkRun.Status),
Conclusion: strings.ToUpper(checkRun.Conclusion),
StartedAt: checkRun.StartedAt,
CompletedAt: checkRun.CompletedAt,
Name: checkRun.Name,
Steps: paramsBytes,
Type: data.RegexEnricher.ReturnNameIfMatched(devops.DEPLOYMENT, checkRun.Name),
Environment: data.RegexEnricher.ReturnNameIfOmittedOrMatched(devops.PRODUCTION, checkRun.Name),
// these columns can not fill by graphql
//HeadSha: ``, // use _tool_github_runs
//RunURL: ``,
//CheckRunURL: ``,
//Labels: ``, // not on use
//RunnerID: ``, // not on use
//RunnerName: ``, // not on use
//RunnerGroupID: ``, // not on use
}
results = append(results, githubJob)
}
}
return results, nil
},
})
if err != nil {
return err
}
return collectorWithState.Execute()
}