func extractCustomizedFields()

in backend/plugins/customize/tasks/customized_fields_extractor.go [59:160]


func extractCustomizedFields(ctx context.Context, d dal.Dal, table, rawTable, rawDataParams string, extractor map[string]string) error {
	pkFields, err := dal.GetPrimarykeyColumns(d, &models.Table{Name: table})
	if err != nil {
		return err
	}
	rawDataField := fmt.Sprintf("%s.data", rawTable)
	// `fields` only include `_raw_data_id` and primary keys coming from the domain layer table, and `data` coming from the raw layer
	fields := []string{fmt.Sprintf("%s.%s", table, "_raw_data_id")}
	fields = append(fields, rawDataField)
	for _, field := range pkFields {
		fields = append(fields, fmt.Sprintf("%s.%s", table, field.Name()))
	}
	clauses := []dal.Clause{
		dal.Select(strings.Join(fields, ", ")),
		dal.From(table),
		dal.Join(fmt.Sprintf(" LEFT JOIN %s ON %s._raw_data_id = %s.id", rawTable, table, rawTable)),
		dal.Where("_raw_data_table = ? AND _raw_data_params LIKE ?", rawTable, rawDataParams),
	}
	rows, err := d.Cursor(clauses...)
	if err != nil {
		return err
	}
	defer rows.Close()

	for rows.Next() {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		row := make(map[string]interface{})
		updates := make(map[string]interface{})
		err = d.Fetch(rows, &row)
		if err != nil {
			return err
		}
		switch blob := row["data"].(type) {
		case []byte:
			for field, path := range extractor {
				result := gjson.GetBytes(blob, path)
				fillInUpdates(result, field, updates)
			}
		case string:
			for field, path := range extractor {
				result := gjson.Get(blob, path)
				// special case for issues custom_fields
				rawDataId, ok := row["_raw_data_id"].(int64)
				if !ok {
					return errors.Default.New("_raw_data_id is not int64")
				}
				if table == "issues" && result.IsArray() {
					issueId := row["id"].(string)
					fieldId := field
					// Delete existing records for the given issue and field
					err = d.Delete(
						&ticket.IssueCustomArrayField{},
						dal.Where("issue_id = ? AND field_id = ?", issueId, fieldId),
					)
					if err != nil {
						return err
					}

					result.ForEach(func(_, v gjson.Result) bool {
						err1 := d.Create(&ticket.IssueCustomArrayField{
							IssueId:    issueId,
							FieldId:    fieldId,
							FieldValue: v.String(),
							NoPKModel: common.NoPKModel{
								RawDataOrigin: common.RawDataOrigin{
									RawDataParams: rawDataParams,
									RawDataTable:  rawTable,
									RawDataId:     uint64(rawDataId),
								},
							},
						})
						if err1 != nil {
							err = err1
							return false
						}
						return true
					})
				} else {
					fillInUpdates(result, field, updates)
				}
			}
		default:
			return nil
		}

		if len(updates) > 0 {
			// remove columns that are not primary key
			delete(row, "_raw_data_id")
			delete(row, "data")
			query, params := mkUpdate(table, updates, row)
			err = d.Exec(query, params...)
			if err != nil {
				return errors.Default.Wrap(err, "Exec SQL error")
			}
		}
	}
	return nil
}