catalog/sql/sql.go (630 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package sql import ( "context" "database/sql" "errors" "fmt" "iter" "maps" "slices" "strings" "sync" _ "unsafe" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/internal" "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/table" "github.com/uptrace/bun" "github.com/uptrace/bun/dialect/feature" "github.com/uptrace/bun/dialect/mssqldialect" "github.com/uptrace/bun/dialect/mysqldialect" "github.com/uptrace/bun/dialect/oracledialect" "github.com/uptrace/bun/dialect/pgdialect" "github.com/uptrace/bun/dialect/sqlitedialect" "github.com/uptrace/bun/extra/bundebug" "github.com/uptrace/bun/schema" ) type SupportedDialect string const ( Postgres SupportedDialect = "postgres" MySQL SupportedDialect = "mysql" SQLite SupportedDialect = "sqlite" MSSQL SupportedDialect = "mssql" Oracle SupportedDialect = "oracle" ) const ( DialectKey = "sql.dialect" DriverKey = "sql.driver" initCatalogTablesKey = "init_catalog_tables" ) func init() { catalog.Register("sql", catalog.RegistrarFunc(func(ctx context.Context, name string, p iceberg.Properties) (c catalog.Catalog, err error) { driver, ok := p[DriverKey] if !ok { return nil, errors.New("must provide driver to pass to sql.Open") } dialect := strings.ToLower(p[DialectKey]) if dialect == "" { return nil, errors.New("must provide sql dialect to use") } uri := strings.TrimPrefix(p.Get("uri", ""), "sql://") sqldb, err := sql.Open(driver, uri) if err != nil { return nil, err } defer func() { if r := recover(); r != nil { err = fmt.Errorf("failed to create SQL catalog: %v", r) } }() return NewCatalog(p.Get(name, "sql"), sqldb, SupportedDialect(dialect), p) })) } var _ catalog.Catalog = (*Catalog)(nil) var ( minimalNamespaceProps = iceberg.Properties{"exists": "true"} dialects = map[SupportedDialect]schema.Dialect{} dialectMx sync.Mutex ) func createDialect(d SupportedDialect) schema.Dialect { switch d { case Postgres: return pgdialect.New() case MySQL: return mysqldialect.New() case SQLite: return sqlitedialect.New() case MSSQL: return mssqldialect.New() case Oracle: return oracledialect.New() default: panic("unsupported sql dialect") } } func getDialect(d SupportedDialect) schema.Dialect { dialectMx.Lock() defer dialectMx.Unlock() ret, ok := dialects[d] if !ok { ret = createDialect(d) dialects[d] = ret } return ret } type sqlIcebergTable struct { bun.BaseModel `bun:"table:iceberg_tables"` CatalogName string `bun:",pk"` TableNamespace string `bun:",pk"` TableName string `bun:",pk"` MetadataLocation sql.NullString PreviousMetadataLocation sql.NullString } type sqlIcebergNamespaceProps struct { bun.BaseModel `bun:"table:iceberg_namespace_properties"` CatalogName string `bun:",pk"` Namespace string `bun:",pk"` PropertyKey string `bun:",pk"` PropertyValue sql.NullString } func withReadTx[R any](ctx context.Context, db *bun.DB, fn func(context.Context, bun.Tx) (R, error)) (result R, err error) { db.RunInTx(ctx, &sql.TxOptions{ReadOnly: true}, func(ctx context.Context, tx bun.Tx) error { result, err = fn(ctx, tx) return err }) return } func withWriteTx(ctx context.Context, db *bun.DB, fn func(context.Context, bun.Tx) error) error { return db.RunInTx(ctx, &sql.TxOptions{Isolation: sql.LevelDefault}, func(ctx context.Context, tx bun.Tx) error { return fn(ctx, tx) }) } type Catalog struct { db *bun.DB name string props iceberg.Properties } // NewCatalog creates a new sql-based catalog using the provided sql.DB handle to perform any queries. // // The dialect parameter determines the SQL dialect to use for query generation and must be one of the // supported dialects, i.e. one of the exported SupportedDialect values. The separation here allows for // the use of different drivers/databases provided they support the chosen sql dialect (e.g. if a particular // database supports the MySQL dialect, then the database can still be used with this catalog even though // it's not explicitly implemented). // // If the "init_catalog_tables" property is set to "true", then creating the catalog will also attempt to // to verify whether the necessary tables (iceberg_tables and iceberg_namespace_properties) exist, creating // them if they do not already exist. // // The environment variable ICEBERG_SQL_DEBUG can be set to automatically log the sql queries to the terminal: // - ICEBERG_SQL_DEBUG=1 logs only failed queries // - ICEBERG_SQL_DEBUG=2 logs all queries // // All interactions with the db are performed within transactions to ensure atomicity and transactional isolation // of catalog changes. func NewCatalog(name string, db *sql.DB, dialect SupportedDialect, props iceberg.Properties) (*Catalog, error) { cat := &Catalog{db: bun.NewDB(db, getDialect(dialect)), name: name, props: props} cat.db.AddQueryHook(bundebug.NewQueryHook( bundebug.WithEnabled(false), // ICEBERG_SQL_DEBUG=1 logs only failed queries // ICEBERG_SQL_DEBUG=2 log all queries bundebug.FromEnv("ICEBERG_SQL_DEBUG"))) if cat.props.GetBool(initCatalogTablesKey, true) { return cat, cat.ensureTablesExist() } return cat, nil } func (c *Catalog) Name() string { return c.name } func (c *Catalog) CatalogType() catalog.Type { return catalog.SQL } func (c *Catalog) CreateSQLTables(ctx context.Context) error { _, err := c.db.NewCreateTable().Model((*sqlIcebergTable)(nil)). IfNotExists().Exec(ctx) if err != nil { return err } _, err = c.db.NewCreateTable().Model((*sqlIcebergNamespaceProps)(nil)). IfNotExists().Exec(ctx) return err } func (c *Catalog) DropSQLTables(ctx context.Context) error { _, err := c.db.NewDropTable().Model((*sqlIcebergTable)(nil)). IfExists().Exec(ctx) if err != nil { return err } _, err = c.db.NewDropTable().Model((*sqlIcebergNamespaceProps)(nil)). IfExists().Exec(ctx) return err } func (c *Catalog) ensureTablesExist() error { return c.CreateSQLTables(context.Background()) } func (c *Catalog) namespaceExists(ctx context.Context, ns string) (bool, error) { return withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) (bool, error) { exists, err := tx.NewSelect().Model((*sqlIcebergTable)(nil)). Where("catalog_name = ?", c.name). Where("table_namespace = ?", ns). Limit(1).Exists(ctx) if err != nil { return false, err } if exists { return true, nil } return tx.NewSelect().Model((*sqlIcebergNamespaceProps)(nil)). Where("catalog_name = ?", c.name).Where("namespace = ?", ns). Limit(1).Exists(ctx) }) } func checkValidNamespace(ident table.Identifier) error { if len(ident) < 1 { return fmt.Errorf("%w: empty namespace identifier", catalog.ErrNoSuchNamespace) } return nil } func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc *iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) { staged, err := internal.CreateStagedTable(ctx, c.props, c.LoadNamespaceProperties, ident, sc, opts...) if err != nil { return nil, err } nsIdent := catalog.NamespaceFromIdent(ident) tblIdent := catalog.TableNameFromIdent(ident) ns := strings.Join(nsIdent, ".") exists, err := c.namespaceExists(ctx, ns) if err != nil { return nil, err } if !exists { return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, ns) } wfs, ok := staged.FS().(io.WriteFileIO) if !ok { return nil, errors.New("loaded filesystem IO does not support writing") } if err := internal.WriteTableMetadata(staged.Metadata(), wfs, staged.MetadataLocation()); err != nil { return nil, err } err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error { _, err := tx.NewInsert().Model(&sqlIcebergTable{ CatalogName: c.name, TableNamespace: ns, TableName: tblIdent, MetadataLocation: sql.NullString{String: staged.MetadataLocation(), Valid: true}, }).Exec(ctx) if err != nil { return fmt.Errorf("failed to create table: %w", err) } return nil }) if err != nil { return nil, err } return c.LoadTable(ctx, ident, staged.Properties()) } func (c *Catalog) CommitTable(ctx context.Context, tbl *table.Table, reqs []table.Requirement, updates []table.Update) (table.Metadata, string, error) { ns := catalog.NamespaceFromIdent(tbl.Identifier()) tblName := catalog.TableNameFromIdent(tbl.Identifier()) current, err := c.LoadTable(ctx, tbl.Identifier(), nil) if err != nil && !errors.Is(err, catalog.ErrNoSuchTable) { return nil, "", err } staged, err := internal.UpdateAndStageTable(ctx, current, tbl.Identifier(), reqs, updates, c) if err != nil { return nil, "", err } if current != nil && staged.Metadata().Equals(current.Metadata()) { // no changes, do nothing return current.Metadata(), current.MetadataLocation(), nil } if err := internal.WriteMetadata(ctx, staged.Metadata(), staged.MetadataLocation(), staged.Properties()); err != nil { return nil, "", err } err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error { if current != nil { res, err := tx.NewUpdate().Model(&sqlIcebergTable{ CatalogName: c.name, TableNamespace: strings.Join(ns, "."), TableName: tblName, MetadataLocation: sql.NullString{Valid: true, String: staged.MetadataLocation()}, PreviousMetadataLocation: sql.NullString{Valid: true, String: current.MetadataLocation()}, }).WherePK().Where("metadata_location = ?", current.MetadataLocation()). Exec(ctx) if err != nil { return fmt.Errorf("error updating table information: %w", err) } n, err := res.RowsAffected() if err != nil { return fmt.Errorf("error updating table information: %w", err) } if n == 0 { return fmt.Errorf("table has been updated by another process: %s.%s", strings.Join(ns, "."), tblName) } return nil } _, err := tx.NewInsert().Model(&sqlIcebergTable{ CatalogName: c.name, TableNamespace: strings.Join(ns, "."), TableName: tblName, MetadataLocation: sql.NullString{Valid: true, String: staged.MetadataLocation()}, }).Exec(ctx) if err != nil { return fmt.Errorf("failed to create table: %w", err) } return nil }) if err != nil { return nil, "", err } return staged.Metadata(), staged.MetadataLocation(), nil } func (c *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) { ns := catalog.NamespaceFromIdent(identifier) tbl := catalog.TableNameFromIdent(identifier) if props == nil { props = iceberg.Properties{} } result, err := withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) (*sqlIcebergTable, error) { t := new(sqlIcebergTable) err := tx.NewSelect().Model(t). Where("catalog_name = ?", c.name). Where("table_namespace = ?", strings.Join(ns, ".")). Where("table_name = ?", tbl). Scan(ctx) if errors.Is(err, sql.ErrNoRows) { return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchTable, identifier) } if err != nil { return nil, fmt.Errorf("error encountered loading table %s: %w", identifier, err) } return t, nil }) if err != nil { return nil, err } if !result.MetadataLocation.Valid { return nil, fmt.Errorf("%w: %s, metadata location is missing", catalog.ErrNoSuchTable, identifier) } tblProps := maps.Clone(c.props) maps.Copy(props, tblProps) iofs, err := io.LoadFS(ctx, tblProps, result.MetadataLocation.String) if err != nil { return nil, err } return table.NewFromLocation(identifier, result.MetadataLocation.String, iofs, c) } func (c *Catalog) DropTable(ctx context.Context, identifier table.Identifier) error { ns := strings.Join(catalog.NamespaceFromIdent(identifier), ".") tbl := catalog.TableNameFromIdent(identifier) return withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error { res, err := tx.NewDelete().Model(&sqlIcebergTable{ CatalogName: c.name, TableNamespace: ns, TableName: tbl, }).WherePK().Exec(ctx) if err != nil { return fmt.Errorf("failed to delete table entry: %w", err) } n, err := res.RowsAffected() if err != nil { return fmt.Errorf("error encountered when deleting table entry: %w", err) } if n == 0 { return fmt.Errorf("%w: %s", catalog.ErrNoSuchTable, identifier) } return nil }) } func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) { fromNs := strings.Join(catalog.NamespaceFromIdent(from), ".") fromTbl := catalog.TableNameFromIdent(from) toNs := strings.Join(catalog.NamespaceFromIdent(to), ".") toTbl := catalog.TableNameFromIdent(to) exists, err := c.namespaceExists(ctx, toNs) if err != nil { return nil, err } if !exists { return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, toNs) } err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error { exists, err := tx.NewSelect().Model(&sqlIcebergTable{ CatalogName: c.name, TableNamespace: toNs, TableName: toTbl, }).WherePK().Exists(ctx) if err != nil { return fmt.Errorf("error encountered checking existence of table '%s': %w", to, err) } if exists { return catalog.ErrTableAlreadyExists } res, err := tx.NewUpdate().Model(&sqlIcebergTable{ CatalogName: c.name, TableNamespace: fromNs, TableName: fromTbl, }).WherePK(). Set("table_namespace = ?", toNs). Set("table_name = ?", toTbl). Exec(ctx) if err != nil { return fmt.Errorf("error renaming table from '%s' to %s': %w", from, to, err) } n, err := res.RowsAffected() if err != nil { return fmt.Errorf("error renaming table from '%s' to %s': %w", from, to, err) } if n == 0 { return fmt.Errorf("%w: %s", catalog.ErrNoSuchTable, from) } return nil }) if err != nil { return nil, err } return c.LoadTable(ctx, to, nil) } func (c *Catalog) CheckTableExists(ctx context.Context, identifier table.Identifier) (bool, error) { _, err := c.LoadTable(ctx, identifier, nil) if err != nil { if errors.Is(err, catalog.ErrNoSuchTable) { return false, nil } return false, err } return true, nil } func (c *Catalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error { if err := checkValidNamespace(namespace); err != nil { return err } exists, err := c.namespaceExists(ctx, strings.Join(namespace, ".")) if err != nil { return err } if exists { return fmt.Errorf("%w: %s", catalog.ErrNamespaceAlreadyExists, strings.Join(namespace, ".")) } if len(props) == 0 { props = minimalNamespaceProps } nsToCreate := strings.Join(namespace, ".") return withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error { toInsert := make([]sqlIcebergNamespaceProps, 0, len(props)) for k, v := range props { toInsert = append(toInsert, sqlIcebergNamespaceProps{ CatalogName: c.name, Namespace: nsToCreate, PropertyKey: k, PropertyValue: sql.NullString{String: v, Valid: true}, }) } _, err := tx.NewInsert().Model(&toInsert).Exec(ctx) if err != nil { return fmt.Errorf("error inserting namespace properties for namespace '%s': %w", namespace, err) } return nil }) } func (c *Catalog) DropNamespace(ctx context.Context, namespace table.Identifier) error { if err := checkValidNamespace(namespace); err != nil { return err } nsToDelete := strings.Join(namespace, ".") exists, err := c.namespaceExists(ctx, nsToDelete) if err != nil { return err } if !exists { return fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, nsToDelete) } tbls := make([]table.Identifier, 0) iter := c.ListTables(ctx, namespace) for tbl, err := range iter { tbls = append(tbls, tbl) if err != nil { return err } break // there is already at least a table } if len(tbls) > 0 { return fmt.Errorf("%w: %d tables exist in namespace %s", catalog.ErrNamespaceNotEmpty, len(tbls), nsToDelete) } return withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error { _, err := tx.NewDelete().Model((*sqlIcebergNamespaceProps)(nil)). Where("catalog_name = ?", c.name). Where("namespace = ?", nsToDelete).Exec(ctx) if err != nil { return fmt.Errorf("error deleting namespace '%s': %w", namespace, err) } return nil }) } func (c *Catalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error) { if err := checkValidNamespace(namespace); err != nil { return nil, err } nsToLoad := strings.Join(namespace, ".") exists, err := c.namespaceExists(ctx, nsToLoad) if err != nil { return nil, err } if !exists { return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, nsToLoad) } return withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) (iceberg.Properties, error) { var props []sqlIcebergNamespaceProps err := tx.NewSelect().Model(&props). Where("catalog_name = ?", c.name). Where("namespace = ?", nsToLoad).Scan(ctx) if err != nil { return nil, fmt.Errorf("error loading namespace properties for '%s': %w", namespace, err) } result := make(iceberg.Properties) for _, p := range props { result[p.PropertyKey] = p.PropertyValue.String } return result, nil }) } func (c *Catalog) ListTables(ctx context.Context, namespace table.Identifier) iter.Seq2[table.Identifier, error] { tables, err := c.listTablesAll(ctx, namespace) if err != nil { return func(yield func(table.Identifier, error) bool) { yield(table.Identifier{}, err) } } return func(yield func(table.Identifier, error) bool) { for _, t := range tables { if !yield(t, nil) { return } } } } func (c *Catalog) listTablesAll(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) { if len(namespace) > 0 { exists, err := c.namespaceExists(ctx, strings.Join(namespace, ".")) if err != nil { return nil, err } if !exists { return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, strings.Join(namespace, ".")) } } ns := strings.Join(namespace, ".") tables, err := withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) ([]sqlIcebergTable, error) { var tables []sqlIcebergTable err := tx.NewSelect().Model(&tables). Where("catalog_name = ?", c.name). Where("table_namespace = ?", ns). Scan(ctx) return tables, err }) if err != nil { return nil, fmt.Errorf("error listing tables for namespace '%s': %w", namespace, err) } ret := make([]table.Identifier, len(tables)) for i, t := range tables { ret[i] = append(strings.Split(t.TableNamespace, "."), t.TableName) } return ret, nil } func (c *Catalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error) { tableQuery := c.db.NewSelect().Model((*sqlIcebergTable)(nil)). Column("table_namespace").Where("catalog_name = ?", c.name) nsQuery := c.db.NewSelect().Model((*sqlIcebergNamespaceProps)(nil)). Column("namespace").Where("catalog_name = ?", c.name) if len(parent) > 0 { ns := strings.Join(parent, ".") exists, err := c.namespaceExists(ctx, ns) if err != nil { return nil, err } if !exists { return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, strings.Join(parent, ".")) } ns += "%" tableQuery = tableQuery.Where("table_namespace like ?", ns) nsQuery = nsQuery.Where("namespace like ?", ns) } namespaces, err := withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) ([]string, error) { var namespaces []string rows, err := tx.QueryContext(ctx, tableQuery.String()+" UNION "+nsQuery.String()) if err != nil { return nil, fmt.Errorf("error listing namespaces for '%s': %w", parent, err) } err = c.db.ScanRows(ctx, rows, &namespaces) return namespaces, err }) if err != nil { return nil, err } ret := make([]table.Identifier, len(namespaces)) for i, n := range namespaces { ret[i] = strings.Split(n, ".") } return ret, nil } // avoid circular dependency while still avoiding having to export the getUpdatedPropsAndUpdateSummary function // so that we can re-use it in the catalog implementations without duplicating the code. //go:linkname getUpdatedPropsAndUpdateSummary github.com/apache/iceberg-go/catalog.getUpdatedPropsAndUpdateSummary func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals []string, updates iceberg.Properties) (iceberg.Properties, catalog.PropertiesUpdateSummary, error) func (c *Catalog) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier, removals []string, updates iceberg.Properties) (catalog.PropertiesUpdateSummary, error) { var summary catalog.PropertiesUpdateSummary currentProps, err := c.LoadNamespaceProperties(ctx, namespace) if err != nil { return summary, err } _, summary, err = getUpdatedPropsAndUpdateSummary(currentProps, removals, updates) if err != nil { return summary, err } nsToUpdate := strings.Join(namespace, ".") return summary, withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error { var m *sqlIcebergNamespaceProps if len(removals) > 0 { _, err := tx.NewDelete().Model(m). Where("catalog_name = ?", c.name). Where("namespace = ?", nsToUpdate). Where("property_key in (?)", bun.In(removals)).Exec(ctx) if err != nil { return fmt.Errorf("error deleting properties for '%s': %w", namespace, err) } } if len(updates) > 0 { props := make([]sqlIcebergNamespaceProps, 0, len(updates)) for k, v := range updates { props = append(props, sqlIcebergNamespaceProps{ CatalogName: c.name, Namespace: nsToUpdate, PropertyKey: k, PropertyValue: sql.NullString{String: v, Valid: true}, }) } q := tx.NewInsert().Model(&props) switch { case c.db.HasFeature(feature.InsertOnConflict): q = q.On("CONFLICT (catalog_name, namespace, property_key) DO UPDATE"). Set("property_value = EXCLUDED.property_value") case c.db.HasFeature(feature.InsertOnDuplicateKey): q = q.On("DUPLICATE KEY UPDATE") default: _, err := tx.NewDelete().Model(m). Where("catalog_name = ?", c.name). Where("namespace = ?", nsToUpdate). Where("property_key in (?)", bun.In(slices.Collect(maps.Keys(updates)))). Exec(ctx) if err != nil { return fmt.Errorf("error deleting properties for '%s': %w", namespace, err) } } _, err := q.Exec(ctx) if err != nil { return fmt.Errorf("error updating namespace properties for '%s': %w", namespace, err) } } return nil }) } func (c *Catalog) CheckNamespaceExists(ctx context.Context, namespace table.Identifier) (bool, error) { return c.namespaceExists(ctx, strings.Join(namespace, ".")) }