in internal/enricher/enricher.go [210:361]
func (mc *MetadataCollector) GenerateCommentSQLs(ctx context.Context) ([]string, error) {
startTime := time.Now()
log.Println("INFO: Starting metadata collection and SQL comment generation...")
tables, err := mc.db.ListTables()
if err != nil {
return nil, fmt.Errorf("failed to list tables: %w", err)
}
// Apply table filtering
filteredTables := []string{}
if len(mc.TableFilters) > 0 {
for table := range mc.TableFilters {
filteredTables = append(filteredTables, table)
}
} else {
filteredTables = tables
}
tables = filteredTables
// Generate schema context once
schemaContext, err := mc.generateSchemaContext()
if err != nil {
log.Printf("WARN: Failed to generate schema context: %v", err)
schemaContext = ""
}
mc.schemaContext = schemaContext
var orderedSQLs []OrderedSQL
var wg sync.WaitGroup
errorChannel := make(chan error, len(tables))
var mu sync.Mutex
for _, table := range tables {
wg.Add(1)
go func(table string) {
defer wg.Done()
// Collect TABLE metadata and generate SQL
tableMetadata, err := withRetry[*TableMetadata](ctx, DefaultRetryOptions, func(ctx context.Context) (*TableMetadata, error) {
return mc.CollectTableMetadata(ctx, table, mc.schemaContext)
})
if err != nil {
log.Println("ERROR: Failed to collect metadata for table:", table, "error:", err)
errorChannel <- err
return
}
tableCommentData := &database.TableCommentData{
TableName: tableMetadata.Table,
Description: tableMetadata.Description,
}
tableSQL, genTableErr := mc.db.GenerateTableCommentSQL(tableCommentData, mc.Enrichments)
if genTableErr != nil {
log.Printf("WARN: Failed to generate table comment SQL for %s: %v", table, genTableErr)
errorChannel <- genTableErr
return
}
if tableSQL != "" {
mu.Lock()
orderedSQLs = append(orderedSQLs, OrderedSQL{SQL: tableSQL, Table: table, IsTableComment: true})
mu.Unlock()
}
columnInfos, err := mc.db.ListColumns(table)
if err != nil {
log.Println("ERROR: Failed to list columns for table:", table, "error:", err)
errorChannel <- err
return
}
// Apply column filtering
filteredColumnInfos := []database.ColumnInfo{}
if columnFilters, ok := mc.TableFilters[table]; ok && columnFilters != nil {
for _, colInfo := range columnInfos {
for _, filteredCol := range columnFilters {
if colInfo.Name == filteredCol {
filteredColumnInfos = append(filteredColumnInfos, colInfo)
break
}
}
}
} else {
filteredColumnInfos = columnInfos
}
columnInfos = filteredColumnInfos
for _, colInfo := range columnInfos {
wg.Add(1)
go func(colInfo database.ColumnInfo) {
defer wg.Done()
metadata, err := withRetry[*ColumnMetadata](ctx, DefaultRetryOptions, func(ctx context.Context) (*ColumnMetadata, error) {
return mc.CollectColumnMetadata(ctx, table, colInfo, mc.schemaContext)
})
if err != nil {
log.Println("ERROR: Failed to collect metadata for column:", colInfo.Name, "in table:", table, "error:", err)
errorChannel <- err
return
}
commentData := &database.CommentData{
TableName: metadata.Table,
ColumnName: metadata.Column,
ColumnDataType: metadata.DataType,
ExampleValues: metadata.ExampleValues,
DistinctCount: metadata.DistinctCount,
NullCount: metadata.NullCount,
Description: metadata.Description,
}
// Pass mc.Enrichments to GenerateCommentSQL
sql, genErr := mc.db.GenerateCommentSQL(commentData, mc.Enrichments)
if genErr != nil {
log.Printf("WARN: Failed to generate comment SQL for %s.%s: %v", metadata.Table, metadata.Column, genErr)
errorChannel <- genErr
return
}
if sql != "" {
mu.Lock()
sql = "\t" + sql // Lock before modifying orderedSQLs
orderedSQLs = append(orderedSQLs, OrderedSQL{SQL: sql, Table: table, Column: colInfo.Name, IsTableComment: false}) // Mark as column comment
mu.Unlock() // Unlock after modifying orderedSQLs
}
}(colInfo)
}
}(table)
}
wg.Wait()
close(errorChannel)
// Sort orderedSQLs to maintain order
sort.Slice(orderedSQLs, func(i, j int) bool {
if orderedSQLs[i].Table != orderedSQLs[j].Table {
return orderedSQLs[i].Table < orderedSQLs[j].Table
}
if orderedSQLs[i].IsTableComment != orderedSQLs[j].IsTableComment {
return orderedSQLs[i].IsTableComment
}
return orderedSQLs[i].Column < orderedSQLs[j].Column
})
allSQLs := make([]string, 0, len(orderedSQLs))
for _, osql := range orderedSQLs {
allSQLs = append(allSQLs, osql.SQL)
}
log.Println("INFO: Metadata collection and SQL comment generation completed in:", time.Since(startTime))
return allSQLs, nil
}