diff --git a/vulnerabilities/api_v3.py b/vulnerabilities/api_v3.py index 12f10ed1c..f262b1828 100644 --- a/vulnerabilities/api_v3.py +++ b/vulnerabilities/api_v3.py @@ -23,6 +23,7 @@ from rest_framework.reverse import reverse from rest_framework.throttling import AnonRateThrottle +from vulnerabilities.models import SSVC from vulnerabilities.models import AdvisoryAlias from vulnerabilities.models import AdvisoryReference from vulnerabilities.models import AdvisorySet @@ -30,13 +31,11 @@ from vulnerabilities.models import AdvisorySeverity from vulnerabilities.models import AdvisoryV2 from vulnerabilities.models import AdvisoryWeakness -from vulnerabilities.models import Group from vulnerabilities.models import GroupedAdvisory from vulnerabilities.models import ImpactedPackageAffecting from vulnerabilities.models import PackageV2 from vulnerabilities.throttling import PermissionBasedUserRateThrottle from vulnerabilities.utils import TYPES_WITH_MULTIPLE_IMPORTERS -from vulnerabilities.utils import get_advisories_from_groups from vulnerabilities.utils import merge_and_save_grouped_advisories @@ -48,6 +47,7 @@ class PackageQuerySerializer(serializers.Serializer): ) details = serializers.BooleanField(default=False) ignore_qualifiers_subpath = serializers.BooleanField(default=False) + max_advisories = serializers.IntegerField(default=100, min_value=1, max_value=10000) def validate(self, data): if not data["purls"]: @@ -113,7 +113,6 @@ class AdvisoryV3Serializer(serializers.ModelSerializer): weaknesses = AdvisoryWeaknessSerializer(many=True) references = AdvisoryReferenceSerializer(many=True) severities = AdvisorySeveritySerializer(many=True) - advisory_id = serializers.CharField(source="avid", read_only=True) related_ssvc_trees = serializers.SerializerMethodField() def get_related_ssvc_trees(self, obj): @@ -142,7 +141,7 @@ def get_related_ssvc_trees(self, obj): class Meta: model = AdvisoryV2 fields = [ - "advisory_id", + "avid", "url", "aliases", "summary", @@ -226,13 +225,18 @@ def get_affected_by_vulnerabilities(self, package): result = [] for adv in advisories: - fixed = impact_map.get(adv["avid"]) - adv.pop("avid", None) + fixed = impact_map.get(adv["avid"]) or [] + resource_url = None + + if request := self.context.get("request", None): + resource_url = adv.pop("resource_url", None) + resource_url = request.build_absolute_uri(location=resource_url) result.append( { **adv, "fixed_by_packages": fixed, + "resource_url": resource_url, } ) @@ -246,9 +250,20 @@ def get_affected_by_vulnerabilities(self, package): advisories_ids = advisories_qs.only("id") advisories_ids = list(advisories_ids[:101]) - if len(advisories_ids) > 100: + if len(advisories_ids) > self.context.get("max_advisories", 100): return None + advisories_qs = advisories_qs.prefetch_related( + "aliases", + Prefetch( + "related_ssvcs", + queryset=SSVC.objects.select_related("source_advisory") + .only("id", "decision", "options", "vector", "source_advisory__url") + .distinct("source_advisory__url"), + to_attr="prefetched_ssvc_trees", + ), + ) + advisory_by_avid = {adv.avid: adv for adv in advisories_qs} avids = advisory_by_avid.keys() @@ -264,18 +279,35 @@ def get_affected_by_vulnerabilities(self, package): for advisory in advisories_qs: impact = impact_by_avid.get(advisory.avid) - if not impact: - continue + fixed_by_packages = [] + if impact: + fixed_by_packages = [pkg.purl for pkg in impact.fixed_by_packages.all()] + + resource_url = None + + if request := self.context.get("request", None): + resource_url = request.build_absolute_uri(location=advisory.get_absolute_url()) result.append( { + "avid": advisory.avid, "advisory_id": advisory.advisory_id.split("/")[-1], "aliases": [alias.alias for alias in advisory.aliases.all()], "summary": advisory.summary, "severity": advisory.weighted_severity, "exploitability": advisory.exploitability, "risk_score": advisory.risk_score, - "fixed_by_packages": [pkg.purl for pkg in impact.fixed_by_packages.all()], + "fixed_by_packages": fixed_by_packages, + "resource_url": resource_url, + "ssvc_trees": [ + { + "vector": ssvc.vector, + "decision": ssvc.decision, + "options": ssvc.options, + "source_url": ssvc.source_advisory.url, + } + for ssvc in advisory.prefetched_ssvc_trees + ], } ) @@ -295,8 +327,19 @@ def get_affected_by_vulnerabilities(self, package): def get_fixing_vulnerabilities(self, package): advisories = self.context["fixing_advisory_map"].get(package.id, []) - if advisories: - return advisories + results = [] + for advisory in advisories: + if request := self.context.get("request", None): + resource_url = request.build_absolute_uri(location=advisory["resource_url"]) + results.append( + { + "advisory_id": advisory["advisory_id"], + "resource_url": resource_url, + "avid": advisory["avid"], + } + ) + if results: + return results advisories_qs = AdvisoryV2.objects.latest_fixed_by_advisories_for_purl(package.package_url) @@ -304,15 +347,19 @@ def get_fixing_vulnerabilities(self, package): advisories_ids = advisories_qs.only("id") advisories_ids = list(advisories_ids[:101]) - if len(advisories_ids) > 100: + if len(advisories_ids) > self.context.get("max_advisories", 100): return None results = [] for advisory in advisories_qs: + if request := self.context.get("request", None): + resource_url = request.build_absolute_uri(location=advisory.get_absolute_url()) results.append( { "advisory_id": advisory.advisory_id.split("/")[-1], + "resource_url": resource_url, + "avid": advisory.avid, } ) return results @@ -334,9 +381,16 @@ def return_fixing_advisories_data(self, advisories): result = [] for advisory in advisories: assert isinstance(advisory, GroupedAdvisory) + resource_url = None + if request := self.context.get("request", None): + resource_url = request.build_absolute_uri( + location=advisory.advisory.get_absolute_url() + ) result.append( { "advisory_id": advisory.identifier, + "resource_url": resource_url, + "avid": advisory.advisory.avid, } ) @@ -357,21 +411,28 @@ def return_advisories_data(self, package, advisories_qs, advisories): result = [] for advisory in advisories: assert isinstance(advisory, GroupedAdvisory) + resource_url = None + fixed_by_packages = [] + if request := self.context.get("request", None): + resource_url = request.build_absolute_uri( + location=advisory.advisory.get_absolute_url() + ) impact = impact_by_avid.get(advisory.advisory.avid) - if not impact: - continue + if impact: + fixed_by_packages = list(set([pkg.purl for pkg in impact.fixed_by_packages.all()])) result.append( { "advisory_id": advisory.identifier, + "avid": advisory.advisory.avid, "aliases": [alias.alias for alias in advisory.aliases], "weighted_severity": advisory.weighted_severity, "exploitability": advisory.exploitability, "risk_score": advisory.risk_score, "summary": advisory.advisory.summary, - "fixed_by_packages": list( - set([pkg.purl for pkg in impact.fixed_by_packages.all()]) - ), + "fixed_by_packages": fixed_by_packages, + "resource_url": resource_url, + "ssvc_trees": advisory.ssvc_trees, } ) @@ -400,6 +461,7 @@ def create(self, request, *args, **kwargs): purls = serializer.validated_data["purls"] details = serializer.validated_data["details"] ignore_qualifiers_subpath = serializer.validated_data["ignore_qualifiers_subpath"] + max_advisories = serializer.validated_data["max_advisories"] if not purls: impacted = ImpactedPackageAffecting.objects.filter(package_id=OuterRef("id")) @@ -464,6 +526,7 @@ def create(self, request, *args, **kwargs): "advisory_map": affected_advisory_map, "impact_map": impact_map, "fixing_advisory_map": fixing_advisory_map, + "max_advisories": max_advisories, }, ) return self.get_paginated_response(serializer.data) @@ -482,7 +545,7 @@ def get_fixed_by_packages(self, obj): class Meta: model = AdvisoryV2 fields = [ - "advisory_id", + "avid", "url", "aliases", "summary", @@ -576,7 +639,27 @@ def get_affected_advisories_bulk(packages): relation_type="affecting", ) .select_related("primary_advisory") - .prefetch_related(Prefetch("aliases", queryset=AdvisoryAlias.objects.only("alias"))) + .prefetch_related( + Prefetch("aliases", queryset=AdvisoryAlias.objects.only("alias")), + Prefetch( + "members", + queryset=AdvisorySetMember.objects.select_related("advisory").prefetch_related( + Prefetch( + "advisory__related_ssvcs", + queryset=SSVC.objects.select_related("source_advisory") + .only( + "id", + "options", + "decision", + "vector", + "source_advisory__url", + ) + .distinct("source_advisory__url"), + to_attr="prefetched_ssvc_trees", + ) + ), + ), + ) .annotate( max_severity=Max( "members__advisory__weighted_severity", @@ -620,6 +703,22 @@ def get_affected_advisories_bulk(packages): identifier = primary.advisory_id.split("/")[-1] aliases = [a for a in adv._aliases_cache if a != identifier] + all_ssvc = [] + + for member in adv.members.all(): + all_ssvc.extend(member.advisory.prefetched_ssvc_trees) + + ssvcs = [] + + for ssvc in all_ssvc: + ssvcs.append( + { + "vector": ssvc.vector, + "decision": ssvc.decision, + "options": ssvc.options, + "source_url": ssvc.source_advisory.url, + } + ) grouped.append( { @@ -630,6 +729,8 @@ def get_affected_advisories_bulk(packages): "exploitability": exploitability, "risk_score": risk_score, "summary": primary.summary, + "resource_url": primary.get_absolute_url(), + "ssvc_trees": ssvcs, } ) @@ -690,7 +791,7 @@ def get_fixing_advisories_bulk(packages): package_map = defaultdict(list) for adv in advisory_sets: - package_map[adv.package_id].append(adv.primary_advisory.advisory_id) + package_map[adv.package_id].append(adv.primary_advisory) result = {} @@ -698,8 +799,14 @@ def get_fixing_advisories_bulk(packages): groups = package_map.get(package.id, []) grouped = [] - for adv_id in groups: - grouped.append({"advisory_id": adv_id.split("/")[-1]}) + for advisory in groups: + grouped.append( + { + "advisory_id": advisory.advisory_id.split("/")[-1], + "resource_url": advisory.get_absolute_url(), + "avid": advisory.avid, + } + ) result[package.id] = grouped diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index d7d7f5832..cfe3b879a 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -19,6 +19,7 @@ from itertools import groupby from operator import attrgetter from traceback import format_exc as traceback_format_exc +from typing import Dict from typing import List from typing import NamedTuple from typing import Optional @@ -3810,3 +3811,4 @@ class GroupedAdvisory(NamedTuple): weighted_severity: Optional[float] exploitability: Optional[float] risk_score: Optional[float] + ssvc_trees: List[Dict] diff --git a/vulnerabilities/utils.py b/vulnerabilities/utils.py index e8a13821e..1bee5869d 100644 --- a/vulnerabilities/utils.py +++ b/vulnerabilities/utils.py @@ -35,6 +35,7 @@ import urllib3 from cwe2.database import Database from cwe2.database import InvalidCWEError +from django.db.models import Prefetch from packageurl import PackageURL from packageurl.contrib.django.utils import without_empty_values from univers.version_range import RANGE_CLASS_BY_SCHEMES @@ -959,10 +960,12 @@ def get_merged_identifier_groups(advisories): return final_groups -def get_advisories_from_groups(groups): +def get_advisories_from_groups(groups, include_ssvc_trees=False): """ Return a list of advisories from the merged groups of advisories. """ + from vulnerabilities.models import SSVC + from vulnerabilities.models import AdvisoryV2 from vulnerabilities.models import Group from vulnerabilities.models import GroupedAdvisory @@ -996,6 +999,35 @@ def get_advisories_from_groups(groups): identifier = group.primary.advisory_id.split("/")[-1] filtered_aliases = [alias for alias in group.aliases if alias.alias != identifier] + ssvc_trees = [] + + if include_ssvc_trees: + + all_advs = [group.primary] + list(group.secondaries) + + advisories_qs = AdvisoryV2.objects.filter( + id__in=[adv.id for adv in all_advs] + ).prefetch_related( + Prefetch( + "related_ssvcs", + queryset=SSVC.objects.select_related("source_advisory") + .only("id", "vector", "decision", "options", "source_advisory__url") + .distinct(), + to_attr="ssvc_trees", + ) + ) + + ssvc_trees = [ + { + "vector": ssvc.vector, + "decision": ssvc.decision, + "options": ssvc.options, + "url": ssvc.source_advisory.url if ssvc.source_advisory else None, + } + for adv in advisories_qs + for ssvc in adv.ssvc_trees + ] + advisories.append( GroupedAdvisory( aliases=filtered_aliases, @@ -1004,6 +1036,7 @@ def get_advisories_from_groups(groups): weighted_severity=weighted_severity, exploitability=exploitability, risk_score=risk_score, + ssvc_trees=ssvc_trees or [], ) )