tools/cloud-build/babysit/core.py (103 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
from typing import Sequence, Dict, Callable, Protocol
from dataclasses import dataclass
from google.cloud.devtools import cloudbuild_v1 # pip install google-cloud-build
from google.cloud.devtools.cloudbuild_v1.types.cloudbuild import Build, ApproveBuildRequest, ApprovalResult, RetryBuildRequest
Selector = Callable[[Build], bool]
Status = Build.Status
@dataclass
class BuildAndCount:
build: Build # latest build for this trigger
count: int # total count of builds for this trigger
class UIProto(Protocol): # just an interface
# TODO: add docs
def on_init(self, builds: Sequence[Build]) -> None: ...
def on_done(self, builds: Sequence[Build]) -> None: ...
def on_update(self, builds: Sequence[Build]) -> None: ...
def on_action(self, action: str, build: Build) -> None: ...
def sleep(self, sec: int) -> None: ...
def latest_by_trigger(builds: Sequence[Build]) -> Dict[str, BuildAndCount]:
"""
Returns a map trigger_name -> (latest_build, num_of_builds)
"""
byt: Dict[str, BuildAndCount] = {}
for b in builds:
t = trig_name(b)
if t not in byt:
byt[t] = BuildAndCount(b, 0)
if b.create_time > byt[t].build.create_time:
byt[t].build = b
byt[t].count += 1
return byt
def trig_name(build: Build) -> str:
return build.substitutions.get("TRIGGER_NAME", "???")
class Babysitter:
def __init__(self, ui: UIProto,
cb: cloudbuild_v1.services.cloud_build.CloudBuildClient,
project: str,
sha: str,
selectors: Sequence[Selector],
concurrency: int,
retries: int) -> None:
self.ui = ui
self.cb = cb
self.project = project
self.sha = sha
self.selectors = list(selectors)
self.concurrency = concurrency
self.retries = retries
def _get_builds(self) -> Sequence[Build]:
req = cloudbuild_v1.ListBuildsRequest(
project_id=self.project,
# cloud build only recognizes SHORT_SHA of length 7
filter=f"substitutions.SHORT_SHA={self.sha[:7]}",
page_size=1000,
)
builds = self.cb.list_builds(req).builds
return [b for b in builds if any(s(b) for s in self.selectors)]
def _in_terminal_state(self, bc: BuildAndCount) -> bool:
if bc.build.status in [Status.STATUS_UNKNOWN, Status.CANCELLED, Status.EXPIRED, Status.SUCCESS]:
return True
if bc.build.status in [Status.PENDING, Status.QUEUED, Status.WORKING]:
return False
if bc.build.status in [Status.FAILURE, Status.INTERNAL_ERROR, Status.TIMEOUT]:
return bc.count > self.retries
assert False, f"Unexpected status {bc.build.status}"
def _approve(self, build: Build) -> None:
self.ui.on_action("approve", build)
req = ApproveBuildRequest(
name=f"projects/{build.project_id}/builds/{build.id}",
approval_result=ApprovalResult(
decision=ApprovalResult.Decision.APPROVED
)
)
self.cb.approve_build(request=req)
def _retry(self, build: Build) -> None:
self.ui.on_action("retry", build)
req = RetryBuildRequest(project_id=build.project_id, id=build.id)
self.cb.retry_build(request=req)
def _take_action(self, builds: Sequence[Build]) -> bool:
"""
Returns bool - whether "babysitting" should be continued.
"""
latest = latest_by_trigger(builds).values()
active = [bc.build for bc in latest if not self._in_terminal_state(bc)]
if not active:
return False # all builds are in terminal state, done
not_running = [b for b in active if b.status not in (Status.QUEUED, Status.WORKING)]
num_running = len(active) - len(not_running)
if num_running == len(active):
return True # waiting for results
if num_running >= self.concurrency:
return True # waiting for "opening"
pending = [b for b in not_running if b.status == Status.PENDING]
if pending: # approve one of pending builds
self._approve(random.choice(pending))
return True
assert not_running # sanity check
failed = random.choice(not_running)
assert failed.status in [Status.FAILURE, Status.INTERNAL_ERROR, Status.TIMEOUT] # sanity check
self._retry(failed) # retry failed build
return True
def do(self):
builds = self._get_builds()
self.ui.on_init(builds)
if not builds:
return
while self._take_action(builds):
self.ui.sleep(10)
builds = self._get_builds()
self.ui.on_update(builds)
self.ui.on_done(builds)