analytics-hub/snippets/create_listing_golang/main.go (451 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bytes"
"context"
"encoding/base64"
"flag"
"fmt"
"os"
"text/template"
"time"
"cloud.google.com/go/bigquery"
analyticshub "cloud.google.com/go/bigquery/analyticshub/apiv1"
"cloud.google.com/go/bigquery/analyticshub/apiv1/analyticshubpb"
iampb "cloud.google.com/go/iam/apiv1/iampb"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func print_iam_policy(policy *iampb.Policy) {
println("IAMPolicy: ")
encodedEtag := base64.StdEncoding.EncodeToString([]byte(policy.Etag))
println(" Etag: " + encodedEtag)
for _, binding := range policy.Bindings {
println(fmt.Sprintf(" Role: %s", binding.Role))
for _, member := range binding.Members {
println(fmt.Sprintf(" Member: %s", member))
}
}
}
func listing_get_iam_policy(ctx context.Context, client *analyticshub.Client, listing_id string) (*iampb.Policy, error) {
req := &iampb.GetIamPolicyRequest{
Resource: listing_id,
// See https://pkg.go.dev/cloud.google.com/go/iam/apiv1/iampb#GetIamPolicyRequest.
}
resp, err := client.GetIamPolicy(ctx, req)
if err != nil {
println(err.Error())
return nil, err
} else {
return resp, nil
}
}
func create_set_iam_policy_request(ctx context.Context, client *analyticshub.Client, listing_id string, role string, member string) (*iampb.SetIamPolicyRequest, error) {
existingPolicy, err := listing_get_iam_policy(ctx, client, listing_id)
if err == nil {
setIamPolicyRequest := &iampb.SetIamPolicyRequest{}
setIamPolicyRequest.Resource = listing_id
setIamPolicyRequest.Policy = &iampb.Policy{}
setIamPolicyRequest.Policy.Etag = existingPolicy.Etag
setIamPolicyRequest.Policy.Bindings = []*iampb.Binding{}
setIamPolicyRequest.Policy.Bindings = append(
setIamPolicyRequest.Policy.Bindings,
existingPolicy.GetBindings()...)
// Look for existing binding for the role
addToBinding := &iampb.Binding{Role: role, Members: []string{member}}
existingBindingFoundForRole := false
for _, binding := range setIamPolicyRequest.Policy.Bindings {
if binding.Role == role {
addToBinding = binding
existingBindingFoundForRole = true
}
}
// If there is an existing binding, add a new member to it
if existingBindingFoundForRole {
addToBinding.Members = append(
addToBinding.Members,
member)
// Else add a new binding with the role/member
} else {
setIamPolicyRequest.Policy.Bindings = append(
setIamPolicyRequest.Policy.Bindings,
addToBinding)
}
return setIamPolicyRequest, nil
} else {
println(err.Error())
return nil, err
}
}
func listing_add_iam_policy_member(ctx context.Context, client *analyticshub.Client, listing_id string, role string, member string) *iampb.Policy {
exitLoop := false
var newPolicy *iampb.Policy
for !exitLoop {
exitLoop = true
setIamPolicyRequest, err := create_set_iam_policy_request(ctx, client, listing_id, role, member)
if err == nil {
resp, err := client.SetIamPolicy(ctx, setIamPolicyRequest)
if err != nil {
// Aborted == concurrent modification / Etag mismatch
if status.Code(err) == codes.Aborted {
// Add delay (should be exponential backoff instead of fixed time)
time.Sleep(5 * time.Second)
println("add_iam_policy_member: concurrent modification (Etag mismatch), retrying")
exitLoop = false
// TODO: handle UserNotFound error (e.g. the user to be added does not exist)
} else {
println(err.Error())
}
} else {
newPolicy = resp
}
}
}
return newPolicy
}
func list_listings(ctx context.Context, client *analyticshub.Client, exchange_id string) {
req := &analyticshubpb.ListListingsRequest{
Parent: exchange_id,
// TODO: Fill request struct fields.
// See https://pkg.go.dev/cloud.google.com/go/bigquery/analyticshub/apiv1/analyticshubpb#ListListingsRequest.
}
it := client.ListListings(ctx, req)
for {
resp, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
println(err.Error())
break
} else {
println(fmt.Sprintf(" ListListingsResponse: [%s] %s", resp.Name, resp.DisplayName))
policy, err := listing_get_iam_policy(ctx, client, resp.Name)
if err == nil {
print_iam_policy(policy)
}
}
}
}
func list_exchanges(ctx context.Context, client *analyticshub.Client, project_id string, location string) {
req := &analyticshubpb.ListDataExchangesRequest{
Parent: fmt.Sprintf("projects/%s/locations/%s", project_id, location),
// See https://pkg.go.dev/cloud.google.com/go/bigquery/analyticshub/apiv1/analyticshubpb#ListDataExchangesRequest.
}
it := client.ListDataExchanges(ctx, req)
for {
resp, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
println(err.Error())
break
} else {
println(fmt.Sprintf("ListDataExchangesResponse: [%s] %s", resp.Name, resp.DisplayName))
list_listings(ctx, client, resp.Name)
}
}
}
// createOrGetDataExchange creates an example data exchange, or returns information about the exchange already bearing
// the example identifier.
func create_or_get_exchange(ctx context.Context, client *analyticshub.Client, projectID, location, exchangeID string, isDCR bool) (*analyticshubpb.DataExchange, error) {
req := &analyticshubpb.GetDataExchangeRequest{
Name: fmt.Sprintf("projects/%s/locations/%s/dataExchanges/%s", projectID, location, exchangeID),
}
resp, err := client.GetDataExchange(ctx, req)
if err != nil {
println(err.Error())
// Default: create regular Data Exchange / DefaultExchangeConfig
sharingEnvironmentConfig := &analyticshubpb.SharingEnvironmentConfig{
Environment: &analyticshubpb.SharingEnvironmentConfig_DefaultExchangeConfig_{},
}
exTitleTag := "Data Exchange"
// if DataCleanRoom: create a Data Clean Room Data Exchange / DcrExchangeConfig
if isDCR {
sharingEnvironmentConfig = &analyticshubpb.SharingEnvironmentConfig{
Environment: &analyticshubpb.SharingEnvironmentConfig_DcrExchangeConfig_{},
}
exTitleTag = "Data Clean Room"
}
req := &analyticshubpb.CreateDataExchangeRequest{
Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, location),
DataExchangeId: exchangeID,
DataExchange: &analyticshubpb.DataExchange{
DisplayName: fmt.Sprintf("Example %s - created using golang API", exTitleTag),
Description: fmt.Sprintf("Example %s - created using golang API", exTitleTag),
PrimaryContact: "",
Documentation: "https://link.to.optional.documentation/",
SharingEnvironmentConfig: sharingEnvironmentConfig,
},
}
resp, err := client.CreateDataExchange(ctx, req)
if err != nil {
println(err.Error())
return nil, err
} else {
return resp, nil
}
} else {
return resp, nil
}
}
// create_or_get_dcr_listing creates an example listing within the specified exchange using the authorized view with analysis rules to represent DCR
func create_or_get_dcr_listing(ctx context.Context, client *analyticshub.Client, projectID, location, exchangeID, listingID string, sharedDataset string, sourceView string) (*analyticshubpb.Listing, error) {
getReq := &analyticshubpb.GetListingRequest{
Name: fmt.Sprintf("projects/%s/locations/%s/dataExchanges/%s/listings/%s", projectID, location, exchangeID, listingID),
}
resp, err := client.GetListing(ctx, getReq)
if err != nil {
println(err.Error())
restrictedExportConfig := &analyticshubpb.Listing_RestrictedExportConfig{}
restrictedExportConfig.Enabled = true
restrictedExportConfig.RestrictDirectTableAccess = true
restrictedExportConfig.RestrictQueryResult = false
req := &analyticshubpb.CreateListingRequest{
Parent: fmt.Sprintf("projects/%s/locations/%s/dataExchanges/%s", projectID, location, exchangeID),
ListingId: listingID,
Listing: &analyticshubpb.Listing{
DisplayName: sourceView,
Categories: []analyticshubpb.Listing_Category{
analyticshubpb.Listing_CATEGORY_OTHERS,
},
PrimaryContact: "primary@contact.co",
Source: &analyticshubpb.Listing_BigqueryDataset{
BigqueryDataset: &analyticshubpb.Listing_BigQueryDatasetSource{
Dataset: fmt.Sprintf("projects/%s/datasets/%s", projectID, sharedDataset),
SelectedResources: []*analyticshubpb.Listing_BigQueryDatasetSource_SelectedResource{
{
Resource: &analyticshubpb.Listing_BigQueryDatasetSource_SelectedResource_Table{
Table: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, sharedDataset, sourceView),
},
},
},
},
},
RestrictedExportConfig: restrictedExportConfig,
},
}
resp, err := client.CreateListing(ctx, req)
if err != nil {
println(err.Error())
return nil, err
}
return resp, nil
} else {
return resp, nil
}
}
// create_or_get_listing creates an example listing within the specified exchange using the provided source dataset.
func create_or_get_listing(ctx context.Context, client *analyticshub.Client, projectID, location, exchangeID, listingID string, restrictEgress bool, sharedDataset string) (*analyticshubpb.Listing, error) {
getReq := &analyticshubpb.GetListingRequest{
Name: fmt.Sprintf("projects/%s/locations/%s/dataExchanges/%s/listings/%s", projectID, location, exchangeID, listingID),
}
resp, err := client.GetListing(ctx, getReq)
if err != nil {
println(err.Error())
restrictedExportConfig := &analyticshubpb.Listing_RestrictedExportConfig{}
if restrictEgress {
restrictedExportConfig.Enabled = true
restrictedExportConfig.RestrictDirectTableAccess = true
restrictedExportConfig.RestrictQueryResult = true
} else {
restrictedExportConfig.Enabled = false
restrictedExportConfig.RestrictDirectTableAccess = false
restrictedExportConfig.RestrictQueryResult = false
}
req := &analyticshubpb.CreateListingRequest{
Parent: fmt.Sprintf("projects/%s/locations/%s/dataExchanges/%s", projectID, location, exchangeID),
ListingId: listingID,
Listing: &analyticshubpb.Listing{
DisplayName: "Example Exchange Listing - created using golang API",
Description: "Example Exchange Listing - created using golang API",
Categories: []analyticshubpb.Listing_Category{
analyticshubpb.Listing_CATEGORY_OTHERS,
},
Source: &analyticshubpb.Listing_BigqueryDataset{
BigqueryDataset: &analyticshubpb.Listing_BigQueryDatasetSource{
Dataset: fmt.Sprintf("projects/%s/datasets/%s", projectID, sharedDataset),
},
},
RestrictedExportConfig: restrictedExportConfig,
},
}
resp, err := client.CreateListing(ctx, req)
if err != nil {
println(err.Error())
return nil, err
}
return resp, nil
} else {
return resp, nil
}
}
type Flags struct {
project_id string
location string
exchange_id string
listing_id string
restrict_egress bool
shared_ds string
subscriber_iam_member string
subscription_viewer_iam_member string
dcr_exchange_id string
dcr_listing_id string
dcr_view string
dcr_shared_table string
dcr_privacy_column string
}
func parse_args() Flags {
// Define command-line flags
flags := Flags{}
project_id := flag.String("project_id", "", "Google Cloud project ID (required)")
location := flag.String("location", "", "Location for the BigQuery dataset (required)")
exchange_id := flag.String("exchange_id", "", "Exchange ID (required)")
listing_id := flag.String("listing_id", "", "Listing ID (required)")
restrict_egress := flag.Bool("restrict_egress", false, "Egress controls enabled")
shared_ds := flag.String("shared_ds", "", "Shared dataset ID (required)")
subscriber_iam_member := flag.String("subscriber_iam_member", "", "IAM member who can subscribe - requires either user: or serviceAccount: prefix")
subscription_viewer_iam_member := flag.String("subscription_viewer_iam_member", "", "IAM member who can see subscription and request access - requires either user: or serviceAccount: prefix")
dcr_exchange_id := flag.String("dcr_exchange_id", "", "Privacy column for Data Clean Room")
dcr_listing_id := flag.String("dcr_listing_id", "", "Privacy column for Data Clean Room")
dcr_view := flag.String("dcr_view", "", "View with analysis rules to create for Data Clean Room")
dcr_shared_table := flag.String("dcr_shared_table", "", "Table to share in Data Clean Room")
dcr_privacy_column := flag.String("dcr_privacy_column", "", "Privacy column for Data Clean Room")
// Parse the command-line flags
flag.Parse()
// Use the parsed values
flags.project_id = *project_id
flags.location = *location
flags.exchange_id = *exchange_id
flags.listing_id = *listing_id
flags.restrict_egress = *restrict_egress
flags.shared_ds = *shared_ds
flags.subscriber_iam_member = *subscriber_iam_member
flags.subscription_viewer_iam_member = *subscription_viewer_iam_member
flags.dcr_exchange_id = *dcr_exchange_id
flags.dcr_listing_id = *dcr_listing_id
flags.dcr_view = *dcr_view
flags.dcr_shared_table = *dcr_shared_table
flags.dcr_privacy_column = *dcr_privacy_column
fmt.Print(flags)
// Check if required flags are provided
if *project_id == "" || *location == "" || *exchange_id == "" || *listing_id == "" || *shared_ds == "" || *subscriber_iam_member == "" || *subscription_viewer_iam_member == "" || *dcr_shared_table == "" || *dcr_privacy_column == "" {
flag.Usage()
os.Exit(1)
}
return flags
}
func bq_view_prep_ddl(projectID string, datasetID string, sourceTableID string, dstTableID string, privacyUnitColumn string) string {
createViewDDLTemplate := `CREATE OR REPLACE VIEW {{.projectID}}.{{.datasetID}}.{{.dstTableID}}
OPTIONS(
privacy_policy= '{"aggregation_threshold_policy": {"threshold": 3, "privacy_unit_column": "{{.privacyUnitColumn}}"}}'
)
AS ( SELECT * FROM {{.projectID}}.{{.datasetID}}.{{.sourceTableID}} );`
t := template.Must(template.New("createViewDDL").Parse(createViewDDLTemplate))
buf := &bytes.Buffer{}
data := map[string]interface{}{
"projectID": projectID,
"datasetID": datasetID,
"sourceTableID": sourceTableID,
"dstTableID": dstTableID,
"privacyUnitColumn": privacyUnitColumn,
}
if err := t.Execute(buf, data); err != nil {
panic(err)
}
s := buf.String()
return s
}
func create_bq_view_with_analysis_rules(ctx context.Context, client *bigquery.Client, projectID string, datasetID string, sourceTableID string, dstTableID string, privacyUnitColumn string) error {
q := client.Query(bq_view_prep_ddl(projectID, datasetID, sourceTableID, dstTableID, privacyUnitColumn))
it, err := q.Read(ctx)
_ = it
if err != nil {
println(err)
// TODO: Handle error.
return err
}
return nil
}
func bq_dataset_add_authorization(ctx context.Context, client *bigquery.Client, projectID string, datasetID string, tableID string) error {
ds := client.DatasetInProject(projectID, datasetID)
dsMetadata, err := ds.Metadata(ctx)
if err == nil {
dsMetadataToUpdate := &bigquery.DatasetMetadataToUpdate{}
dsMetadataToUpdate.Access = append(dsMetadataToUpdate.Access, dsMetadata.Access...)
needsUpdate := true
for _, bqAccess := range dsMetadataToUpdate.Access {
if bqAccess.EntityType == bigquery.ViewEntity &&
bqAccess.View.ProjectID == projectID &&
bqAccess.View.DatasetID == datasetID &&
bqAccess.View.TableID == tableID {
needsUpdate = false
}
}
if needsUpdate {
dsMetadataToUpdate.Access = append(dsMetadataToUpdate.Access,
&bigquery.AccessEntry{
EntityType: bigquery.ViewEntity,
View: &bigquery.Table{
ProjectID: projectID,
DatasetID: datasetID,
TableID: tableID,
},
})
_, err := ds.Update(ctx, *dsMetadataToUpdate, dsMetadata.ETag)
if err != nil {
println(err)
return err
}
}
return nil
} else {
return err
}
}
func get_bq_table_metadata(ctx context.Context, client *bigquery.Client, datasetID string, tableID string) (*bigquery.TableMetadata, error) {
table := client.Dataset(datasetID).Table(tableID)
tableMetadata, err := table.Metadata(ctx, bigquery.WithMetadataView(bigquery.FullMetadataView))
return tableMetadata, err
}
func main() {
flags := parse_args()
ctx := context.Background()
client, err := analyticshub.NewClient(ctx)
if err != nil {
println(err)
}
defer client.Close()
bqClient, err := bigquery.NewClient(ctx, flags.project_id)
if err != nil {
println(fmt.Errorf("bigquery.NewClient: %v", err))
}
defer bqClient.Close()
println("Creating Data Exchange")
list_exchanges(ctx, client, flags.project_id, flags.location)
exchg, err := create_or_get_exchange(ctx, client, flags.project_id, flags.location, flags.exchange_id, false)
if err == nil {
println(fmt.Sprintf("Exchange: [%s] %s", exchg.Name, exchg.DisplayName))
listing, err := create_or_get_listing(ctx, client, flags.project_id, flags.location, flags.exchange_id, flags.listing_id, flags.restrict_egress, flags.shared_ds)
if err == nil {
println(fmt.Sprintf("Listing: [%s] %s", listing.Name, listing.DisplayName))
println("GetIamPolicy before setIamPolicy")
policy, err := listing_get_iam_policy(ctx, client, listing.Name)
if err == nil {
print_iam_policy(policy)
listing_add_iam_policy_member(ctx, client, listing.Name, "roles/analyticshub.subscriber", flags.subscriber_iam_member)
listing_add_iam_policy_member(ctx, client, listing.Name, "roles/analyticshub.viewer", flags.subscription_viewer_iam_member)
println("GetIamPolicy after setIamPolicy")
policy, err := listing_get_iam_policy(ctx, client, listing.Name)
if err == nil {
print_iam_policy(policy)
}
}
}
}
println("\nCreating Data Clean Room")
exchgDCR, err := create_or_get_exchange(ctx, client, flags.project_id, flags.location, flags.dcr_exchange_id, true)
if err == nil {
println(fmt.Sprintf("Exchange(DCR): [%s] %s", exchgDCR.Name, exchgDCR.DisplayName))
println("Creating BigQuery view with analysis rules")
create_bq_view_with_analysis_rules(ctx, bqClient, flags.project_id, flags.shared_ds, flags.dcr_shared_table, flags.dcr_view, flags.dcr_privacy_column)
tableMetadata, err := get_bq_table_metadata(ctx, bqClient, flags.shared_ds, flags.dcr_view)
if err == nil {
print(tableMetadata)
bq_dataset_add_authorization(ctx, bqClient, flags.project_id, flags.shared_ds, flags.dcr_view)
listingDCR, err := create_or_get_dcr_listing(
ctx,
client,
flags.project_id,
flags.location,
flags.dcr_exchange_id,
flags.dcr_listing_id,
flags.shared_ds,
flags.dcr_view,
)
if err == nil {
println(fmt.Sprintf("Listing(DCR): [%s] %s", listingDCR.Name, listingDCR.DisplayName))
}
}
}
}