in backend/plugins/customize/tasks/customized_fields_extractor.go [59:160]
func extractCustomizedFields(ctx context.Context, d dal.Dal, table, rawTable, rawDataParams string, extractor map[string]string) error {
pkFields, err := dal.GetPrimarykeyColumns(d, &models.Table{Name: table})
if err != nil {
return err
}
rawDataField := fmt.Sprintf("%s.data", rawTable)
// `fields` only include `_raw_data_id` and primary keys coming from the domain layer table, and `data` coming from the raw layer
fields := []string{fmt.Sprintf("%s.%s", table, "_raw_data_id")}
fields = append(fields, rawDataField)
for _, field := range pkFields {
fields = append(fields, fmt.Sprintf("%s.%s", table, field.Name()))
}
clauses := []dal.Clause{
dal.Select(strings.Join(fields, ", ")),
dal.From(table),
dal.Join(fmt.Sprintf(" LEFT JOIN %s ON %s._raw_data_id = %s.id", rawTable, table, rawTable)),
dal.Where("_raw_data_table = ? AND _raw_data_params LIKE ?", rawTable, rawDataParams),
}
rows, err := d.Cursor(clauses...)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
row := make(map[string]interface{})
updates := make(map[string]interface{})
err = d.Fetch(rows, &row)
if err != nil {
return err
}
switch blob := row["data"].(type) {
case []byte:
for field, path := range extractor {
result := gjson.GetBytes(blob, path)
fillInUpdates(result, field, updates)
}
case string:
for field, path := range extractor {
result := gjson.Get(blob, path)
// special case for issues custom_fields
rawDataId, ok := row["_raw_data_id"].(int64)
if !ok {
return errors.Default.New("_raw_data_id is not int64")
}
if table == "issues" && result.IsArray() {
issueId := row["id"].(string)
fieldId := field
// Delete existing records for the given issue and field
err = d.Delete(
&ticket.IssueCustomArrayField{},
dal.Where("issue_id = ? AND field_id = ?", issueId, fieldId),
)
if err != nil {
return err
}
result.ForEach(func(_, v gjson.Result) bool {
err1 := d.Create(&ticket.IssueCustomArrayField{
IssueId: issueId,
FieldId: fieldId,
FieldValue: v.String(),
NoPKModel: common.NoPKModel{
RawDataOrigin: common.RawDataOrigin{
RawDataParams: rawDataParams,
RawDataTable: rawTable,
RawDataId: uint64(rawDataId),
},
},
})
if err1 != nil {
err = err1
return false
}
return true
})
} else {
fillInUpdates(result, field, updates)
}
}
default:
return nil
}
if len(updates) > 0 {
// remove columns that are not primary key
delete(row, "_raw_data_id")
delete(row, "data")
query, params := mkUpdate(table, updates, row)
err = d.Exec(query, params...)
if err != nil {
return errors.Default.Wrap(err, "Exec SQL error")
}
}
}
return nil
}