func buildTableReportBody()

in internal/reports/report_helpers.go [83:526]


func buildTableReportBody(conv *internal.Conv, tableId string, issues map[string][]internal.SchemaIssue, spSchema ddl.CreateTable, srcSchema schema.Table, syntheticPK *string, uniquePK []string, tableLevelIssues []internal.SchemaIssue) []tableReportBody {
	var body []tableReportBody
	for _, p := range []struct {
		heading  string
		severity Severity
	}{
		{"Warning", warning},
		{"Note", note},
		{"Suggestion", suggestion},
		{"Error", Errors},
	} {
		// Print out issues is alphabetical column order.
		var colNames []string
		for colId := range issues {
			colNames = append(colNames, conv.SpSchema[tableId].ColDefs[colId].Name)
		}
		sort.Strings(colNames)
		l := []Issue{}
		if p.severity == Errors && len(tableLevelIssues) != 0 {
			for _, issue := range tableLevelIssues {
				if issue == internal.RowLimitExceeded {
					toAppend := Issue{
						Category:    IssueDB[internal.RowLimitExceeded].Category,
						Description: IssueDB[internal.RowLimitExceeded].Brief,
					}
					l = append(l, toAppend)
				}
			}

		}

		// added if condition to add table level warnings
		if p.severity == warning && len(conv.InvalidCheckExp[tableId]) != 0 {
			for _, invalidExp := range conv.InvalidCheckExp[tableId] {
				var backtickMsg string = ""
				var dialectMsg string = "with the constraint logic"
				if conv.SpDialect == constants.DIALECT_POSTGRESQL && strings.ContainsAny(invalidExp.Expression, "`") {
					backtickMsg = "caused by backticks"
					dialectMsg = "with the PostgreSQL dialect"
				}
				switch invalidExp.IssueType {
				case internal.TypeMismatch:
					toAppend := Issue{
						Category:    IssueDB[invalidExp.IssueType].Category,
						Description: fmt.Sprintf("Table '%s': The check constraint %s could not be applied. Please ensure the column type aligns with the constraint logic. As a result, the check constraint has not been applied and has been dropped", conv.SpSchema[tableId].Name, invalidExp.Expression),
					}
					l = append(l, toAppend)
				case internal.InvalidCondition:
					toAppend := Issue{
						Category:    IssueDB[invalidExp.IssueType].Category,
						Description: fmt.Sprintf("Table '%s': The check constraint %s contains an invalid condition %s that is incompatible %s. As a result, the check constraint has not been applied and has been dropped", conv.SpSchema[tableId].Name, invalidExp.Expression, backtickMsg, dialectMsg),
					}
					l = append(l, toAppend)
				case internal.ColumnNotFound:
					toAppend := Issue{
						Category:    IssueDB[invalidExp.IssueType].Category,
						Description: fmt.Sprintf("Table '%s': The check constraint %s references a column that was not found. Please verify that all referenced columns exist. As a result, the check constraint has not been applied and has been dropped", conv.SpSchema[tableId].Name, invalidExp.Expression),
					}
					l = append(l, toAppend)

				case internal.CheckConstraintFunctionNotFound:
					toAppend := Issue{
						Category:    IssueDB[invalidExp.IssueType].Category,
						Description: fmt.Sprintf("Table '%s': The check constraint %s could not be applied due to the use of an unsupported function. As a result, the check constraint has not been applied and has been dropped", conv.SpSchema[tableId].Name, invalidExp.Expression),
					}
					l = append(l, toAppend)
				case internal.GenericWarning:
					toAppend := Issue{
						Category:    IssueDB[invalidExp.IssueType].Category,
						Description: fmt.Sprintf("Table '%s': An error occurred in the check constraint %s. Please verify the conditions and ensure the constraint logic is valid. As a result, the check constraint has not been applied and has been dropped", conv.SpSchema[tableId].Name, invalidExp.Expression),
					}
					l = append(l, toAppend)
				}
			}
		}

		// added if condition to add table level Errors
		if p.severity == Errors && len(conv.InvalidCheckExp[tableId]) != 0 {

			for _, invalidExp := range conv.InvalidCheckExp[tableId] {
				var backtickMsg string = ""
				var dialectMsg string = "with the constraint logic"
				if conv.SpDialect == constants.DIALECT_POSTGRESQL && strings.ContainsAny(invalidExp.Expression, "`") {
					backtickMsg = "caused by backticks"
					dialectMsg = "with the PostgreSQL dialect"
				}

				switch invalidExp.IssueType {
				case internal.TypeMismatchError:
					toAppend := Issue{
						Category:    IssueDB[invalidExp.IssueType].Category,
						Description: fmt.Sprintf("Table '%s': The check constraint %s could not be applied. Please ensure the column type aligns with the constraint logic. Kindly address the errors related to the check constraint", conv.SpSchema[tableId].Name, invalidExp.Expression),
					}
					l = append(l, toAppend)
				case internal.InvalidConditionError:
					toAppend := Issue{
						Category:    IssueDB[invalidExp.IssueType].Category,
						Description: fmt.Sprintf("Table '%s': The check constraint %s contains an invalid condition %s that is incompatible %s. Kindly address the errors related to the check constraint", conv.SpSchema[tableId].Name, invalidExp.Expression, backtickMsg, dialectMsg),
					}
					l = append(l, toAppend)
				case internal.ColumnNotFoundError:
					toAppend := Issue{
						Category:    IssueDB[invalidExp.IssueType].Category,
						Description: fmt.Sprintf("Table '%s': The check constraint %s references a column that was not found. Please verify that all referenced columns exist. Kindly address the errors related to the check constraint", conv.SpSchema[tableId].Name, invalidExp.Expression),
					}
					l = append(l, toAppend)

				case internal.CheckConstraintFunctionNotFoundError:
					toAppend := Issue{
						Category:    IssueDB[invalidExp.IssueType].Category,
						Description: fmt.Sprintf("Table '%s': The check constraint %s could not be applied due to the use of an unsupported function. Kindly address the errors related to the check constraint", conv.SpSchema[tableId].Name, invalidExp.Expression),
					}
					l = append(l, toAppend)
				case internal.GenericError:
					toAppend := Issue{
						Category:    IssueDB[invalidExp.IssueType].Category,
						Description: fmt.Sprintf("Table '%s': An error occurred in the check constraint %s. Please verify the conditions and ensure the constraint logic is valid. Kindly address the errors related to the check constraint", conv.SpSchema[tableId].Name, invalidExp.Expression),
					}
					l = append(l, toAppend)
				}
			}

		}

		if p.severity == warning {
			flag := false
			for _, spFk := range conv.SpSchema[tableId].ForeignKeys {
				srcFk, err := internal.GetSrcFkFromId(conv.SrcSchema[tableId].ForeignKeys, spFk.Id)
				if err != nil {
					continue
				}
				if srcFk.OnDelete == "" && srcFk.OnUpdate == "" && !flag {
					flag = true
					issue := internal.ForeignKeyActionNotSupported
					toAppend := Issue{
						Category:    IssueDB[issue].Category,
						Description: fmt.Sprintf("Table '%s': %s", conv.SpSchema[tableId].Name, IssueDB[issue].Brief),
					}
					l = append(l, toAppend)
				}

				if srcFk.OnDelete != spFk.OnDelete {
					issue := internal.ForeignKeyOnDelete
					toAppend := Issue{
						Category:    IssueDB[issue].Category,
						Description: fmt.Sprintf("Table '%s': ON DELETE action of Foreign Key '%s' mapped from %s to %s - %s", conv.SpSchema[tableId].Name, srcFk.Name, srcFk.OnDelete, spFk.OnDelete, IssueDB[issue].Brief),
					}
					l = append(l, toAppend)
				}

				if srcFk.OnUpdate != spFk.OnUpdate {
					issue := internal.ForeignKeyOnUpdate
					toAppend := Issue{
						Category:    IssueDB[issue].Category,
						Description: fmt.Sprintf("Table '%s': ON UPDATE action of Foreign Key '%s' mapped from %s to %s - %s", conv.SpSchema[tableId].Name, srcFk.Name, srcFk.OnUpdate, spFk.OnUpdate, IssueDB[issue].Brief),
					}
					l = append(l, toAppend)
				}

				_, isChanged := internal.FixName(srcFk.Name)
				if isChanged && srcFk.Name != spFk.Name {
					toAppend := Issue{
						Category:    IssueDB[internal.IllegalName].Category,
						Description: fmt.Sprintf("%s, Foreign Key '%s' is mapped to '%s' for table '%s'", IssueDB[internal.IllegalName].Brief, srcFk.Name, spFk.Name, conv.SpSchema[tableId].Name),
					}
					l = append(l, toAppend)
				}
			}
			for _, spIdx := range conv.SpSchema[tableId].Indexes {
				srcIdx, err := internal.GetSrcIndexFromId(conv.SrcSchema[tableId].Indexes, spIdx.Id)
				if err != nil {
					continue
				}
				_, isChanged := internal.FixName(srcIdx.Name)
				if isChanged && srcIdx.Name != spIdx.Name {
					toAppend := Issue{
						Category:    IssueDB[internal.IllegalName].Category,
						Description: fmt.Sprintf("%s, Index '%s' is mapped to '%s' for table '%s'", IssueDB[internal.IllegalName].Brief, srcIdx.Name, spIdx.Name, conv.SpSchema[tableId].Name),
					}
					l = append(l, toAppend)
				}
			}

			_, isChanged := internal.FixName(srcSchema.Name)
			if isChanged && (spSchema.Name != srcSchema.Name) {
				toAppend := Issue{
					Category:    IssueDB[internal.IllegalName].Category,
					Description: fmt.Sprintf("%s, Table '%s' is mapped to '%s'", IssueDB[internal.IllegalName].Brief, srcSchema.Name, spSchema.Name),
				}
				l = append(l, toAppend)
			}
		}

		issueBatcher := make(map[internal.SchemaIssue]bool)
		for _, colName := range colNames {
			colId, _ := internal.GetColIdFromSpName(conv.SpSchema[tableId].ColDefs, colName)
			for _, i := range issues[colId] {
				if IssueDB[i].Severity != p.severity {
					continue
				}
				if IssueDB[i].batch {
					if issueBatcher[i] {
						// Have already reported a previous instance of this
						// (batched) issue, so skip this one.
						continue
					}
					issueBatcher[i] = true
				}
				srcColType := srcSchema.ColDefs[colId].Type.Print()
				spColType := spSchema.ColDefs[colId].T.PrintColumnDefType()
				if conv.SpDialect == constants.DIALECT_POSTGRESQL {
					spColType = spSchema.ColDefs[colId].T.PGPrintColumnDefType()
				}
				srcColName := srcSchema.ColDefs[colId].Name
				spColName := spSchema.ColDefs[colId].Name

				// A note on case: Spanner types are case insensitive, but
				// default to upper case. In particular, the Spanner AST uses
				// upper case, so spType is upper case. Many source DBs
				// default to lower case. When printing source DB and
				// Spanner types for comparison purposes, this can be distracting.
				// Hence we switch to lower-case for Spanner types here.
				// TODO: add logic to choose case for Spanner types based
				// on case of srcType.
				spColType = strings.ToLower(spColType)
				switch i {
				case internal.DefaultValue:
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: fmt.Sprintf("%s for table '%s' e.g. column '%s'", IssueDB[i].Brief, conv.SpSchema[tableId].Name, spColName),
					}
					l = append(l, toAppend)
				case internal.ForeignKey:
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: fmt.Sprintf("Column '%s' in table '%s' uses foreign keys which Spanner migration tool does not support yet", conv.SpSchema[tableId].Name, spColName),
					}
					l = append(l, toAppend)
				case internal.AutoIncrement:
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: fmt.Sprintf("Column '%s' is an autoincrement column in table '%s'. %s", spColName, conv.SpSchema[tableId].Name, IssueDB[i].Brief),
					}
					l = append(l, toAppend)
				case internal.SequenceCreated:
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: fmt.Sprintf("Auto-Increment has been converted to Sequence '%s' for column '%s' in table '%s'. Set Skipped Range or Start with Counter to avoid duplicate value errors.", conv.SpSchema[tableId].ColDefs[colId].AutoGen.Name, spColName, conv.SpSchema[tableId].Name),
					}
					l = append(l, toAppend)
				case internal.Timestamp:
					// Avoid the confusing "timestamp is mapped to timestamp" message.
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: fmt.Sprintf("Some columns have source DB type 'timestamp without timezone' which is mapped to Spanner type timestamp in table '%s' e.g. column '%s'. %s", conv.SpSchema[tableId].Name, spColName, IssueDB[i].Brief),
					}
					l = append(l, toAppend)
				case internal.Datetime:
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: fmt.Sprintf("Some columns have source DB type 'datetime' which is mapped to Spanner type timestamp in table '%s' e.g. column '%s'. %s", conv.SpSchema[tableId].Name, spColName, IssueDB[i].Brief),
					}
					l = append(l, toAppend)
				case internal.Widened:
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: fmt.Sprintf("Table '%s': %s e.g. for column '%s', source DB type %s is mapped to Spanner data type %s", conv.SpSchema[tableId].Name, IssueDB[i].Brief, spColName, srcColType, spColType),
					}
					l = append(l, toAppend)
				case internal.HotspotTimestamp:
					str := fmt.Sprintf(" %s for Table '%s' and Column  '%s'", IssueDB[i].Brief, spSchema.Name, spColName)

					if !Contains(l, str) {
						toAppend := Issue{
							Category:    IssueDB[i].Category,
							Description: str,
						}
						l = append(l, toAppend)
					}
				case internal.HotspotAutoIncrement:
					str := fmt.Sprintf(" %s for Table '%s' and Column '%s'", IssueDB[i].Brief, spSchema.Name, spColName)

					if !Contains(l, str) {
						toAppend := Issue{
							Category:    IssueDB[i].Category,
							Description: str,
						}
						l = append(l, toAppend)
					}
				case internal.InterleavedNotInOrder:
					parent, _, _ := getInterleaveDetail(conv, tableId, colId, i)
					str := fmt.Sprintf(" Table '%s' can be interleaved with table '%s' %s '%s' and Column '%s'", spSchema.Name, parent, IssueDB[i].Brief, spSchema.Name, spColName)

					if !Contains(l, str) {
						toAppend := Issue{
							Category:    IssueDB[i].Category,
							Description: str,
						}
						l = append(l, toAppend)
					}
				case internal.InterleavedOrder:
					parent, _, _ := getInterleaveDetail(conv, tableId, colId, i)
					str := fmt.Sprintf("Table '%s' %s '%s' go to Interleave Table Tab", spSchema.Name, IssueDB[i].Brief, parent)

					if !Contains(l, str) {
						toAppend := Issue{
							Category:    IssueDB[i].Category,
							Description: str,
						}
						l = append(l, toAppend)
					}
				case internal.InterleavedAddColumn:
					parent, _, _ := getInterleaveDetail(conv, tableId, colId, i)
					str := fmt.Sprintf("Table '%s' is %s '%s' add '%s' as a primary key in table '%s'", conv.SpSchema[tableId].Name, IssueDB[i].Brief, parent, spColName, spSchema.Name)

					if !Contains(l, str) {
						toAppend := Issue{
							Category:    IssueDB[i].Category,
							Description: str,
						}
						l = append(l, toAppend)
					}
				case internal.InterleavedRenameColumn:
					parent, fkName, referColName := getInterleaveDetail(conv, tableId, colId, i)
					str := fmt.Sprintf(" %s '%s' rename '%s' primary key in table '%s' to match the foreign key '%s' refer column '%s'", IssueDB[i].Brief, parent, spColName, spSchema.Name, fkName, referColName)

					if !Contains(l, str) {
						toAppend := Issue{
							Category:    IssueDB[i].Category,
							Description: str,
						}
						l = append(l, toAppend)
					}
				case internal.InterleavedChangeColumnSize:
					parent, fkName, referColName := getInterleaveDetail(conv, tableId, colId, i)
					str := fmt.Sprintf(" %s '%s' change column size of column '%s' primary key in table '%s' to match the foreign key '%s' refer column '%s'", IssueDB[i].Brief, parent, spColName, spSchema.Name, fkName, referColName)

					if !Contains(l, str) {
						toAppend := Issue{
							Category:    IssueDB[i].Category,
							Description: str,
						}
						l = append(l, toAppend)
					}
				case internal.RedundantIndex:
					str := fmt.Sprintf(" %s for Table '%s' and Column  '%s'", IssueDB[i].Brief, spSchema.Name, spColName)

					if !Contains(l, str) {
						toAppend := Issue{
							Category:    IssueDB[i].Category,
							Description: str,
						}
						l = append(l, toAppend)
					}

				case internal.AutoIncrementIndex:
					str := fmt.Sprintf(" %s for Table '%s' and Column '%s'", IssueDB[i].Brief, spSchema.Name, spColName)

					if !Contains(l, str) {
						toAppend := Issue{
							Category:    IssueDB[i].Category,
							Description: str,
						}
						l = append(l, toAppend)
					}

				case internal.InterleaveIndex:
					str := fmt.Sprintf("Column '%s' of Table '%s' %s", spColName, spSchema.Name, IssueDB[i].Brief)

					if !Contains(l, str) {
						toAppend := Issue{
							Category:    IssueDB[i].Category,
							Description: str,
						}
						l = append(l, toAppend)
					}
				case internal.ShardIdColumnAdded:
					str := fmt.Sprintf("Table '%s': '%s' %s", conv.SpSchema[tableId].Name, conv.SpSchema[tableId].ColDefs[conv.SpSchema[tableId].ShardIdColumn].Name, IssueDB[i].Brief)
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: str,
					}
					l = append(l, toAppend)

				case internal.ShardIdColumnPrimaryKey:
					str := fmt.Sprintf("Table '%s': '%s' %s", conv.SpSchema[tableId].Name, conv.SpSchema[tableId].ColDefs[conv.SpSchema[tableId].ShardIdColumn].Name, IssueDB[i].Brief)
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: str,
					}
					l = append(l, toAppend)

				case internal.IllegalName:
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: fmt.Sprintf("%s, Column '%s' is mapped to '%s' for table '%s'", IssueDB[i].Brief, srcColName, spColName, conv.SpSchema[tableId].Name),
					}
					l = append(l, toAppend)
				case internal.ArrayTypeNotSupported:
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: fmt.Sprintf("Table '%s': Column '%s', %s", conv.SpSchema[tableId].Name, spColName, IssueDB[i].Brief),
					}
					l = append(l, toAppend)
				case internal.MissingPrimaryKey:
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: fmt.Sprintf("Column '%s' was added because table '%s' didn't have a primary key. Spanner requires a primary key for every table", *syntheticPK, conv.SpSchema[tableId].Name),
					}
					l = append(l, toAppend)
				case internal.UniqueIndexPrimaryKey:
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: fmt.Sprintf("UNIQUE constraint on column(s) '%s' replaced with primary key since table '%s' didn't have one. Spanner requires a primary key for every table", strings.Join(uniquePK, ", "), conv.SpSchema[tableId].Name),
					}
					l = append(l, toAppend)

				case internal.DefaultValueError:
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: fmt.Sprintf("%s for table '%s' column '%s'", IssueDB[i].Brief, conv.SpSchema[tableId].Name, spColName),
					}
					l = append(l, toAppend)
				default:
					toAppend := Issue{
						Category:    IssueDB[i].Category,
						Description: fmt.Sprintf("Table '%s': Column '%s', type %s is mapped to %s. %s", conv.SpSchema[tableId].Name, spColName, srcColType, spColType, IssueDB[i].Brief),
					}
					l = append(l, toAppend)
				}
			}
		}
		if len(l) == 0 {
			continue
		}

		heading := p.heading
		if len(l) > 1 {
			heading = heading + "s"
		}
		body = append(body, tableReportBody{Heading: heading, IssueBody: l})
	}
	return body
}