catalog/rest/rest.go (997 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package rest import ( "bytes" "context" "crypto/sha256" "crypto/tls" "encoding/hex" "encoding/json" "errors" "fmt" "hash" "io" "iter" "maps" "net/http" "net/url" "strconv" "strings" "time" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" iceio "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" "github.com/aws/aws-sdk-go-v2/config" ) var _ catalog.Catalog = (*Catalog)(nil) const ( pageSizeKey contextKey = "page_size" defaultPageSize = 20 keyOauthToken = "token" keyWarehouseLocation = "warehouse" keyMetadataLocation = "metadata_location" keyOauthCredential = "credential" authorizationHeader = "Authorization" bearerPrefix = "Bearer" namespaceSeparator = "\x1F" keyPrefix = "prefix" icebergRestSpecVersion = "0.14.1" keyRestSigV4 = "rest.sigv4-enabled" keyRestSigV4Region = "rest.signing-region" keyRestSigV4Service = "rest.signing-name" keyAuthUrl = "rest.authorization-url" keyTlsSkipVerify = "rest.tls.skip-verify" ) var ( ErrRESTError = errors.New("REST error") ErrBadRequest = fmt.Errorf("%w: bad request", ErrRESTError) ErrForbidden = fmt.Errorf("%w: forbidden", ErrRESTError) ErrUnauthorized = fmt.Errorf("%w: unauthorized", ErrRESTError) ErrAuthorizationExpired = fmt.Errorf("%w: authorization expired", ErrRESTError) ErrServiceUnavailable = fmt.Errorf("%w: service unavailable", ErrRESTError) ErrServerError = fmt.Errorf("%w: server error", ErrRESTError) ErrCommitFailed = fmt.Errorf("%w: commit failed, refresh and try again", ErrRESTError) ErrCommitStateUnknown = fmt.Errorf("%w: commit failed due to unknown reason", ErrRESTError) ErrOAuthError = fmt.Errorf("%w: oauth error", ErrRESTError) ) func init() { reg := catalog.RegistrarFunc(func(ctx context.Context, name string, p iceberg.Properties) (catalog.Catalog, error) { return newCatalogFromProps(ctx, name, p.Get("uri", ""), p) }) catalog.Register(string(catalog.REST), reg) catalog.Register("http", reg) catalog.Register("https", reg) } type errorResponse struct { Message string `json:"message"` Type string `json:"type"` Code int `json:"code"` wrapping error } type contextKey string func (e errorResponse) Unwrap() error { return e.wrapping } func (e errorResponse) Error() string { return e.Type + ": " + e.Message } type identifier struct { Namespace []string `json:"namespace"` Name string `json:"name"` } type commitTableResponse struct { MetadataLoc string `json:"metadata-location"` RawMetadata json.RawMessage `json:"metadata"` Metadata table.Metadata `json:"-"` } func (t *commitTableResponse) UnmarshalJSON(b []byte) (err error) { type Alias commitTableResponse if err = json.Unmarshal(b, (*Alias)(t)); err != nil { return err } t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata) return } type loadTableResponse struct { MetadataLoc string `json:"metadata-location"` RawMetadata json.RawMessage `json:"metadata"` Config iceberg.Properties `json:"config"` Metadata table.Metadata `json:"-"` } func (t *loadTableResponse) UnmarshalJSON(b []byte) (err error) { type Alias loadTableResponse if err = json.Unmarshal(b, (*Alias)(t)); err != nil { return err } t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata) return } type createTableRequest struct { Name string `json:"name"` Schema *iceberg.Schema `json:"schema"` Location string `json:"location,omitempty"` PartitionSpec *iceberg.PartitionSpec `json:"partition-spec,omitempty"` WriteOrder *table.SortOrder `json:"write-order,omitempty"` StageCreate bool `json:"stage-create"` Props iceberg.Properties `json:"properties,omitempty"` } type oauthTokenResponse struct { AccessToken string `json:"access_token"` TokenType string `json:"token_type"` ExpiresIn int `json:"expires_in"` Scope string `json:"scope"` RefreshToken string `json:"refresh_token"` } type oauthErrorResponse struct { Err string `json:"error"` ErrDesc string `json:"error_description"` ErrURI string `json:"error_uri"` } func (o oauthErrorResponse) Unwrap() error { return ErrOAuthError } func (o oauthErrorResponse) Error() string { msg := o.Err if o.ErrDesc != "" { msg += ": " + o.ErrDesc } if o.ErrURI != "" { msg += " (" + o.ErrURI + ")" } return msg } type configResponse struct { Defaults iceberg.Properties `json:"defaults"` Overrides iceberg.Properties `json:"overrides"` } type sessionTransport struct { http.Transport defaultHeaders http.Header signer v4.HTTPSigner cfg aws.Config service string newHash func() hash.Hash } // from https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/aws/signer/v4#Signer.SignHTTP const emptyStringHash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" func (s *sessionTransport) RoundTrip(r *http.Request) (*http.Response, error) { for k, v := range s.defaultHeaders { for _, hdr := range v { r.Header.Add(k, hdr) } } if s.signer != nil { var payloadHash string if r.Body == nil { payloadHash = emptyStringHash } else { rdr, err := r.GetBody() if err != nil { return nil, err } h := s.newHash() if _, err = io.Copy(h, rdr); err != nil { return nil, err } payloadHash = hex.EncodeToString(h.Sum(nil)) } creds, err := s.cfg.Credentials.Retrieve(r.Context()) if err != nil { return nil, err } // modifies the request in place err = s.signer.SignHTTP(r.Context(), creds, r, payloadHash, s.service, s.cfg.Region, time.Now()) if err != nil { return nil, err } } return s.Transport.RoundTrip(r) } func do[T any](ctx context.Context, method string, baseURI *url.URL, path []string, cl *http.Client, override map[int]error, allowNoContent bool) (ret T, err error) { var ( req *http.Request rsp *http.Response ) uri := baseURI.JoinPath(path...).String() if req, err = http.NewRequestWithContext(ctx, method, uri, nil); err != nil { return } if rsp, err = cl.Do(req); err != nil { return } if allowNoContent && rsp.StatusCode == http.StatusNoContent { return } if rsp.StatusCode != http.StatusOK { return ret, handleNon200(rsp, override) } if method == http.MethodHead || method == http.MethodDelete { return } defer rsp.Body.Close() if err = json.NewDecoder(rsp.Body).Decode(&ret); err != nil { return ret, fmt.Errorf("%w: error decoding json payload: `%s`", ErrRESTError, err.Error()) } return } func doGet[T any](ctx context.Context, baseURI *url.URL, path []string, cl *http.Client, override map[int]error) (ret T, err error) { return do[T](ctx, http.MethodGet, baseURI, path, cl, override, false) } func doDelete[T any](ctx context.Context, baseURI *url.URL, path []string, cl *http.Client, override map[int]error) (ret T, err error) { return do[T](ctx, http.MethodDelete, baseURI, path, cl, override, true) } func doHead(ctx context.Context, baseURI *url.URL, path []string, cl *http.Client, override map[int]error) error { _, err := do[struct{}](ctx, http.MethodHead, baseURI, path, cl, override, true) return err } func doPost[Payload, Result any](ctx context.Context, baseURI *url.URL, path []string, payload Payload, cl *http.Client, override map[int]error) (ret Result, err error) { var ( req *http.Request rsp *http.Response data []byte ) uri := baseURI.JoinPath(path...).String() data, err = json.Marshal(payload) if err != nil { return } req, err = http.NewRequestWithContext(ctx, http.MethodPost, uri, bytes.NewReader(data)) if err != nil { return } rsp, err = cl.Do(req) if err != nil { return } if rsp.StatusCode != http.StatusOK { return ret, handleNon200(rsp, override) } if rsp.ContentLength == 0 { return } defer rsp.Body.Close() if err = json.NewDecoder(rsp.Body).Decode(&ret); err != nil { return ret, fmt.Errorf("%w: error decoding json payload: `%s`", ErrRESTError, err.Error()) } return } func handleNon200(rsp *http.Response, override map[int]error) error { var e errorResponse dec := json.NewDecoder(rsp.Body) dec.Decode(&struct { Error *errorResponse `json:"error"` }{Error: &e}) if override != nil { if err, ok := override[rsp.StatusCode]; ok { e.wrapping = err return e } } switch rsp.StatusCode { case http.StatusBadRequest: e.wrapping = ErrBadRequest case http.StatusUnauthorized: e.wrapping = ErrUnauthorized case http.StatusForbidden: e.wrapping = ErrForbidden case http.StatusUnprocessableEntity: e.wrapping = ErrRESTError case 419: e.wrapping = ErrAuthorizationExpired case http.StatusNotImplemented: e.wrapping = iceberg.ErrNotImplemented case http.StatusServiceUnavailable: e.wrapping = ErrServiceUnavailable default: if 500 <= rsp.StatusCode && rsp.StatusCode < 600 { e.wrapping = ErrServerError } else { e.wrapping = ErrRESTError } } return e } func fromProps(props iceberg.Properties, o *options) { for k, v := range props { switch k { case keyOauthToken: o.oauthToken = v case keyWarehouseLocation: o.warehouseLocation = v case keyMetadataLocation: o.metadataLocation = v case keyRestSigV4: o.enableSigv4 = strings.ToLower(v) == "true" case keyRestSigV4Region: o.sigv4Region = v case keyRestSigV4Service: o.sigv4Service = v case keyAuthUrl: u, err := url.Parse(v) if err != nil { continue } o.authUri = u case keyOauthCredential: o.credential = v case keyPrefix: o.prefix = v case keyTlsSkipVerify: verify := strings.ToLower(v) == "true" if o.tlsConfig == nil { o.tlsConfig = &tls.Config{ InsecureSkipVerify: verify, } } else { o.tlsConfig.InsecureSkipVerify = verify } case "uri", "type": default: if v != "" { if o.additionalProps == nil { o.additionalProps = iceberg.Properties{} } o.additionalProps[k] = v } } } } func toProps(o *options) iceberg.Properties { props := iceberg.Properties{} maps.Copy(props, o.additionalProps) setIf := func(key, v string) { if v != "" { props[key] = v } } setIf(keyOauthCredential, o.credential) setIf(keyOauthToken, o.oauthToken) setIf(keyWarehouseLocation, o.warehouseLocation) setIf(keyMetadataLocation, o.metadataLocation) if o.enableSigv4 { props[keyRestSigV4] = "true" setIf(keyRestSigV4Region, o.sigv4Region) setIf(keyRestSigV4Service, o.sigv4Service) } setIf(keyPrefix, o.prefix) if o.authUri != nil { setIf(keyAuthUrl, o.authUri.String()) } return props } type Catalog struct { baseURI *url.URL cl *http.Client name string props iceberg.Properties } func newCatalogFromProps(ctx context.Context, name string, uri string, p iceberg.Properties) (*Catalog, error) { var ops options fromProps(p, &ops) r := &Catalog{name: name} if err := r.init(ctx, &ops, uri); err != nil { return nil, err } return r, nil } func NewCatalog(ctx context.Context, name, uri string, opts ...Option) (*Catalog, error) { ops := &options{} for _, o := range opts { o(ops) } r := &Catalog{name: name} if err := r.init(ctx, ops, uri); err != nil { return nil, err } return r, nil } func (r *Catalog) init(ctx context.Context, ops *options, uri string) error { baseuri, err := url.Parse(uri) if err != nil { return err } r.baseURI = baseuri.JoinPath("v1") if r.cl, ops, err = r.fetchConfig(ctx, ops); err != nil { return err } if ops.prefix != "" { r.baseURI = r.baseURI.JoinPath(ops.prefix) } r.props = toProps(ops) return nil } func (r *Catalog) fetchAccessToken(cl *http.Client, creds string, opts *options) (string, error) { clientID, clientSecret, hasID := strings.Cut(creds, ":") if !hasID { clientID, clientSecret = "", clientID } scope := "catalog" if opts.scope != "" { scope = opts.scope } data := url.Values{ "grant_type": {"client_credentials"}, "client_id": {clientID}, "client_secret": {clientSecret}, "scope": {scope}, } uri := opts.authUri if uri == nil { uri = r.baseURI.JoinPath("oauth/tokens") } rsp, err := cl.PostForm(uri.String(), data) if err != nil { return "", err } if rsp.StatusCode == http.StatusOK { defer rsp.Body.Close() dec := json.NewDecoder(rsp.Body) var tok oauthTokenResponse if err := dec.Decode(&tok); err != nil { return "", fmt.Errorf("failed to decode oauth token response: %w", err) } return tok.AccessToken, nil } switch rsp.StatusCode { case http.StatusUnauthorized, http.StatusBadRequest: defer rsp.Request.GetBody() dec := json.NewDecoder(rsp.Body) var oauthErr oauthErrorResponse if err := dec.Decode(&oauthErr); err != nil { return "", fmt.Errorf("failed to decode oauth error: %w", err) } return "", oauthErr default: return "", handleNon200(rsp, nil) } } func (r *Catalog) createSession(ctx context.Context, opts *options) (*http.Client, error) { session := &sessionTransport{ Transport: http.Transport{Proxy: http.ProxyFromEnvironment, TLSClientConfig: opts.tlsConfig}, defaultHeaders: http.Header{}, } cl := &http.Client{Transport: session} token := opts.oauthToken if token == "" && opts.credential != "" { var err error if token, err = r.fetchAccessToken(cl, opts.credential, opts); err != nil { return nil, fmt.Errorf("auth error: %w", err) } } if token != "" { session.defaultHeaders.Set(authorizationHeader, bearerPrefix+" "+token) } session.defaultHeaders.Set("X-Client-Version", icebergRestSpecVersion) session.defaultHeaders.Set("Content-Type", "application/json") session.defaultHeaders.Set("User-Agent", "GoIceberg/"+iceberg.Version()) session.defaultHeaders.Set("X-Iceberg-Access-Delegation", "vended-credentials") if opts.enableSigv4 { cfg := opts.awsConfig if !opts.awsConfigSet { // If no config provided, load defaults from environment. var err error cfg, err = config.LoadDefaultConfig(ctx) if err != nil { return nil, err } } if opts.sigv4Region != "" { cfg.Region = opts.sigv4Region } session.cfg, session.service = cfg, opts.sigv4Service session.signer, session.newHash = v4.NewSigner(), sha256.New } return cl, nil } func (r *Catalog) fetchConfig(ctx context.Context, opts *options) (*http.Client, *options, error) { params := url.Values{} if opts.warehouseLocation != "" { params.Set(keyWarehouseLocation, opts.warehouseLocation) } route := r.baseURI.JoinPath("config") route.RawQuery = params.Encode() sess, err := r.createSession(ctx, opts) if err != nil { return nil, nil, err } rsp, err := doGet[configResponse](ctx, route, []string{}, sess, nil) if err != nil { return nil, nil, err } cfg := rsp.Defaults if cfg == nil { cfg = iceberg.Properties{} } maps.Copy(cfg, toProps(opts)) maps.Copy(cfg, rsp.Overrides) o := *opts fromProps(cfg, &o) if uri, ok := cfg["uri"]; ok { r.baseURI, err = url.Parse(uri) if err != nil { return nil, nil, err } r.baseURI = r.baseURI.JoinPath("v1") } return sess, &o, nil } func (r *Catalog) Name() string { return r.name } func (r *Catalog) CatalogType() catalog.Type { return catalog.REST } func checkValidNamespace(ident table.Identifier) error { if len(ident) < 1 { return fmt.Errorf("%w: empty namespace identifier", catalog.ErrNoSuchNamespace) } return nil } func (r *Catalog) tableFromResponse(ctx context.Context, identifier []string, metadata table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) { iofs, err := iceio.LoadFS(ctx, config, loc) if err != nil { return nil, err } return table.New(identifier, metadata, loc, iofs, r), nil } func (r *Catalog) ListTables(ctx context.Context, namespace table.Identifier) iter.Seq2[table.Identifier, error] { return func(yield func(table.Identifier, error) bool) { pageSize := r.getPageSize(ctx) var pageToken string for { tables, nextPageToken, err := r.listTablesPage(ctx, namespace, pageToken, pageSize) if err != nil { yield(table.Identifier{}, err) return } for _, tbl := range tables { if !yield(tbl, nil) { return } } if nextPageToken == "" { return } pageToken = nextPageToken } } } func (r *Catalog) listTablesPage(ctx context.Context, namespace table.Identifier, pageToken string, pageSize int) ([]table.Identifier, string, error) { if err := checkValidNamespace(namespace); err != nil { return nil, "", err } ns := strings.Join(namespace, namespaceSeparator) uri := r.baseURI.JoinPath("namespaces", ns, "tables") v := url.Values{} if pageSize >= 0 { v.Set("pageSize", strconv.Itoa(pageSize)) } if pageToken != "" { v.Set("pageToken", pageToken) } uri.RawQuery = v.Encode() type resp struct { Identifiers []identifier `json:"identifiers"` NextPageToken string `json:"next-page-token,omitempty"` } rsp, err := doGet[resp](ctx, uri, []string{}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace}) if err != nil { return nil, "", err } out := make([]table.Identifier, len(rsp.Identifiers)) for i, id := range rsp.Identifiers { out[i] = append(id.Namespace, id.Name) } return out, rsp.NextPageToken, nil } func splitIdentForPath(ident table.Identifier) (string, string, error) { if len(ident) < 1 { return "", "", fmt.Errorf("%w: missing namespace or invalid identifier %v", catalog.ErrNoSuchTable, strings.Join(ident, ".")) } return strings.Join(catalog.NamespaceFromIdent(ident), namespaceSeparator), catalog.TableNameFromIdent(ident), nil } func (r *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) { ns, tbl, err := splitIdentForPath(identifier) if err != nil { return nil, err } var cfg catalog.CreateTableCfg for _, o := range opts { o(&cfg) } freshSchema, err := iceberg.AssignFreshSchemaIDs(schema, nil) if err != nil { return nil, err } freshPartitionSpec, err := iceberg.AssignFreshPartitionSpecIDs(cfg.PartitionSpec, schema, freshSchema) if err != nil { return nil, err } freshSortOrder, err := table.AssignFreshSortOrderIDs(cfg.SortOrder, schema, freshSchema) if err != nil { return nil, err } payload := createTableRequest{ Name: tbl, Schema: freshSchema, Location: cfg.Location, PartitionSpec: &freshPartitionSpec, WriteOrder: &freshSortOrder, StageCreate: false, Props: cfg.Properties, } ret, err := doPost[createTableRequest, loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables"}, payload, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace, http.StatusConflict: catalog.ErrTableAlreadyExists}) if err != nil { return nil, err } config := maps.Clone(r.props) maps.Copy(config, ret.Metadata.Properties()) maps.Copy(config, ret.Config) return r.tableFromResponse(ctx, identifier, ret.Metadata, ret.MetadataLoc, config) } func (r *Catalog) CommitTable(ctx context.Context, tbl *table.Table, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error) { ident := tbl.Identifier() ns, tblName, err := splitIdentForPath(ident) if err != nil { return nil, "", err } restIdentifier := identifier{ Namespace: catalog.NamespaceFromIdent(ident), Name: tblName, } type payload struct { Identifier identifier `json:"identifier"` Requirements []table.Requirement `json:"requirements"` Updates []table.Update `json:"updates"` } ret, err := doPost[payload, commitTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tblName}, payload{Identifier: restIdentifier, Requirements: requirements, Updates: updates}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable, http.StatusConflict: ErrCommitFailed}) if err != nil { return nil, "", err } config := maps.Clone(r.props) maps.Copy(config, ret.Metadata.Properties()) return ret.Metadata, ret.MetadataLoc, nil } func (r *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier, metadataLoc string) (*table.Table, error) { ns, tbl, err := splitIdentForPath(identifier) if err != nil { return nil, err } type payload struct { Name string `json:"name"` MetadataLoc string `json:"metadata-location"` } ret, err := doPost[payload, loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl}, payload{Name: tbl, MetadataLoc: metadataLoc}, r.cl, map[int]error{ http.StatusNotFound: catalog.ErrNoSuchNamespace, http.StatusConflict: catalog.ErrTableAlreadyExists, }) if err != nil { return nil, err } config := maps.Clone(r.props) maps.Copy(config, ret.Metadata.Properties()) maps.Copy(config, ret.Config) return r.tableFromResponse(ctx, identifier, ret.Metadata, ret.MetadataLoc, config) } func (r *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) { ns, tbl, err := splitIdentForPath(identifier) if err != nil { return nil, err } ret, err := doGet[loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable}) if err != nil { return nil, err } config := maps.Clone(r.props) maps.Copy(config, props) maps.Copy(config, ret.Metadata.Properties()) for k, v := range ret.Config { config[k] = v } return r.tableFromResponse(ctx, identifier, ret.Metadata, ret.MetadataLoc, config) } func (r *Catalog) UpdateTable(ctx context.Context, ident table.Identifier, requirements []table.Requirement, updates []table.Update) (*table.Table, error) { ns, tbl, err := splitIdentForPath(ident) if err != nil { return nil, err } restIdentifier := identifier{ Namespace: catalog.NamespaceFromIdent(ident), Name: tbl, } type payload struct { Identifier identifier `json:"identifier"` Requirements []table.Requirement `json:"requirements"` Updates []table.Update `json:"updates"` } ret, err := doPost[payload, commitTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl}, payload{Identifier: restIdentifier, Requirements: requirements, Updates: updates}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable, http.StatusConflict: ErrCommitFailed}) if err != nil { return nil, err } config := maps.Clone(r.props) maps.Copy(config, ret.Metadata.Properties()) return r.tableFromResponse(ctx, ident, ret.Metadata, ret.MetadataLoc, config) } func (r *Catalog) DropTable(ctx context.Context, identifier table.Identifier) error { ns, tbl, err := splitIdentForPath(identifier) if err != nil { return err } _, err = doDelete[struct{}](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable}) return err } func (r *Catalog) PurgeTable(ctx context.Context, identifier table.Identifier) error { ns, tbl, err := splitIdentForPath(identifier) if err != nil { return err } uri := r.baseURI.JoinPath("namespaces", ns, "tables", tbl) v := url.Values{} v.Set("purgeRequested", "true") uri.RawQuery = v.Encode() _, err = doDelete[struct{}](ctx, uri, []string{}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable}) return err } func (r *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) { type payload struct { Source identifier `json:"source"` Destination identifier `json:"destination"` } src := identifier{ Namespace: catalog.NamespaceFromIdent(from), Name: catalog.TableNameFromIdent(from), } dst := identifier{ Namespace: catalog.NamespaceFromIdent(to), Name: catalog.TableNameFromIdent(to), } _, err := doPost[payload, any](ctx, r.baseURI, []string{"tables", "rename"}, payload{Source: src, Destination: dst}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable}) if err != nil { return nil, err } return r.LoadTable(ctx, to, nil) } func (r *Catalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error { if err := checkValidNamespace(namespace); err != nil { return err } _, err := doPost[map[string]any, struct{}](ctx, r.baseURI, []string{"namespaces"}, map[string]any{"namespace": namespace, "properties": props}, r.cl, map[int]error{ http.StatusNotFound: catalog.ErrNoSuchNamespace, http.StatusConflict: catalog.ErrNamespaceAlreadyExists, }) return err } func (r *Catalog) DropNamespace(ctx context.Context, namespace table.Identifier) error { if err := checkValidNamespace(namespace); err != nil { return err } _, err := doDelete[struct{}](ctx, r.baseURI, []string{"namespaces", strings.Join(namespace, namespaceSeparator)}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace}) return err } func (r *Catalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error) { uri := r.baseURI.JoinPath("namespaces") if len(parent) != 0 { v := url.Values{} v.Set("parent", strings.Join(parent, namespaceSeparator)) uri.RawQuery = v.Encode() } type rsptype struct { Namespaces []table.Identifier `json:"namespaces"` } rsp, err := doGet[rsptype](ctx, uri, []string{}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace}) if err != nil { return nil, err } return rsp.Namespaces, nil } func (r *Catalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error) { if err := checkValidNamespace(namespace); err != nil { return nil, err } type nsresponse struct { Namespace table.Identifier `json:"namespace"` Props iceberg.Properties `json:"properties"` } rsp, err := doGet[nsresponse](ctx, r.baseURI, []string{"namespaces", strings.Join(namespace, namespaceSeparator)}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace}) if err != nil { return nil, err } return rsp.Props, nil } func (r *Catalog) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier, removals []string, updates iceberg.Properties, ) (catalog.PropertiesUpdateSummary, error) { if err := checkValidNamespace(namespace); err != nil { return catalog.PropertiesUpdateSummary{}, err } type payload struct { Remove []string `json:"removals"` Updates iceberg.Properties `json:"updates"` } ns := strings.Join(namespace, namespaceSeparator) return doPost[payload, catalog.PropertiesUpdateSummary](ctx, r.baseURI, []string{"namespaces", ns, "properties"}, payload{Remove: removals, Updates: updates}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace}) } func (r *Catalog) CheckNamespaceExists(ctx context.Context, namespace table.Identifier) (bool, error) { if err := checkValidNamespace(namespace); err != nil { return false, err } err := doHead(ctx, r.baseURI, []string{"namespaces", strings.Join(namespace, namespaceSeparator)}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace}) if err != nil { if errors.Is(err, catalog.ErrNoSuchNamespace) { return false, nil } return false, err } return true, nil } func (r *Catalog) CheckTableExists(ctx context.Context, identifier table.Identifier) (bool, error) { ns, tbl, err := splitIdentForPath(identifier) if err != nil { return false, err } err = doHead(ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable}) if err != nil { if errors.Is(err, catalog.ErrNoSuchTable) { return false, nil } return false, err } return true, nil } func (r *Catalog) ListViews(ctx context.Context, namespace table.Identifier) iter.Seq2[table.Identifier, error] { return func(yield func(table.Identifier, error) bool) { pageSize := r.getPageSize(ctx) var pageToken string for { views, nextPageToken, err := r.listViewsPage(ctx, namespace, pageToken, pageSize) if err != nil { yield(table.Identifier{}, err) return } for _, view := range views { if !yield(view, nil) { return } } if nextPageToken == "" { return } pageToken = nextPageToken } } } func (r *Catalog) listViewsPage(ctx context.Context, namespace table.Identifier, pageToken string, pageSize int) ([]table.Identifier, string, error) { if err := checkValidNamespace(namespace); err != nil { return nil, "", err } ns := strings.Join(namespace, namespaceSeparator) uri := r.baseURI.JoinPath("namespaces", ns, "views") v := url.Values{} if pageSize >= 0 { v.Set("pageSize", strconv.Itoa(pageSize)) } if pageToken != "" { v.Set("pageToken", pageToken) } uri.RawQuery = v.Encode() type resp struct { Identifiers []identifier `json:"identifiers"` NextPageToken string `json:"next-page-token,omitempty"` } rsp, err := doGet[resp](ctx, uri, []string{}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace}) if err != nil { return nil, "", err } out := make([]table.Identifier, len(rsp.Identifiers)) for i, id := range rsp.Identifiers { out[i] = append(id.Namespace, id.Name) } return out, rsp.NextPageToken, nil } func (r *Catalog) getPageSize(ctx context.Context) int { if pageSize, ok := ctx.Value(pageSizeKey).(int); ok { return pageSize } return defaultPageSize } func (r *Catalog) SetPageSize(ctx context.Context, sz int) context.Context { return context.WithValue(ctx, pageSizeKey, sz) } func (r *Catalog) DropView(ctx context.Context, identifier table.Identifier) error { ns, view, err := splitIdentForPath(identifier) if err != nil { return err } _, err = doDelete[struct{}](ctx, r.baseURI, []string{"namespaces", ns, "views", view}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchView}) return err } func (r *Catalog) CheckViewExists(ctx context.Context, identifier table.Identifier) (bool, error) { ns, view, err := splitIdentForPath(identifier) if err != nil { return false, err } err = doHead(ctx, r.baseURI, []string{"namespaces", ns, "views", view}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchView}) if err != nil { if errors.Is(err, catalog.ErrNoSuchView) { return false, nil } return false, err } return true, nil } type viewVersion struct { VersionID int64 `json:"version-id"` TimestampMs int64 `json:"timestamp-ms"` SchemaID int `json:"schema-id"` Summary map[string]string `json:"summary"` Representations []struct { Type string `json:"type"` SQL string `json:"sql"` Dialect string `json:"dialect"` } `json:"representations"` DefaultCatalog string `json:"default-catalog"` DefaultNamespace []string `json:"default-namespace"` } type createViewRequest struct { Name string `json:"name"` Schema *iceberg.Schema `json:"schema"` Location string `json:"location,omitempty"` Props iceberg.Properties `json:"properties,omitempty"` SQL string `json:"sql"` ViewVersion viewVersion `json:"view-version"` } type viewResponse struct { MetadataLoc string `json:"metadata-location"` RawMetadata json.RawMessage `json:"metadata"` Config iceberg.Properties `json:"config"` Metadata table.Metadata `json:"-"` } // CreateView creates a new view in the catalog. func (r *Catalog) CreateView(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, sql string, props iceberg.Properties) error { ns, view, err := splitIdentForPath(identifier) if err != nil { return err } freshSchema, err := iceberg.AssignFreshSchemaIDs(schema, nil) if err != nil { return err } payload := createViewRequest{ Name: view, Schema: freshSchema, SQL: sql, Props: props, ViewVersion: viewVersion{ VersionID: 1, TimestampMs: time.Now().UnixMilli(), SchemaID: freshSchema.ID, Summary: map[string]string{"sql": sql}, Representations: []struct { Type string `json:"type"` SQL string `json:"sql"` Dialect string `json:"dialect"` }{ {Type: "sql", SQL: sql, Dialect: "default"}, }, DefaultCatalog: r.name, DefaultNamespace: strings.Split(ns, namespaceSeparator), }, } _, err = doPost[createViewRequest, viewResponse](ctx, r.baseURI, []string{"namespaces", ns, "views"}, payload, r.cl, map[int]error{ http.StatusNotFound: catalog.ErrNoSuchNamespace, http.StatusConflict: catalog.ErrViewAlreadyExists, }) if err != nil { return err } return nil }