def check_concurrency()

in gha_scanner/checks.py [0:0]


def check_concurrency(wdata):
    log.debug("Checking workflow for max concurrency")
    for job in wdata["jobs"]:
        if "matrix" in wdata["jobs"][job].get("strategy", {}):
            concurrency = 1
            for options in wdata["jobs"][job]["strategy"]["matrix"]:
                concurrency *= len(wdata["jobs"][job]["strategy"]["matrix"][options])
            if (
                concurrency >= GHA_MAX_CONCURRENCY
                and "max-parallel" not in wdata["jobs"][job]["strategy"]
            ):
                log.debug("max-concurrency check Failed")
                return False
            else:
                log.debug("max-concurrency check Passed")
                return True
        else:
            return True