gcpdiag/queries/dataproc.py (415 lines of code) (raw):
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Queries related to Dataproc."""
import logging
import re
from typing import Iterable, List, Mapping, Optional
import googleapiclient.errors
import requests
from gcpdiag import caching, config, models, utils
from gcpdiag.lint import get_executor
from gcpdiag.queries import apis, crm, gce, network, web
class Cluster(models.Resource):
"""Represents Dataproc Cluster"""
name: str
_resource_data: Mapping
def __init__(self, name: str, project_id: str, resource_data: Mapping):
super().__init__(project_id)
self.name = name
self._resource_data = resource_data
def is_running(self) -> bool:
return self.status == 'RUNNING'
def get_software_property(self, property_name) -> str:
return self._resource_data['config']['softwareConfig']['properties'].get(
property_name)
def is_stackdriver_logging_enabled(self) -> bool:
# Unless overridden during create,
# properties with default values are not returned,
# therefore get_software_property should only return when its false
return (not self.get_software_property(
'dataproc:dataproc.logging.stackdriver.enable') == 'false')
def is_stackdriver_monitoring_enabled(self) -> bool:
return (self.get_software_property(
'dataproc:dataproc.monitoring.stackdriver.enable') == 'true')
@property
def region(self) -> str:
"""biggest regions have a trailing '-d' at most in its zoneUri
https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d
"""
return self._resource_data['config']['gceClusterConfig']['zoneUri'].split(
'/')[-1][0:-2]
@property
def zone(self) -> Optional[str]:
zone = (self._resource_data.get('config', {}).get('gceClusterConfig',
{}).get('zoneUri'))
if zone:
m = re.search(r'/zones/([^/]+)$', zone)
if m:
return m.group(1)
raise RuntimeError(f"can't determine zone for cluster {self.name}")
@property
def full_path(self) -> str:
return (
f'projects/{self.project_id}/regions/{self.region}/clusters/{self.name}'
)
@property
def short_path(self) -> str:
return f'{self.project_id}/{self.region}/{self.name}'
@property
def status(self) -> str:
return self._resource_data['status']['state']
def __str__(self) -> str:
return self.short_path
@property
def cluster_uuid(self) -> str:
return self._resource_data['clusterUuid']
@property
def image_version(self):
return self._resource_data['config']['softwareConfig']['imageVersion']
@property
def vm_service_account_email(self):
sa = self._resource_data['config']['gceClusterConfig'].get('serviceAccount')
if sa is None:
sa = crm.get_project(self.project_id).default_compute_service_account
return sa
@property
def is_custom_gcs_connector(self) -> bool:
return bool(
self._resource_data.get('config', {}).get('gceClusterConfig', {}).get(
'metadata', {}).get('GCS_CONNECTOR_VERSION'))
@property
def cluster_provided_bq_connector(self):
"""Check user-supplied BigQuery connector on the cluster level"""
bigquery_connector = (self._resource_data.get('config', {}).get(
'gceClusterConfig', {}).get('metadata',
{}).get('SPARK_BQ_CONNECTOR_VERSION'))
if not bigquery_connector:
bigquery_connector = (self._resource_data.get('config', {}).get(
'gceClusterConfig', {}).get('metadata',
{}).get('SPARK_BQ_CONNECTOR_URL'))
if bigquery_connector:
if bigquery_connector == 'spark-bigquery-latest.jar':
return 'spark-bigquery-latest'
else:
match = re.search(
r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar',
bigquery_connector)
if match:
return match.group(1)
# If returns None, it means that the cluster is using the default,
# pre-installed BQ connector for the image version
return bigquery_connector
@property
def is_gce_cluster(self) -> bool:
return bool(self._resource_data.get('config', {}).get('gceClusterConfig'))
@property
def gce_network_uri(self) -> Optional[str]:
"""Get network uri from cluster network or subnetwork"""
if not self.is_gce_cluster:
raise RuntimeError(
'Can not return network URI for a Dataproc on GKE cluster')
network_uri = (self._resource_data.get('config',
{}).get('gceClusterConfig',
{}).get('networkUri'))
if not network_uri:
subnetwork_uri = (self._resource_data.get('config', {}).get(
'gceClusterConfig', {}).get('subnetworkUri'))
network_uri = network.get_subnetwork_from_url(subnetwork_uri).network
return network_uri
@property
def gce_subnetwork_uri(self) -> Optional[str]:
"""Get subnetwork uri from cluster subnetwork."""
if not self.is_gce_cluster:
raise RuntimeError(
'Can not return subnetwork URI for a Dataproc on GKE cluster')
subnetwork_uri = (self._resource_data.get('config',
{}).get('gceClusterConfig',
{}).get('subnetworkUri'))
if not subnetwork_uri:
subnetwork_uri = ('https://www.googleapis.com/compute/v1/projects/' +
self.project_id + '/regions/' + self.region +
'/subnetworks/default')
return subnetwork_uri
@property
def is_single_node_cluster(self) -> bool:
workers = (self._resource_data.get('config',
{}).get('workerConfig',
{}).get('numInstances', 0))
return workers == 0
@property
def is_ha_cluster(self) -> bool:
masters = (self._resource_data.get('config',
{}).get('masterConfig',
{}).get('numInstances', 1))
return masters != 1
@property
def is_internal_ip_only(self) -> bool:
# internalIpOnly is set to true by default when creating a
# Dataproc 2.2 image version cluster.
# The default should be false in older versions instead.
internal_ip_only = self._resource_data['config']['gceClusterConfig'][
'internalIpOnly']
return internal_ip_only
@property
def has_autoscaling_policy(self) -> bool:
"""Checks if an autoscaling policy is configured for the cluster."""
return bool(self._resource_data['config'].get('autoscalingConfig', {}))
@property
def autoscaling_policy_id(self) -> str:
"""Returns the autoscaling policy ID for the cluster."""
if self.has_autoscaling_policy:
return (self._resource_data['config'].get('autoscalingConfig',
{}).get('policyUri',
'').split('/')[-1])
else:
return ''
@property
def number_of_primary_workers(self) -> float:
"""Gets the number of primary worker nodes in the cluster."""
return (self._resource_data['config'].get('workerConfig',
{}).get('numInstances', 0))
@property
def number_of_secondary_workers(self) -> float:
"""Gets the number of secondary worker nodes in the cluster."""
return (self._resource_data['config'].get('secondaryWorkerConfig',
{}).get('numInstances', 0))
@property
def is_preemptible_primary_workers(self) -> bool:
"""Checks if the primary worker nodes in the cluster are preemptible."""
return (self._resource_data['config'].get('workerConfig',
{}).get('isPreemptible', False))
@property
def is_preemptible_secondary_workers(self) -> bool:
"""Checks if the secondary worker nodes in the cluster are preemptible."""
return (self._resource_data['config'].get('secondaryWorkerConfig',
{}).get('isPreemptible', False))
@property
def initialization_actions(self) -> List[str]:
return self._resource_data['config'].get('initializationActions', [])
class Region:
"""Represents Dataproc region"""
project_id: str
region: str
def __init__(self, project_id: str, region: str):
self.project_id = project_id
self.region = region
def get_clusters(self, context: models.Context) -> Iterable[Cluster]:
clusters = []
for cluster in self.query_api():
if not context.match_project_resource(resource=cluster.get('clusterName'),
labels=cluster.get('labels', {})):
continue
c = Cluster(
name=cluster['clusterName'],
project_id=self.project_id,
resource_data=cluster,
)
clusters.append(c)
return clusters
def query_api(self) -> Iterable[dict]:
try:
api = apis.get_api('dataproc', 'v1', self.project_id)
query = (api.projects().regions().clusters().list(
projectId=self.project_id, region=self.region))
# be careful not to retry too many times because querying all regions
# sometimes causes requests to fail permanently
resp = query.execute(num_retries=1)
return resp.get('clusters', [])
except googleapiclient.errors.HttpError as err:
# b/371526148 investigate permission denied error
logging.error(err)
return []
# raise utils.GcpApiError(err) from err
class Dataproc:
"""Represents Dataproc product"""
project_id: str
def __init__(self, project_id: str):
self.project_id = project_id
def get_regions(self) -> Iterable[Region]:
return [
Region(self.project_id, r.name)
for r in gce.get_all_regions(self.project_id)
]
def is_api_enabled(self) -> bool:
return apis.is_enabled(self.project_id, 'dataproc')
@caching.cached_api_call
def get_clusters(context: models.Context) -> Iterable[Cluster]:
r: List[Cluster] = []
dataproc = Dataproc(context.project_id)
if not dataproc.is_api_enabled():
return r
executor = get_executor()
for clusters in executor.map(lambda r: r.get_clusters(context),
dataproc.get_regions()):
r += clusters
return r
@caching.cached_api_call
def get_cluster(cluster_name, region, project) -> Optional[Cluster]:
api = apis.get_api('dataproc', 'v1', project)
request = api.projects().regions().clusters().get(projectId=project,
clusterName=cluster_name,
region=region)
try:
r = request.execute(num_retries=config.API_RETRIES)
except (googleapiclient.errors.HttpError,
requests.exceptions.RequestException):
#logging.error(err)
return None
return Cluster(r['clusterName'], project_id=r['projectId'], resource_data=r)
class AutoScalingPolicy(models.Resource):
"""AutoScalingPolicy."""
_resource_data: dict
def __init__(self, project_id, resource_data, region):
super().__init__(project_id=project_id)
self._resource_data = resource_data
self.region = region
@property
def policy_id(self) -> str:
return self._resource_data['id']
@property
def full_path(self) -> str:
return self._resource_data['name']
@property
def short_path(self) -> str:
return f'{self.project_id}/{self.region}/{self.policy_id}'
@property
def name(self) -> str:
return self._resource_data['name']
@property
def scale_down_factor(self) -> float:
return self._resource_data['basicAlgorithm']['yarnConfig'].get(
'scaleDownFactor', 0.0)
@property
def has_graceful_decommission_timeout(self) -> bool:
"""Checks if a graceful decommission timeout is configured in the autoscaling policy."""
return bool(
self._resource_data.get('basicAlgorithm',
{}).get('yarnConfig',
{}).get('gracefulDecommissionTimeout',
{}))
@property
def graceful_decommission_timeout(self) -> float:
"""Gets the configured graceful decommission timeout in the autoscaling policy."""
return (self._resource_data.get('basicAlgorithm',
{}).get('yarnConfig', {}).get(
'gracefulDecommissionTimeout', -1))
@caching.cached_api_call
def get_auto_scaling_policy(project_id: str, region: str,
policy_id: str) -> AutoScalingPolicy:
logging.debug('fetching autoscalingpolicy: %s', project_id)
dataproc = apis.get_api('dataproc', 'v1', project_id)
name = (
f'projects/{project_id}/regions/{region}/autoscalingPolicies/{policy_id}')
try:
request = dataproc.projects().regions().autoscalingPolicies().get(name=name)
response = request.execute(num_retries=config.API_RETRIES)
return AutoScalingPolicy(project_id, response, region)
except googleapiclient.errors.HttpError as err:
raise utils.GcpApiError(err) from err
@caching.cached_api_call
def list_auto_scaling_policies(project_id: str,
region: str) -> List[AutoScalingPolicy]:
"""Lists all autoscaling policies in the given project and region."""
dataproc = apis.get_api('dataproc', 'v1', project_id)
parent = f'projects/{project_id}/regions/{region}'
try:
request = (dataproc.projects().regions().autoscalingPolicies().list(
parent=parent))
response = request.execute(num_retries=config.API_RETRIES)
return [
AutoScalingPolicy(project_id, policy_data, region)
for policy_data in response.get('policies', [])
]
except googleapiclient.errors.HttpError as err:
raise utils.GcpApiError(err) from err
class Job(models.Resource):
"""Job."""
_resource_data: dict
def __init__(self, project_id, job_id, region, resource_data):
super().__init__(project_id=project_id)
self._resource_data = resource_data
self.region = region
self.job_id = job_id
@property
def full_path(self) -> str:
return (
f'projects/{self.project_id}/regions/{self.region}/jobs/{self.job_id}')
@property
def short_path(self) -> str:
return f'{self.project_id}/{self.region}/{self.job_id}'
@property
def cluster_name(self) -> str:
return self._resource_data['placement']['clusterName']
@property
def cluster_uuid(self) -> str:
return self._resource_data['placement']['clusterUuid']
@property
def state(self):
return self._resource_data['status']['state']
@property
def details(self):
if self._resource_data['status']['state'] == 'ERROR':
return self._resource_data['status']['details']
return None
@property
def status_history(self):
status_history_dict = {}
for previous_status in self._resource_data['statusHistory']:
if previous_status['state'] not in status_history_dict:
status_history_dict[
previous_status['state']] = previous_status['stateStartTime']
return status_history_dict
@property
def yarn_applications(self):
return self._resource_data['yarnApplications']
@property
def driver_output_resource_uri(self):
return self._resource_data.get('driverOutputResourceUri')
@property
def job_uuid(self):
return self._resource_data.get('jobUuid')
@property
def job_provided_bq_connector(self):
"""Check user-supplied BigQuery connector on the job level"""
jar_file_uris = (self._resource_data.get('sparkJob', {}).get('jarFileUris'))
if jar_file_uris is not None:
for file in jar_file_uris:
if 'spark-bigquery-latest.jar' in file:
return 'spark-bigquery-latest'
else:
match = re.search(
r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar',
file)
if match:
return match.group(1)
return None
@caching.cached_api_call
def get_job_by_jobid(project_id: str, region: str, job_id: str):
dataproc = apis.get_api('dataproc', 'v1', project_id)
try:
request = (dataproc.projects().regions().jobs().get(projectId=project_id,
region=region,
jobId=job_id))
response = request.execute(num_retries=config.API_RETRIES)
return Job(project_id, region, job_id, response)
except googleapiclient.errors.HttpError as err:
raise utils.GcpApiError(err) from err
@caching.cached_api_call
def extract_dataproc_supported_version() -> list[str]:
"""Extract the supported Dataproc versions(use Debian as representative).
"""
page_url = 'https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-version-clusters'
try:
table = web.fetch_and_extract_table(page_url,
tag='h3',
tag_id='debian_images')
if table:
rows = table.find_all('tr')[1:] #Skip the header row
version_list = []
for row in rows:
dp_version = row.find_all('td')[0].get_text().strip().split('-')[0]
version_list.append(dp_version)
return version_list
else:
return []
except (
requests.exceptions.RequestException,
AttributeError,
TypeError,
ValueError,
IndexError,
) as e:
logging.error(
'Error in extracting dataproc versions: %s',
e,
)
return []
@caching.cached_api_call
def extract_dataproc_bigquery_version(image_version) -> list[str]:
"""Extract Dataproc BigQuery connector versions based on image version GCP documentation.
"""
page_url = ('https://cloud.google.com/dataproc/docs/concepts/versioning/'
'dataproc-release-' + image_version)
try:
table = web.fetch_and_extract_table(page_url, tag='div')
bq_version = []
if table:
rows = table.find_all('tr')[1:]
for row in rows:
cells = row.find_all('td')
if 'BigQuery Connector' in cells[0].get_text(strip=True):
bq_version = cells[1].get_text(strip=True)
return bq_version
except (
requests.exceptions.RequestException,
AttributeError,
TypeError,
ValueError,
IndexError,
) as e:
logging.error(
'%s Error in extracting BigQuery connector versions.'
' Please check BigQuery Connector version on %s',
e,
page_url,
)
return []