def replication_worker()

in images/orbit-controller/src/orbit_controller/operators/imagereplication_operator.py [0:0]


def replication_worker(queue: Queue, statuses: Dict[str, Any], logger: logging.Logger) -> None:  # type: ignore
    client = dynamic_client()

    while True:
        destination = queue.get(block=True, timeout=None)
        with LOCK:
            source = statuses.get(destination, {}).get("source", None)
            status = statuses.get(destination, {}).get("status", None)

        patch: Dict[str, Any] = {}
        spec = {"source": source, "destination": destination}

        if status is None:
            replication_status = replication_checker(status={}, spec=spec, patch=patch, logger=logger)  # type: ignore
            if replication_status == "ECRImageExists":
                logger.info("ECR Image already exists: %s", destination)
                queue.task_done()
                continue
            status = patch
            namespace, name = imagereplication_utils.create_imagereplication(
                namespace="orbit-system",
                source=source,
                destination=destination,
                client=client,
                logger=logger,
            )
            statuses[destination]["namespace"] = namespace
            statuses[destination]["name"] = name
            logger.debug("New Status: %s", status)
        else:
            replication_status = status.get("status", {}).get("replication", {}).get("replicationStatus", None)

        status = {**status, **patch}
        imagereplication_utils.update_imagereplication_status(
            namespace=statuses[destination]["namespace"],
            name=statuses[destination]["name"],
            status=status["status"],
            client=client,
            logger=logger,
        )
        logger.info("Replication Checker: %s Source: %s", replication_status, source)
        if replication_status not in ["Pending", "Failed"]:
            with LOCK:
                statuses[destination]["status"] = status
            queue.task_done()
            continue

        patch = {}
        replication_status = scheduler(status=status["status"], patch=patch, logger=logger)  # type: ignore
        status = {**status, **patch}
        imagereplication_utils.update_imagereplication_status(
            namespace=statuses[destination]["namespace"],
            name=statuses[destination]["name"],
            status=status["status"],
            client=client,
            logger=logger,
        )
        logger.info("Scheduler: %s Source: %s", replication_status, source)
        if replication_status != "Scheduled":
            with LOCK:
                statuses[destination]["status"] = status
            queue.task_done()
            continue

        patch = {}
        replication_status = codebuild_runner(  # type: ignore
            spec=cast(kopf.Spec, spec),
            status=status["status"],
            patch=cast(kopf.Patch, patch),
            logger=logger,
        )
        status = {**status, **patch}
        imagereplication_utils.update_imagereplication_status(
            namespace=statuses[destination]["namespace"],
            name=statuses[destination]["name"],
            status=status["status"],
            client=client,
            logger=logger,
        )
        logger.info("CodeBuild Runner: %s Source: %s", replication_status, source)
        if replication_status != "Replicating":
            if _needs_rescheduling(status=cast(kopf.Status, patch)):
                queue.put(destination)

            with LOCK:
                statuses[destination]["status"] = status
            queue.task_done()
            continue

        patch = {}
        replication_status = codebuild_monitor(status=status["status"], patch=patch, logger=logger)  # type: ignore
        status = {**status, **patch}
        imagereplication_utils.update_imagereplication_status(
            namespace=statuses[destination]["namespace"],
            name=statuses[destination]["name"],
            status=status["status"],
            client=client,
            logger=logger,
        )
        logger.info("CodeBuild Monitor: %s Source: %s", replication_status, source)
        while replication_status == "IN_PROGRESS":
            time.sleep(20)
            patch = {}
            replication_status = codebuild_monitor(status=status["status"], patch=patch, logger=logger)  # type: ignore
            status = {**status, **patch}

        logger.info("CodeBuild Monitor: %s Source: %s", replication_checker, source)
        if replication_status != "SUCCEEDED" and _needs_rescheduling(status=status["status"]):
            queue.put(destination)

        with LOCK:
            statuses[destination]["status"] = status
        imagereplication_utils.update_imagereplication_status(
            namespace=statuses[destination]["namespace"],
            name=statuses[destination]["name"],
            status=status["status"],
            client=client,
            logger=logger,
        )
        queue.task_done()
        continue