backend/plugins/customize/service/service.go (426 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package service import ( "fmt" "io" "regexp" "strings" "time" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/common" "github.com/apache/incubator-devlake/core/models/domainlayer" "github.com/apache/incubator-devlake/core/models/domainlayer/crossdomain" "github.com/apache/incubator-devlake/core/models/domainlayer/qa" "github.com/apache/incubator-devlake/core/models/domainlayer/ticket" "github.com/apache/incubator-devlake/helpers/pluginhelper" customizeModels "github.com/apache/incubator-devlake/plugins/customize/models" ) // Service wraps database operations type Service struct { dal dal.Dal nameChecker *regexp.Regexp } func NewService(dal dal.Dal) *Service { return &Service{dal: dal, nameChecker: regexp.MustCompile(`^x_[a-zA-Z0-9_]{0,50}$`)} } // GetFields returns all the fields of the table func (s *Service) GetFields(table string) ([]customizeModels.CustomizedField, errors.Error) { // the customized fields created before v0.16.0 were not recorded in the table `_tool_customized_field`, we should take care of them columns, err := s.dal.GetColumns(&customizeModels.Table{Name: table}, func(columnMeta dal.ColumnMeta) bool { return true }) if err != nil { return nil, errors.Default.Wrap(err, "GetColumns error") } ff, err := s.getCustomizedFields(table) if err != nil { return nil, err } fieldMap := make(map[string]customizeModels.CustomizedField) for _, f := range ff { fieldMap[f.ColumnName] = f } var result []customizeModels.CustomizedField for _, col := range columns { // original fields if !strings.HasPrefix(col.Name(), "x_") { dataType, _ := col.ColumnType() result = append(result, customizeModels.CustomizedField{ TbName: table, ColumnName: col.Name(), DataType: dal.ColumnType(dataType), }) // customized fields } else { if field, ok := fieldMap[col.Name()]; ok { result = append(result, field) } else { result = append(result, customizeModels.CustomizedField{ ColumnName: col.Name(), DataType: dal.Varchar, }) } } } return result, nil } // checkField checks if the field exist in table func (s *Service) checkField(table, field string) (bool, errors.Error) { if table == "" { return false, errors.Default.New("empty table name") } if !strings.HasPrefix(field, "x_") { return false, errors.Default.New("column name should start with `x_`") } if !s.nameChecker.MatchString(field) { return false, errors.Default.New("invalid column name") } fields, err := s.GetFields(table) if err != nil { return false, err } for _, fld := range fields { if fld.ColumnName == field { return true, nil } } return false, nil } // CreateField creates a new column for the table cf.TbName and creates a new record in the table `_tool_customized_fields` func (s *Service) CreateField(cf *customizeModels.CustomizedField) errors.Error { exists, err := s.checkField(cf.TbName, cf.ColumnName) if err != nil { return err } if exists { return errors.BadInput.New(fmt.Sprintf("the column %s already exists", cf.ColumnName)) } err = s.dal.Create(cf) if err != nil { return errors.Default.Wrap(err, "create customizedField") } err = s.dal.AddColumn(cf.TbName, cf.ColumnName, cf.DataType) if err != nil { return errors.Default.Wrap(err, "AddColumn error") } return nil } // DeleteField deletes the `field` form the `table` func (s *Service) DeleteField(table, field string) errors.Error { exists, err := s.checkField(table, field) if err != nil { return err } if !exists { return nil } err = s.dal.DropColumns(table, field) if err != nil { return errors.Default.Wrap(err, "DropColumn error") } return s.dal.Delete(&customizeModels.CustomizedField{}, dal.Where("tb_name = ? AND column_name = ?", table, field)) } // getCustomizedFields returns all the customized fields definitions of the table func (s *Service) getCustomizedFields(table string) ([]customizeModels.CustomizedField, errors.Error) { var result []customizeModels.CustomizedField err := s.dal.All(&result, dal.Where("tb_name = ?", table)) return result, err } // ImportIssue import csv file to the table `issues`, and create relations to boards // issue could exist in multiple boards, so we should only delete an old records when it doesn't belong to another board func (s *Service) ImportIssue(boardId string, file io.ReadCloser, incremental bool) errors.Error { if !incremental { // not delete accounts data since account may be referenced by others err := s.dal.Delete( &ticket.Issue{}, dal.Where("id IN (SELECT issue_id FROM board_issues WHERE board_id=? AND issue_id NOT IN (SELECT issue_id FROM board_issues WHERE board_id!=?))", boardId, boardId), ) if err != nil { return err } err = s.dal.Delete( &ticket.IssueLabel{}, dal.Where("issue_id IN (SELECT issue_id FROM board_issues WHERE board_id=? AND issue_id NOT IN (SELECT issue_id FROM board_issues WHERE board_id!=?))", boardId, boardId), ) if err != nil { return err } err = s.dal.Delete( &ticket.BoardIssue{}, dal.Where("board_id = ?", boardId), ) if err != nil { return err } } return s.importCSV(file, boardId, s.issueHandlerFactory(boardId, incremental)) } // SaveBoard make sure the board exists in table `boards` func (s *Service) SaveBoard(boardId, boardName string) errors.Error { return s.dal.CreateOrUpdate(&ticket.Board{ DomainEntity: domainlayer.DomainEntity{ Id: boardId, }, Name: boardName, Type: "csv", }) } // ImportIssueCommit imports csv file into the table `issue_commits` func (s *Service) ImportIssueCommit(boardId string, file io.ReadCloser) errors.Error { err := s.dal.Delete( &crossdomain.IssueCommit{}, dal.Where("issue_id IN (SELECT issue_id FROM board_issues WHERE board_id=? AND issue_id NOT IN (SELECT issue_id FROM board_issues WHERE board_id!=?))", boardId, boardId), ) if err != nil { return err } return s.importCSV(file, boardId, s.issueCommitHandler) } // ImportIssueRepoCommit imports data to the table `issue_repo_commits` and `issue_commits` func (s *Service) ImportIssueRepoCommit(boardId string, file io.ReadCloser, incremental bool) errors.Error { if !incremental { // delete old records of the table `issue_repo_commit` and `issue_commit` err := s.dal.Delete( &crossdomain.IssueRepoCommit{}, dal.Where("issue_id IN (SELECT issue_id FROM board_issues WHERE board_id=? AND issue_id NOT IN (SELECT issue_id FROM board_issues WHERE board_id!=?))", boardId, boardId), ) if err != nil { return err } err = s.dal.Delete( &crossdomain.IssueCommit{}, dal.Where("issue_id IN (SELECT issue_id FROM board_issues WHERE board_id=? AND issue_id NOT IN (SELECT issue_id FROM board_issues WHERE board_id!=?))", boardId, boardId), ) if err != nil { return err } } return s.importCSV(file, boardId, s.issueRepoCommitHandler) } // importCSV extract records from csv file, and save them to DB using recordHandler // the rawDataParams is used to identify the data source, // the recordHandler is used to handle the record, it should return an error if the record is invalid // the `created_at` and `updated_at` will be set to the current time func (s *Service) importCSV(file io.ReadCloser, rawDataParams string, recordHandler func(map[string]interface{}) errors.Error) errors.Error { iterator, err := pluginhelper.NewCsvFileIteratorFromFile(file) if err != nil { return err } var hasNext bool var line int now := time.Now() for { line++ if hasNext, err = iterator.HasNextWithError(); !hasNext { return errors.BadInput.Wrap(err, fmt.Sprintf("error on processing the line:%d", line)) } else { record := iterator.Fetch() record["_raw_data_params"] = rawDataParams for k, v := range record { if v.(string) == "NULL" { record[k] = nil } } record["created_at"] = now record["updated_at"] = now err = recordHandler(record) if err != nil { return errors.BadInput.Wrap(err, fmt.Sprintf("error on processing the line:%d", line)) } } } } // createOrUpdateAccount creates or updates an account based on the provided name. // It returns the account ID and an error if any occurred. func (s *Service) createOrUpdateAccount(accountName string, rawDataParams string) (string, errors.Error) { if accountName == "" { return "", nil // Return empty ID if name is empty, no error needed here. } now := time.Now() accountId := fmt.Sprintf("csv:CsvAccount:0:%s", accountName) account := &crossdomain.Account{ DomainEntity: domainlayer.DomainEntity{ Id: accountId, NoPKModel: common.NoPKModel{ RawDataOrigin: common.RawDataOrigin{ RawDataParams: rawDataParams, }, }, }, FullName: accountName, UserName: accountName, CreatedDate: &now, } err := s.dal.CreateOrUpdate(account) if err != nil { return "", errors.Default.Wrap(err, fmt.Sprintf("failed to create or update account for %s", accountName)) } return accountId, nil } // getStringField extracts a string field from a record map. // If required is true, it returns an error if the field is missing, nil, empty, or not a string. // If required is false, it returns an empty string without error if the field is missing or nil, // but returns an error if the field exists and is not a string. func getStringField(record map[string]interface{}, fieldName string, required bool) (string, errors.Error) { value, ok := record[fieldName] if !ok || value == nil { if required { return "", errors.Default.New(fmt.Sprintf("record without required field %s", fieldName)) } return "", nil // Field missing or nil, but not required } strValue, ok := value.(string) if !ok { return "", errors.Default.New(fmt.Sprintf("%s is not a string", fieldName)) } if required && strValue == "" { return "", errors.Default.New(fmt.Sprintf("invalid or empty required field %s", fieldName)) } return strValue, nil } // issueHandlerFactory returns a handler that save record into `issues`, `board_issues` and `issue_labels` table func (s *Service) issueHandlerFactory(boardId string, incremental bool) func(record map[string]interface{}) errors.Error { return func(record map[string]interface{}) errors.Error { var err errors.Error id, err := getStringField(record, "id", true) if err != nil { return err } // Handle labels labels, err := getStringField(record, "labels", false) if err != nil { return err } if labels != "" { var issueLabels []*ticket.IssueLabel appearedLabels := make(map[string]struct{}) // record the labels that have appeared for _, label := range strings.Split(labels, ",") { label = strings.TrimSpace(label) if label == "" { continue } if _, appeared := appearedLabels[label]; !appeared { issueLabel := &ticket.IssueLabel{ IssueId: id, LabelName: label, NoPKModel: common.NoPKModel{ RawDataOrigin: common.RawDataOrigin{ RawDataParams: boardId, }, }, } issueLabels = append(issueLabels, issueLabel) appearedLabels[label] = struct{}{} } } if len(issueLabels) > 0 { err = s.dal.CreateOrUpdate(issueLabels) if err != nil { return err } } } delete(record, "labels") // Remove labels from record map as it's handled // Handle creator and assignee accounts rawDataParams, err := getStringField(record, "_raw_data_params", true) if err != nil { // This should ideally not happen as it's set in importCSV, but good to check return err } // Handle creator creatorName, err := getStringField(record, "creator_name", false) if err != nil { return err } creatorId, err := s.createOrUpdateAccount(creatorName, rawDataParams) if err != nil { return err } if creatorId != "" { record["creator_id"] = creatorId } // Handle assignee assigneeName, err := getStringField(record, "assignee_name", false) if err != nil { return err } assigneeId, err := s.createOrUpdateAccount(assigneeName, rawDataParams) if err != nil { return err } if assigneeId != "" { record["assignee_id"] = assigneeId } // Handle issues err = s.dal.CreateWithMap(&ticket.Issue{}, record) if err != nil { return err } // Handle board_issues err = s.dal.CreateOrUpdate(&ticket.BoardIssue{ BoardId: boardId, IssueId: id, }) if err != nil { return err } return nil } } // issueCommitHandler save record into `issue_commits` table func (s *Service) issueCommitHandler(record map[string]interface{}) errors.Error { return s.dal.CreateWithMap(&crossdomain.IssueCommit{}, record) } // ImportQaApis imports csv file to the table `qa_apis` func (s *Service) ImportQaApis(qaProjectId string, file io.ReadCloser, incremental bool) errors.Error { if !incremental { // delete old data associated with this qaProjectId err := s.dal.Delete(&qa.QaApi{}, dal.Where("qa_project_id = ?", qaProjectId)) if err != nil { return errors.Default.Wrap(err, fmt.Sprintf("failed to delete old qa_apis for qaProjectId %s", qaProjectId)) } } return s.importCSV(file, qaProjectId, s.qaApiHandler(qaProjectId)) } // qaApiHandler saves a record into the `qa_apis` table func (s *Service) qaApiHandler(qaProjectId string) func(record map[string]interface{}) errors.Error { return func(record map[string]interface{}) errors.Error { creatorName, err := getStringField(record, "creator_name", false) if err != nil { return err } if creatorName != "" { creatorId, _ := s.createOrUpdateAccount(creatorName, qaProjectId) if creatorId != "" { record["creator_id"] = creatorId } } delete(record, "creator_name") record["qa_project_id"] = qaProjectId return s.dal.CreateWithMap(&qa.QaApi{}, record) } } // ImportQaTestCases imports csv file to the table `qa_test_cases` func (s *Service) ImportQaTestCases(qaProjectId, qaProjectName string, file io.ReadCloser, incremental bool) errors.Error { if !incremental { // delete old data associated with this qaProjectId err := s.dal.Delete(&qa.QaTestCase{}, dal.Where("qa_project_id = ?", qaProjectId)) if err != nil { return errors.Default.Wrap(err, fmt.Sprintf("failed to delete old qa_test_cases for qaProjectId %s", qaProjectId)) } // using ImportQaApis to delete data in qa_apis // never delete data in qa_projects } // create or update qa_projects err := s.dal.CreateOrUpdate(&qa.QaProject{ DomainEntityExtended: domainlayer.DomainEntityExtended{ Id: qaProjectId, }, Name: qaProjectName, }) if err != nil { return err } return s.importCSV(file, qaProjectId, s.qaTestCaseHandler(qaProjectId)) } // qaTestCaseHandler saves a record into the `qa_test_cases` table func (s *Service) qaTestCaseHandler(qaProjectId string) func(record map[string]interface{}) errors.Error { return func(record map[string]interface{}) errors.Error { creatorName, _ := getStringField(record, "creator_name", false) if creatorName != "" { creatorId, _ := s.createOrUpdateAccount(creatorName, qaProjectId) record["creator_id"] = creatorId } // remove fields delete(record, "creator_name") record["qa_project_id"] = qaProjectId return s.dal.CreateWithMap(&qa.QaTestCase{}, record) } } // ImportQaTestCaseExecutions imports csv file to the table `qa_test_case_executions` func (s *Service) ImportQaTestCaseExecutions(qaProjectId string, file io.ReadCloser, incremental bool) errors.Error { if !incremental { // delete old data associated with this qaProjectId err := s.dal.Delete(&qa.QaTestCaseExecution{}, dal.Where("qa_project_id = ?", qaProjectId)) if err != nil { return errors.Default.Wrap(err, fmt.Sprintf("failed to delete old qa_test_case_executions for qaProjectId %s", qaProjectId)) } } return s.importCSV(file, qaProjectId, s.qaTestCaseExecutionHandler(qaProjectId)) } // qaTestCaseExecutionHandler saves a record into the `qa_test_case_executions` table func (s *Service) qaTestCaseExecutionHandler(qaProjectId string) func(record map[string]interface{}) errors.Error { // Assuming qa.QaTestCaseExecution model exists and CreateWithMap is suitable return func(record map[string]interface{}) errors.Error { creatorName, _ := getStringField(record, "creator_name", false) if creatorName != "" { creatorId, _ := s.createOrUpdateAccount(creatorName, qaProjectId) record["creator_id"] = creatorId } delete(record, "creator_name") record["qa_project_id"] = qaProjectId return s.dal.CreateWithMap(&qa.QaTestCaseExecution{}, record) } } // issueRepoCommitHandlerFactory returns a handler that will populate the `issue_commits` and `issue_repo_commits` table // ths issueCommitsFields is used to filter the fields that should be inserted into the `issue_commits` table func (s *Service) issueRepoCommitHandler(record map[string]interface{}) errors.Error { err := s.dal.CreateWithMap(&crossdomain.IssueRepoCommit{}, record) if err != nil { return err } // remove fields that not in table `issue_commits` delete(record, "host") delete(record, "namespace") delete(record, "repo_name") delete(record, "repo_url") return s.dal.CreateWithMap(&crossdomain.IssueCommit{}, record) }