def _list_tables_in_dataset_for_regeneration()

in src/package/dataplexutils/metadata/wizard.py [0:0]


    def _list_tables_in_dataset_for_regeneration(self,dataset_fqn):
        """Lists all tables in a given dataset.
        """
        try:
            # Create Dataplex Catalog client
            client = self._cloud_clients[constants["CLIENTS"]["DATAPLEX_CATALOG"]]
            
            # Get project and dataset IDs
            project_id, dataset_id = self._split_dataset_fqn(dataset_fqn)
            
            

            # Build the search request
            name = f"projects/{project_id}/locations/global"
            query = f"""system=BIGQUERY  parent:{dataset_id}  aspect:{project_id}.global.{constants["ASPECT_TEMPLATE"]["name"]}.to-be-regenerated=true"""
            logger.info(f"Query: {query}")
            # Execute search request
            request = dataplex_v1.SearchEntriesRequest( 
                name=name,
                query=query
            )
            
            table_names = []
            try:
                # Get all pages of results
                search_results = client.search_entries(request=request)

                for result in search_results:
                    if result.dataplex_entry.fully_qualified_name.startswith("bigquery:"):
                        table_fqn = result.dataplex_entry.fully_qualified_name.replace("bigquery:", "")
                        table_names.append(table_fqn)
                        #logger.info(f"result: {result}")
                    
                return table_names        
                     
            except google.api_core.exceptions.PermissionDenied:
                logger.warning(f"Permission denied when searching for tables in dataset {dataset_fqn}")
                # Fall back to using BigQuery client
                return self._list_tables_in_dataset_bigquery(dataset_fqn)
            
        except Exception as e:
            logger.error(f"Error listing tables in dataset {dataset_fqn}: {e}")
            raise e
            
        return self._list_tables_in_dataset(dataset_fqn)