func()

in internal/enricher/enricher.go [210:361]


func (mc *MetadataCollector) GenerateCommentSQLs(ctx context.Context) ([]string, error) {
	startTime := time.Now()
	log.Println("INFO: Starting metadata collection and SQL comment generation...")

	tables, err := mc.db.ListTables()
	if err != nil {
		return nil, fmt.Errorf("failed to list tables: %w", err)
	}

	// Apply table filtering
	filteredTables := []string{}
	if len(mc.TableFilters) > 0 {
		for table := range mc.TableFilters {
			filteredTables = append(filteredTables, table)
		}
	} else {
		filteredTables = tables
	}
	tables = filteredTables

	// Generate schema context once
	schemaContext, err := mc.generateSchemaContext()
	if err != nil {
		log.Printf("WARN: Failed to generate schema context: %v", err)
		schemaContext = ""
	}
	mc.schemaContext = schemaContext

	var orderedSQLs []OrderedSQL
	var wg sync.WaitGroup
	errorChannel := make(chan error, len(tables))
	var mu sync.Mutex

	for _, table := range tables {
		wg.Add(1)
		go func(table string) {
			defer wg.Done()

			// Collect TABLE metadata and generate SQL
			tableMetadata, err := withRetry[*TableMetadata](ctx, DefaultRetryOptions, func(ctx context.Context) (*TableMetadata, error) {
				return mc.CollectTableMetadata(ctx, table, mc.schemaContext)
			})
			if err != nil {
				log.Println("ERROR: Failed to collect metadata for table:", table, "error:", err)
				errorChannel <- err
				return
			}

			tableCommentData := &database.TableCommentData{
				TableName:   tableMetadata.Table,
				Description: tableMetadata.Description,
			}
			tableSQL, genTableErr := mc.db.GenerateTableCommentSQL(tableCommentData, mc.Enrichments)
			if genTableErr != nil {
				log.Printf("WARN: Failed to generate table comment SQL for %s: %v", table, genTableErr)
				errorChannel <- genTableErr
				return
			}

			if tableSQL != "" {
				mu.Lock()
				orderedSQLs = append(orderedSQLs, OrderedSQL{SQL: tableSQL, Table: table, IsTableComment: true})
				mu.Unlock()
			}

			columnInfos, err := mc.db.ListColumns(table)
			if err != nil {
				log.Println("ERROR: Failed to list columns for table:", table, "error:", err)
				errorChannel <- err
				return
			}
			// Apply column filtering
			filteredColumnInfos := []database.ColumnInfo{}

			if columnFilters, ok := mc.TableFilters[table]; ok && columnFilters != nil {
				for _, colInfo := range columnInfos {
					for _, filteredCol := range columnFilters {
						if colInfo.Name == filteredCol {
							filteredColumnInfos = append(filteredColumnInfos, colInfo)
							break
						}
					}
				}
			} else {
				filteredColumnInfos = columnInfos
			}
			columnInfos = filteredColumnInfos

			for _, colInfo := range columnInfos {
				wg.Add(1)
				go func(colInfo database.ColumnInfo) {
					defer wg.Done()

					metadata, err := withRetry[*ColumnMetadata](ctx, DefaultRetryOptions, func(ctx context.Context) (*ColumnMetadata, error) {
						return mc.CollectColumnMetadata(ctx, table, colInfo, mc.schemaContext)
					})
					if err != nil {
						log.Println("ERROR: Failed to collect metadata for column:", colInfo.Name, "in table:", table, "error:", err)
						errorChannel <- err
						return
					}

					commentData := &database.CommentData{
						TableName:      metadata.Table,
						ColumnName:     metadata.Column,
						ColumnDataType: metadata.DataType,
						ExampleValues:  metadata.ExampleValues,
						DistinctCount:  metadata.DistinctCount,
						NullCount:      metadata.NullCount,
						Description:    metadata.Description,
					}
					// Pass mc.Enrichments to GenerateCommentSQL
					sql, genErr := mc.db.GenerateCommentSQL(commentData, mc.Enrichments)
					if genErr != nil {
						log.Printf("WARN: Failed to generate comment SQL for %s.%s: %v", metadata.Table, metadata.Column, genErr)
						errorChannel <- genErr
						return
					}

					if sql != "" {
						mu.Lock()
						sql = "\t" + sql                                                                                                   // Lock before modifying orderedSQLs
						orderedSQLs = append(orderedSQLs, OrderedSQL{SQL: sql, Table: table, Column: colInfo.Name, IsTableComment: false}) // Mark as column comment
						mu.Unlock()                                                                                                        // Unlock after modifying orderedSQLs
					}
				}(colInfo)
			}
		}(table)
	}

	wg.Wait()
	close(errorChannel)

	// Sort orderedSQLs to maintain order
	sort.Slice(orderedSQLs, func(i, j int) bool {
		if orderedSQLs[i].Table != orderedSQLs[j].Table {
			return orderedSQLs[i].Table < orderedSQLs[j].Table
		}
		if orderedSQLs[i].IsTableComment != orderedSQLs[j].IsTableComment {
			return orderedSQLs[i].IsTableComment
		}
		return orderedSQLs[i].Column < orderedSQLs[j].Column
	})

	allSQLs := make([]string, 0, len(orderedSQLs))
	for _, osql := range orderedSQLs {
		allSQLs = append(allSQLs, osql.SQL)
	}

	log.Println("INFO: Metadata collection and SQL comment generation completed in:", time.Since(startTime))
	return allSQLs, nil
}