cmd/integration_test/build/script/check_concurrent_pipelines.py (23 lines of code) (raw):
import boto3
client = boto3.client('codepipeline')
def is_pipeline_idle(pipeline_name):
ps = client.get_pipeline_state(name=pipeline_name)
print(f"Checking if test stage in {pipeline_name} is enabled.")
for ss in ps['stageStates']:
if ss['stageName'].startswith('TestEKSA'):
if ss['inboundTransitionState']['enabled'] == False:
print(f"TestEKSA stage is disabled, pipeline {pipeline_name} is idle")
return True
print("TestEKSA stage is enabled, checking if any stage is in progress.")
for ss in ps['stageStates']:
if ss['latestExecution']['status'] == 'InProgress':
print(f"{ss['stageName']} is in progress, pipeline is not idle")
return False
print(f"No stage is in progress, pipeline {pipeline_name} is idle")
return True
ps = list(map(lambda p: p['name'], filter(lambda p: p['name'].startswith('aws-eks-anywhere-release'), client.list_pipelines()['pipelines'])))
ps.append('aws-eks-anywhere') # main
if any(map(lambda p: not is_pipeline_idle(p), ps)):
print("Some test pipelines are not idle")
exit(1)
print("Check completed, all test piplines are idle")