in bigquery/bigquery_storage_quickstart/main.go [76:185]
func main() {
flag.Parse()
ctx := context.Background()
bqReadClient, err := bqStorage.NewBigQueryReadClient(ctx)
if err != nil {
log.Fatalf("NewBigQueryStorageClient: %v", err)
}
defer bqReadClient.Close()
// Verify we've been provided a parent project which will contain the read session. The
// session may exist in a different project than the table being read.
if *projectID == "" {
log.Fatalf("No parent project ID specified, please supply using the --project_id flag.")
}
// This example uses baby name data from the public datasets.
srcProjectID := "bigquery-public-data"
srcDatasetID := "usa_names"
srcTableID := "usa_1910_current"
readTable := fmt.Sprintf("projects/%s/datasets/%s/tables/%s",
srcProjectID,
srcDatasetID,
srcTableID,
)
// We limit the output columns to a subset of those allowed in the table,
// and set a simple filter to only report names from the state of
// Washington (WA).
tableReadOptions := &storagepb.ReadSession_TableReadOptions{
SelectedFields: []string{"name", "number", "state"},
RowRestriction: `state = "WA"`,
}
dataFormat := storagepb.DataFormat_AVRO
if *format == ARROW_FORMAT {
dataFormat = storagepb.DataFormat_ARROW
}
createReadSessionRequest := &storagepb.CreateReadSessionRequest{
Parent: fmt.Sprintf("projects/%s", *projectID),
ReadSession: &storagepb.ReadSession{
Table: readTable,
DataFormat: dataFormat,
ReadOptions: tableReadOptions,
},
MaxStreamCount: 1,
}
// Set a snapshot time if it's been specified.
if *snapshotMillis > 0 {
ts := timestamppb.New(time.Unix(0, *snapshotMillis*1000))
if !ts.IsValid() {
log.Fatalf("Invalid snapshot millis (%d): %v", *snapshotMillis, err)
}
createReadSessionRequest.ReadSession.TableModifiers = &storagepb.ReadSession_TableModifiers{
SnapshotTime: ts,
}
}
// Create the session from the request.
session, err := bqReadClient.CreateReadSession(ctx, createReadSessionRequest, rpcOpts)
if err != nil {
log.Fatalf("CreateReadSession: %v", err)
}
fmt.Printf("Read session: %s\n", session.GetName())
if len(session.GetStreams()) == 0 {
log.Fatalf("no streams in session. if this was a small query result, consider writing to output to a named table.")
}
// We'll use only a single stream for reading data from the table. Because
// of dynamic sharding, this will yield all the rows in the table. However,
// if you wanted to fan out multiple readers you could do so by having a
// increasing the MaxStreamCount.
readStream := session.GetStreams()[0].Name
ch := make(chan *storagepb.ReadRowsResponse)
// Use a waitgroup to coordinate the reading and decoding goroutines.
var wg sync.WaitGroup
// Start the reading in one goroutine.
wg.Add(1)
go func() {
defer wg.Done()
if err := processStream(ctx, bqReadClient, readStream, ch); err != nil {
log.Fatalf("processStream failure: %v", err)
}
close(ch)
}()
// Start Avro processing and decoding in another goroutine.
wg.Add(1)
go func() {
defer wg.Done()
var err error
switch *format {
case ARROW_FORMAT:
err = processArrow(ctx, session.GetArrowSchema().GetSerializedSchema(), ch)
case AVRO_FORMAT:
err = processAvro(ctx, session.GetAvroSchema().GetSchema(), ch)
}
if err != nil {
log.Fatalf("error processing %s: %v", *format, err)
}
}()
// Wait until both the reading and decoding goroutines complete.
wg.Wait()
}