def find_latest_compatible_version()

in detection_rules/integrations.py [0:0]


def find_latest_compatible_version(package: str, integration: str,
                                   rule_stack_version: Version,
                                   packages_manifest: dict) -> Union[None, Tuple[str, str]]:
    """Finds least compatible version for specified integration based on stack version supplied."""

    if not package:
        raise ValueError("Package must be specified")

    package_manifest = packages_manifest.get(package)
    if package_manifest is None:
        raise ValueError(f"Package {package} not found in manifest.")

    # Converts the dict keys (version numbers) to Version objects for proper sorting (descending)
    integration_manifests = sorted(package_manifest.items(), key=lambda x: Version.parse(x[0]), reverse=True)
    notice = ""

    for version, manifest in integration_manifests:
        kibana_conditions = manifest.get("conditions", {}).get("kibana", {})
        version_requirement = kibana_conditions.get("version")
        if not version_requirement:
            raise ValueError(f"Manifest for {package}:{integration} version {version} is missing conditions.")

        compatible_versions = re.sub(r"\>|\<|\=|\^|\~", "", version_requirement).split(" || ")

        if not compatible_versions:
            raise ValueError(f"Manifest for {package}:{integration} version {version} is missing compatible versions")

        highest_compatible_version = Version.parse(max(compatible_versions,
                                                       key=lambda x: Version.parse(x)))

        if highest_compatible_version > rule_stack_version:
            # generate notice message that a later integration version is available
            integration = f" {integration.strip()}" if integration else ""

            notice = (f"There is a new integration {package}{integration} version {version} available!",
                      f"Update the rule min_stack version from {rule_stack_version} to "
                      f"{highest_compatible_version} if using new features in this latest version.")

        if highest_compatible_version.major == rule_stack_version.major:
            return version, notice

        else:
            # Check for rules that cross majors
            for compatible_version in compatible_versions:
                if Version.parse(compatible_version) <= rule_stack_version:
                    return version, notice

    raise ValueError(f"no compatible version for integration {package}:{integration}")