bigtable/filters/filters.go (117 lines of code) (raw):

// Copyright 2019 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package filters contains snippets related to reading data from Cloud Bigtable // with various filters. package filters // [START bigtable_filters_imports] import ( "context" "fmt" "io" "sort" "strings" "time" "cloud.google.com/go/bigtable" ) // [END bigtable_filters_imports] // [START bigtable_filters_limit_row_sample] func filterLimitRowSample(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.RowSampleFilter(.75) return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_limit_row_sample] // [START bigtable_filters_limit_row_regex] func filterLimitRowRegex(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.RowKeyFilter(".*#20190501$") return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_limit_row_regex] // [START bigtable_filters_limit_cells_per_col] func filterLimitCellsPerCol(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.LatestNFilter(2) return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_limit_cells_per_col] // [START bigtable_filters_limit_cells_per_row] func filterLimitCellsPerRow(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.CellsPerRowLimitFilter(2) return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_limit_cells_per_row] // [START bigtable_filters_limit_cells_per_row_offset] func filterLimitCellsPerRowOffset(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.CellsPerRowOffsetFilter(2) return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_limit_cells_per_row_offset] // [START bigtable_filters_limit_col_family_regex] func filterLimitColFamilyRegex(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.FamilyFilter("stats_.*$") return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_limit_col_family_regex] // [START bigtable_filters_limit_col_qualifier_regex] func filterLimitColQualifierRegex(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.ColumnFilter("connected_.*$") return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_limit_col_qualifier_regex] // [START bigtable_filters_limit_col_range] func filterLimitColRange(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.ColumnRangeFilter("cell_plan", "data_plan_01gb", "data_plan_10gb") return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_limit_col_range] // [START bigtable_filters_limit_value_range] func filterLimitValueRange(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.ValueRangeFilter([]byte("PQ2A.190405"), []byte("PQ2A.190406")) return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_limit_value_range] // [START bigtable_filters_limit_value_regex] func filterLimitValueRegex(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.ValueFilter("PQ2A.*$") return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_limit_value_regex] // [START bigtable_filters_limit_timestamp_range] func filterLimitTimestampRange(w io.Writer, projectID, instanceID string, tableName string) error { startTime := time.Unix(0, 0) endTime := time.Now().Add(-1 * time.Hour) filter := bigtable.TimestampRangeFilter(startTime, endTime) return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_limit_timestamp_range] // [START bigtable_filters_limit_block_all] func filterLimitBlockAll(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.BlockAllFilter() return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_limit_block_all] // [START bigtable_filters_limit_pass_all] func filterLimitPassAll(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.PassAllFilter() return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_limit_pass_all] // [START bigtable_filters_modify_strip_value] func filterModifyStripValue(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.StripValueFilter() return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_modify_strip_value] // [START bigtable_filters_composing_chain] func filterComposingChain(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.ChainFilters(bigtable.LatestNFilter(1), bigtable.FamilyFilter("cell_plan")) return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_composing_chain] // [START bigtable_filters_composing_interleave] func filterComposingInterleave(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.InterleaveFilters(bigtable.ValueFilter("true"), bigtable.ColumnFilter("os_build")) return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_composing_interleave] // [START bigtable_filters_composing_condition] func filterComposingCondition(w io.Writer, projectID, instanceID string, tableName string) error { filter := bigtable.ConditionFilter( bigtable.ChainFilters(bigtable.ColumnFilter("data_plan_10gb"), bigtable.ValueFilter("true")), bigtable.StripValueFilter(), bigtable.PassAllFilter()) return readWithFilter(w, projectID, instanceID, tableName, filter) } // [END bigtable_filters_composing_condition] // [START bigtable_filters_print] func readWithFilter(w io.Writer, projectID, instanceID string, tableName string, filter bigtable.Filter) error { // projectID := "my-project-id" // instanceID := "my-instance-id" // tableName := "mobile-time-series" ctx := context.Background() client, err := bigtable.NewClient(ctx, projectID, instanceID) if err != nil { return fmt.Errorf("bigtable.NewAdminClient: %w", err) } tbl := client.Open(tableName) err = tbl.ReadRows(ctx, bigtable.RowRange{}, func(row bigtable.Row) bool { printRow(w, row) return true }, bigtable.RowFilter(filter)) if err = client.Close(); err != nil { return fmt.Errorf("client.Close(): %w", err) } return nil } func printRow(w io.Writer, row bigtable.Row) { fmt.Fprintf(w, "Reading data for %s:\n", row.Key()) var keys []string for k := range row { keys = append(keys, k) } sort.Strings(keys) for _, columnFamily := range keys { cols := row[columnFamily] fmt.Fprintf(w, "Column Family %s\n", columnFamily) for _, col := range cols { qualifier := col.Column[strings.IndexByte(col.Column, ':')+1:] fmt.Fprintf(w, "\t%s: %s @%d\n", qualifier, col.Value, col.Timestamp) } } fmt.Fprintln(w) } // [END bigtable_filters_print]