in servicecatalog_puppet/commands/misc.py [0:0]
def wait_for_parameterised_run_to_complete(on_complete_url: str) -> bool:
with betterboto_client.ClientContextManager("s3") as s3:
paginator = s3.get_paginator("list_object_versions")
pages = paginator.paginate(
Bucket=f"sc-puppet-parameterised-runs-{config.get_puppet_account_id()}",
)
for page in pages:
for version in page.get("Versions", []):
if version.get("Key") == "parameters.zip" and version.get("IsLatest"):
parameters_file_version_id = version.get("VersionId")
while True:
time.sleep(5)
with betterboto_client.ClientContextManager(
"codepipeline"
) as codepipeline:
click.echo(
f"looking for execution for {parameters_file_version_id}"
)
paginator = codepipeline.get_paginator(
"list_pipeline_executions"
)
pages = paginator.paginate(
pipelineName=constants.PIPELINE_NAME,
PaginationConfig={"PageSize": 100,},
)
for page in pages:
for pipeline_execution_summary in page.get(
"pipelineExecutionSummaries", []
):
if (
pipeline_execution_summary.get("trigger").get(
"triggerDetail"
)
== "ParameterisedSource"
):
for s in pipeline_execution_summary.get(
"sourceRevisions", []
):
if (
s.get("actionName")
== "ParameterisedSource"
and s.get("revisionId")
== parameters_file_version_id
):
pipeline_execution_id = pipeline_execution_summary.get(
"pipelineExecutionId"
)
click.echo(
f"Found execution id {pipeline_execution_id}"
)
while True:
time.sleep(10)
pipelineExecution = codepipeline.get_pipeline_execution(
pipelineName=constants.PIPELINE_NAME,
pipelineExecutionId=pipeline_execution_id,
).get(
"pipelineExecution"
)
status = pipelineExecution.get(
"status"
)
click.echo(
f"Current status (A): {status}"
)
if status in [
"Cancelled",
"Stopped",
"Succeeded",
"Superseded",
"Failed",
]:
succeeded = status in [
"Succeeded"
]
if on_complete_url:
logger.info(
f"About to post results"
)
if succeeded:
result = dict(
Status="SUCCESS",
Reason=f"All tasks run with success: {pipeline_execution_id}",
UniqueId=pipeline_execution_id.replace(
":", ""
).replace(
"-", ""
),
Data=f"{pipeline_execution_id}",
)
else:
result = dict(
Status="FAILURE",
Reason=f"All tasks did not run with success: {pipeline_execution_id}",
UniqueId=pipeline_execution_id.replace(
":", ""
).replace(
"-", ""
),
Data=f"{pipeline_execution_id}",
)
req = urllib.request.Request(
url=on_complete_url,
data=json.dumps(
result
).encode(),
method="PUT",
)
with urllib.request.urlopen(
req
) as f:
pass
logger.info(f.status)
logger.info(f.reason)
return succeeded