data_validation/query_builder/partition_row_builder.py (34 lines of code) (raw):
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ibis
from data_validation import clients
from data_validation.query_builder.query_builder import QueryBuilder
class PartitionRowBuilder(object):
def __init__(
self,
primary_keys: [str],
data_client: ibis.backends.base.BaseBackend,
schema_name: str,
table_name: str,
custom_query: str,
query_builder: QueryBuilder,
) -> None:
"""Build a PartitionRowBuilder object which is ready to build a partition row filter query.
Args:
primary_keys [str]: Keys used to identify a row for validation
data_client (BaseBackend): The Backend used to query random rows.
schema_name (String): The name of the schema for the given table.
table_name (String): The name of the table to query.
query_builder (QueryBuilder): QueryBuilder object.
"""
self.primary_keys = primary_keys
self.query = self._compile_query(
data_client, schema_name, table_name, custom_query, query_builder
)
def _compile_query(
self,
data_client: ibis.backends.base.BaseBackend,
schema_name: str,
table_name: str,
custom_query: str,
query_builder: QueryBuilder,
) -> ibis.Expr:
"""Return an Ibis query object
Args:
data_client (BaseBackend): The Backend used to query random rows.
schema_name (String): The name of the schema for the given table.
table_name (String): The name of the table to query.
custom_query (String) : Custom query provided instead of a table
query_builder (QueryBuilder): QueryBuilder object.
"""
if table_name:
table = clients.get_ibis_table(data_client, schema_name, table_name)
else:
table = clients.get_ibis_query(data_client, custom_query)
compiled_filters = query_builder.compile_filter_fields(table)
filtered_table = table.filter(compiled_filters) if compiled_filters else table
return filtered_table
def get_count(self) -> int:
"""Return a count of rows of primary keys - they should be all distinct"""
return self.query[self.primary_keys].count().force_cast("int64").execute()