def _scan_vulnerabilities()

in setup/setup-ci/security-scanner/amlsecscan.py [0:0]


def _scan_vulnerabilities(telemetry):

    start_time = time.time()
    _send_health(telemetry, "ScanVulnerabilities", "Started")

    try:
        shutil.rmtree(f"{_config_folder_path}/anaconda", ignore_errors=True)
        for env_name in (
            entry.name for entry in os.scandir("/anaconda/envs") if entry.is_dir()
        ):
            _logger.info(
                f"Saving pip freeze of conda environment {env_name} to {_config_folder_path}/anaconda/{env_name}/requirements.txt"
            )
            os.makedirs(f"{_config_folder_path}/anaconda/{env_name}", exist_ok=True)
            _run(
                f"/anaconda/envs/{env_name}/bin/python3 -m pip freeze > {_config_folder_path}/anaconda/{env_name}/requirements.txt"
            )

        _logger.info("Running Trivy scan")
        _run(
            f"/usr/local/bin/trivy filesystem --format json --output {_config_folder_path}/trivy.json --security-checks vuln --severity HIGH,CRITICAL --ignore-unfixed /"
        )

        findings_os, findings_python = _parse_trivy_results(
            f"{_config_folder_path}/trivy.json"
        )

        _send_assessment(
            telemetry,
            "OsVulnerabilities",
            len(findings_os),
            {"findings": _filter_trivy_results(findings_os)}
            if len(findings_os) > 0
            else None,
        )
        _send_assessment(
            telemetry,
            "PythonVulnerabilities",
            len(findings_python),
            {"findings": _filter_trivy_results(findings_python)}
            if len(findings_python) > 0
            else None,
        )
        _send_health(
            telemetry,
            "ScanVulnerabilities",
            "Succeeded",
            {"elapsedTimeInS": time.time() - start_time},
        )
        return True

    except subprocess.CalledProcessError as e:
        _send_health(
            telemetry,
            "ScanVulnerabilities",
            "Failed",
            {
                "error": str(e),
                "stdout": e.stdout,
                "stderr": e.stderr,
                "elapsedTimeInS": time.time() - start_time,
            },
        )
        return False
    except Exception as e:
        _logger.exception(f"Error: {e}")
        _send_health(
            telemetry,
            "ScanVulnerabilities",
            "Failed",
            {"error": str(e), "elapsedTimeInS": time.time() - start_time},
        )
        return False