backend/plugins/customize/service/service.go (269 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package service import ( "fmt" "io" "regexp" "strings" "time" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/common" "github.com/apache/incubator-devlake/core/models/domainlayer" "github.com/apache/incubator-devlake/core/models/domainlayer/crossdomain" "github.com/apache/incubator-devlake/core/models/domainlayer/ticket" "github.com/apache/incubator-devlake/helpers/pluginhelper" "github.com/apache/incubator-devlake/plugins/customize/models" ) // Service wraps database operations type Service struct { dal dal.Dal nameChecker *regexp.Regexp } func NewService(dal dal.Dal) *Service { return &Service{dal: dal, nameChecker: regexp.MustCompile(`^x_[a-zA-Z0-9_]{0,50}$`)} } // GetFields returns all the fields of the table func (s *Service) GetFields(table string) ([]models.CustomizedField, errors.Error) { // the customized fields created before v0.16.0 were not recorded in the table `_tool_customized_field`, we should take care of them columns, err := s.dal.GetColumns(&models.Table{Name: table}, func(columnMeta dal.ColumnMeta) bool { return true }) if err != nil { return nil, errors.Default.Wrap(err, "GetColumns error") } ff, err := s.getCustomizedFields(table) if err != nil { return nil, err } fieldMap := make(map[string]models.CustomizedField) for _, f := range ff { fieldMap[f.ColumnName] = f } var result []models.CustomizedField for _, col := range columns { // original fields if !strings.HasPrefix(col.Name(), "x_") { dataType, _ := col.ColumnType() result = append(result, models.CustomizedField{ TbName: table, ColumnName: col.Name(), DataType: dal.ColumnType(dataType), }) // customized fields } else { if field, ok := fieldMap[col.Name()]; ok { result = append(result, field) } else { result = append(result, models.CustomizedField{ ColumnName: col.Name(), DataType: dal.Varchar, }) } } } return result, nil } func (s *Service) checkField(table, field string) (bool, errors.Error) { if table == "" { return false, errors.Default.New("empty table name") } if !strings.HasPrefix(field, "x_") { return false, errors.Default.New("column name should start with `x_`") } if !s.nameChecker.MatchString(field) { return false, errors.Default.New("invalid column name") } fields, err := s.GetFields(table) if err != nil { return false, err } for _, fld := range fields { if fld.ColumnName == field { return true, nil } } return false, nil } // CreateField creates a new column for the table cf.TbName and creates a new record in the table `_tool_customized_fields` func (s *Service) CreateField(cf *models.CustomizedField) errors.Error { exists, err := s.checkField(cf.TbName, cf.ColumnName) if err != nil { return err } if exists { return errors.BadInput.New(fmt.Sprintf("the column %s already exists", cf.ColumnName)) } err = s.dal.Create(cf) if err != nil { return errors.Default.Wrap(err, "create customizedField") } err = s.dal.AddColumn(cf.TbName, cf.ColumnName, cf.DataType) if err != nil { return errors.Default.Wrap(err, "AddColumn error") } return nil } // DeleteField deletes the `field` form the `table` func (s *Service) DeleteField(table, field string) errors.Error { exists, err := s.checkField(table, field) if err != nil { return err } if !exists { return nil } err = s.dal.DropColumns(table, field) if err != nil { return errors.Default.Wrap(err, "DropColumn error") } return s.dal.Delete(&models.CustomizedField{}, dal.Where("tb_name = ? AND column_name = ?", table, field)) } func (s *Service) getCustomizedFields(table string) ([]models.CustomizedField, errors.Error) { var result []models.CustomizedField err := s.dal.All(&result, dal.Where("tb_name = ?", table)) return result, err } func (s *Service) ImportIssue(boardId string, file io.ReadCloser) errors.Error { err := s.dal.Delete(&ticket.Issue{}, dal.Where("_raw_data_params = ?", boardId)) if err != nil { return err } err = s.dal.Delete(&ticket.BoardIssue{}, dal.Where("board_id = ?", boardId)) if err != nil { return err } return s.importCSV(file, boardId, s.issueHandlerFactory(boardId)) } func (s *Service) SaveBoard(boardId, boardName string) errors.Error { return s.dal.CreateOrUpdate(&ticket.Board{ DomainEntity: domainlayer.DomainEntity{ Id: boardId, }, Name: boardName, Type: "csv", }) } func (s *Service) ImportIssueCommit(rawDataParams string, file io.ReadCloser) errors.Error { err := s.dal.Delete(&crossdomain.IssueCommit{}, dal.Where("_raw_data_params = ?", rawDataParams)) if err != nil { return err } return s.importCSV(file, rawDataParams, s.issueCommitHandler) } // ImportIssueRepoCommit imports data to the table `issue_repo_commits` and `issue_commits` func (s *Service) ImportIssueRepoCommit(rawDataParams string, file io.ReadCloser) errors.Error { fields := make(map[string]struct{}) // get all fields of the table `issue_repo_commit` columns, err := s.dal.GetColumns(&crossdomain.IssueCommit{}, func(columnMeta dal.ColumnMeta) bool { return true }) if err != nil { return err } for _, column := range columns { fields[column.Name()] = struct{}{} } // delete old records of the table `issue_repo_commit` and `issue_commit` err = s.dal.Delete(&crossdomain.IssueRepoCommit{}, dal.Where("_raw_data_params = ?", rawDataParams)) if err != nil { return err } err = s.dal.Delete(&crossdomain.IssueCommit{}, dal.Where("_raw_data_params = ?", rawDataParams)) if err != nil { return err } return s.importCSV(file, rawDataParams, s.issueRepoCommitHandlerFactory(fields)) } // importCSV imports the csv file to the database, // the rawDataParams is used to identify the data source, // the recordHandler is used to handle the record, it should return an error if the record is invalid // the `created_at` and `updated_at` will be set to the current time func (s *Service) importCSV(file io.ReadCloser, rawDataParams string, recordHandler func(map[string]interface{}) errors.Error) errors.Error { iterator, err := pluginhelper.NewCsvFileIteratorFromFile(file) if err != nil { return err } var hasNext bool var line int now := time.Now() for { line++ if hasNext, err = iterator.HasNextWithError(); !hasNext { return errors.BadInput.Wrap(err, fmt.Sprintf("error on processing the line:%d", line)) } else { record := iterator.Fetch() record["_raw_data_params"] = rawDataParams for k, v := range record { if v.(string) == "NULL" { record[k] = nil } } record["created_at"] = now record["updated_at"] = now err = recordHandler(record) if err != nil { return errors.BadInput.Wrap(err, fmt.Sprintf("error on processing the line:%d", line)) } } } } func (s *Service) issueHandlerFactory(boardId string) func(record map[string]interface{}) errors.Error { return func(record map[string]interface{}) errors.Error { var err errors.Error var id string if record["id"] == nil { return errors.Default.New("record without id") } id, _ = record["id"].(string) if id == "" { return errors.Default.New("empty id") } if record["labels"] != nil { labels, ok := record["labels"].(string) if !ok { return errors.Default.New("labels is not string") } var issueLabels []*ticket.IssueLabel appearedLabels := make(map[string]struct{}) // record the labels that have appeared for _, label := range strings.Split(labels, ",") { label = strings.TrimSpace(label) if label == "" { continue } if _, appeared := appearedLabels[label]; !appeared { issueLabel := &ticket.IssueLabel{ IssueId: id, LabelName: label, NoPKModel: common.NoPKModel{ RawDataOrigin: common.RawDataOrigin{ RawDataParams: boardId, }, }, } issueLabels = append(issueLabels, issueLabel) appearedLabels[label] = struct{}{} } } if len(issueLabels) > 0 { err = s.dal.CreateOrUpdate(issueLabels) if err != nil { return err } } } delete(record, "labels") err = s.dal.CreateWithMap(&ticket.Issue{}, record) if err != nil { return err } return s.dal.CreateOrUpdate(&ticket.BoardIssue{ BoardId: boardId, IssueId: id, }) } } func (s *Service) issueCommitHandler(record map[string]interface{}) errors.Error { return s.dal.CreateWithMap(&crossdomain.IssueCommit{}, record) } // issueRepoCommitHandlerFactory returns a handler that will populate the `issue_commits` and `issue_repo_commits` table // ths issueCommitsFields is used to filter the fields that should be inserted into the `issue_commits` table func (s *Service) issueRepoCommitHandlerFactory(issueCommitsFields map[string]struct{}) func(record map[string]interface{}) errors.Error { return func(record map[string]interface{}) errors.Error { err := s.dal.CreateWithMap(&crossdomain.IssueRepoCommit{}, record) if err != nil { return err } for head := range record { if _, exists := issueCommitsFields[head]; exists { continue } else { delete(record, head) } } return s.dal.CreateWithMap(&crossdomain.IssueCommit{}, record) } }