backend/helpers/pluginhelper/api/api_extractor_stateful.go (113 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package api import ( "encoding/json" "reflect" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/common" plugin "github.com/apache/incubator-devlake/core/plugin" ) // StatefulApiExtractorArgs is a struct that contains the arguments for a stateful api extractor type StatefulApiExtractorArgs[InputType any] struct { *SubtaskCommonArgs BeforeExtract func(issue *InputType, stateManager *SubtaskStateManager) errors.Error Extract func(body *InputType, row *RawData) ([]any, errors.Error) } // StatefulApiExtractor is a struct that manages the stateful API extraction process. // It facilitates extracting data from a single _raw_data table and saving it into multiple Tool Layer tables. // By default, the extractor operates in Incremental Mode, processing only new records added to the raw table since the previous run. // This approach reduces the amount of data to process, significantly decreasing the execution time. // The extractor automatically detects if the configuration has changed since the last run. If a change is detected, // it will automatically switch to Full-Sync mode. // // Example: // // extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[apiv2models.Issue]{ // SubtaskCommonArgs: &api.SubtaskCommonArgs{ // SubTaskContext: subtaskCtx, // Table: RAW_ISSUE_TABLE, // Params: JiraApiParams{ // ConnectionId: data.Options.ConnectionId, // BoardId: data.Options.BoardId, // }, // SubtaskConfig: config, // The helper stores this configuration in the state and compares it with the previous one // // to determine the operating mode (Incremental/FullSync). // // Ensure that the configuration is serializable and contains only public fields. // // It is also recommended that the configuration includes only the necessary fields used by the extractor. // ..}, // BeforeExtract: func(body *IssuesResponse, stateManager *api.SubtaskStateManager) errors.Error { // if stateManager.IsIncremental() { // // It is important to delete all existing child-records under DiffSync Mode // err := db.Delete( // &models.JiraIssueLabel{}, // dal.Where("connection_id = ? AND issue_id = ?", data.Options.ConnectionId, body.Id), // ) // } // return nil // }, // Extract: func(apiIssue *apiv2models.Issue, row *api.RawData) ([]interface{}, errors.Error) { // }, // }) // // if err != nil { // return err // } // // return extractor.Execute() type StatefulApiExtractor[InputType any] struct { *StatefulApiExtractorArgs[InputType] *SubtaskStateManager } // NewStatefulApiExtractor creates a new StatefulApiExtractor func NewStatefulApiExtractor[InputType any](args *StatefulApiExtractorArgs[InputType]) (*StatefulApiExtractor[InputType], errors.Error) { stateManager, err := NewSubtaskStateManager(args.SubtaskCommonArgs) if err != nil { return nil, err } return &StatefulApiExtractor[InputType]{ StatefulApiExtractorArgs: args, SubtaskStateManager: stateManager, }, nil } // Execute sub-task func (extractor *StatefulApiExtractor[InputType]) Execute() errors.Error { // load data from database db := extractor.GetDal() logger := extractor.GetLogger() table := extractor.GetRawDataTable() params := extractor.GetRawDataParams() if !db.HasTable(table) { return nil } clauses := []dal.Clause{ dal.Select("id"), dal.From(table), dal.Where("params = ?", params), dal.Orderby("id ASC"), } if extractor.IsIncremental() { since := extractor.GetSince() if since != nil { clauses = append(clauses, dal.Where("created_at >= ? ", since)) } } clauses = append(clauses, dal.Where("created_at < ? ", extractor.GetUntil())) // first get total count for progress tracking count, err := db.Count(clauses...) if err != nil { return errors.Default.Wrap(err, "error getting count of records") } logger.Info("get data from %s where params=%s and got %d with clauses %+v", table, params, count, clauses) // get all IDs var ids []uint64 err = db.Pluck("id", &ids, clauses...) if err != nil { return errors.Default.Wrap(err, "error getting IDs") } // batch save divider divider := NewBatchSaveDivider(extractor.SubTaskContext, extractor.GetBatchSize(), table, params) divider.SetIncrementalMode(extractor.IsIncremental()) // progress extractor.SetProgress(0, -1) ctx := extractor.GetContext() // process each record individually by ID for _, id := range ids { select { case <-ctx.Done(): return errors.Convert(ctx.Err()) default: } // load full record by ID row := &RawData{} err := db.First(row, dal.From(table), dal.Where("id = ?", id)) if err != nil { return errors.Default.Wrap(err, "error loading full row by ID") } body := new(InputType) err = errors.Convert(json.Unmarshal(row.Data, body)) if err != nil { return err } if extractor.BeforeExtract != nil { err = extractor.BeforeExtract(body, extractor.SubtaskStateManager) if err != nil { return err } } results, err := extractor.Extract(body, row) if err != nil { return errors.Default.Wrap(err, "error calling plugin Extract implementation") } for _, result := range results { // get the batch operator for the specific type batch, err := divider.ForType(reflect.TypeOf(result)) if err != nil { return errors.Default.Wrap(err, "error getting batch from result") } // set raw data origin field setRawDataOrigin(result, common.RawDataOrigin{ RawDataTable: table, RawDataParams: params, RawDataId: row.ID, }) // records get saved into db when slots were max outed err = batch.Add(result) if err != nil { return errors.Default.Wrap(err, "error adding result to batch") } } extractor.IncProgress(1) } // save the last batches err = divider.Close() if err != nil { return err } // save the incremental state return extractor.SubtaskStateManager.Close() } var _ plugin.SubTask = (*StatefulApiExtractor[any])(nil)