From 6da641ff6c4f91a3a233cd81949f33e8cfc40f16 Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Sun, 7 Jun 2026 17:22:38 +0200 Subject: [PATCH 1/4] refactor(product): extract Product/Product_Line/Product_API_Scan_Configuration into dojo/product/ Phase 1 of module reorg per AGENTS.md. Move Product, Product_Line, Product_API_Scan_Configuration + admin registrations into dojo/product/{models,admin}.py. Cross-module FKs use string refs to avoid circular imports. Product_Type re-export now pure backward-compat (F401). No migration change. --- dojo/models.py | 295 +------------------------------------ dojo/product/__init__.py | 1 + dojo/product/admin.py | 21 +++ dojo/product/models.py | 303 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 331 insertions(+), 289 deletions(-) create mode 100644 dojo/product/admin.py create mode 100644 dojo/product/models.py diff --git a/dojo/models.py b/dojo/models.py index 00d53ad3ba5..5f709f4b543 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -7,7 +7,6 @@ import warnings from contextlib import suppress from datetime import datetime, timedelta -from decimal import Decimal from pathlib import Path from typing import TYPE_CHECKING from urllib.parse import urlparse @@ -746,7 +745,12 @@ def clean(self): raise ValidationError(msg) -from dojo.product_type.models import Product_Type # noqa: E402 -- re-export; mid-file as Product FK uses it below +from dojo.product.models import ( # noqa: E402 -- re-export; class-body FKs below reference these + Product, + Product_API_Scan_Configuration, # noqa: F401 -- re-export + Product_Line, # noqa: F401 -- re-export +) +from dojo.product_type.models import Product_Type # noqa: E402, F401 -- re-export from dojo.test.models import ( # noqa: E402 -- re-export; class-body FKs below reference these IMPORT_ACTIONS, # noqa: F401 -- re-export IMPORT_CLOSED_FINDING, # noqa: F401 -- re-export @@ -760,14 +764,6 @@ def clean(self): ) -class Product_Line(models.Model): - name = models.CharField(max_length=300) - description = models.CharField(max_length=2000) - - def __str__(self): - return self.name - - class Report_Type(models.Model): name = models.CharField(max_length=255) @@ -957,257 +953,6 @@ def get_summary(self): return f"{self.name} - Critical: {self.critical}, High: {self.high}, Medium: {self.medium}, Low: {self.low}" -class Product(BaseModel): - WEB_PLATFORM = "web" - IOT = "iot" - DESKTOP_PLATFORM = "desktop" - MOBILE_PLATFORM = "mobile" - WEB_SERVICE_PLATFORM = "web service" - PLATFORM_CHOICES = ( - (WEB_SERVICE_PLATFORM, _("API")), - (DESKTOP_PLATFORM, _("Desktop")), - (IOT, _("Internet of Things")), - (MOBILE_PLATFORM, _("Mobile")), - (WEB_PLATFORM, _("Web")), - ) - - CONSTRUCTION = "construction" - PRODUCTION = "production" - RETIREMENT = "retirement" - LIFECYCLE_CHOICES = ( - (CONSTRUCTION, _("Construction")), - (PRODUCTION, _("Production")), - (RETIREMENT, _("Retirement")), - ) - - THIRD_PARTY_LIBRARY_ORIGIN = "third party library" - PURCHASED_ORIGIN = "purchased" - CONTRACTOR_ORIGIN = "contractor" - INTERNALLY_DEVELOPED_ORIGIN = "internal" - OPEN_SOURCE_ORIGIN = "open source" - OUTSOURCED_ORIGIN = "outsourced" - ORIGIN_CHOICES = ( - (THIRD_PARTY_LIBRARY_ORIGIN, _("Third Party Library")), - (PURCHASED_ORIGIN, _("Purchased")), - (CONTRACTOR_ORIGIN, _("Contractor Developed")), - (INTERNALLY_DEVELOPED_ORIGIN, _("Internally Developed")), - (OPEN_SOURCE_ORIGIN, _("Open Source")), - (OUTSOURCED_ORIGIN, _("Outsourced")), - ) - - VERY_HIGH_CRITICALITY = "very high" - HIGH_CRITICALITY = "high" - MEDIUM_CRITICALITY = "medium" - LOW_CRITICALITY = "low" - VERY_LOW_CRITICALITY = "very low" - NONE_CRITICALITY = "none" - BUSINESS_CRITICALITY_CHOICES = ( - (VERY_HIGH_CRITICALITY, _("Very High")), - (HIGH_CRITICALITY, _("High")), - (MEDIUM_CRITICALITY, _("Medium")), - (LOW_CRITICALITY, _("Low")), - (VERY_LOW_CRITICALITY, _("Very Low")), - (NONE_CRITICALITY, _("None")), - ) - - name = models.CharField(max_length=255, unique=True) - description = models.CharField(max_length=4000) - - product_manager = models.ForeignKey(Dojo_User, null=True, blank=True, - related_name="product_manager", on_delete=models.RESTRICT) - technical_contact = models.ForeignKey(Dojo_User, null=True, blank=True, - related_name="technical_contact", on_delete=models.RESTRICT) - team_manager = models.ForeignKey(Dojo_User, null=True, blank=True, - related_name="team_manager", on_delete=models.RESTRICT) - - prod_type = models.ForeignKey(Product_Type, related_name="prod_type", - null=False, blank=False, on_delete=models.CASCADE) - sla_configuration = models.ForeignKey(SLA_Configuration, - related_name="sla_config", - null=False, - blank=False, - default=1, - on_delete=models.RESTRICT) - tid = models.IntegerField(default=0, editable=False) - authorized_users = models.ManyToManyField(Dojo_User, related_name="authorized_products", blank=True) - prod_numeric_grade = models.IntegerField(null=True, blank=True) - - # Metadata - business_criticality = models.CharField(max_length=9, choices=BUSINESS_CRITICALITY_CHOICES, blank=True, null=True) - platform = models.CharField(max_length=11, choices=PLATFORM_CHOICES, blank=True, null=True) - lifecycle = models.CharField(max_length=12, choices=LIFECYCLE_CHOICES, blank=True, null=True) - origin = models.CharField(max_length=19, choices=ORIGIN_CHOICES, blank=True, null=True) - user_records = models.PositiveIntegerField(blank=True, null=True, help_text=_("Estimate the number of user records within the application.")) - revenue = models.DecimalField(max_digits=15, decimal_places=2, blank=True, null=True, validators=[MinValueValidator(Decimal("0.00"))], help_text=_("Estimate the application's revenue.")) - external_audience = models.BooleanField(default=False, help_text=_("Specify if the application is used by people outside the organization.")) - internet_accessible = models.BooleanField(default=False, help_text=_("Specify if the application is accessible from the public internet.")) - regulations = models.ManyToManyField(Regulation, blank=True) - - tags = TagField(blank=True, force_lowercase=True, help_text=_("Add tags that help describe this product. Choose from the list or add new tags. Press Enter key to add.")) - enable_product_tag_inheritance = models.BooleanField( - default=False, - blank=False, - verbose_name=_("Enable Product Tag Inheritance"), - help_text=_("Enables product tag inheritance. Any tags added on a product will automatically be added to all Engagements, Tests, and Findings")) - enable_simple_risk_acceptance = models.BooleanField(default=False, help_text=_("Allows simple risk acceptance by checking/unchecking a checkbox.")) - enable_full_risk_acceptance = models.BooleanField(default=True, help_text=_("Allows full risk acceptance using a risk acceptance form, expiration date, uploaded proof, etc.")) - - disable_sla_breach_notifications = models.BooleanField( - default=False, - blank=False, - verbose_name=_("Disable SLA breach notifications"), - help_text=_("Disable SLA breach notifications if configured in the global settings")) - async_updating = models.BooleanField(default=False, - help_text=_("Findings under this Product or SLA configuration are asynchronously being updated")) - - class Meta: - ordering = ("name",) - - def __str__(self): - return self.name - - def save(self, *args, **kwargs): - # get the product's sla config before saving (if this is an existing product) - initial_sla_config = None - if self.pk is not None: - initial_sla_config = getattr(Product.objects.get(pk=self.pk), "sla_configuration", None) - # if initial sla config exists and async finding update is already running, revert sla config before saving - if initial_sla_config and self.async_updating: - self.sla_configuration = initial_sla_config - - super().save(*args, **kwargs) - - # if the initial sla config exists and async finding update is not running - if initial_sla_config is not None and not self.async_updating: - # get the new sla config from the saved product - new_sla_config = getattr(self, "sla_configuration", None) - # if the sla config has changed, update finding sla expiration dates within this product - if new_sla_config and (initial_sla_config != new_sla_config): - # set the async updating flag to true for this product - self.async_updating = True - super().save(*args, **kwargs) - # set the async updating flag to true for the sla config assigned to this product - sla_config = getattr(self, "sla_configuration", None) - if sla_config: - sla_config.async_updating = True - super(SLA_Configuration, sla_config).save() - # launch the async task to update all finding sla expiration dates - from dojo.sla_config.helpers import async_update_sla_expiration_dates_sla_config_sync # noqa: I001, PLC0415 circular import - from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import - - dojo_dispatch_task( - async_update_sla_expiration_dates_sla_config_sync, - sla_config.id, - [self.id], - ) - # The async task refetches and resets async_updating on its own copies. - # Mirror that on this in-memory product and the in-memory sla_config so a - # subsequent save() on either does not trigger their lock-revert paths. - self.async_updating = False - if sla_config: - sla_config.async_updating = False - - def get_absolute_url(self): - return reverse("view_product", args=[str(self.id)]) - - @cached_property - def findings_count(self): - try: - # if prefetched, it's already there - return self.active_finding_count - except AttributeError: - # ideally it's always prefetched and we can remove this code in the future - self.active_finding_count = Finding.objects.filter(active=True, - test__engagement__product=self).count() - return self.active_finding_count - - @cached_property - def findings_active_verified_count(self): - try: - # if prefetched, it's already there - return self.active_verified_finding_count - except AttributeError: - # ideally it's always prefetched and we can remove this code in the future - self.active_verified_finding_count = Finding.objects.filter(active=True, - verified=True, - test__engagement__product=self).count() - return self.active_verified_finding_count - - # TODO: Delete this after the move to Locations - @cached_property - def endpoint_host_count(self): - # active_endpoints is (should be) prefetched - endpoints = getattr(self, "active_endpoints", None) - - hosts = [] - for e in endpoints: - if e.host in hosts: - continue - hosts.append(e.host) - - return len(hosts) - - # TODO: Delete this after the move to Locations - @cached_property - def endpoint_count(self): - # active_endpoints is (should be) prefetched - endpoints = getattr(self, "active_endpoints", None) - if endpoints: - return len(self.active_endpoints) - return 0 - - def open_findings(self, start_date=None, end_date=None): - if start_date is None or end_date is None: - return {} - - from dojo.utils import get_system_setting # noqa: PLC0415 circular import - findings = Finding.objects.filter(test__engagement__product=self, - mitigated__isnull=True, - false_p=False, - duplicate=False, - out_of_scope=False, - date__range=[start_date, - end_date]) - - if get_system_setting("enforce_verified_status", True) or get_system_setting("enforce_verified_status_metrics", True): - findings = findings.filter(verified=True) - - critical = findings.filter(severity="Critical").count() - high = findings.filter(severity="High").count() - medium = findings.filter(severity="Medium").count() - low = findings.filter(severity="Low").count() - - return {"Critical": critical, - "High": high, - "Medium": medium, - "Low": low, - "Total": (critical + high + medium + low)} - - def get_breadcrumbs(self): - return [{"title": str(self), - "url": reverse("view_product", args=(self.id,))}] - - @property - def get_product_type(self): - return self.prod_type if self.prod_type is not None else "unknown" - - # only used in APIv2 serializers.py, should be deprecated or at least prefetched - def open_findings_list(self): - findings = Finding.objects.filter(test__engagement__product=self, active=True).values_list("id", flat=True) - return list(findings) - - @property - def has_jira_configured(self): - from dojo.jira import services as jira_services # noqa: PLC0415 circular import - return jira_services.has_configured(self) - - def violates_sla(self): - findings = Finding.objects.filter(test__engagement__product=self, - active=True, - sla_expiration_date__lt=timezone.now().date()) - return findings.count() > 0 - - class Tool_Type(models.Model): name = models.CharField(max_length=200) description = models.CharField(max_length=2000, null=True, blank=True) @@ -1248,31 +993,6 @@ def __str__(self): return self.name -class Product_API_Scan_Configuration(models.Model): - product = models.ForeignKey(Product, null=False, blank=False, on_delete=models.CASCADE) - tool_configuration = models.ForeignKey(Tool_Configuration, null=False, blank=False, on_delete=models.CASCADE) - service_key_1 = models.CharField(max_length=200, null=True, blank=True) - service_key_2 = models.CharField(max_length=200, null=True, blank=True) - service_key_3 = models.CharField(max_length=200, null=True, blank=True) - - def __str__(self): - name = self.tool_configuration.name - if self.service_key_1 or self.service_key_2 or self.service_key_3: - name += f" ({self.details})" - return name - - @property - def details(self): - details = "" - if self.service_key_1: - details += f"{self.service_key_1}" - if self.service_key_2: - details += f" | {self.service_key_2}" - if self.service_key_3: - details += f" | {self.service_key_3}" - return details - - # declare form here as we can't import forms.py due to circular imports not even locally class ToolConfigForm_Admin(forms.ModelForm): password = forms.CharField(widget=forms.PasswordInput, required=False) @@ -3948,7 +3668,6 @@ def __str__(self): admin.site.register(Endpoint_Params) admin.site.register(Endpoint_Status) admin.site.register(Endpoint) -admin.site.register(Product) admin.site.register(UserContactInfo) admin.site.register(Notes) admin.site.register(Note_Type) @@ -3986,10 +3705,8 @@ def __str__(self): admin.site.register(Contact) admin.site.register(NoteHistory) -admin.site.register(Product_Line) admin.site.register(Report_Type) admin.site.register(DojoMeta) -admin.site.register(Product_API_Scan_Configuration) admin.site.register(Development_Environment) admin.site.register(Finding_Template) admin.site.register(Vulnerability_Id) diff --git a/dojo/product/__init__.py b/dojo/product/__init__.py index e69de29bb2d..df5e047d856 100644 --- a/dojo/product/__init__.py +++ b/dojo/product/__init__.py @@ -0,0 +1 @@ +import dojo.product.admin # noqa: F401 diff --git a/dojo/product/admin.py b/dojo/product/admin.py new file mode 100644 index 00000000000..e6a32855567 --- /dev/null +++ b/dojo/product/admin.py @@ -0,0 +1,21 @@ +from django.contrib import admin + +from dojo.product.models import Product, Product_API_Scan_Configuration, Product_Line + + +@admin.register(Product_Line) +class ProductLineAdmin(admin.ModelAdmin): + + """Admin support for the Product_Line model.""" + + +@admin.register(Product) +class ProductAdmin(admin.ModelAdmin): + + """Admin support for the Product model.""" + + +@admin.register(Product_API_Scan_Configuration) +class ProductAPIScanConfigurationAdmin(admin.ModelAdmin): + + """Admin support for the Product_API_Scan_Configuration model.""" diff --git a/dojo/product/models.py b/dojo/product/models.py new file mode 100644 index 00000000000..cd49e157ac4 --- /dev/null +++ b/dojo/product/models.py @@ -0,0 +1,303 @@ +from decimal import Decimal + +from django.core.validators import MinValueValidator +from django.db import models +from django.urls import reverse +from django.utils import timezone +from django.utils.functional import cached_property +from django.utils.translation import gettext as _ +from tagulous.models import TagField + +from dojo.base_models.base import BaseModel + + +class Product_Line(models.Model): + name = models.CharField(max_length=300) + description = models.CharField(max_length=2000) + + def __str__(self): + return self.name + + +class Product(BaseModel): + WEB_PLATFORM = "web" + IOT = "iot" + DESKTOP_PLATFORM = "desktop" + MOBILE_PLATFORM = "mobile" + WEB_SERVICE_PLATFORM = "web service" + PLATFORM_CHOICES = ( + (WEB_SERVICE_PLATFORM, _("API")), + (DESKTOP_PLATFORM, _("Desktop")), + (IOT, _("Internet of Things")), + (MOBILE_PLATFORM, _("Mobile")), + (WEB_PLATFORM, _("Web")), + ) + + CONSTRUCTION = "construction" + PRODUCTION = "production" + RETIREMENT = "retirement" + LIFECYCLE_CHOICES = ( + (CONSTRUCTION, _("Construction")), + (PRODUCTION, _("Production")), + (RETIREMENT, _("Retirement")), + ) + + THIRD_PARTY_LIBRARY_ORIGIN = "third party library" + PURCHASED_ORIGIN = "purchased" + CONTRACTOR_ORIGIN = "contractor" + INTERNALLY_DEVELOPED_ORIGIN = "internal" + OPEN_SOURCE_ORIGIN = "open source" + OUTSOURCED_ORIGIN = "outsourced" + ORIGIN_CHOICES = ( + (THIRD_PARTY_LIBRARY_ORIGIN, _("Third Party Library")), + (PURCHASED_ORIGIN, _("Purchased")), + (CONTRACTOR_ORIGIN, _("Contractor Developed")), + (INTERNALLY_DEVELOPED_ORIGIN, _("Internally Developed")), + (OPEN_SOURCE_ORIGIN, _("Open Source")), + (OUTSOURCED_ORIGIN, _("Outsourced")), + ) + + VERY_HIGH_CRITICALITY = "very high" + HIGH_CRITICALITY = "high" + MEDIUM_CRITICALITY = "medium" + LOW_CRITICALITY = "low" + VERY_LOW_CRITICALITY = "very low" + NONE_CRITICALITY = "none" + BUSINESS_CRITICALITY_CHOICES = ( + (VERY_HIGH_CRITICALITY, _("Very High")), + (HIGH_CRITICALITY, _("High")), + (MEDIUM_CRITICALITY, _("Medium")), + (LOW_CRITICALITY, _("Low")), + (VERY_LOW_CRITICALITY, _("Very Low")), + (NONE_CRITICALITY, _("None")), + ) + + name = models.CharField(max_length=255, unique=True) + description = models.CharField(max_length=4000) + + product_manager = models.ForeignKey("dojo.Dojo_User", null=True, blank=True, + related_name="product_manager", on_delete=models.RESTRICT) + technical_contact = models.ForeignKey("dojo.Dojo_User", null=True, blank=True, + related_name="technical_contact", on_delete=models.RESTRICT) + team_manager = models.ForeignKey("dojo.Dojo_User", null=True, blank=True, + related_name="team_manager", on_delete=models.RESTRICT) + + prod_type = models.ForeignKey("dojo.Product_Type", related_name="prod_type", + null=False, blank=False, on_delete=models.CASCADE) + sla_configuration = models.ForeignKey("dojo.SLA_Configuration", + related_name="sla_config", + null=False, + blank=False, + default=1, + on_delete=models.RESTRICT) + tid = models.IntegerField(default=0, editable=False) + authorized_users = models.ManyToManyField("dojo.Dojo_User", related_name="authorized_products", blank=True) + prod_numeric_grade = models.IntegerField(null=True, blank=True) + + # Metadata + business_criticality = models.CharField(max_length=9, choices=BUSINESS_CRITICALITY_CHOICES, blank=True, null=True) + platform = models.CharField(max_length=11, choices=PLATFORM_CHOICES, blank=True, null=True) + lifecycle = models.CharField(max_length=12, choices=LIFECYCLE_CHOICES, blank=True, null=True) + origin = models.CharField(max_length=19, choices=ORIGIN_CHOICES, blank=True, null=True) + user_records = models.PositiveIntegerField(blank=True, null=True, help_text=_("Estimate the number of user records within the application.")) + revenue = models.DecimalField(max_digits=15, decimal_places=2, blank=True, null=True, validators=[MinValueValidator(Decimal("0.00"))], help_text=_("Estimate the application's revenue.")) + external_audience = models.BooleanField(default=False, help_text=_("Specify if the application is used by people outside the organization.")) + internet_accessible = models.BooleanField(default=False, help_text=_("Specify if the application is accessible from the public internet.")) + regulations = models.ManyToManyField("dojo.Regulation", blank=True) + + tags = TagField(blank=True, force_lowercase=True, help_text=_("Add tags that help describe this product. Choose from the list or add new tags. Press Enter key to add.")) + enable_product_tag_inheritance = models.BooleanField( + default=False, + blank=False, + verbose_name=_("Enable Product Tag Inheritance"), + help_text=_("Enables product tag inheritance. Any tags added on a product will automatically be added to all Engagements, Tests, and Findings")) + enable_simple_risk_acceptance = models.BooleanField(default=False, help_text=_("Allows simple risk acceptance by checking/unchecking a checkbox.")) + enable_full_risk_acceptance = models.BooleanField(default=True, help_text=_("Allows full risk acceptance using a risk acceptance form, expiration date, uploaded proof, etc.")) + + disable_sla_breach_notifications = models.BooleanField( + default=False, + blank=False, + verbose_name=_("Disable SLA breach notifications"), + help_text=_("Disable SLA breach notifications if configured in the global settings")) + async_updating = models.BooleanField(default=False, + help_text=_("Findings under this Product or SLA configuration are asynchronously being updated")) + + class Meta: + ordering = ("name",) + + def __str__(self): + return self.name + + def save(self, *args, **kwargs): + # get the product's sla config before saving (if this is an existing product) + initial_sla_config = None + if self.pk is not None: + initial_sla_config = getattr(Product.objects.get(pk=self.pk), "sla_configuration", None) + # if initial sla config exists and async finding update is already running, revert sla config before saving + if initial_sla_config and self.async_updating: + self.sla_configuration = initial_sla_config + + super().save(*args, **kwargs) + + # if the initial sla config exists and async finding update is not running + if initial_sla_config is not None and not self.async_updating: + # get the new sla config from the saved product + new_sla_config = getattr(self, "sla_configuration", None) + # if the sla config has changed, update finding sla expiration dates within this product + if new_sla_config and (initial_sla_config != new_sla_config): + # set the async updating flag to true for this product + self.async_updating = True + super().save(*args, **kwargs) + # set the async updating flag to true for the sla config assigned to this product + sla_config = getattr(self, "sla_configuration", None) + if sla_config: + from dojo.models import ( # noqa: PLC0415 -- lazy import, avoids circular dependency + SLA_Configuration, + ) + sla_config.async_updating = True + super(SLA_Configuration, sla_config).save() + # launch the async task to update all finding sla expiration dates + from dojo.sla_config.helpers import async_update_sla_expiration_dates_sla_config_sync # noqa: I001, PLC0415 circular import + from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import + + dojo_dispatch_task( + async_update_sla_expiration_dates_sla_config_sync, + sla_config.id, + [self.id], + ) + # The async task refetches and resets async_updating on its own copies. + # Mirror that on this in-memory product and the in-memory sla_config so a + # subsequent save() on either does not trigger their lock-revert paths. + self.async_updating = False + if sla_config: + sla_config.async_updating = False + + def get_absolute_url(self): + return reverse("view_product", args=[str(self.id)]) + + @cached_property + def findings_count(self): + from dojo.models import Finding # noqa: PLC0415 -- lazy import, avoids circular dependency + try: + # if prefetched, it's already there + return self.active_finding_count + except AttributeError: + # ideally it's always prefetched and we can remove this code in the future + self.active_finding_count = Finding.objects.filter(active=True, + test__engagement__product=self).count() + return self.active_finding_count + + @cached_property + def findings_active_verified_count(self): + from dojo.models import Finding # noqa: PLC0415 -- lazy import, avoids circular dependency + try: + # if prefetched, it's already there + return self.active_verified_finding_count + except AttributeError: + # ideally it's always prefetched and we can remove this code in the future + self.active_verified_finding_count = Finding.objects.filter(active=True, + verified=True, + test__engagement__product=self).count() + return self.active_verified_finding_count + + # TODO: Delete this after the move to Locations + @cached_property + def endpoint_host_count(self): + # active_endpoints is (should be) prefetched + endpoints = getattr(self, "active_endpoints", None) + + hosts = [] + for e in endpoints: + if e.host in hosts: + continue + hosts.append(e.host) + + return len(hosts) + + # TODO: Delete this after the move to Locations + @cached_property + def endpoint_count(self): + # active_endpoints is (should be) prefetched + endpoints = getattr(self, "active_endpoints", None) + if endpoints: + return len(self.active_endpoints) + return 0 + + def open_findings(self, start_date=None, end_date=None): + if start_date is None or end_date is None: + return {} + + from dojo.models import Finding # noqa: PLC0415 -- lazy import, avoids circular dependency + from dojo.utils import get_system_setting # noqa: PLC0415 circular import + findings = Finding.objects.filter(test__engagement__product=self, + mitigated__isnull=True, + false_p=False, + duplicate=False, + out_of_scope=False, + date__range=[start_date, + end_date]) + + if get_system_setting("enforce_verified_status", True) or get_system_setting("enforce_verified_status_metrics", True): + findings = findings.filter(verified=True) + + critical = findings.filter(severity="Critical").count() + high = findings.filter(severity="High").count() + medium = findings.filter(severity="Medium").count() + low = findings.filter(severity="Low").count() + + return {"Critical": critical, + "High": high, + "Medium": medium, + "Low": low, + "Total": (critical + high + medium + low)} + + def get_breadcrumbs(self): + return [{"title": str(self), + "url": reverse("view_product", args=(self.id,))}] + + @property + def get_product_type(self): + return self.prod_type if self.prod_type is not None else "unknown" + + # only used in APIv2 serializers.py, should be deprecated or at least prefetched + def open_findings_list(self): + from dojo.models import Finding # noqa: PLC0415 -- lazy import, avoids circular dependency + findings = Finding.objects.filter(test__engagement__product=self, active=True).values_list("id", flat=True) + return list(findings) + + @property + def has_jira_configured(self): + from dojo.jira import services as jira_services # noqa: PLC0415 circular import + return jira_services.has_configured(self) + + def violates_sla(self): + from dojo.models import Finding # noqa: PLC0415 -- lazy import, avoids circular dependency + findings = Finding.objects.filter(test__engagement__product=self, + active=True, + sla_expiration_date__lt=timezone.now().date()) + return findings.count() > 0 + + +class Product_API_Scan_Configuration(models.Model): + product = models.ForeignKey("dojo.Product", null=False, blank=False, on_delete=models.CASCADE) + tool_configuration = models.ForeignKey("dojo.Tool_Configuration", null=False, blank=False, on_delete=models.CASCADE) + service_key_1 = models.CharField(max_length=200, null=True, blank=True) + service_key_2 = models.CharField(max_length=200, null=True, blank=True) + service_key_3 = models.CharField(max_length=200, null=True, blank=True) + + def __str__(self): + name = self.tool_configuration.name + if self.service_key_1 or self.service_key_2 or self.service_key_3: + name += f" ({self.details})" + return name + + @property + def details(self): + details = "" + if self.service_key_1: + details += f"{self.service_key_1}" + if self.service_key_2: + details += f" | {self.service_key_2}" + if self.service_key_3: + details += f" | {self.service_key_3}" + return details From 5b8be7d3223ded5103ac7fcad154d9fbe23d8487 Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Sun, 7 Jun 2026 21:56:52 +0200 Subject: [PATCH 2/4] refactor(product): move forms + UI filters into dojo/product/ui/ [product Phase 3,4] --- dojo/components/views.py | 2 +- dojo/filters.py | 190 +----------------------------- dojo/forms.py | 144 ++--------------------- dojo/product/ui/__init__.py | 0 dojo/product/ui/filters.py | 213 ++++++++++++++++++++++++++++++++++ dojo/product/ui/forms.py | 154 ++++++++++++++++++++++++ dojo/product/views.py | 20 ++-- dojo/product_type/ui/views.py | 2 +- 8 files changed, 392 insertions(+), 333 deletions(-) create mode 100644 dojo/product/ui/__init__.py create mode 100644 dojo/product/ui/filters.py create mode 100644 dojo/product/ui/forms.py diff --git a/dojo/components/views.py b/dojo/components/views.py index 28e6f720ea8..96f6bcbf4a0 100644 --- a/dojo/components/views.py +++ b/dojo/components/views.py @@ -5,8 +5,8 @@ from django.shortcuts import render from dojo.components.sql_group_concat import Sql_GroupConcat -from dojo.filters import ComponentFilter, ComponentFilterWithoutObjectLookups from dojo.finding.queries import get_authorized_findings +from dojo.product.ui.filters import ComponentFilter, ComponentFilterWithoutObjectLookups from dojo.utils import add_breadcrumb, get_page_items, get_system_setting diff --git a/dojo/filters.py b/dojo/filters.py index 46ca7e0927b..38a4f673376 100644 --- a/dojo/filters.py +++ b/dojo/filters.py @@ -53,7 +53,7 @@ from dojo.finding.queries import get_authorized_findings_for_queryset from dojo.finding_group.queries import get_authorized_finding_groups_for_queryset from dojo.labels import get_labels -from dojo.location.status import FindingLocationStatus, ProductLocationStatus +from dojo.location.status import FindingLocationStatus from dojo.models import ( EFFORT_FOR_FIXING_CHOICES, SEVERITY_CHOICES, @@ -995,194 +995,6 @@ def filter(self, qs, value): return self.options[value][1](self, qs, self.field_name) -class ProductComponentFilter(DojoFilter): - component_name = CharFilter(lookup_expr="icontains", label="Module Name") - component_version = CharFilter(lookup_expr="icontains", label="Module Version") - - o = OrderingFilter( - fields=( - ("component_name", "component_name"), - ("component_version", "component_version"), - ("active", "active"), - ("duplicate", "duplicate"), - ("total", "total"), - ), - field_labels={ - "component_name": "Component Name", - "component_version": "Component Version", - "active": "Active", - "duplicate": "Duplicate", - "total": "Total", - }, - ) - - -class ComponentFilterWithoutObjectLookups(ProductComponentFilter): - test__engagement__product__prod_type__name = CharFilter( - field_name="test__engagement__product__prod_type__name", - lookup_expr="iexact", - label=labels.ORG_FILTERS_NAME_LABEL, - help_text=labels.ORG_FILTERS_NAME_HELP) - test__engagement__product__prod_type__name_contains = CharFilter( - field_name="test__engagement__product__prod_type__name", - lookup_expr="icontains", - label=labels.ORG_FILTERS_NAME_CONTAINS_LABEL, - help_text=labels.ORG_FILTERS_NAME_CONTAINS_HELP) - test__engagement__product__name = CharFilter( - field_name="test__engagement__product__name", - lookup_expr="iexact", - label=labels.ASSET_FILTERS_NAME_LABEL, - help_text=labels.ASSET_FILTERS_NAME_HELP) - test__engagement__product__name_contains = CharFilter( - field_name="test__engagement__product__name", - lookup_expr="icontains", - label=labels.ASSET_FILTERS_NAME_CONTAINS_LABEL, - help_text=labels.ASSET_FILTERS_NAME_CONTAINS_HELP) - - -class ComponentFilter(ProductComponentFilter): - test__engagement__product__prod_type = ModelMultipleChoiceFilter( - queryset=Product_Type.objects.none(), - label=labels.ORG_FILTERS_LABEL) - test__engagement__product = ModelMultipleChoiceFilter( - queryset=Product.objects.none(), - label=labels.ASSET_FILTERS_LABEL) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.form.fields[ - "test__engagement__product__prod_type"].queryset = get_authorized_product_types("view") - self.form.fields[ - "test__engagement__product"].queryset = get_authorized_products("view") - - -class ProductFilterHelper(FilterSet): - name = CharFilter(lookup_expr="icontains", label=labels.ASSET_FILTERS_NAME_LABEL) - name_exact = CharFilter(field_name="name", lookup_expr="iexact", label=labels.ASSET_FILTERS_NAME_EXACT_LABEL) - business_criticality = MultipleChoiceFilter(choices=Product.BUSINESS_CRITICALITY_CHOICES, null_label="Empty") - platform = MultipleChoiceFilter(choices=Product.PLATFORM_CHOICES, null_label="Empty") - lifecycle = MultipleChoiceFilter(choices=Product.LIFECYCLE_CHOICES, null_label="Empty") - origin = MultipleChoiceFilter(choices=Product.ORIGIN_CHOICES, null_label="Empty") - external_audience = BooleanFilter(field_name="external_audience") - internet_accessible = BooleanFilter(field_name="internet_accessible") - tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag contains") - not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Not tag name contains", exclude=True) - outside_of_sla = ProductSLAFilter(label="Outside of SLA") - has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") - if settings.V3_FEATURE_LOCATIONS: - location_status = MultipleChoiceFilter( - field_name="locations__status", - choices=ProductLocationStatus.choices, - help_text="Status of the Location from the Products relationship", - ) - endpoints__host = CharFilter( - field_name="locations__location__url__host", method="filter_endpoints_host", label="Endpoint Host", - ) - endpoints = NumberFilter(field_name="locations__location", method="filter_endpoints", widget=HiddenInput()) - - def filter_endpoints_host(self, queryset, name, value): - return filter_endpoints_host_base( - queryset, - name, - value, - endpoint_id=self.data.get("endpoints"), - statuses=self.data.getlist("location_status"), - ) - - def filter_endpoints(self, queryset, name, value): - return filter_endpoints_base( - queryset, - name, - value, - statuses=self.data.getlist("location_status"), - host=self.data.get("endpoints__host"), - ) - - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("name", "name"), - ("name_exact", "name_exact"), - ("prod_type__name", "prod_type__name"), - ("business_criticality", "business_criticality"), - ("platform", "platform"), - ("lifecycle", "lifecycle"), - ("origin", "origin"), - ("external_audience", "external_audience"), - ("internet_accessible", "internet_accessible"), - ("findings_count", "findings_count"), - ), - field_labels={ - "name": labels.ASSET_FILTERS_NAME_LABEL, - "name_exact": labels.ASSET_FILTERS_NAME_EXACT_LABEL, - "prod_type__name": labels.ORG_FILTERS_LABEL, - "business_criticality": "Business Criticality", - "platform": "Platform ", - "lifecycle": "Lifecycle ", - "origin": "Origin ", - "external_audience": "External Audience ", - "internet_accessible": "Internet Accessible ", - "findings_count": "Findings Count ", - }, - ) - - -class ProductFilter(ProductFilterHelper, DojoFilter): - prod_type = ModelMultipleChoiceFilter( - queryset=Product_Type.objects.none(), - label=labels.ORG_FILTERS_LABEL) - tags = ModelMultipleChoiceFilter( - field_name="tags__name", - to_field_name="name", - queryset=Product.tags.tag_model.objects.all().order_by("name")) - not_tags = ModelMultipleChoiceFilter( - field_name="tags__name", - to_field_name="name", - exclude=True, - queryset=Product.tags.tag_model.objects.all().order_by("name")) - - def __init__(self, *args, **kwargs): - self.user = None - if "user" in kwargs: - self.user = kwargs.pop("user") - super().__init__(*args, **kwargs) - self.form.fields["prod_type"].queryset = get_authorized_product_types("view") - self.form.fields["tags"].help_text = labels.ASSET_FILTERS_TAGS_HELP - self.form.fields["not_tags"].help_text = labels.ASSET_FILTERS_NOT_TAGS_HELP - - class Meta: - model = Product - fields = [ - "name", "name_exact", "prod_type", "business_criticality", - "platform", "lifecycle", "origin", "external_audience", - "internet_accessible", "tags", - ] - - -class ProductFilterWithoutObjectLookups(ProductFilterHelper): - prod_type__name = CharFilter( - field_name="prod_type__name", - lookup_expr="iexact", - label=labels.ORG_FILTERS_NAME_LABEL, - help_text=labels.ORG_FILTERS_NAME_HELP) - prod_type__name_contains = CharFilter( - field_name="prod_type__name", - lookup_expr="icontains", - label=labels.ORG_FILTERS_NAME_CONTAINS_LABEL, - help_text=labels.ORG_FILTERS_NAME_CONTAINS_HELP) - - def __init__(self, *args, **kwargs): - kwargs.pop("user", None) - super().__init__(*args, **kwargs) - - class Meta: - model = Product - fields = [ - "name", "name_exact", "business_criticality", "platform", - "lifecycle", "origin", "external_audience", "internet_accessible", - ] - - class ApiDojoMetaFilter(DojoFilter): name_case_insensitive = CharFilter(field_name="name", lookup_expr="iexact") value_case_insensitive = CharFilter(field_name="value", lookup_expr="iexact") diff --git a/dojo/forms.py b/dojo/forms.py index bab612749b0..c2428988d4b 100644 --- a/dojo/forms.py +++ b/dojo/forms.py @@ -271,61 +271,6 @@ class Meta: fields = ["id"] -class ProductForm(forms.ModelForm): - name = forms.CharField(max_length=255, required=True) - description = forms.CharField(widget=forms.Textarea(attrs={}), - required=True) - - prod_type = forms.ModelChoiceField(label=labels.ORG_LABEL, - queryset=Product_Type.objects.none(), - required=True) - - sla_configuration = forms.ModelChoiceField(label="SLA Configuration", - queryset=SLA_Configuration.objects.all(), - required=True, - initial="Default") - - product_manager = forms.ModelChoiceField(label=labels.ASSET_MANAGER_LABEL, - queryset=Dojo_User.objects.exclude(is_active=False).order_by("first_name", "last_name"), required=False) - technical_contact = forms.ModelChoiceField(queryset=Dojo_User.objects.exclude(is_active=False).order_by("first_name", "last_name"), required=False) - team_manager = forms.ModelChoiceField(queryset=Dojo_User.objects.exclude(is_active=False).order_by("first_name", "last_name"), required=False) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.fields["prod_type"].queryset = get_authorized_product_types("add") - self.fields["enable_product_tag_inheritance"].label = labels.ASSET_TAG_INHERITANCE_ENABLE_LABEL - self.fields["enable_product_tag_inheritance"].help_text = labels.ASSET_TAG_INHERITANCE_ENABLE_HELP - if prod_type_id := kwargs.get("instance", Product()).prod_type_id: # we are editing existing instance - self.fields["prod_type"].queryset |= Product_Type.objects.filter(pk=prod_type_id) # even if user does not have permission for any other ProdType we need to add at least assign ProdType to make form submittable (otherwise empty list was here which generated invalid form) - - # if this product has findings being asynchronously updated, disable the sla config field - if self.instance.async_updating: - self.fields["sla_configuration"].disabled = True - self.fields["sla_configuration"].widget.attrs["message"] = ( - "Finding SLA expiration dates are currently being recalculated. " - "This field cannot be changed until the calculation is complete." - ) - - class Meta: - model = Product - fields = ["name", "description", "tags", "product_manager", "technical_contact", "team_manager", "prod_type", "sla_configuration", "regulations", - "business_criticality", "platform", "lifecycle", "origin", "user_records", "revenue", "external_audience", "enable_product_tag_inheritance", - "internet_accessible", "enable_simple_risk_acceptance", "enable_full_risk_acceptance", "disable_sla_breach_notifications"] - - def clean_tags(self): - tag_validator(self.cleaned_data.get("tags")) - return self.cleaned_data.get("tags") - - -class DeleteProductForm(forms.ModelForm): - id = forms.IntegerField(required=True, - widget=forms.widgets.HiddenInput()) - - class Meta: - model = Product - fields = ["id"] - - class EditFindingGroupForm(forms.ModelForm): name = forms.CharField(max_length=255, required=True, label="Finding Group Name") jira_issue = forms.CharField(max_length=255, required=False, label="Linked JIRA Issue", @@ -358,37 +303,6 @@ class Meta: fields = ["id"] -class Add_Product_AuthorizedUsersForm(forms.Form): - users = forms.ModelMultipleChoiceField( - queryset=Dojo_User.objects.none(), required=True, label="Users", - ) - - def __init__(self, *args, product=None, **kwargs): - super().__init__(*args, **kwargs) - self.product = product - current = product.authorized_users.values_list("pk", flat=True) - self.fields["users"].queryset = ( - Dojo_User.objects.filter(is_active=True) - .exclude(is_superuser=True) - .exclude(pk__in=current) - .order_by("first_name", "last_name") - ) - - -class Authorize_User_For_ProductsForm(forms.Form): - products = forms.ModelMultipleChoiceField( - queryset=Product.objects.none(), required=True, label=labels.ASSET_PLURAL_LABEL, - ) - - def __init__(self, *args, user=None, **kwargs): - super().__init__(*args, **kwargs) - self.user = user - # Show products the user is not already directly authorized for. - self.fields["products"].queryset = ( - Product.objects.exclude(authorized_users=user).order_by("name") - ) - - class Authorize_User_For_ProductTypesForm(forms.Form): product_types = forms.ModelMultipleChoiceField( queryset=Product_Type.objects.none(), required=True, label=labels.ORG_PLURAL_LABEL, @@ -2234,16 +2148,16 @@ def __init__(self, *args, **kwargs): self.fields.pop("reset_api_token", None) -def get_years(): - now = timezone.now() - return [(now.year, now.year), (now.year - 1, now.year - 1), (now.year - 2, now.year - 2)] - - -class ProductCountsFormBase(forms.Form): - month = forms.ChoiceField(choices=list(MONTHS.items()), required=True, error_messages={ - "required": "*"}) - year = forms.ChoiceField(choices=get_years, required=True, error_messages={ - "required": "*"}) +# Product forms live in dojo/product/ui/forms.py. Re-exported here for backward +# compat: ProductCountsFormBase is subclassed by ProductTypeCountsForm below, +# Authorize_User_For_ProductsForm by dojo/user/views.py, ProductTagCountsForm by +# dojo/metrics/views.py. The other product forms are imported directly from +# dojo.product.ui.forms by the product module's own views. +from dojo.product.ui.forms import ( # noqa: E402, F401 -- backward compat + Authorize_User_For_ProductsForm, + ProductCountsFormBase, + ProductTagCountsForm, +) class ProductTypeCountsForm(ProductCountsFormBase): @@ -2258,20 +2172,6 @@ def __init__(self, *args, **kwargs): self.fields["product_type"].queryset = get_authorized_product_types("view") -class ProductTagCountsForm(ProductCountsFormBase): - product_tag = forms.ModelChoiceField(required=True, - queryset=Product.tags.tag_model.objects.none().order_by("name"), - label=labels.ASSET_TAG_LABEL, - error_messages={ - "required": "*"}) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - prods = get_authorized_products("view") - tags_available_to_user = Product.tags.tag_model.objects.filter(product__in=prods) - self.fields["product_tag"].queryset = tags_available_to_user - - class APIKeyForm(forms.ModelForm): id = forms.IntegerField(required=True, widget=forms.widgets.HiddenInput()) @@ -2354,30 +2254,6 @@ class Meta: fields = ["id"] -class Product_API_Scan_ConfigurationForm(forms.ModelForm): - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - tool_configuration = forms.ModelChoiceField( - label="Tool Configuration", - queryset=Tool_Configuration.objects.all().order_by("name"), - required=True, - ) - - class Meta: - model = Product_API_Scan_Configuration - exclude = ["product"] - - -class DeleteProduct_API_Scan_ConfigurationForm(forms.ModelForm): - id = forms.IntegerField(required=True, widget=forms.widgets.HiddenInput()) - - class Meta: - model = Product_API_Scan_Configuration - fields = ["id"] - - class ToolTypeForm(forms.ModelForm): class Meta: model = Tool_Type diff --git a/dojo/product/ui/__init__.py b/dojo/product/ui/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/product/ui/filters.py b/dojo/product/ui/filters.py new file mode 100644 index 00000000000..f5deccf9a02 --- /dev/null +++ b/dojo/product/ui/filters.py @@ -0,0 +1,213 @@ +from django.conf import settings +from django.forms import HiddenInput +from django_filters import ( + BooleanFilter, + CharFilter, + FilterSet, + ModelMultipleChoiceFilter, + MultipleChoiceFilter, + NumberFilter, + OrderingFilter, +) + +from dojo.filters import ( + DojoFilter, + ProductSLAFilter, + filter_endpoints_base, + filter_endpoints_host_base, +) +from dojo.labels import get_labels +from dojo.location.status import ProductLocationStatus +from dojo.models import Product, Product_Type +from dojo.product.queries import get_authorized_products +from dojo.product_type.queries import get_authorized_product_types + +labels = get_labels() + + +class ProductComponentFilter(DojoFilter): + component_name = CharFilter(lookup_expr="icontains", label="Module Name") + component_version = CharFilter(lookup_expr="icontains", label="Module Version") + + o = OrderingFilter( + fields=( + ("component_name", "component_name"), + ("component_version", "component_version"), + ("active", "active"), + ("duplicate", "duplicate"), + ("total", "total"), + ), + field_labels={ + "component_name": "Component Name", + "component_version": "Component Version", + "active": "Active", + "duplicate": "Duplicate", + "total": "Total", + }, + ) + + +class ComponentFilterWithoutObjectLookups(ProductComponentFilter): + test__engagement__product__prod_type__name = CharFilter( + field_name="test__engagement__product__prod_type__name", + lookup_expr="iexact", + label=labels.ORG_FILTERS_NAME_LABEL, + help_text=labels.ORG_FILTERS_NAME_HELP) + test__engagement__product__prod_type__name_contains = CharFilter( + field_name="test__engagement__product__prod_type__name", + lookup_expr="icontains", + label=labels.ORG_FILTERS_NAME_CONTAINS_LABEL, + help_text=labels.ORG_FILTERS_NAME_CONTAINS_HELP) + test__engagement__product__name = CharFilter( + field_name="test__engagement__product__name", + lookup_expr="iexact", + label=labels.ASSET_FILTERS_NAME_LABEL, + help_text=labels.ASSET_FILTERS_NAME_HELP) + test__engagement__product__name_contains = CharFilter( + field_name="test__engagement__product__name", + lookup_expr="icontains", + label=labels.ASSET_FILTERS_NAME_CONTAINS_LABEL, + help_text=labels.ASSET_FILTERS_NAME_CONTAINS_HELP) + + +class ComponentFilter(ProductComponentFilter): + test__engagement__product__prod_type = ModelMultipleChoiceFilter( + queryset=Product_Type.objects.none(), + label=labels.ORG_FILTERS_LABEL) + test__engagement__product = ModelMultipleChoiceFilter( + queryset=Product.objects.none(), + label=labels.ASSET_FILTERS_LABEL) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.form.fields[ + "test__engagement__product__prod_type"].queryset = get_authorized_product_types("view") + self.form.fields[ + "test__engagement__product"].queryset = get_authorized_products("view") + + +class ProductFilterHelper(FilterSet): + name = CharFilter(lookup_expr="icontains", label=labels.ASSET_FILTERS_NAME_LABEL) + name_exact = CharFilter(field_name="name", lookup_expr="iexact", label=labels.ASSET_FILTERS_NAME_EXACT_LABEL) + business_criticality = MultipleChoiceFilter(choices=Product.BUSINESS_CRITICALITY_CHOICES, null_label="Empty") + platform = MultipleChoiceFilter(choices=Product.PLATFORM_CHOICES, null_label="Empty") + lifecycle = MultipleChoiceFilter(choices=Product.LIFECYCLE_CHOICES, null_label="Empty") + origin = MultipleChoiceFilter(choices=Product.ORIGIN_CHOICES, null_label="Empty") + external_audience = BooleanFilter(field_name="external_audience") + internet_accessible = BooleanFilter(field_name="internet_accessible") + tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag contains") + not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Not tag name contains", exclude=True) + outside_of_sla = ProductSLAFilter(label="Outside of SLA") + has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") + if settings.V3_FEATURE_LOCATIONS: + location_status = MultipleChoiceFilter( + field_name="locations__status", + choices=ProductLocationStatus.choices, + help_text="Status of the Location from the Products relationship", + ) + endpoints__host = CharFilter( + field_name="locations__location__url__host", method="filter_endpoints_host", label="Endpoint Host", + ) + endpoints = NumberFilter(field_name="locations__location", method="filter_endpoints", widget=HiddenInput()) + + def filter_endpoints_host(self, queryset, name, value): + return filter_endpoints_host_base( + queryset, + name, + value, + endpoint_id=self.data.get("endpoints"), + statuses=self.data.getlist("location_status"), + ) + + def filter_endpoints(self, queryset, name, value): + return filter_endpoints_base( + queryset, + name, + value, + statuses=self.data.getlist("location_status"), + host=self.data.get("endpoints__host"), + ) + + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("name", "name"), + ("name_exact", "name_exact"), + ("prod_type__name", "prod_type__name"), + ("business_criticality", "business_criticality"), + ("platform", "platform"), + ("lifecycle", "lifecycle"), + ("origin", "origin"), + ("external_audience", "external_audience"), + ("internet_accessible", "internet_accessible"), + ("findings_count", "findings_count"), + ), + field_labels={ + "name": labels.ASSET_FILTERS_NAME_LABEL, + "name_exact": labels.ASSET_FILTERS_NAME_EXACT_LABEL, + "prod_type__name": labels.ORG_FILTERS_LABEL, + "business_criticality": "Business Criticality", + "platform": "Platform ", + "lifecycle": "Lifecycle ", + "origin": "Origin ", + "external_audience": "External Audience ", + "internet_accessible": "Internet Accessible ", + "findings_count": "Findings Count ", + }, + ) + + +class ProductFilter(ProductFilterHelper, DojoFilter): + prod_type = ModelMultipleChoiceFilter( + queryset=Product_Type.objects.none(), + label=labels.ORG_FILTERS_LABEL) + tags = ModelMultipleChoiceFilter( + field_name="tags__name", + to_field_name="name", + queryset=Product.tags.tag_model.objects.all().order_by("name")) + not_tags = ModelMultipleChoiceFilter( + field_name="tags__name", + to_field_name="name", + exclude=True, + queryset=Product.tags.tag_model.objects.all().order_by("name")) + + def __init__(self, *args, **kwargs): + self.user = None + if "user" in kwargs: + self.user = kwargs.pop("user") + super().__init__(*args, **kwargs) + self.form.fields["prod_type"].queryset = get_authorized_product_types("view") + self.form.fields["tags"].help_text = labels.ASSET_FILTERS_TAGS_HELP + self.form.fields["not_tags"].help_text = labels.ASSET_FILTERS_NOT_TAGS_HELP + + class Meta: + model = Product + fields = [ + "name", "name_exact", "prod_type", "business_criticality", + "platform", "lifecycle", "origin", "external_audience", + "internet_accessible", "tags", + ] + + +class ProductFilterWithoutObjectLookups(ProductFilterHelper): + prod_type__name = CharFilter( + field_name="prod_type__name", + lookup_expr="iexact", + label=labels.ORG_FILTERS_NAME_LABEL, + help_text=labels.ORG_FILTERS_NAME_HELP) + prod_type__name_contains = CharFilter( + field_name="prod_type__name", + lookup_expr="icontains", + label=labels.ORG_FILTERS_NAME_CONTAINS_LABEL, + help_text=labels.ORG_FILTERS_NAME_CONTAINS_HELP) + + def __init__(self, *args, **kwargs): + kwargs.pop("user", None) + super().__init__(*args, **kwargs) + + class Meta: + model = Product + fields = [ + "name", "name_exact", "business_criticality", "platform", + "lifecycle", "origin", "external_audience", "internet_accessible", + ] diff --git a/dojo/product/ui/forms.py b/dojo/product/ui/forms.py new file mode 100644 index 00000000000..9f86884bfea --- /dev/null +++ b/dojo/product/ui/forms.py @@ -0,0 +1,154 @@ +from django import forms +from django.utils import timezone +from django.utils.dates import MONTHS + +from dojo.labels import get_labels +from dojo.models import ( + Dojo_User, + Product, + Product_API_Scan_Configuration, + Product_Type, + SLA_Configuration, + Tool_Configuration, +) +from dojo.product.queries import get_authorized_products +from dojo.product_type.queries import get_authorized_product_types +from dojo.validators import tag_validator + +labels = get_labels() + + +class ProductForm(forms.ModelForm): + name = forms.CharField(max_length=255, required=True) + description = forms.CharField(widget=forms.Textarea(attrs={}), + required=True) + + prod_type = forms.ModelChoiceField(label=labels.ORG_LABEL, + queryset=Product_Type.objects.none(), + required=True) + + sla_configuration = forms.ModelChoiceField(label="SLA Configuration", + queryset=SLA_Configuration.objects.all(), + required=True, + initial="Default") + + product_manager = forms.ModelChoiceField(label=labels.ASSET_MANAGER_LABEL, + queryset=Dojo_User.objects.exclude(is_active=False).order_by("first_name", "last_name"), required=False) + technical_contact = forms.ModelChoiceField(queryset=Dojo_User.objects.exclude(is_active=False).order_by("first_name", "last_name"), required=False) + team_manager = forms.ModelChoiceField(queryset=Dojo_User.objects.exclude(is_active=False).order_by("first_name", "last_name"), required=False) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fields["prod_type"].queryset = get_authorized_product_types("add") + self.fields["enable_product_tag_inheritance"].label = labels.ASSET_TAG_INHERITANCE_ENABLE_LABEL + self.fields["enable_product_tag_inheritance"].help_text = labels.ASSET_TAG_INHERITANCE_ENABLE_HELP + if prod_type_id := kwargs.get("instance", Product()).prod_type_id: # we are editing existing instance + self.fields["prod_type"].queryset |= Product_Type.objects.filter(pk=prod_type_id) # even if user does not have permission for any other ProdType we need to add at least assign ProdType to make form submittable (otherwise empty list was here which generated invalid form) + + # if this product has findings being asynchronously updated, disable the sla config field + if self.instance.async_updating: + self.fields["sla_configuration"].disabled = True + self.fields["sla_configuration"].widget.attrs["message"] = ( + "Finding SLA expiration dates are currently being recalculated. " + "This field cannot be changed until the calculation is complete." + ) + + class Meta: + model = Product + fields = ["name", "description", "tags", "product_manager", "technical_contact", "team_manager", "prod_type", "sla_configuration", "regulations", + "business_criticality", "platform", "lifecycle", "origin", "user_records", "revenue", "external_audience", "enable_product_tag_inheritance", + "internet_accessible", "enable_simple_risk_acceptance", "enable_full_risk_acceptance", "disable_sla_breach_notifications"] + + def clean_tags(self): + tag_validator(self.cleaned_data.get("tags")) + return self.cleaned_data.get("tags") + + +class DeleteProductForm(forms.ModelForm): + id = forms.IntegerField(required=True, + widget=forms.widgets.HiddenInput()) + + class Meta: + model = Product + fields = ["id"] + + +class Add_Product_AuthorizedUsersForm(forms.Form): + users = forms.ModelMultipleChoiceField( + queryset=Dojo_User.objects.none(), required=True, label="Users", + ) + + def __init__(self, *args, product=None, **kwargs): + super().__init__(*args, **kwargs) + self.product = product + current = product.authorized_users.values_list("pk", flat=True) + self.fields["users"].queryset = ( + Dojo_User.objects.filter(is_active=True) + .exclude(is_superuser=True) + .exclude(pk__in=current) + .order_by("first_name", "last_name") + ) + + +class Authorize_User_For_ProductsForm(forms.Form): + products = forms.ModelMultipleChoiceField( + queryset=Product.objects.none(), required=True, label=labels.ASSET_PLURAL_LABEL, + ) + + def __init__(self, *args, user=None, **kwargs): + super().__init__(*args, **kwargs) + self.user = user + # Show products the user is not already directly authorized for. + self.fields["products"].queryset = ( + Product.objects.exclude(authorized_users=user).order_by("name") + ) + + +def get_years(): + now = timezone.now() + return [(now.year, now.year), (now.year - 1, now.year - 1), (now.year - 2, now.year - 2)] + + +class ProductCountsFormBase(forms.Form): + month = forms.ChoiceField(choices=list(MONTHS.items()), required=True, error_messages={ + "required": "*"}) + year = forms.ChoiceField(choices=get_years, required=True, error_messages={ + "required": "*"}) + + +class ProductTagCountsForm(ProductCountsFormBase): + product_tag = forms.ModelChoiceField(required=True, + queryset=Product.tags.tag_model.objects.none().order_by("name"), + label=labels.ASSET_TAG_LABEL, + error_messages={ + "required": "*"}) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + prods = get_authorized_products("view") + tags_available_to_user = Product.tags.tag_model.objects.filter(product__in=prods) + self.fields["product_tag"].queryset = tags_available_to_user + + +class Product_API_Scan_ConfigurationForm(forms.ModelForm): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + tool_configuration = forms.ModelChoiceField( + label="Tool Configuration", + queryset=Tool_Configuration.objects.all().order_by("name"), + required=True, + ) + + class Meta: + model = Product_API_Scan_Configuration + exclude = ["product"] + + +class DeleteProduct_API_Scan_ConfigurationForm(forms.ModelForm): + id = forms.IntegerField(required=True, widget=forms.widgets.HiddenInput()) + + class Meta: + model = Product_API_Scan_Configuration + fields = ["id"] diff --git a/dojo/product/views.py b/dojo/product/views.py index f9dab849576..a5ef6acbd62 100644 --- a/dojo/product/views.py +++ b/dojo/product/views.py @@ -40,18 +40,12 @@ MetricsEndpointFilterWithoutObjectLookups, MetricsFindingFilter, MetricsFindingFilterWithoutObjectLookups, - ProductComponentFilter, - ProductFilter, - ProductFilterWithoutObjectLookups, ) from dojo.forms import ( - Add_Product_AuthorizedUsersForm, AdHocFindingForm, AppAnalysisForm, DeleteAppAnalysisForm, DeleteEngagementPresetsForm, - DeleteProduct_API_Scan_ConfigurationForm, - DeleteProductForm, DojoMetaFormSet, EngagementPresetsForm, EngForm, @@ -60,8 +54,6 @@ JIRAEngagementForm, JIRAFindingForm, JIRAProjectForm, - Product_API_Scan_ConfigurationForm, - ProductForm, ProductNotificationsForm, SLA_Configuration, ) @@ -94,6 +86,18 @@ from dojo.product.queries import ( get_authorized_products, ) +from dojo.product.ui.filters import ( + ProductComponentFilter, + ProductFilter, + ProductFilterWithoutObjectLookups, +) +from dojo.product.ui.forms import ( + Add_Product_AuthorizedUsersForm, + DeleteProduct_API_Scan_ConfigurationForm, + DeleteProductForm, + Product_API_Scan_ConfigurationForm, + ProductForm, +) from dojo.product_type.queries import ( get_authorized_product_types, ) diff --git a/dojo/product_type/ui/views.py b/dojo/product_type/ui/views.py index 44b7ce6ab6f..c454f6060cf 100644 --- a/dojo/product_type/ui/views.py +++ b/dojo/product_type/ui/views.py @@ -15,7 +15,6 @@ from dojo.authorization.authorization import user_has_permission_or_403 from dojo.authorization.roles_permissions import Permissions -from dojo.filters import ProductFilter, ProductFilterWithoutObjectLookups from dojo.forms import ( Add_Product_Type_AuthorizedUsersForm, Delete_Product_TypeForm, @@ -24,6 +23,7 @@ from dojo.labels import get_labels from dojo.models import Dojo_User, Finding, Product, Product_Type from dojo.product.queries import get_authorized_products +from dojo.product.ui.filters import ProductFilter, ProductFilterWithoutObjectLookups from dojo.product_type.queries import ( get_authorized_product_types, ) From 4f8761ab95e859fe32980f58555b2907ba4a0097 Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Sun, 7 Jun 2026 21:57:57 +0200 Subject: [PATCH 3/4] refactor(product): move views into dojo/product/ui/ [product Phase 5] --- dojo/asset/urls.py | 2 +- dojo/organization/urls.py | 2 +- dojo/product/{ => ui}/views.py | 0 unittests/test_product_metrics_closed_count.py | 2 +- 4 files changed, 3 insertions(+), 3 deletions(-) rename dojo/product/{ => ui}/views.py (100%) diff --git a/dojo/asset/urls.py b/dojo/asset/urls.py index 55474a155d2..22d29cdf39b 100644 --- a/dojo/asset/urls.py +++ b/dojo/asset/urls.py @@ -2,7 +2,7 @@ from django.urls import re_path from dojo.engagement.ui import views as dojo_engagement_views -from dojo.product import views +from dojo.product.ui import views from dojo.utils import redirect_view # TODO: remove the else: branch once v3 migration is complete diff --git a/dojo/organization/urls.py b/dojo/organization/urls.py index 1147f927ec3..3bfa6128b7e 100644 --- a/dojo/organization/urls.py +++ b/dojo/organization/urls.py @@ -1,7 +1,7 @@ from django.conf import settings from django.urls import re_path -from dojo.product import views as product_views +from dojo.product.ui import views as product_views from dojo.product_type.ui import views from dojo.utils import redirect_view diff --git a/dojo/product/views.py b/dojo/product/ui/views.py similarity index 100% rename from dojo/product/views.py rename to dojo/product/ui/views.py diff --git a/unittests/test_product_metrics_closed_count.py b/unittests/test_product_metrics_closed_count.py index 80b9c4dbd62..31b978b5b6a 100644 --- a/unittests/test_product_metrics_closed_count.py +++ b/unittests/test_product_metrics_closed_count.py @@ -18,7 +18,7 @@ from django.test import RequestFactory from dojo.models import Engagement, Finding, Product, Product_Type, Test, Test_Type, User -from dojo.product.views import finding_queries +from dojo.product.ui.views import finding_queries from .dojo_test_case import DojoTestCase From 2d773ada78ee9e2ab436e20300bd4d1627d52b80 Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Sun, 7 Jun 2026 22:06:08 +0200 Subject: [PATCH 4/4] refactor(product): extract API layer into dojo/product/api/ [product Phase 6,7,8,9] --- dojo/api_v2/serializers.py | 60 +++------------ dojo/api_v2/views.py | 109 -------------------------- dojo/filters.py | 75 ------------------ dojo/product/api/__init__.py | 1 + dojo/product/api/filters.py | 97 +++++++++++++++++++++++ dojo/product/api/serializer.py | 60 +++++++++++++++ dojo/product/api/urls.py | 7 ++ dojo/product/api/views.py | 128 +++++++++++++++++++++++++++++++ dojo/urls.py | 6 +- unittests/test_rest_framework.py | 3 +- 10 files changed, 306 insertions(+), 240 deletions(-) create mode 100644 dojo/product/api/__init__.py create mode 100644 dojo/product/api/filters.py create mode 100644 dojo/product/api/serializer.py create mode 100644 dojo/product/api/urls.py create mode 100644 dojo/product/api/views.py diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index 0992f6c6ac3..6e57a8b370b 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -466,12 +466,6 @@ def validate(self, data): return data -class ProductMetaSerializer(serializers.ModelSerializer): - class Meta: - model = DojoMeta - fields = ("name", "value") - - class UserSerializer(serializers.ModelSerializer): date_joined = serializers.DateTimeField(read_only=True) last_login = serializers.DateTimeField(read_only=True, allow_null=True) @@ -722,6 +716,16 @@ class Meta: # RiskAcceptanceSerializer (below) still reference it. The other engagement # serializers are imported directly from dojo.engagement.api by their consumers. from dojo.engagement.api.serializer import EngagementSerializer # noqa: E402 -- backward compat + +# Product serializers live in dojo/product/api/serializer.py. ProductSerializer is +# re-exported because ReportGenerateSerializer (below) still references it; +# ProductMetaSerializer because dojo/asset/api/serializers.py imports it. +# ProductAPIScanConfigurationSerializer is imported directly from +# dojo.product.api.serializer by its only consumer (the viewset). +from dojo.product.api.serializer import ( # noqa: E402 -- backward compat + ProductMetaSerializer, # noqa: F401 -- backward compat + ProductSerializer, +) from dojo.product_type.api.serializer import ProductTypeSerializer # noqa: E402 @@ -940,12 +944,6 @@ class Meta: fields = "__all__" -class ProductAPIScanConfigurationSerializer(serializers.ModelSerializer): - class Meta: - model = Product_API_Scan_Configuration - fields = "__all__" - - class DevelopmentEnvironmentSerializer(serializers.ModelSerializer): class Meta: model = Development_Environment @@ -1616,44 +1614,6 @@ def update(self, instance, validated_data): return super().update(instance, validated_data) -class ProductSerializer(serializers.ModelSerializer): - findings_count = serializers.SerializerMethodField() - findings_list = serializers.SerializerMethodField() - - business_criticality = serializers.ChoiceField(choices=Product.BUSINESS_CRITICALITY_CHOICES, allow_blank=True, allow_null=True, required=False) - platform = serializers.ChoiceField(choices=Product.PLATFORM_CHOICES, allow_blank=True, allow_null=True, required=False) - lifecycle = serializers.ChoiceField(choices=Product.LIFECYCLE_CHOICES, allow_blank=True, allow_null=True, required=False) - origin = serializers.ChoiceField(choices=Product.ORIGIN_CHOICES, allow_blank=True, allow_null=True, required=False) - - tags = TagListSerializerField(required=False) - product_meta = ProductMetaSerializer(read_only=True, many=True) - - class Meta: - model = Product - exclude = ( - "tid", - "updated", - "async_updating", - ) - - def validate(self, data): - async_updating = getattr(self.instance, "async_updating", None) - if async_updating: - new_sla_config = data.get("sla_configuration", None) - old_sla_config = getattr(self.instance, "sla_configuration", None) - if new_sla_config and old_sla_config and new_sla_config != old_sla_config: - msg = "Finding SLA expiration dates are currently being recalculated. The SLA configuration for this product cannot be changed until the calculation is complete." - raise serializers.ValidationError(msg) - return data - - def get_findings_count(self, obj) -> int: - return obj.findings_count - - # TODO: maybe extend_schema_field is needed here? - def get_findings_list(self, obj) -> list[int]: - return obj.open_findings_list() - - class CommonImportScanSerializer(serializers.Serializer): scan_date = serializers.DateField( required=False, diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index 11c728dd435..653dfaf2d8d 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -56,7 +56,6 @@ ApiDojoMetaFilter, ApiEndpointFilter, ApiFindingFilter, - ApiProductFilter, ApiRiskAcceptanceFilter, ApiTemplateFindingFilter, ApiUserFilter, @@ -93,7 +92,6 @@ NoteHistory, Notes, Product, - Product_API_Scan_Configuration, Regulation, Risk_Acceptance, SLA_Configuration, @@ -111,7 +109,6 @@ get_authorized_app_analysis, get_authorized_dojo_meta, get_authorized_languages, - get_authorized_product_api_scan_configurations, get_authorized_products, ) from dojo.query_utils import build_count_subquery @@ -127,12 +124,10 @@ from dojo.user.authentication import reset_token_for_user from dojo.user.utils import get_configuration_permissions_codenames from dojo.utils import ( - async_delete, generate_file_response, get_celery_queue_details, get_celery_queue_length, get_celery_worker_status, - get_setting, get_system_setting, process_tag_notifications, purge_celery_queue, @@ -1258,31 +1253,6 @@ def get_queryset(self): # Authorization: object-based @extend_schema_view(**schema_with_prefetch()) -class ProductAPIScanConfigurationViewSet( - PrefetchDojoModelViewSet, -): - serializer_class = serializers.ProductAPIScanConfigurationSerializer - queryset = Product_API_Scan_Configuration.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_fields = [ - "id", - "product", - "tool_configuration", - "service_key_1", - "service_key_2", - "service_key_3", - ] - permission_classes = ( - IsAuthenticated, - permissions.UserHasProductAPIScanConfigurationPermission, - ) - - def get_queryset(self): - return get_authorized_product_api_scan_configurations( - "view", - ) - - # Authorization: object-based # @extend_schema_view(**schema_with_prefetch()) # Nested models with prefetch make the response schema too long for Swagger UI @@ -1376,85 +1346,6 @@ def process_patch(self, request): raise ValidationError(msg) -@extend_schema_view(**schema_with_prefetch()) -class ProductViewSet( - prefetch.PrefetchListMixin, - prefetch.PrefetchRetrieveMixin, - mixins.CreateModelMixin, - mixins.DestroyModelMixin, - mixins.UpdateModelMixin, - viewsets.GenericViewSet, - dojo_mixins.DeletePreviewModelMixin, -): - serializer_class = serializers.ProductSerializer - queryset = Product.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_class = ApiProductFilter - permission_classes = ( - IsAuthenticated, - permissions.UserHasProductPermission, - ) - - def get_queryset(self): - return get_authorized_products("view").distinct() - - def destroy(self, request, *args, **kwargs): - instance = self.get_object() - if get_setting("ASYNC_OBJECT_DELETE"): - async_del = async_delete() - async_del.delete(instance) - else: - instance.delete() - return Response(status=status.HTTP_204_NO_CONTENT) - - # def list(self, request): - # # Note the use of `get_queryset()` instead of `self.queryset` - # queryset = self.get_queryset() - # serializer = self.serializer_class(queryset, many=True) - # return Response(serializer.data) - - @extend_schema( - request=serializers.ReportGenerateOptionSerializer, - responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, - ) - @action( - detail=True, methods=["post"], - # IsAuthenticated only: report generation requires View permission, - # enforced by the permission-filtered get_queryset(). The viewset's - # permission_classes would check Edit (POST), which is too restrictive. - permission_classes=[IsAuthenticated], - ) - def generate_report(self, request, pk=None): - product = self.get_object() - - options = {} - # prepare post data - report_options = serializers.ReportGenerateOptionSerializer( - data=request.data, - ) - if report_options.is_valid(): - options["include_finding_notes"] = report_options.validated_data[ - "include_finding_notes" - ] - options["include_finding_images"] = report_options.validated_data[ - "include_finding_images" - ] - options[ - "include_executive_summary" - ] = report_options.validated_data["include_executive_summary"] - options[ - "include_table_of_contents" - ] = report_options.validated_data["include_table_of_contents"] - else: - return Response( - report_options.errors, status=status.HTTP_400_BAD_REQUEST, - ) - - data = report_generate(request, product, options) - report = serializers.ReportGenerateSerializer(data) - return Response(report.data) - - # Authorization: authenticated, configuration class DevelopmentEnvironmentViewSet( DojoModelViewSet, diff --git a/dojo/filters.py b/dojo/filters.py index 38a4f673376..55afc778196 100644 --- a/dojo/filters.py +++ b/dojo/filters.py @@ -1019,81 +1019,6 @@ class Meta: ] -class ApiProductFilter(DojoFilter): - # BooleanFilter - external_audience = BooleanFilter(field_name="external_audience") - internet_accessible = BooleanFilter(field_name="internet_accessible") - # CharFilter - name = CharFilter(lookup_expr="icontains") - name_exact = CharFilter(field_name="name", lookup_expr="iexact") - description = CharFilter(lookup_expr="icontains") - business_criticality = MultipleChoiceFilter(choices=Product.BUSINESS_CRITICALITY_CHOICES) - platform = MultipleChoiceFilter(choices=Product.PLATFORM_CHOICES) - lifecycle = MultipleChoiceFilter(choices=Product.LIFECYCLE_CHOICES) - origin = MultipleChoiceFilter(choices=Product.ORIGIN_CHOICES) - # NumberInFilter - id = NumberInFilter(field_name="id", lookup_expr="in") - product_manager = NumberInFilter(field_name="product_manager", lookup_expr="in") - technical_contact = NumberInFilter(field_name="technical_contact", lookup_expr="in") - team_manager = NumberInFilter(field_name="team_manager", lookup_expr="in") - prod_type = NumberInFilter(field_name="prod_type", lookup_expr="in") - tid = NumberInFilter(field_name="tid", lookup_expr="in") - prod_numeric_grade = NumberInFilter(field_name="prod_numeric_grade", lookup_expr="in") - user_records = NumberInFilter(field_name="user_records", lookup_expr="in") - regulations = NumberInFilter(field_name="regulations", lookup_expr="in") - - tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag name contains") - tags = CharFieldInFilter( - field_name="tags__name", - lookup_expr="in", - help_text="Comma separated list of exact tags (uses OR for multiple values)") - tags__and = CharFieldFilterANDExpression( - field_name="tags__name", - help_text="Comma separated list of exact tags to match with an AND expression") - not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Not Tag name contains", exclude="True") - not_tags = CharFieldInFilter(field_name="tags__name", lookup_expr="in", - help_text=labels.ASSET_FILTERS_CSV_TAGS_NOT_HELP, exclude="True") - has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") - outside_of_sla = extend_schema_field(OpenApiTypes.NUMBER)(ProductSLAFilter()) - - # DateRangeFilter - created = DateRangeFilter() - updated = DateRangeFilter() - # NumberFilter - revenue = NumberFilter() - - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("id", "id"), - ("tid", "tid"), - ("name", "name"), - ("created", "created"), - ("prod_numeric_grade", "prod_numeric_grade"), - ("business_criticality", "business_criticality"), - ("platform", "platform"), - ("lifecycle", "lifecycle"), - ("origin", "origin"), - ("revenue", "revenue"), - ("external_audience", "external_audience"), - ("internet_accessible", "internet_accessible"), - ("product_manager", "product_manager"), - ("product_manager__first_name", "product_manager__first_name"), - ("product_manager__last_name", "product_manager__last_name"), - ("technical_contact", "technical_contact"), - ("technical_contact__first_name", "technical_contact__first_name"), - ("technical_contact__last_name", "technical_contact__last_name"), - ("team_manager", "team_manager"), - ("team_manager__first_name", "team_manager__first_name"), - ("team_manager__last_name", "team_manager__last_name"), - ("prod_type", "prod_type"), - ("prod_type__name", "prod_type__name"), - ("updated", "updated"), - ("user_records", "user_records"), - ), - ) - - class PercentageRangeFilter(RangeFilter): def filter(self, qs, value): if value is not None: diff --git a/dojo/product/api/__init__.py b/dojo/product/api/__init__.py new file mode 100644 index 00000000000..36b10db0174 --- /dev/null +++ b/dojo/product/api/__init__.py @@ -0,0 +1 @@ +path = "products" # noqa: RUF067 diff --git a/dojo/product/api/filters.py b/dojo/product/api/filters.py new file mode 100644 index 00000000000..e75b95684ad --- /dev/null +++ b/dojo/product/api/filters.py @@ -0,0 +1,97 @@ +from django_filters import ( + BooleanFilter, + CharFilter, + MultipleChoiceFilter, + NumberFilter, + OrderingFilter, +) +from drf_spectacular.types import OpenApiTypes +from drf_spectacular.utils import extend_schema_field + +from dojo.filters import ( + CharFieldFilterANDExpression, + CharFieldInFilter, + DateRangeFilter, + DojoFilter, + NumberInFilter, + ProductSLAFilter, +) +from dojo.labels import get_labels +from dojo.models import Product + +labels = get_labels() + + +class ApiProductFilter(DojoFilter): + # BooleanFilter + external_audience = BooleanFilter(field_name="external_audience") + internet_accessible = BooleanFilter(field_name="internet_accessible") + # CharFilter + name = CharFilter(lookup_expr="icontains") + name_exact = CharFilter(field_name="name", lookup_expr="iexact") + description = CharFilter(lookup_expr="icontains") + business_criticality = MultipleChoiceFilter(choices=Product.BUSINESS_CRITICALITY_CHOICES) + platform = MultipleChoiceFilter(choices=Product.PLATFORM_CHOICES) + lifecycle = MultipleChoiceFilter(choices=Product.LIFECYCLE_CHOICES) + origin = MultipleChoiceFilter(choices=Product.ORIGIN_CHOICES) + # NumberInFilter + id = NumberInFilter(field_name="id", lookup_expr="in") + product_manager = NumberInFilter(field_name="product_manager", lookup_expr="in") + technical_contact = NumberInFilter(field_name="technical_contact", lookup_expr="in") + team_manager = NumberInFilter(field_name="team_manager", lookup_expr="in") + prod_type = NumberInFilter(field_name="prod_type", lookup_expr="in") + tid = NumberInFilter(field_name="tid", lookup_expr="in") + prod_numeric_grade = NumberInFilter(field_name="prod_numeric_grade", lookup_expr="in") + user_records = NumberInFilter(field_name="user_records", lookup_expr="in") + regulations = NumberInFilter(field_name="regulations", lookup_expr="in") + + tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag name contains") + tags = CharFieldInFilter( + field_name="tags__name", + lookup_expr="in", + help_text="Comma separated list of exact tags (uses OR for multiple values)") + tags__and = CharFieldFilterANDExpression( + field_name="tags__name", + help_text="Comma separated list of exact tags to match with an AND expression") + not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Not Tag name contains", exclude="True") + not_tags = CharFieldInFilter(field_name="tags__name", lookup_expr="in", + help_text=labels.ASSET_FILTERS_CSV_TAGS_NOT_HELP, exclude="True") + has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") + outside_of_sla = extend_schema_field(OpenApiTypes.NUMBER)(ProductSLAFilter()) + + # DateRangeFilter + created = DateRangeFilter() + updated = DateRangeFilter() + # NumberFilter + revenue = NumberFilter() + + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("id", "id"), + ("tid", "tid"), + ("name", "name"), + ("created", "created"), + ("prod_numeric_grade", "prod_numeric_grade"), + ("business_criticality", "business_criticality"), + ("platform", "platform"), + ("lifecycle", "lifecycle"), + ("origin", "origin"), + ("revenue", "revenue"), + ("external_audience", "external_audience"), + ("internet_accessible", "internet_accessible"), + ("product_manager", "product_manager"), + ("product_manager__first_name", "product_manager__first_name"), + ("product_manager__last_name", "product_manager__last_name"), + ("technical_contact", "technical_contact"), + ("technical_contact__first_name", "technical_contact__first_name"), + ("technical_contact__last_name", "technical_contact__last_name"), + ("team_manager", "team_manager"), + ("team_manager__first_name", "team_manager__first_name"), + ("team_manager__last_name", "team_manager__last_name"), + ("prod_type", "prod_type"), + ("prod_type__name", "prod_type__name"), + ("updated", "updated"), + ("user_records", "user_records"), + ), + ) diff --git a/dojo/product/api/serializer.py b/dojo/product/api/serializer.py new file mode 100644 index 00000000000..53b89033a28 --- /dev/null +++ b/dojo/product/api/serializer.py @@ -0,0 +1,60 @@ +from rest_framework import serializers + +from dojo.models import DojoMeta, Product, Product_API_Scan_Configuration + + +class ProductMetaSerializer(serializers.ModelSerializer): + class Meta: + model = DojoMeta + fields = ("name", "value") + + +class ProductAPIScanConfigurationSerializer(serializers.ModelSerializer): + class Meta: + model = Product_API_Scan_Configuration + fields = "__all__" + + +class ProductSerializer(serializers.ModelSerializer): + findings_count = serializers.SerializerMethodField() + findings_list = serializers.SerializerMethodField() + + business_criticality = serializers.ChoiceField(choices=Product.BUSINESS_CRITICALITY_CHOICES, allow_blank=True, allow_null=True, required=False) + platform = serializers.ChoiceField(choices=Product.PLATFORM_CHOICES, allow_blank=True, allow_null=True, required=False) + lifecycle = serializers.ChoiceField(choices=Product.LIFECYCLE_CHOICES, allow_blank=True, allow_null=True, required=False) + origin = serializers.ChoiceField(choices=Product.ORIGIN_CHOICES, allow_blank=True, allow_null=True, required=False) + + product_meta = ProductMetaSerializer(read_only=True, many=True) + + class Meta: + model = Product + exclude = ( + "tid", + "updated", + "async_updating", + ) + + def get_fields(self): + from dojo.api_v2.serializers import ( # noqa: PLC0415 -- lazy import, avoids circular dependency + TagListSerializerField, + ) + fields = super().get_fields() + fields["tags"] = TagListSerializerField(required=False) + return fields + + def validate(self, data): + async_updating = getattr(self.instance, "async_updating", None) + if async_updating: + new_sla_config = data.get("sla_configuration", None) + old_sla_config = getattr(self.instance, "sla_configuration", None) + if new_sla_config and old_sla_config and new_sla_config != old_sla_config: + msg = "Finding SLA expiration dates are currently being recalculated. The SLA configuration for this product cannot be changed until the calculation is complete." + raise serializers.ValidationError(msg) + return data + + def get_findings_count(self, obj) -> int: + return obj.findings_count + + # TODO: maybe extend_schema_field is needed here? + def get_findings_list(self, obj) -> list[int]: + return obj.open_findings_list() diff --git a/dojo/product/api/urls.py b/dojo/product/api/urls.py new file mode 100644 index 00000000000..0e7e34974c0 --- /dev/null +++ b/dojo/product/api/urls.py @@ -0,0 +1,7 @@ +from dojo.product.api.views import ProductAPIScanConfigurationViewSet, ProductViewSet + + +def add_product_urls(router): + router.register("products", ProductViewSet, basename="product") + router.register("product_api_scan_configurations", ProductAPIScanConfigurationViewSet, basename="product_api_scan_configuration") + return router diff --git a/dojo/product/api/views.py b/dojo/product/api/views.py new file mode 100644 index 00000000000..ef993508698 --- /dev/null +++ b/dojo/product/api/views.py @@ -0,0 +1,128 @@ +from django_filters.rest_framework import DjangoFilterBackend +from drf_spectacular.utils import extend_schema, extend_schema_view +from rest_framework import mixins, status, viewsets +from rest_framework.decorators import action +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response + +import dojo.api_v2.mixins as dojo_mixins +from dojo.api_v2 import prefetch +from dojo.api_v2 import serializers as api_v2_serializers +from dojo.api_v2.views import PrefetchDojoModelViewSet, report_generate, schema_with_prefetch +from dojo.authorization import api_permissions as permissions +from dojo.models import Product, Product_API_Scan_Configuration +from dojo.product.api.filters import ApiProductFilter +from dojo.product.api.serializer import ( + ProductAPIScanConfigurationSerializer, + ProductSerializer, +) +from dojo.product.queries import ( + get_authorized_product_api_scan_configurations, + get_authorized_products, +) +from dojo.utils import async_delete, get_setting + + +# Authorization: object-based +class ProductAPIScanConfigurationViewSet( + PrefetchDojoModelViewSet, +): + serializer_class = ProductAPIScanConfigurationSerializer + queryset = Product_API_Scan_Configuration.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_fields = [ + "id", + "product", + "tool_configuration", + "service_key_1", + "service_key_2", + "service_key_3", + ] + permission_classes = ( + IsAuthenticated, + permissions.UserHasProductAPIScanConfigurationPermission, + ) + + def get_queryset(self): + return get_authorized_product_api_scan_configurations( + "view", + ) + + +@extend_schema_view(**schema_with_prefetch()) +class ProductViewSet( + prefetch.PrefetchListMixin, + prefetch.PrefetchRetrieveMixin, + mixins.CreateModelMixin, + mixins.DestroyModelMixin, + mixins.UpdateModelMixin, + viewsets.GenericViewSet, + dojo_mixins.DeletePreviewModelMixin, +): + serializer_class = ProductSerializer + queryset = Product.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_class = ApiProductFilter + permission_classes = ( + IsAuthenticated, + permissions.UserHasProductPermission, + ) + + def get_queryset(self): + return get_authorized_products("view").distinct() + + def destroy(self, request, *args, **kwargs): + instance = self.get_object() + if get_setting("ASYNC_OBJECT_DELETE"): + async_del = async_delete() + async_del.delete(instance) + else: + instance.delete() + return Response(status=status.HTTP_204_NO_CONTENT) + + # def list(self, request): + # # Note the use of `get_queryset()` instead of `self.queryset` + # queryset = self.get_queryset() + # serializer = self.serializer_class(queryset, many=True) + # return Response(serializer.data) + + @extend_schema( + request=api_v2_serializers.ReportGenerateOptionSerializer, + responses={status.HTTP_200_OK: api_v2_serializers.ReportGenerateSerializer}, + ) + @action( + detail=True, methods=["post"], + # IsAuthenticated only: report generation requires View permission, + # enforced by the permission-filtered get_queryset(). The viewset's + # permission_classes would check Edit (POST), which is too restrictive. + permission_classes=[IsAuthenticated], + ) + def generate_report(self, request, pk=None): + product = self.get_object() + + options = {} + # prepare post data + report_options = api_v2_serializers.ReportGenerateOptionSerializer( + data=request.data, + ) + if report_options.is_valid(): + options["include_finding_notes"] = report_options.validated_data[ + "include_finding_notes" + ] + options["include_finding_images"] = report_options.validated_data[ + "include_finding_images" + ] + options[ + "include_executive_summary" + ] = report_options.validated_data["include_executive_summary"] + options[ + "include_table_of_contents" + ] = report_options.validated_data["include_table_of_contents"] + else: + return Response( + report_options.errors, status=status.HTTP_400_BAD_REQUEST, + ) + + data = report_generate(request, product, options) + report = api_v2_serializers.ReportGenerateSerializer(data) + return Response(report.data) diff --git a/dojo/urls.py b/dojo/urls.py index cfccb1152d1..c7f6ac68e52 100644 --- a/dojo/urls.py +++ b/dojo/urls.py @@ -34,8 +34,6 @@ NetworkLocationsViewset, NotesViewSet, NoteTypeViewSet, - ProductAPIScanConfigurationViewSet, - ProductViewSet, RegulationsViewSet, ReImportScanView, RiskAcceptanceViewSet, @@ -75,6 +73,7 @@ from dojo.object.urls import urlpatterns as object_urls from dojo.organization.api.urls import add_organization_urls from dojo.organization.urls import urlpatterns as organization_urls +from dojo.product.api.urls import add_product_urls from dojo.product_type.api.urls import add_product_type_urls from dojo.regulations.urls import urlpatterns as regulations from dojo.reports.urls import urlpatterns as reports_urls @@ -127,8 +126,7 @@ v2_api.register(r"notes", NotesViewSet, basename="notes") v2_api.register(r"note_type", NoteTypeViewSet, basename="note_type") add_notifications_urls(v2_api) -v2_api.register(r"products", ProductViewSet, basename="product") -v2_api.register(r"product_api_scan_configurations", ProductAPIScanConfigurationViewSet, basename="product_api_scan_configuration") +v2_api = add_product_urls(v2_api) # RBAC endpoints moved to Pro under legacy authorization: # product_groups, product_members → pro/product_groups, pro/product_members v2_api = add_product_type_urls(v2_api) diff --git a/unittests/test_rest_framework.py b/unittests/test_rest_framework.py index b195500889e..222f478c933 100644 --- a/unittests/test_rest_framework.py +++ b/unittests/test_rest_framework.py @@ -55,8 +55,6 @@ LanguageViewSet, NotesViewSet, NoteTypeViewSet, - ProductAPIScanConfigurationViewSet, - ProductViewSet, RiskAcceptanceViewSet, SonarqubeIssueViewSet, ToolConfigurationsViewSet, @@ -113,6 +111,7 @@ from dojo.organization.api.views import ( OrganizationViewSet, ) +from dojo.product.api.views import ProductAPIScanConfigurationViewSet, ProductViewSet from dojo.product_type.api.views import ProductTypeViewSet from dojo.test.api.views import TestsViewSet, TestTypesViewSet from dojo.url.api.views import URLViewSet