cmd/iceberg/main.go (369 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (
"context"
"errors"
"fmt"
"log"
"os"
"strings"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/catalog/glue"
"github.com/apache/iceberg-go/catalog/rest"
"github.com/apache/iceberg-go/config"
"github.com/apache/iceberg-go/table"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/docopt/docopt-go"
)
const usage = `iceberg.
Usage:
iceberg list [options] [PARENT]
iceberg describe [options] [namespace | table] IDENTIFIER
iceberg (schema | spec | uuid | location) [options] TABLE_ID
iceberg create [options] (namespace | table) IDENTIFIER
iceberg drop [options] (namespace | table) IDENTIFIER
iceberg files [options] TABLE_ID [--history]
iceberg rename [options] <from> <to>
iceberg properties [options] get (namespace | table) IDENTIFIER [PROPNAME]
iceberg properties [options] set (namespace | table) IDENTIFIER PROPNAME VALUE
iceberg properties [options] remove (namespace | table) IDENTIFIER PROPNAME
iceberg -h | --help | --version
Commands:
describe Describe a namespace or a table.
list List tables or namespaces.
schema Get the schema of the table.
spec Return the partition spec of the table.
uuid Return the UUID of the table.
location Return the location of the table.
drop Operations to drop a namespace or table.
files List all the files of the table.
rename Rename a table.
properties Properties on tables/namespaces.
Arguments:
PARENT Catalog parent namespace
IDENTIFIER fully qualified namespace or table
TABLE_ID full path to a table
PROPNAME name of a property
VALUE value to set
Options:
-h --help show this help messages and exit
--catalog TEXT specify the catalog type [default: rest]
--uri TEXT specify the catalog URI
--output TYPE output type (json/text) [default: text]
--credential TEXT specify credentials for the catalog
--warehouse TEXT specify the warehouse to use
--config TEXT specify the path to the configuration file
--description TEXT specify a description for the namespace
--location-uri TEXT specify a location URI for the namespace`
type Config struct {
List bool `docopt:"list"`
Describe bool `docopt:"describe"`
Schema bool `docopt:"schema"`
Spec bool `docopt:"spec"`
Uuid bool `docopt:"uuid"`
Location bool `docopt:"location"`
Props bool `docopt:"properties"`
Create bool `docopt:"create"`
Drop bool `docopt:"drop"`
Files bool `docopt:"files"`
Rename bool `docopt:"rename"`
Get bool `docopt:"get"`
Set bool `docopt:"set"`
Remove bool `docopt:"remove"`
Namespace bool `docopt:"namespace"`
Table bool `docopt:"table"`
RenameFrom string `docopt:"<from>"`
RenameTo string `docopt:"<to>"`
Parent string `docopt:"PARENT"`
Ident string `docopt:"IDENTIFIER"`
TableID string `docopt:"TABLE_ID"`
PropName string `docopt:"PROPNAME"`
Value string `docopt:"VALUE"`
Catalog string `docopt:"--catalog"`
URI string `docopt:"--uri"`
Output string `docopt:"--output"`
History bool `docopt:"--history"`
Cred string `docopt:"--credential"`
Warehouse string `docopt:"--warehouse"`
Config string `docopt:"--config"`
Description string `docopt:"--description"`
LocationURI string `docopt:"--location-uri"`
}
func main() {
ctx := context.Background()
args, err := docopt.ParseArgs(usage, os.Args[1:], iceberg.Version())
if err != nil {
log.Fatal(err)
}
cfg := Config{}
if err := args.Bind(&cfg); err != nil {
log.Fatal(err)
}
fileCfg := config.ParseConfig(config.LoadConfig(cfg.Config), "default")
if fileCfg != nil {
mergeConf(fileCfg, &cfg)
}
var output Output
switch strings.ToLower(cfg.Output) {
case "text":
output = textOutput{}
case "json":
output = jsonOutput{}
default:
log.Fatal("unimplemented output type")
}
var cat catalog.Catalog
switch catalog.Type(cfg.Catalog) {
case catalog.REST:
opts := []rest.Option{}
if len(cfg.Cred) > 0 {
opts = append(opts, rest.WithCredential(cfg.Cred))
}
if len(cfg.Warehouse) > 0 {
opts = append(opts, rest.WithWarehouseLocation(cfg.Warehouse))
}
if cat, err = rest.NewCatalog(ctx, "rest", cfg.URI, opts...); err != nil {
log.Fatal(err)
}
case catalog.Glue:
awscfg, err := awsconfig.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
opts := []glue.Option{
glue.WithAwsConfig(awscfg),
}
cat = glue.NewCatalog(opts...)
default:
log.Fatal("unrecognized catalog type")
}
switch {
case cfg.List:
list(ctx, output, cat, cfg.Parent)
case cfg.Describe:
entityType := "any"
if cfg.Namespace {
entityType = "ns"
} else if cfg.Table {
entityType = "tbl"
}
describe(ctx, output, cat, cfg.Ident, entityType)
case cfg.Schema:
tbl := loadTable(ctx, output, cat, cfg.TableID)
output.Schema(tbl.Schema())
case cfg.Spec:
tbl := loadTable(ctx, output, cat, cfg.TableID)
output.Spec(tbl.Spec())
case cfg.Location:
tbl := loadTable(ctx, output, cat, cfg.TableID)
output.Text(tbl.Location())
case cfg.Uuid:
tbl := loadTable(ctx, output, cat, cfg.TableID)
output.Uuid(tbl.Metadata().TableUUID())
case cfg.Props:
properties(ctx, output, cat, propCmd{
get: cfg.Get, set: cfg.Set, remove: cfg.Remove,
namespace: cfg.Namespace, table: cfg.Table,
identifier: cfg.Ident,
propname: cfg.PropName,
value: cfg.Value,
})
case cfg.Rename:
_, err := cat.RenameTable(ctx,
catalog.ToIdentifier(cfg.RenameFrom), catalog.ToIdentifier(cfg.RenameTo))
if err != nil {
output.Error(err)
os.Exit(1)
}
output.Text("Renamed table from " + cfg.RenameFrom + " to " + cfg.RenameTo)
case cfg.Drop:
switch {
case cfg.Namespace:
err := cat.DropNamespace(ctx, catalog.ToIdentifier(cfg.Ident))
if err != nil {
output.Error(err)
os.Exit(1)
}
case cfg.Table:
err := cat.DropTable(ctx, catalog.ToIdentifier(cfg.Ident))
if err != nil {
output.Error(err)
os.Exit(1)
}
}
case cfg.Create:
switch {
case cfg.Namespace:
props := iceberg.Properties{}
if cfg.Description != "" {
props["Description"] = cfg.Description
}
if cfg.LocationURI != "" {
props["Location"] = cfg.LocationURI
}
err := cat.CreateNamespace(ctx, catalog.ToIdentifier(cfg.Ident), props)
if err != nil {
output.Error(err)
os.Exit(1)
}
case cfg.Table:
output.Error(errors.New("not implemented: Create Table is WIP"))
default:
output.Error(errors.New("not implemented"))
os.Exit(1)
}
case cfg.Files:
tbl := loadTable(ctx, output, cat, cfg.TableID)
output.Files(tbl, cfg.History)
}
}
func list(ctx context.Context, output Output, cat catalog.Catalog, parent string) {
prnt := catalog.ToIdentifier(parent)
ids, err := cat.ListNamespaces(ctx, prnt)
if err != nil {
output.Error(err)
os.Exit(1)
}
if len(ids) == 0 && parent != "" {
iter := cat.ListTables(ctx, prnt)
for id, err := range iter {
ids = append(ids, id)
if err != nil {
output.Error(err)
}
}
}
output.Identifiers(ids)
}
func describe(ctx context.Context, output Output, cat catalog.Catalog, id string, entityType string) {
ident := catalog.ToIdentifier(id)
isNS, isTbl := false, false
if (entityType == "any" || entityType == "ns") && len(ident) > 0 {
nsprops, err := cat.LoadNamespaceProperties(ctx, ident)
if err != nil {
if errors.Is(err, catalog.ErrNoSuchNamespace) {
if entityType != "any" || len(ident) == 1 {
output.Error(err)
os.Exit(1)
}
} else {
output.Error(err)
os.Exit(1)
}
} else {
isNS = true
output.DescribeProperties(nsprops)
}
}
if (entityType == "any" || entityType == "tbl") && len(ident) > 1 {
tbl, err := cat.LoadTable(ctx, ident, nil)
if err != nil {
if !errors.Is(err, catalog.ErrNoSuchTable) || entityType != "any" {
output.Error(err)
os.Exit(1)
}
} else {
isTbl = true
output.DescribeTable(tbl)
}
}
if !isNS && !isTbl {
output.Error(fmt.Errorf("%w: table or namespace does not exist: %s",
catalog.ErrNoSuchNamespace, ident))
os.Exit(1)
}
}
func loadTable(ctx context.Context, output Output, cat catalog.Catalog, id string) *table.Table {
tbl, err := cat.LoadTable(ctx, catalog.ToIdentifier(id), nil)
if err != nil {
output.Error(err)
os.Exit(1)
}
return tbl
}
type propCmd struct {
get, set, remove bool
namespace, table bool
identifier, propname, value string
}
func properties(ctx context.Context, output Output, cat catalog.Catalog, args propCmd) {
ident := catalog.ToIdentifier(args.identifier)
switch {
case args.get:
var props iceberg.Properties
switch {
case args.namespace:
var err error
props, err = cat.LoadNamespaceProperties(ctx, ident)
if err != nil {
output.Error(err)
os.Exit(1)
}
case args.table:
tbl := loadTable(ctx, output, cat, args.identifier)
props = tbl.Metadata().Properties()
}
if args.propname == "" {
output.DescribeProperties(props)
return
}
if val, ok := props[args.propname]; ok {
output.Text(val)
} else {
output.Error(errors.New("could not find property " + args.propname + " on namespace " + args.identifier))
os.Exit(1)
}
case args.set:
switch {
case args.namespace:
_, err := cat.UpdateNamespaceProperties(ctx, ident,
nil, iceberg.Properties{args.propname: args.value})
if err != nil {
output.Error(err)
os.Exit(1)
}
output.Text("updated " + args.propname + " on " + args.identifier)
case args.table:
tbl := loadTable(ctx, output, cat, args.identifier)
output.Text("Setting " + args.propname + "=" + args.value + " on " + args.identifier)
// TODO: handle other Update operations
_, _, err := cat.CommitTable(ctx, tbl, nil,
[]table.Update{table.NewSetPropertiesUpdate(iceberg.Properties{args.propname: args.value})})
if err != nil {
output.Error(err)
os.Exit(1)
}
}
case args.remove:
switch {
case args.namespace:
_, err := cat.UpdateNamespaceProperties(ctx, ident,
[]string{args.propname}, nil)
if err != nil {
output.Error(err)
os.Exit(1)
}
output.Text("removing " + args.propname + " from " + args.identifier)
case args.table:
loadTable(ctx, output, cat, args.identifier)
output.Text("Setting " + args.propname + "=" + args.value + " on " + args.identifier)
output.Error(errors.New("not implemented: Writing is WIP"))
}
}
}
func mergeConf(fileConf *config.CatalogConfig, resConfig *Config) {
if len(resConfig.Catalog) == 0 {
resConfig.Catalog = fileConf.CatalogType
}
if len(resConfig.URI) == 0 {
resConfig.URI = fileConf.URI
}
if len(resConfig.Output) == 0 {
resConfig.Output = fileConf.Output
}
if len(resConfig.Cred) == 0 {
resConfig.Cred = fileConf.Credential
}
if len(resConfig.Warehouse) == 0 {
resConfig.Warehouse = fileConf.Warehouse
}
}