in gcpdiag/lint/gce/warn_2022_012_windows_kms.py [0:0]
def run_rule(context: models.Context, report: lint.LintReportRuleInterface):
# skip entire rule if no instances
instances = gce.get_instances(context).values()
if len(instances) == 0:
report.add_skipped(None, 'No instances found')
return
# Load gcp non-byol licenses
licenses = gce.get_gce_public_licences('windows-cloud')
payg_licenses = [x for x in licenses if not x.endswith('-byol')]
# Add windows to new list and skip entire rule if no Windows instances
for instance in sorted(instances,
key=op.attrgetter('project_id', 'full_path')):
fault_list = []
is_faulty = False
# Skip non-Windows machines
if not instance.is_windows_machine():
continue
# Skip BYOL instances
if not instance.check_license(payg_licenses):
report.add_skipped(instance, 'No PAYG licence attached to this instance')
continue
# Check for public IP instances
if instance.is_public_machine():
# Firewall rule check
result = instance.network.firewall.check_connectivity_egress(
src_ip=KMS_FW_RULE,
ip_protocol='tcp',
port=KMS_PORT,
target_service_account=instance.service_account,
target_tags=instance.tags)
if result.action == 'deny':
# Implied deny is a pass for external IP instances
if result.matched_by_str is not None:
fault_list.append(
f'connections from {KMS_FW_RULE} to tcp:{KMS_PORT} blocked by '
f'{result.matched_by_str}')
is_faulty = True
# Check for private IP instances
else:
# PGA check
for subnetwork in instance.subnetworks:
if not subnetwork.is_private_ip_google_access():
fault_list.append(
f'Subnetwork {subnetwork.name} does not have Private Google Access enabled.'
)
is_faulty = True
# Firewall rule check
result = instance.network.firewall.check_connectivity_egress(
src_ip=KMS_FW_RULE,
ip_protocol='tcp',
port=KMS_PORT,
target_service_account=instance.service_account,
target_tags=instance.tags)
if result.action == 'deny':
if result.matched_by_str is None:
fault_list.append(
f'Connectivity to {KMS_FW_RULE} and port tcp:{KMS_PORT} not found '
f'in VPC.')
else:
fault_list.append(
f'connections from {KMS_FW_RULE} to tcp:{KMS_PORT} blocked by '
f'{result.matched_by_str}.')
is_faulty = True
# Routes Check
if not kms_route_access(instance):
fault_list.append(
f'Route {KMS_ROUTE} with next hop {NEXT_HOP} not found in VPC.')
is_faulty = True
if is_faulty:
report.add_failed(instance, utils.format_fault_list(fault_list))
else:
report.add_ok(instance)