in images/orbit-controller/src/orbit_controller/operators/imagereplication_operator.py [0:0]
def replication_worker(queue: Queue, statuses: Dict[str, Any], logger: logging.Logger) -> None: # type: ignore
client = dynamic_client()
while True:
destination = queue.get(block=True, timeout=None)
with LOCK:
source = statuses.get(destination, {}).get("source", None)
status = statuses.get(destination, {}).get("status", None)
patch: Dict[str, Any] = {}
spec = {"source": source, "destination": destination}
if status is None:
replication_status = replication_checker(status={}, spec=spec, patch=patch, logger=logger) # type: ignore
if replication_status == "ECRImageExists":
logger.info("ECR Image already exists: %s", destination)
queue.task_done()
continue
status = patch
namespace, name = imagereplication_utils.create_imagereplication(
namespace="orbit-system",
source=source,
destination=destination,
client=client,
logger=logger,
)
statuses[destination]["namespace"] = namespace
statuses[destination]["name"] = name
logger.debug("New Status: %s", status)
else:
replication_status = status.get("status", {}).get("replication", {}).get("replicationStatus", None)
status = {**status, **patch}
imagereplication_utils.update_imagereplication_status(
namespace=statuses[destination]["namespace"],
name=statuses[destination]["name"],
status=status["status"],
client=client,
logger=logger,
)
logger.info("Replication Checker: %s Source: %s", replication_status, source)
if replication_status not in ["Pending", "Failed"]:
with LOCK:
statuses[destination]["status"] = status
queue.task_done()
continue
patch = {}
replication_status = scheduler(status=status["status"], patch=patch, logger=logger) # type: ignore
status = {**status, **patch}
imagereplication_utils.update_imagereplication_status(
namespace=statuses[destination]["namespace"],
name=statuses[destination]["name"],
status=status["status"],
client=client,
logger=logger,
)
logger.info("Scheduler: %s Source: %s", replication_status, source)
if replication_status != "Scheduled":
with LOCK:
statuses[destination]["status"] = status
queue.task_done()
continue
patch = {}
replication_status = codebuild_runner( # type: ignore
spec=cast(kopf.Spec, spec),
status=status["status"],
patch=cast(kopf.Patch, patch),
logger=logger,
)
status = {**status, **patch}
imagereplication_utils.update_imagereplication_status(
namespace=statuses[destination]["namespace"],
name=statuses[destination]["name"],
status=status["status"],
client=client,
logger=logger,
)
logger.info("CodeBuild Runner: %s Source: %s", replication_status, source)
if replication_status != "Replicating":
if _needs_rescheduling(status=cast(kopf.Status, patch)):
queue.put(destination)
with LOCK:
statuses[destination]["status"] = status
queue.task_done()
continue
patch = {}
replication_status = codebuild_monitor(status=status["status"], patch=patch, logger=logger) # type: ignore
status = {**status, **patch}
imagereplication_utils.update_imagereplication_status(
namespace=statuses[destination]["namespace"],
name=statuses[destination]["name"],
status=status["status"],
client=client,
logger=logger,
)
logger.info("CodeBuild Monitor: %s Source: %s", replication_status, source)
while replication_status == "IN_PROGRESS":
time.sleep(20)
patch = {}
replication_status = codebuild_monitor(status=status["status"], patch=patch, logger=logger) # type: ignore
status = {**status, **patch}
logger.info("CodeBuild Monitor: %s Source: %s", replication_checker, source)
if replication_status != "SUCCEEDED" and _needs_rescheduling(status=status["status"]):
queue.put(destination)
with LOCK:
statuses[destination]["status"] = status
imagereplication_utils.update_imagereplication_status(
namespace=statuses[destination]["namespace"],
name=statuses[destination]["name"],
status=status["status"],
client=client,
logger=logger,
)
queue.task_done()
continue