From d6ac41aed069ada8db8a9be82ed268ab83d1ff45 Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Sun, 7 Jun 2026 17:14:53 +0200 Subject: [PATCH 1/5] refactor(engagement): extract Engagement/Engagement_Presets models into dojo/engagement/ Phase 1 of module reorg per AGENTS.md. Move Engagement, Engagement_Presets + admin registrations into dojo/engagement/{models,admin}.py. Cross-module FKs use string refs to avoid circular imports; ENGAGEMENT_STATUS_CHOICES single-sourced with re-export. No migration change. --- dojo/engagement/__init__.py | 1 + dojo/engagement/admin.py | 15 +++ dojo/engagement/models.py | 184 ++++++++++++++++++++++++++++++++++++ dojo/models.py | 173 +-------------------------------- 4 files changed, 205 insertions(+), 168 deletions(-) create mode 100644 dojo/engagement/admin.py create mode 100644 dojo/engagement/models.py diff --git a/dojo/engagement/__init__.py b/dojo/engagement/__init__.py index e69de29bb2d..4dd8749c6cf 100644 --- a/dojo/engagement/__init__.py +++ b/dojo/engagement/__init__.py @@ -0,0 +1 @@ +import dojo.engagement.admin # noqa: F401 diff --git a/dojo/engagement/admin.py b/dojo/engagement/admin.py new file mode 100644 index 00000000000..921b7593b64 --- /dev/null +++ b/dojo/engagement/admin.py @@ -0,0 +1,15 @@ +from django.contrib import admin + +from dojo.engagement.models import Engagement, Engagement_Presets + + +@admin.register(Engagement_Presets) +class EngagementPresetsAdmin(admin.ModelAdmin): + + """Admin support for the Engagement_Presets model.""" + + +@admin.register(Engagement) +class EngagementAdmin(admin.ModelAdmin): + + """Admin support for the Engagement model.""" diff --git a/dojo/engagement/models.py b/dojo/engagement/models.py new file mode 100644 index 00000000000..ec658e4f6f7 --- /dev/null +++ b/dojo/engagement/models.py @@ -0,0 +1,184 @@ +import logging +from contextlib import suppress + +from dateutil.relativedelta import relativedelta +from django.db import models +from django.urls import reverse +from django.utils import timezone +from django.utils.translation import gettext as _ +from tagulous.models import TagField + +from dojo.base_models.base import BaseModel + +logger = logging.getLogger(__name__) + + +class Engagement_Presets(models.Model): + title = models.CharField(max_length=500, default=None, help_text=_("Brief description of preset.")) + test_type = models.ManyToManyField("dojo.Test_Type", default=None, blank=True) + network_locations = models.ManyToManyField("dojo.Network_Locations", default=None, blank=True) + notes = models.CharField(max_length=2000, help_text=_("Description of what needs to be tested or setting up environment for testing"), null=True, blank=True) + scope = models.CharField(max_length=800, help_text=_("Scope of Engagement testing, IP's/Resources/URL's)"), default=None, blank=True) + product = models.ForeignKey("dojo.Product", on_delete=models.CASCADE) + created = models.DateTimeField(auto_now_add=True, null=False) + + class Meta: + ordering = ["title"] + + def __str__(self): + return self.title + + +ENGAGEMENT_STATUS_CHOICES = (("Not Started", "Not Started"), + ("Blocked", "Blocked"), + ("Cancelled", "Cancelled"), + ("Completed", "Completed"), + ("In Progress", "In Progress"), + ("On Hold", "On Hold"), + ("Scheduled", "Scheduled"), + ("Waiting for Resource", "Waiting for Resource")) + + +class Engagement(BaseModel): + name = models.CharField(max_length=300, null=True, blank=True) + description = models.CharField(max_length=2000, null=True, blank=True) + version = models.CharField(max_length=100, null=True, blank=True, help_text=_("Version of the product the engagement tested.")) + first_contacted = models.DateField(null=True, blank=True) + target_start = models.DateField(null=False, blank=False) + target_end = models.DateField(null=False, blank=False) + lead = models.ForeignKey("dojo.Dojo_User", editable=True, null=True, blank=True, on_delete=models.RESTRICT) + requester = models.ForeignKey("dojo.Contact", null=True, blank=True, on_delete=models.CASCADE) + preset = models.ForeignKey("dojo.Engagement_Presets", null=True, blank=True, help_text=_("Settings and notes for performing this engagement."), on_delete=models.CASCADE) + reason = models.CharField(max_length=2000, null=True, blank=True) + report_type = models.ForeignKey("dojo.Report_Type", null=True, blank=True, on_delete=models.CASCADE) + product = models.ForeignKey("dojo.Product", on_delete=models.CASCADE) + active = models.BooleanField(default=True, editable=False) + tracker = models.URLField(max_length=200, help_text=_("Link to epic or ticket system with changes to version."), editable=True, blank=True, null=True) + test_strategy = models.URLField(editable=True, blank=True, null=True) + threat_model = models.BooleanField(default=True) + api_test = models.BooleanField(default=True) + pen_test = models.BooleanField(default=True) + check_list = models.BooleanField(default=True) + notes = models.ManyToManyField("dojo.Notes", blank=True, editable=False) + files = models.ManyToManyField("dojo.FileUpload", blank=True, editable=False) + status = models.CharField(editable=True, max_length=2000, default="Not Started", + null=True, + choices=ENGAGEMENT_STATUS_CHOICES) + progress = models.CharField(max_length=100, + default="threat_model", editable=False) + tmodel_path = models.CharField(max_length=1000, default="none", + editable=False, blank=True, null=True) + risk_acceptance = models.ManyToManyField("dojo.Risk_Acceptance", + default=None, + editable=False, + blank=True) + done_testing = models.BooleanField(default=False, editable=False) + engagement_type = models.CharField(editable=True, max_length=30, default="Interactive", + null=True, + choices=(("Interactive", "Interactive"), + ("CI/CD", "CI/CD"))) + build_id = models.CharField(editable=True, max_length=150, + null=True, blank=True, help_text=_("Build ID of the product the engagement tested."), verbose_name=_("Build ID")) + commit_hash = models.CharField(editable=True, max_length=150, + null=True, blank=True, help_text=_("Commit hash from repo"), verbose_name=_("Commit Hash")) + branch_tag = models.CharField(editable=True, max_length=150, + null=True, blank=True, help_text=_("Tag or branch of the product the engagement tested."), verbose_name=_("Branch/Tag")) + build_server = models.ForeignKey("dojo.Tool_Configuration", verbose_name=_("Build Server"), help_text=_("Build server responsible for CI/CD test"), null=True, blank=True, related_name="build_server", on_delete=models.CASCADE) + source_code_management_server = models.ForeignKey("dojo.Tool_Configuration", null=True, blank=True, verbose_name=_("SCM Server"), help_text=_("Source code server for CI/CD test"), related_name="source_code_management_server", on_delete=models.CASCADE) + source_code_management_uri = models.URLField(max_length=600, null=True, blank=True, editable=True, verbose_name=_("Repo"), help_text=_("Resource link to source code")) + orchestration_engine = models.ForeignKey("dojo.Tool_Configuration", verbose_name=_("Orchestration Engine"), help_text=_("Orchestration service responsible for CI/CD test"), null=True, blank=True, related_name="orchestration", on_delete=models.CASCADE) + deduplication_on_engagement = models.BooleanField(default=False, verbose_name=_("Deduplication within this engagement only"), help_text=_("If enabled deduplication will only mark a finding in this engagement as duplicate of another finding if both findings are in this engagement. If disabled, deduplication is on the product level.")) + + tags = TagField(blank=True, force_lowercase=True, help_text=_("Add tags that help describe this engagement. Choose from the list or add new tags. Press Enter key to add.")) + inherited_tags = TagField(blank=True, force_lowercase=True, help_text=_("Internal use tags sepcifically for maintaining parity with product. This field will be present as a subset in the tags field")) + + class Meta: + ordering = ["-target_start"] + indexes = [ + models.Index(fields=["product", "active"]), + ] + + def __str__(self): + return "Engagement {}: {} ({})".format(self.id if id else 0, self.name or "", + self.target_start.strftime( + "%b %d, %Y")) + + def get_absolute_url(self): + return reverse("view_engagement", args=[str(self.id)]) + + def copy(self): + from dojo.models import Test, copy_model_util # noqa: PLC0415 -- lazy import, avoids circular dependency + copy = copy_model_util(self) + # Save the necessary ManyToMany relationships + old_notes = list(self.notes.all()) + old_files = list(self.files.all()) + old_tags = list(self.tags.all()) + old_risk_acceptances = list(self.risk_acceptance.all()) + old_tests = list(Test.objects.filter(engagement=self)) + # Save the object before setting any ManyToMany relationships + copy.save() + # Copy the notes + for notes in old_notes: + copy.notes.add(notes.copy()) + # Copy the files + for files in old_files: + copy.files.add(files.copy()) + # Copy the tests + for test in old_tests: + test.copy(engagement=copy) + # Copy the risk_acceptances + for risk_acceptance in old_risk_acceptances: + copy.risk_acceptance.add(risk_acceptance.copy(engagement=copy)) + # Assign any tags + copy.tags.set(old_tags) + + return copy + + def is_overdue(self): + overdue_grace_days = 10 if self.engagement_type == "CI/CD" else 0 + + max_end_date = timezone.now() - relativedelta(days=overdue_grace_days) + + return self.target_end < max_end_date.date() + + def get_breadcrumbs(self): + bc = self.product.get_breadcrumbs() + bc += [{"title": str(self), + "url": reverse("view_engagement", args=(self.id,))}] + return bc + + # only used by bulk risk acceptance api + @property + def unaccepted_open_findings(self): + from dojo.models import Finding # noqa: PLC0415 -- lazy import, avoids circular dependency + from dojo.utils import get_system_setting # noqa: PLC0415 circular import + + findings = Finding.objects.filter(risk_accepted=False, active=True, duplicate=False, test__engagement=self) + if get_system_setting("enforce_verified_status", True) or get_system_setting("enforce_verified_status_metrics", True): + findings = findings.filter(verified=True) + + return findings + + def accept_risks(self, accepted_risks): + self.risk_acceptance.add(*accepted_risks) + + @property + def has_jira_issue(self): + from dojo.jira import services as jira_services # noqa: PLC0415 circular import + return jira_services.has_issue(self) + + @property + def is_ci_cd(self): + return self.engagement_type == "CI/CD" + + def delete(self, *args, **kwargs): + logger.debug("%d engagement delete", self.id) + from dojo.finding import helper as finding_helper # noqa: PLC0415 circular import + finding_helper.prepare_duplicates_for_delete(self) + super().delete(*args, **kwargs) + from dojo.models import Product # noqa: PLC0415 -- lazy import, avoids circular dependency + with suppress(Engagement.DoesNotExist, Product.DoesNotExist): + # Suppressing a potential issue created from async delete removing + # related objects in a separate task + from dojo.utils import perform_product_grading # noqa: PLC0415 circular import + perform_product_grading(self.product) diff --git a/dojo/models.py b/dojo/models.py index dd833e0a427..00d53ad3ba5 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -1313,172 +1313,11 @@ def __str__(self): return self.location -class Engagement_Presets(models.Model): - title = models.CharField(max_length=500, default=None, help_text=_("Brief description of preset.")) - test_type = models.ManyToManyField(Test_Type, default=None, blank=True) - network_locations = models.ManyToManyField(Network_Locations, default=None, blank=True) - notes = models.CharField(max_length=2000, help_text=_("Description of what needs to be tested or setting up environment for testing"), null=True, blank=True) - scope = models.CharField(max_length=800, help_text=_("Scope of Engagement testing, IP's/Resources/URL's)"), default=None, blank=True) - product = models.ForeignKey(Product, on_delete=models.CASCADE) - created = models.DateTimeField(auto_now_add=True, null=False) - - class Meta: - ordering = ["title"] - - def __str__(self): - return self.title - - -ENGAGEMENT_STATUS_CHOICES = (("Not Started", "Not Started"), - ("Blocked", "Blocked"), - ("Cancelled", "Cancelled"), - ("Completed", "Completed"), - ("In Progress", "In Progress"), - ("On Hold", "On Hold"), - ("Scheduled", "Scheduled"), - ("Waiting for Resource", "Waiting for Resource")) - - -class Engagement(BaseModel): - name = models.CharField(max_length=300, null=True, blank=True) - description = models.CharField(max_length=2000, null=True, blank=True) - version = models.CharField(max_length=100, null=True, blank=True, help_text=_("Version of the product the engagement tested.")) - first_contacted = models.DateField(null=True, blank=True) - target_start = models.DateField(null=False, blank=False) - target_end = models.DateField(null=False, blank=False) - lead = models.ForeignKey(Dojo_User, editable=True, null=True, blank=True, on_delete=models.RESTRICT) - requester = models.ForeignKey(Contact, null=True, blank=True, on_delete=models.CASCADE) - preset = models.ForeignKey(Engagement_Presets, null=True, blank=True, help_text=_("Settings and notes for performing this engagement."), on_delete=models.CASCADE) - reason = models.CharField(max_length=2000, null=True, blank=True) - report_type = models.ForeignKey(Report_Type, null=True, blank=True, on_delete=models.CASCADE) - product = models.ForeignKey(Product, on_delete=models.CASCADE) - active = models.BooleanField(default=True, editable=False) - tracker = models.URLField(max_length=200, help_text=_("Link to epic or ticket system with changes to version."), editable=True, blank=True, null=True) - test_strategy = models.URLField(editable=True, blank=True, null=True) - threat_model = models.BooleanField(default=True) - api_test = models.BooleanField(default=True) - pen_test = models.BooleanField(default=True) - check_list = models.BooleanField(default=True) - notes = models.ManyToManyField(Notes, blank=True, editable=False) - files = models.ManyToManyField(FileUpload, blank=True, editable=False) - status = models.CharField(editable=True, max_length=2000, default="Not Started", - null=True, - choices=ENGAGEMENT_STATUS_CHOICES) - progress = models.CharField(max_length=100, - default="threat_model", editable=False) - tmodel_path = models.CharField(max_length=1000, default="none", - editable=False, blank=True, null=True) - risk_acceptance = models.ManyToManyField("Risk_Acceptance", - default=None, - editable=False, - blank=True) - done_testing = models.BooleanField(default=False, editable=False) - engagement_type = models.CharField(editable=True, max_length=30, default="Interactive", - null=True, - choices=(("Interactive", "Interactive"), - ("CI/CD", "CI/CD"))) - build_id = models.CharField(editable=True, max_length=150, - null=True, blank=True, help_text=_("Build ID of the product the engagement tested."), verbose_name=_("Build ID")) - commit_hash = models.CharField(editable=True, max_length=150, - null=True, blank=True, help_text=_("Commit hash from repo"), verbose_name=_("Commit Hash")) - branch_tag = models.CharField(editable=True, max_length=150, - null=True, blank=True, help_text=_("Tag or branch of the product the engagement tested."), verbose_name=_("Branch/Tag")) - build_server = models.ForeignKey(Tool_Configuration, verbose_name=_("Build Server"), help_text=_("Build server responsible for CI/CD test"), null=True, blank=True, related_name="build_server", on_delete=models.CASCADE) - source_code_management_server = models.ForeignKey(Tool_Configuration, null=True, blank=True, verbose_name=_("SCM Server"), help_text=_("Source code server for CI/CD test"), related_name="source_code_management_server", on_delete=models.CASCADE) - source_code_management_uri = models.URLField(max_length=600, null=True, blank=True, editable=True, verbose_name=_("Repo"), help_text=_("Resource link to source code")) - orchestration_engine = models.ForeignKey(Tool_Configuration, verbose_name=_("Orchestration Engine"), help_text=_("Orchestration service responsible for CI/CD test"), null=True, blank=True, related_name="orchestration", on_delete=models.CASCADE) - deduplication_on_engagement = models.BooleanField(default=False, verbose_name=_("Deduplication within this engagement only"), help_text=_("If enabled deduplication will only mark a finding in this engagement as duplicate of another finding if both findings are in this engagement. If disabled, deduplication is on the product level.")) - - tags = TagField(blank=True, force_lowercase=True, help_text=_("Add tags that help describe this engagement. Choose from the list or add new tags. Press Enter key to add.")) - inherited_tags = TagField(blank=True, force_lowercase=True, help_text=_("Internal use tags sepcifically for maintaining parity with product. This field will be present as a subset in the tags field")) - - class Meta: - ordering = ["-target_start"] - indexes = [ - models.Index(fields=["product", "active"]), - ] - - def __str__(self): - return "Engagement {}: {} ({})".format(self.id if id else 0, self.name or "", - self.target_start.strftime( - "%b %d, %Y")) - - def get_absolute_url(self): - return reverse("view_engagement", args=[str(self.id)]) - - def copy(self): - copy = copy_model_util(self) - # Save the necessary ManyToMany relationships - old_notes = list(self.notes.all()) - old_files = list(self.files.all()) - old_tags = list(self.tags.all()) - old_risk_acceptances = list(self.risk_acceptance.all()) - old_tests = list(Test.objects.filter(engagement=self)) - # Save the object before setting any ManyToMany relationships - copy.save() - # Copy the notes - for notes in old_notes: - copy.notes.add(notes.copy()) - # Copy the files - for files in old_files: - copy.files.add(files.copy()) - # Copy the tests - for test in old_tests: - test.copy(engagement=copy) - # Copy the risk_acceptances - for risk_acceptance in old_risk_acceptances: - copy.risk_acceptance.add(risk_acceptance.copy(engagement=copy)) - # Assign any tags - copy.tags.set(old_tags) - - return copy - - def is_overdue(self): - overdue_grace_days = 10 if self.engagement_type == "CI/CD" else 0 - - max_end_date = timezone.now() - relativedelta(days=overdue_grace_days) - - return self.target_end < max_end_date.date() - - def get_breadcrumbs(self): - bc = self.product.get_breadcrumbs() - bc += [{"title": str(self), - "url": reverse("view_engagement", args=(self.id,))}] - return bc - - # only used by bulk risk acceptance api - @property - def unaccepted_open_findings(self): - from dojo.utils import get_system_setting # noqa: PLC0415 circular import - - findings = Finding.objects.filter(risk_accepted=False, active=True, duplicate=False, test__engagement=self) - if get_system_setting("enforce_verified_status", True) or get_system_setting("enforce_verified_status_metrics", True): - findings = findings.filter(verified=True) - - return findings - - def accept_risks(self, accepted_risks): - self.risk_acceptance.add(*accepted_risks) - - @property - def has_jira_issue(self): - from dojo.jira import services as jira_services # noqa: PLC0415 circular import - return jira_services.has_issue(self) - - @property - def is_ci_cd(self): - return self.engagement_type == "CI/CD" - - def delete(self, *args, **kwargs): - logger.debug("%d engagement delete", self.id) - from dojo.finding import helper as finding_helper # noqa: PLC0415 circular import - finding_helper.prepare_duplicates_for_delete(self) - super().delete(*args, **kwargs) - with suppress(Engagement.DoesNotExist, Product.DoesNotExist): - # Suppressing a potential issue created from async delete removing - # related objects in a separate task - from dojo.utils import perform_product_grading # noqa: PLC0415 circular import - perform_product_grading(self.product) +from dojo.engagement.models import ( # noqa: E402 -- re-export; class-body FKs below reference these + ENGAGEMENT_STATUS_CHOICES, # noqa: F401 -- re-export + Engagement, + Engagement_Presets, # noqa: F401 -- re-export +) class CWE(models.Model): @@ -4095,7 +3934,6 @@ def __str__(self): admin.site.register(Testing_Guide_Category) admin.site.register(Testing_Guide) -admin.site.register(Engagement_Presets) admin.site.register(Network_Locations) admin.site.register(Objects_Product) admin.site.register(Objects_Review) @@ -4105,7 +3943,6 @@ def __str__(self): admin.site.register(Finding, FindingAdmin) admin.site.register(FileUpload) admin.site.register(FileAccessToken) -admin.site.register(Engagement) admin.site.register(Risk_Acceptance) admin.site.register(Check_List) admin.site.register(Endpoint_Params) From 3d829ff1c98e08fcf7c3f40afd3c78c6d47a2a6b Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Sun, 7 Jun 2026 19:12:10 +0200 Subject: [PATCH 2/5] refactor(engagement): extract copy_engagement workflow into services.py [Phase 2 pilot] Phase 2 of module reorg per AGENTS.md. Move the engagement copy workflow (copy + product grade recalc + notification) out of the copy_engagement view into an HTTP-free copy_engagement(engagement, user) service, so both UI and (future) API can reuse it. The inline notification carried a TODO asking for exactly this. View is thinned to call the service. Add a unit test for the service (the workflow was previously untested). Notification URL uses relative reverse() to match the codebase convention. --- dojo/engagement/services.py | 26 ++++++++++++++++++++++++++ dojo/engagement/views.py | 21 ++++++++------------- unittests/test_copy_model.py | 30 ++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 13 deletions(-) diff --git a/dojo/engagement/services.py b/dojo/engagement/services.py index b78844fc6bc..6a5b7570103 100644 --- a/dojo/engagement/services.py +++ b/dojo/engagement/services.py @@ -3,10 +3,14 @@ from django.db.models.signals import pre_save from django.dispatch import receiver +from django.urls import reverse +from django.utils.translation import gettext as _ from dojo.celery_dispatch import dojo_dispatch_task from dojo.jira import services as jira_services from dojo.models import Engagement +from dojo.notifications.helper import create_notification +from dojo.utils import calculate_grade logger = logging.getLogger(__name__) @@ -28,6 +32,28 @@ def reopen_engagement(eng): eng.save() +def copy_engagement(engagement, user): + """ + Copy an engagement (and its tests/findings) within the same product, recalculate the + product grade, and notify. Returns the new engagement. + + HTTP-free so both the UI view and (eventually) the API can call it. + """ + product = engagement.product + engagement_copy = engagement.copy() + dojo_dispatch_task(calculate_grade, product.id) + create_notification( + event="engagement_copied", + title=_("Copying of %s") % engagement.name, + description=f'The engagement "{engagement.name}" was copied by {user}', + product=product, + url=reverse("view_engagement", args=(engagement_copy.id,)), + recipients=[engagement.lead], + icon="exclamation-triangle", + ) + return engagement_copy + + @receiver(pre_save, sender=Engagement) def set_name_if_none(sender, instance, *args, **kwargs): if not instance.name: diff --git a/dojo/engagement/views.py b/dojo/engagement/views.py index 0154aa5d336..8ec3693aa4f 100644 --- a/dojo/engagement/views.py +++ b/dojo/engagement/views.py @@ -24,7 +24,6 @@ from django.shortcuts import get_object_or_404, render from django.urls import Resolver404, reverse from django.utils import timezone -from django.utils.translation import gettext as _ from django.views import View from django.views.decorators.cache import cache_page from django.views.decorators.http import require_POST @@ -34,10 +33,15 @@ import dojo.risk_acceptance.helper as ra_helper from dojo.authorization.authorization import user_has_permission_or_403 -from dojo.celery_dispatch import dojo_dispatch_task from dojo.endpoint.utils import save_endpoints_to_add from dojo.engagement.queries import get_authorized_engagements -from dojo.engagement.services import close_engagement, reopen_engagement +from dojo.engagement.services import ( + close_engagement, + reopen_engagement, +) +from dojo.engagement.services import ( + copy_engagement as copy_engagement_service, +) from dojo.filters import ( EngagementDirectFilter, EngagementDirectFilterWithoutObjectLookups, @@ -107,7 +111,6 @@ add_error_message_to_response, add_success_message_to_response, async_delete, - calculate_grade, generate_file_response_from_file_path, get_cal_event, get_page_items, @@ -391,20 +394,12 @@ def copy_engagement(request, eid): if request.method == "POST": form = DoneForm(request.POST) if form.is_valid(): - engagement_copy = engagement.copy() - dojo_dispatch_task(calculate_grade, product.id) + copy_engagement_service(engagement, request.user) messages.add_message( request, messages.SUCCESS, "Engagement Copied successfully.", extra_tags="alert-success") - create_notification(event="engagement_copied", # TODO: - if 'copy' functionality will be supported by API as well, 'create_notification' needs to be migrated to place where it will be able to cover actions from both interfaces - title=_("Copying of %s") % engagement.name, - description=f'The engagement "{engagement.name}" was copied by {request.user}', - product=product, - url=request.build_absolute_uri(reverse("view_engagement", args=(engagement_copy.id, ))), - recipients=[engagement.lead], - icon="exclamation-triangle") return redirect_to_return_url_or_else(request, reverse("view_engagements", args=(product.id, ))) messages.add_message( request, diff --git a/unittests/test_copy_model.py b/unittests/test_copy_model.py index d67246630cc..2b262847ac5 100644 --- a/unittests/test_copy_model.py +++ b/unittests/test_copy_model.py @@ -1,6 +1,7 @@ from unittest.mock import patch +from dojo.engagement.services import copy_engagement from dojo.location.models import Location, LocationFindingReference from dojo.models import Endpoint, Endpoint_Status, Engagement, Finding, Product, Test, User from dojo.test.services import copy_test @@ -389,3 +390,32 @@ def test_duplicate_engagement_with_tags_and_notes(self): self.assertQuerySetEqual(engagement.notes.all(), engagement_copy.notes.all()) # Do the tags match self.assertEqual(engagement.tags, engagement_copy.tags) + + +class TestCopyEngagementService(DojoTestCase): + + """Phase 2: the copy_engagement service holds the copy workflow extracted from the UI view.""" + + @patch("dojo.engagement.services.create_notification") + @patch("dojo.engagement.services.dojo_dispatch_task") + def test_copy_engagement_service(self, mock_dispatch, mock_notification): + user, _ = User.objects.get_or_create(username="admin") + product_type = self.create_product_type("svc_prod_type") + product = self.create_product("svc_copy_product", prod_type=product_type) + engagement = self.create_engagement("svc_eng", product) + test = self.create_test(engagement=engagement, scan_type="NPM Audit Scan", title="test") + _ = Finding.objects.create(test=test, reporter=user) + before = Engagement.objects.filter(product=product).count() + before_findings = Finding.objects.filter(test__engagement__product=product).count() + # Run the service + engagement_copy = copy_engagement(engagement, user) + # A new engagement was created under the same product + self.assertEqual(before + 1, Engagement.objects.filter(product=product).count()) + self.assertNotEqual(engagement.id, engagement_copy.id) + self.assertEqual(product, engagement_copy.product) + # Findings were duplicated along with the engagement + self.assertEqual(before_findings + 1, Finding.objects.filter(test__engagement__product=product).count()) + # Side effects: grade recalculation dispatched and a notification raised + mock_dispatch.assert_called_once() + mock_notification.assert_called_once() + self.assertEqual(mock_notification.call_args.kwargs["event"], "engagement_copied") From fd6d22246d7faba0395f1b907a0b0f059a09a06e Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Sun, 7 Jun 2026 21:13:45 +0200 Subject: [PATCH 3/5] refactor(engagement): move forms + UI filters into dojo/engagement/ui/ [engagement Phase 3,4] --- dojo/engagement/ui/__init__.py | 0 dojo/engagement/ui/filters.py | 399 +++++++++++++++++++++++++++++++++ dojo/engagement/ui/forms.py | 151 +++++++++++++ dojo/engagement/views.py | 5 +- dojo/filters.py | 374 ------------------------------ dojo/forms.py | 153 +------------ dojo/product/views.py | 8 +- 7 files changed, 567 insertions(+), 523 deletions(-) create mode 100644 dojo/engagement/ui/__init__.py create mode 100644 dojo/engagement/ui/filters.py create mode 100644 dojo/engagement/ui/forms.py diff --git a/dojo/engagement/ui/__init__.py b/dojo/engagement/ui/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/engagement/ui/filters.py b/dojo/engagement/ui/filters.py new file mode 100644 index 00000000000..055da964976 --- /dev/null +++ b/dojo/engagement/ui/filters.py @@ -0,0 +1,399 @@ +from django.conf import settings +from django_filters import ( + BooleanFilter, + CharFilter, + FilterSet, + ModelChoiceFilter, + ModelMultipleChoiceFilter, + MultipleChoiceFilter, + OrderingFilter, +) + +from dojo.filters import DateRangeFilter, DojoFilter +from dojo.labels import get_labels +from dojo.models import ( + ENGAGEMENT_STATUS_CHOICES, + Dojo_User, + Engagement, + Product, + Product_API_Scan_Configuration, + Product_Type, + Test, + Test_Type, +) +from dojo.product_type.queries import get_authorized_product_types +from dojo.user.queries import get_authorized_users + +labels = get_labels() + + +class EngagementDirectFilterHelper(FilterSet): + name = CharFilter(lookup_expr="icontains", label="Engagement name contains") + version = CharFilter(field_name="version", lookup_expr="icontains", label="Engagement version") + test__version = CharFilter(field_name="test__version", lookup_expr="icontains", label="Test version") + product__name = CharFilter(lookup_expr="icontains", label=labels.ASSET_FILTERS_NAME_CONTAINS_LABEL) + status = MultipleChoiceFilter(choices=ENGAGEMENT_STATUS_CHOICES, label="Status") + tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag name contains") + not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Not tag name contains", exclude=True) + has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") + target_start = DateRangeFilter() + target_end = DateRangeFilter() + test__engagement__product__lifecycle = MultipleChoiceFilter( + choices=Product.LIFECYCLE_CHOICES, + label=labels.ASSET_LIFECYCLE_LABEL, + null_label="Empty") + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("target_start", "target_start"), + ("name", "name"), + ("product__name", "product__name"), + ("product__prod_type__name", "product__prod_type__name"), + ("lead__first_name", "lead__first_name"), + ), + field_labels={ + "target_start": "Start date", + "name": "Engagement", + "product__name": labels.ASSET_FILTERS_NAME_LABEL, + "product__prod_type__name": labels.ORG_FILTERS_LABEL, + "lead__first_name": "Lead", + }, + ) + + +class EngagementDirectFilter(EngagementDirectFilterHelper, DojoFilter): + lead = ModelChoiceFilter(queryset=Dojo_User.objects.none(), label="Lead") + product__prod_type = ModelMultipleChoiceFilter( + queryset=Product_Type.objects.none(), + label=labels.ORG_FILTERS_LABEL) + tags = ModelMultipleChoiceFilter( + field_name="tags__name", + to_field_name="name", + queryset=Engagement.tags.tag_model.objects.all().order_by("name")) + not_tags = ModelMultipleChoiceFilter( + field_name="tags__name", + to_field_name="name", + exclude=True, + queryset=Engagement.tags.tag_model.objects.all().order_by("name")) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.form.fields["product__prod_type"].queryset = get_authorized_product_types("view") + self.form.fields["lead"].queryset = get_authorized_users("view") \ + .filter(engagement__lead__isnull=False).distinct() + + class Meta: + model = Engagement + fields = ["product__name", "product__prod_type"] + + +class EngagementDirectFilterWithoutObjectLookups(EngagementDirectFilterHelper): + lead = CharFilter( + field_name="lead__username", + lookup_expr="iexact", + label="Lead Username", + help_text="Search for Lead username that are an exact match") + lead_contains = CharFilter( + field_name="lead__username", + lookup_expr="icontains", + label="Lead Username Contains", + help_text="Search for Lead username that contain a given pattern") + product__prod_type__name = CharFilter( + field_name="product__prod_type__name", + lookup_expr="iexact", + label=labels.ORG_FILTERS_NAME_LABEL, + help_text=labels.ORG_FILTERS_NAME_HELP) + product__prod_type__name_contains = CharFilter( + field_name="product__prod_type__name", + lookup_expr="icontains", + label=labels.ORG_FILTERS_NAME_CONTAINS_LABEL, + help_text=labels.ORG_FILTERS_NAME_CONTAINS_HELP) + + class Meta: + model = Engagement + fields = ["product__name"] + + +class EngagementFilterHelper(FilterSet): + name = CharFilter(lookup_expr="icontains", label=labels.ASSET_FILTERS_NAME_CONTAINS_LABEL) + tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag name contains") + not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Not tag name contains", exclude=True) + has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") + engagement__name = CharFilter(lookup_expr="icontains", label="Engagement name contains") + engagement__version = CharFilter(field_name="engagement__version", lookup_expr="icontains", label="Engagement version") + engagement__test__version = CharFilter(field_name="engagement__test__version", lookup_expr="icontains", label="Test version") + engagement__product__lifecycle = MultipleChoiceFilter( + choices=Product.LIFECYCLE_CHOICES, + label=labels.ASSET_LIFECYCLE_LABEL, + null_label="Empty") + engagement__status = MultipleChoiceFilter( + choices=ENGAGEMENT_STATUS_CHOICES, + label="Status") + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("name", "name"), + ("prod_type__name", "prod_type__name"), + ), + field_labels={ + "name": labels.ASSET_FILTERS_NAME_LABEL, + "prod_type__name": labels.ORG_FILTERS_LABEL, + }, + ) + + +class EngagementFilter(EngagementFilterHelper, DojoFilter): + engagement__lead = ModelChoiceFilter( + queryset=Dojo_User.objects.none(), + label="Lead") + prod_type = ModelMultipleChoiceFilter( + queryset=Product_Type.objects.none(), + label=labels.ORG_FILTERS_LABEL) + tags = ModelMultipleChoiceFilter( + field_name="tags__name", + to_field_name="name", + queryset=Engagement.tags.tag_model.objects.all().order_by("name")) + not_tags = ModelMultipleChoiceFilter( + field_name="tags__name", + to_field_name="name", + exclude=True, + queryset=Engagement.tags.tag_model.objects.all().order_by("name")) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.form.fields["prod_type"].queryset = get_authorized_product_types("view") + self.form.fields["engagement__lead"].queryset = get_authorized_users("view") \ + .filter(engagement__lead__isnull=False).distinct() + self.form.fields["tags"].help_text = labels.ASSET_FILTERS_TAGS_HELP + self.form.fields["not_tags"].help_text = labels.ASSET_FILTERS_NOT_TAGS_HELP + + class Meta: + model = Product + fields = ["name", "prod_type"] + + +class ProductEngagementsFilter(DojoFilter): + engagement__name = CharFilter(field_name="name", lookup_expr="icontains", label="Engagement name contains") + engagement__lead = ModelChoiceFilter(field_name="lead", queryset=Dojo_User.objects.none(), label="Lead") + engagement__version = CharFilter(field_name="version", lookup_expr="icontains", label="Engagement version") + engagement__test__version = CharFilter(field_name="test__version", lookup_expr="icontains", label="Test version") + engagement__status = MultipleChoiceFilter(field_name="status", choices=ENGAGEMENT_STATUS_CHOICES, + label="Status") + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.form.fields["engagement__lead"].queryset = get_authorized_users("view") \ + .filter(engagement__lead__isnull=False).distinct() + + class Meta: + model = Engagement + fields = [] + + +class ProductEngagementsFilterWithoutObjectLookups(ProductEngagementsFilter): + engagement__lead = CharFilter( + field_name="lead__username", + lookup_expr="iexact", + label="Lead Username", + help_text="Search for Lead username that are an exact match") + + +class EngagementFilterWithoutObjectLookups(EngagementFilterHelper): + engagement__lead = CharFilter( + field_name="engagement__lead__username", + lookup_expr="iexact", + label="Lead Username", + help_text="Search for Lead username that are an exact match") + engagement__lead_contains = CharFilter( + field_name="engagement__lead__username", + lookup_expr="icontains", + label="Lead Username Contains", + help_text="Search for Lead username that contain a given pattern") + prod_type__name = CharFilter( + field_name="prod_type__name", + lookup_expr="iexact", + label=labels.ORG_FILTERS_LABEL, + help_text=labels.ORG_FILTERS_LABEL_HELP) + prod_type__name_contains = CharFilter( + field_name="prod_type__name", + lookup_expr="icontains", + label=labels.ORG_FILTERS_NAME_CONTAINS_LABEL, + help_text=labels.ORG_FILTERS_NAME_CONTAINS_HELP) + + class Meta: + model = Product + fields = ["name"] + + +class ProductEngagementFilterHelper(FilterSet): + version = CharFilter(lookup_expr="icontains", label="Engagement version") + test__version = CharFilter(field_name="test__version", lookup_expr="icontains", label="Test version") + name = CharFilter(lookup_expr="icontains") + status = MultipleChoiceFilter(choices=ENGAGEMENT_STATUS_CHOICES, label="Status") + target_start = DateRangeFilter() + target_end = DateRangeFilter() + tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag name contains") + not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Not tag name contains", exclude=True) + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("name", "name"), + ("version", "version"), + ("target_start", "target_start"), + ("target_end", "target_end"), + ("status", "status"), + ("lead", "lead"), + ), + field_labels={ + "name": "Engagement Name", + }, + ) + + class Meta: + model = Product + fields = ["name"] + + +class ProductEngagementFilter(ProductEngagementFilterHelper, DojoFilter): + lead = ModelChoiceFilter(queryset=Dojo_User.objects.none(), label="Lead") + tags = ModelMultipleChoiceFilter( + field_name="tags__name", + to_field_name="name", + queryset=Engagement.tags.tag_model.objects.all().order_by("name")) + not_tags = ModelMultipleChoiceFilter( + field_name="tags__name", + to_field_name="name", + exclude=True, + queryset=Engagement.tags.tag_model.objects.all().order_by("name")) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.form.fields["lead"].queryset = get_authorized_users( + "view").filter(engagement__lead__isnull=False).distinct() + + +class ProductEngagementFilterWithoutObjectLookups(ProductEngagementFilterHelper, DojoFilter): + lead = CharFilter( + field_name="lead__username", + lookup_expr="iexact", + label="Lead Username", + help_text="Search for Lead username that are an exact match") + lead_contains = CharFilter( + field_name="lead__username", + lookup_expr="icontains", + label="Lead Username Contains", + help_text="Search for Lead username that contain a given pattern") + + +class EngagementTestFilterHelper(FilterSet): + version = CharFilter(lookup_expr="icontains", label="Version") + if settings.TRACK_IMPORT_HISTORY: + test_import__version = CharFilter(field_name="test_import__version", lookup_expr="icontains", label="Reimported Version") + target_start = DateRangeFilter() + target_end = DateRangeFilter() + tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag name contains") + not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Not tag name contains", exclude=True) + has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("title", "title"), + ("version", "version"), + ("target_start", "target_start"), + ("target_end", "target_end"), + ("lead", "lead"), + ("api_scan_configuration", "api_scan_configuration"), + ), + field_labels={ + "name": "Test Name", + }, + ) + + +class EngagementTestFilter(EngagementTestFilterHelper, DojoFilter): + lead = ModelChoiceFilter(queryset=Dojo_User.objects.none(), label="Lead") + api_scan_configuration = ModelChoiceFilter( + queryset=Product_API_Scan_Configuration.objects.none(), + label="API Scan Configuration") + tags = ModelMultipleChoiceFilter( + field_name="tags__name", + to_field_name="name", + queryset=Test.tags.tag_model.objects.all().order_by("name")) + not_tags = ModelMultipleChoiceFilter( + field_name="tags__name", + to_field_name="name", + exclude=True, + queryset=Test.tags.tag_model.objects.all().order_by("name")) + + class Meta: + model = Test + fields = [ + "title", "test_type", "target_start", + "target_end", "percent_complete", + "version", "api_scan_configuration", + ] + + def __init__(self, *args, **kwargs): + self.engagement = kwargs.pop("engagement") + super(DojoFilter, self).__init__(*args, **kwargs) + self.form.fields["test_type"].queryset = Test_Type.objects.filter(test__engagement=self.engagement).distinct().order_by("name") + self.form.fields["api_scan_configuration"].queryset = Product_API_Scan_Configuration.objects.filter(product=self.engagement.product).distinct() + self.form.fields["lead"].queryset = get_authorized_users("view") \ + .filter(test__lead__isnull=False).distinct() + + +class EngagementTestFilterWithoutObjectLookups(EngagementTestFilterHelper): + lead = CharFilter( + field_name="lead__username", + lookup_expr="iexact", + label="Lead Username", + help_text="Search for Lead username that are an exact match") + lead_contains = CharFilter( + field_name="lead__username", + lookup_expr="icontains", + label="Lead Username Contains", + help_text="Search for Lead username that contain a given pattern") + api_scan_configuration__tool_configuration__name = CharFilter( + field_name="api_scan_configuration__tool_configuration__name", + lookup_expr="iexact", + label="API Scan Configuration Name", + help_text="Search for Lead username that are an exact match") + api_scan_configuration__tool_configuration__name_contains = CharFilter( + field_name="api_scan_configuration__tool_configuration__name", + lookup_expr="icontains", + label="API Scan Configuration Name Contains", + help_text="Search for Lead username that contain a given pattern") + tags_contains = CharFilter( + label="Test Tag Contains", + field_name="tags__name", + lookup_expr="icontains", + help_text="Search for tags on a Test that contain a given pattern") + tags = CharFilter( + label="Test Tag", + field_name="tags__name", + lookup_expr="iexact", + help_text="Search for tags on a Test that are an exact match") + not_tags_contains = CharFilter( + label="Test Tag Does Not Contain", + field_name="tags__name", + lookup_expr="icontains", + help_text="Search for tags on a Test that contain a given pattern, and exclude them", + exclude=True) + not_tags = CharFilter( + label="Not Test Tag", + field_name="tags__name", + lookup_expr="iexact", + help_text="Search for tags on a Test that are an exact match, and exclude them", + exclude=True) + + class Meta: + model = Test + fields = [ + "title", "test_type", "target_start", + "target_end", "percent_complete", "version", + ] + + def __init__(self, *args, **kwargs): + self.engagement = kwargs.pop("engagement") + super().__init__(*args, **kwargs) + self.form.fields["test_type"].queryset = Test_Type.objects.filter(test__engagement=self.engagement).distinct().order_by("name") diff --git a/dojo/engagement/ui/forms.py b/dojo/engagement/ui/forms.py new file mode 100644 index 00000000000..a325e9a3d69 --- /dev/null +++ b/dojo/engagement/ui/forms.py @@ -0,0 +1,151 @@ +from django import forms + +from dojo.engagement.queries import get_authorized_engagements +from dojo.labels import get_labels +from dojo.models import Engagement, Engagement_Presets, Product +from dojo.product.queries import get_authorized_products +from dojo.user.queries import get_authorized_users, get_authorized_users_for_product_and_product_type +from dojo.utils import get_system_setting +from dojo.validators import tag_validator + +labels = get_labels() + + +class EngForm(forms.ModelForm): + name = forms.CharField( + max_length=300, required=False, + help_text=( + "Add a descriptive name to identify this engagement. " + "Without a name the target start date will be set." + )) + description = forms.CharField(widget=forms.Textarea(attrs={}), + required=False, help_text="Description of the engagement and details regarding the engagement.") + product = forms.ModelChoiceField(label=labels.ASSET_LABEL, + queryset=Product.objects.none(), + required=True) + target_start = forms.DateField(widget=forms.TextInput( + attrs={"class": "datepicker", "autocomplete": "off"})) + target_end = forms.DateField(widget=forms.TextInput( + attrs={"class": "datepicker", "autocomplete": "off"})) + lead = forms.ModelChoiceField( + queryset=None, + required=True, label="Testing Lead") + test_strategy = forms.URLField(required=False, label="Test Strategy URL") + + def __init__(self, *args, **kwargs): + cicd = False + product = None + if "cicd" in kwargs: + cicd = kwargs.pop("cicd") + + if "product" in kwargs: + product = kwargs.pop("product") + + self.user = None + if "user" in kwargs: + self.user = kwargs.pop("user") + + super().__init__(*args, **kwargs) + + if product: + self.fields["preset"] = forms.ModelChoiceField(help_text="Settings and notes for performing this engagement.", required=False, queryset=Engagement_Presets.objects.filter(product=product)) + self.fields["lead"].queryset = get_authorized_users_for_product_and_product_type(None, product, "view").filter(is_active=True) + else: + self.fields["lead"].queryset = get_authorized_users("view").filter(is_active=True) + + self.fields["product"].queryset = get_authorized_products("add") + + # Don't show CICD fields on a interactive engagement + if cicd is False: + del self.fields["build_id"] + del self.fields["commit_hash"] + del self.fields["branch_tag"] + del self.fields["build_server"] + del self.fields["source_code_management_server"] + # del self.fields['source_code_management_uri'] + del self.fields["orchestration_engine"] + else: + del self.fields["test_strategy"] + del self.fields["status"] + + def is_valid(self): + valid = super().is_valid() + + # we're done now if not valid + if not valid: + return valid + if self.cleaned_data["target_start"] > self.cleaned_data["target_end"]: + self.add_error("target_start", "Your target start date exceeds your target end date") + self.add_error("target_end", "Your target start date exceeds your target end date") + return False + return True + + def clean_tags(self): + tag_validator(self.cleaned_data.get("tags")) + return self.cleaned_data.get("tags") + + class Meta: + model = Engagement + exclude = ("first_contacted", "real_start", "engagement_type", "inherited_tags", + "real_end", "requester", "reason", "updated", "report_type", + "product", "threat_model", "api_test", "pen_test", "check_list") + + +class DeleteEngagementForm(forms.ModelForm): + id = forms.IntegerField(required=True, + widget=forms.widgets.HiddenInput()) + + class Meta: + model = Engagement + fields = ["id"] + + +class EngagementPresetsForm(forms.ModelForm): + + notes = forms.CharField(widget=forms.Textarea(attrs={}), + required=False, help_text="Description of what needs to be tested or setting up environment for testing") + + scope = forms.CharField(widget=forms.Textarea(attrs={}), + required=False, help_text="Scope of Engagement testing, IP's/Resources/URL's)") + + class Meta: + model = Engagement_Presets + exclude = ["product"] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if disclaimer := get_system_setting("disclaimer_notes"): + self.disclaimer = disclaimer.strip() + + +class DeleteEngagementPresetsForm(forms.ModelForm): + id = forms.IntegerField(required=True, + widget=forms.widgets.HiddenInput()) + + class Meta: + model = Engagement_Presets + fields = ["id"] + + +class AddEngagementForm(forms.Form): + product = forms.ModelChoiceField( + queryset=Product.objects.none(), + required=True, + widget=forms.widgets.Select(), + help_text="Select which product to attach Engagement") + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fields["product"].queryset = get_authorized_products("add") + + +class ExistingEngagementForm(forms.Form): + engagement = forms.ModelChoiceField( + queryset=Engagement.objects.none(), + required=True, + widget=forms.widgets.Select(), + help_text="Select which Engagement to link the Questionnaire to") + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fields["engagement"].queryset = get_authorized_engagements("edit").order_by("-target_start") diff --git a/dojo/engagement/views.py b/dojo/engagement/views.py index 8ec3693aa4f..4c0978dea56 100644 --- a/dojo/engagement/views.py +++ b/dojo/engagement/views.py @@ -42,7 +42,7 @@ from dojo.engagement.services import ( copy_engagement as copy_engagement_service, ) -from dojo.filters import ( +from dojo.engagement.ui.filters import ( EngagementDirectFilter, EngagementDirectFilterWithoutObjectLookups, EngagementFilter, @@ -52,15 +52,14 @@ ProductEngagementsFilter, ProductEngagementsFilterWithoutObjectLookups, ) +from dojo.engagement.ui.forms import DeleteEngagementForm, EngForm from dojo.finding.helper import NOT_ACCEPTED_FINDINGS_QUERY from dojo.finding.views import find_available_notetypes from dojo.forms import ( AddFindingsRiskAcceptanceForm, CheckForm, - DeleteEngagementForm, DoneForm, EditRiskAcceptanceForm, - EngForm, ImportScanForm, JIRAEngagementForm, JIRAImportScanForm, diff --git a/dojo/filters.py b/dojo/filters.py index 14af254bb69..21357ca5b9e 100644 --- a/dojo/filters.py +++ b/dojo/filters.py @@ -56,7 +56,6 @@ from dojo.location.status import FindingLocationStatus, ProductLocationStatus from dojo.models import ( EFFORT_FOR_FIXING_CHOICES, - ENGAGEMENT_STATUS_CHOICES, SEVERITY_CHOICES, App_Analysis, ChoiceQuestion, @@ -72,7 +71,6 @@ Finding_Template, Note_Type, Product, - Product_API_Scan_Configuration, Product_Type, Question, Risk_Acceptance, @@ -1058,264 +1056,6 @@ def __init__(self, *args, **kwargs): "test__engagement__product"].queryset = get_authorized_products("view") -class EngagementDirectFilterHelper(FilterSet): - name = CharFilter(lookup_expr="icontains", label="Engagement name contains") - version = CharFilter(field_name="version", lookup_expr="icontains", label="Engagement version") - test__version = CharFilter(field_name="test__version", lookup_expr="icontains", label="Test version") - product__name = CharFilter(lookup_expr="icontains", label=labels.ASSET_FILTERS_NAME_CONTAINS_LABEL) - status = MultipleChoiceFilter(choices=ENGAGEMENT_STATUS_CHOICES, label="Status") - tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag name contains") - not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Not tag name contains", exclude=True) - has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") - target_start = DateRangeFilter() - target_end = DateRangeFilter() - test__engagement__product__lifecycle = MultipleChoiceFilter( - choices=Product.LIFECYCLE_CHOICES, - label=labels.ASSET_LIFECYCLE_LABEL, - null_label="Empty") - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("target_start", "target_start"), - ("name", "name"), - ("product__name", "product__name"), - ("product__prod_type__name", "product__prod_type__name"), - ("lead__first_name", "lead__first_name"), - ), - field_labels={ - "target_start": "Start date", - "name": "Engagement", - "product__name": labels.ASSET_FILTERS_NAME_LABEL, - "product__prod_type__name": labels.ORG_FILTERS_LABEL, - "lead__first_name": "Lead", - }, - ) - - -class EngagementDirectFilter(EngagementDirectFilterHelper, DojoFilter): - lead = ModelChoiceFilter(queryset=Dojo_User.objects.none(), label="Lead") - product__prod_type = ModelMultipleChoiceFilter( - queryset=Product_Type.objects.none(), - label=labels.ORG_FILTERS_LABEL) - tags = ModelMultipleChoiceFilter( - field_name="tags__name", - to_field_name="name", - queryset=Engagement.tags.tag_model.objects.all().order_by("name")) - not_tags = ModelMultipleChoiceFilter( - field_name="tags__name", - to_field_name="name", - exclude=True, - queryset=Engagement.tags.tag_model.objects.all().order_by("name")) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.form.fields["product__prod_type"].queryset = get_authorized_product_types("view") - self.form.fields["lead"].queryset = get_authorized_users("view") \ - .filter(engagement__lead__isnull=False).distinct() - - class Meta: - model = Engagement - fields = ["product__name", "product__prod_type"] - - -class EngagementDirectFilterWithoutObjectLookups(EngagementDirectFilterHelper): - lead = CharFilter( - field_name="lead__username", - lookup_expr="iexact", - label="Lead Username", - help_text="Search for Lead username that are an exact match") - lead_contains = CharFilter( - field_name="lead__username", - lookup_expr="icontains", - label="Lead Username Contains", - help_text="Search for Lead username that contain a given pattern") - product__prod_type__name = CharFilter( - field_name="product__prod_type__name", - lookup_expr="iexact", - label=labels.ORG_FILTERS_NAME_LABEL, - help_text=labels.ORG_FILTERS_NAME_HELP) - product__prod_type__name_contains = CharFilter( - field_name="product__prod_type__name", - lookup_expr="icontains", - label=labels.ORG_FILTERS_NAME_CONTAINS_LABEL, - help_text=labels.ORG_FILTERS_NAME_CONTAINS_HELP) - - class Meta: - model = Engagement - fields = ["product__name"] - - -class EngagementFilterHelper(FilterSet): - name = CharFilter(lookup_expr="icontains", label=labels.ASSET_FILTERS_NAME_CONTAINS_LABEL) - tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag name contains") - not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Not tag name contains", exclude=True) - has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") - engagement__name = CharFilter(lookup_expr="icontains", label="Engagement name contains") - engagement__version = CharFilter(field_name="engagement__version", lookup_expr="icontains", label="Engagement version") - engagement__test__version = CharFilter(field_name="engagement__test__version", lookup_expr="icontains", label="Test version") - engagement__product__lifecycle = MultipleChoiceFilter( - choices=Product.LIFECYCLE_CHOICES, - label=labels.ASSET_LIFECYCLE_LABEL, - null_label="Empty") - engagement__status = MultipleChoiceFilter( - choices=ENGAGEMENT_STATUS_CHOICES, - label="Status") - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("name", "name"), - ("prod_type__name", "prod_type__name"), - ), - field_labels={ - "name": labels.ASSET_FILTERS_NAME_LABEL, - "prod_type__name": labels.ORG_FILTERS_LABEL, - }, - ) - - -class EngagementFilter(EngagementFilterHelper, DojoFilter): - engagement__lead = ModelChoiceFilter( - queryset=Dojo_User.objects.none(), - label="Lead") - prod_type = ModelMultipleChoiceFilter( - queryset=Product_Type.objects.none(), - label=labels.ORG_FILTERS_LABEL) - tags = ModelMultipleChoiceFilter( - field_name="tags__name", - to_field_name="name", - queryset=Engagement.tags.tag_model.objects.all().order_by("name")) - not_tags = ModelMultipleChoiceFilter( - field_name="tags__name", - to_field_name="name", - exclude=True, - queryset=Engagement.tags.tag_model.objects.all().order_by("name")) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.form.fields["prod_type"].queryset = get_authorized_product_types("view") - self.form.fields["engagement__lead"].queryset = get_authorized_users("view") \ - .filter(engagement__lead__isnull=False).distinct() - self.form.fields["tags"].help_text = labels.ASSET_FILTERS_TAGS_HELP - self.form.fields["not_tags"].help_text = labels.ASSET_FILTERS_NOT_TAGS_HELP - - class Meta: - model = Product - fields = ["name", "prod_type"] - - -class ProductEngagementsFilter(DojoFilter): - engagement__name = CharFilter(field_name="name", lookup_expr="icontains", label="Engagement name contains") - engagement__lead = ModelChoiceFilter(field_name="lead", queryset=Dojo_User.objects.none(), label="Lead") - engagement__version = CharFilter(field_name="version", lookup_expr="icontains", label="Engagement version") - engagement__test__version = CharFilter(field_name="test__version", lookup_expr="icontains", label="Test version") - engagement__status = MultipleChoiceFilter(field_name="status", choices=ENGAGEMENT_STATUS_CHOICES, - label="Status") - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.form.fields["engagement__lead"].queryset = get_authorized_users("view") \ - .filter(engagement__lead__isnull=False).distinct() - - class Meta: - model = Engagement - fields = [] - - -class ProductEngagementsFilterWithoutObjectLookups(ProductEngagementsFilter): - engagement__lead = CharFilter( - field_name="lead__username", - lookup_expr="iexact", - label="Lead Username", - help_text="Search for Lead username that are an exact match") - - -class EngagementFilterWithoutObjectLookups(EngagementFilterHelper): - engagement__lead = CharFilter( - field_name="engagement__lead__username", - lookup_expr="iexact", - label="Lead Username", - help_text="Search for Lead username that are an exact match") - engagement__lead_contains = CharFilter( - field_name="engagement__lead__username", - lookup_expr="icontains", - label="Lead Username Contains", - help_text="Search for Lead username that contain a given pattern") - prod_type__name = CharFilter( - field_name="prod_type__name", - lookup_expr="iexact", - label=labels.ORG_FILTERS_LABEL, - help_text=labels.ORG_FILTERS_LABEL_HELP) - prod_type__name_contains = CharFilter( - field_name="prod_type__name", - lookup_expr="icontains", - label=labels.ORG_FILTERS_NAME_CONTAINS_LABEL, - help_text=labels.ORG_FILTERS_NAME_CONTAINS_HELP) - - class Meta: - model = Product - fields = ["name"] - - -class ProductEngagementFilterHelper(FilterSet): - version = CharFilter(lookup_expr="icontains", label="Engagement version") - test__version = CharFilter(field_name="test__version", lookup_expr="icontains", label="Test version") - name = CharFilter(lookup_expr="icontains") - status = MultipleChoiceFilter(choices=ENGAGEMENT_STATUS_CHOICES, label="Status") - target_start = DateRangeFilter() - target_end = DateRangeFilter() - tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag name contains") - not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Not tag name contains", exclude=True) - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("name", "name"), - ("version", "version"), - ("target_start", "target_start"), - ("target_end", "target_end"), - ("status", "status"), - ("lead", "lead"), - ), - field_labels={ - "name": "Engagement Name", - }, - ) - - class Meta: - model = Product - fields = ["name"] - - -class ProductEngagementFilter(ProductEngagementFilterHelper, DojoFilter): - lead = ModelChoiceFilter(queryset=Dojo_User.objects.none(), label="Lead") - tags = ModelMultipleChoiceFilter( - field_name="tags__name", - to_field_name="name", - queryset=Engagement.tags.tag_model.objects.all().order_by("name")) - not_tags = ModelMultipleChoiceFilter( - field_name="tags__name", - to_field_name="name", - exclude=True, - queryset=Engagement.tags.tag_model.objects.all().order_by("name")) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.form.fields["lead"].queryset = get_authorized_users( - "view").filter(engagement__lead__isnull=False).distinct() - - -class ProductEngagementFilterWithoutObjectLookups(ProductEngagementFilterHelper, DojoFilter): - lead = CharFilter( - field_name="lead__username", - lookup_expr="iexact", - label="Lead Username", - help_text="Search for Lead username that are an exact match") - lead_contains = CharFilter( - field_name="lead__username", - lookup_expr="icontains", - label="Lead Username Contains", - help_text="Search for Lead username that contain a given pattern") - - class ApiEngagementFilter(DojoFilter): product__prod_type = NumberInFilter(field_name="product__prod_type", lookup_expr="in") tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Tag name contains") @@ -3168,120 +2908,6 @@ class Meta: } -class EngagementTestFilterHelper(FilterSet): - version = CharFilter(lookup_expr="icontains", label="Version") - if settings.TRACK_IMPORT_HISTORY: - test_import__version = CharFilter(field_name="test_import__version", lookup_expr="icontains", label="Reimported Version") - target_start = DateRangeFilter() - target_end = DateRangeFilter() - tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag name contains") - not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Not tag name contains", exclude=True) - has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("title", "title"), - ("version", "version"), - ("target_start", "target_start"), - ("target_end", "target_end"), - ("lead", "lead"), - ("api_scan_configuration", "api_scan_configuration"), - ), - field_labels={ - "name": "Test Name", - }, - ) - - -class EngagementTestFilter(EngagementTestFilterHelper, DojoFilter): - lead = ModelChoiceFilter(queryset=Dojo_User.objects.none(), label="Lead") - api_scan_configuration = ModelChoiceFilter( - queryset=Product_API_Scan_Configuration.objects.none(), - label="API Scan Configuration") - tags = ModelMultipleChoiceFilter( - field_name="tags__name", - to_field_name="name", - queryset=Test.tags.tag_model.objects.all().order_by("name")) - not_tags = ModelMultipleChoiceFilter( - field_name="tags__name", - to_field_name="name", - exclude=True, - queryset=Test.tags.tag_model.objects.all().order_by("name")) - - class Meta: - model = Test - fields = [ - "title", "test_type", "target_start", - "target_end", "percent_complete", - "version", "api_scan_configuration", - ] - - def __init__(self, *args, **kwargs): - self.engagement = kwargs.pop("engagement") - super(DojoFilter, self).__init__(*args, **kwargs) - self.form.fields["test_type"].queryset = Test_Type.objects.filter(test__engagement=self.engagement).distinct().order_by("name") - self.form.fields["api_scan_configuration"].queryset = Product_API_Scan_Configuration.objects.filter(product=self.engagement.product).distinct() - self.form.fields["lead"].queryset = get_authorized_users("view") \ - .filter(test__lead__isnull=False).distinct() - - -class EngagementTestFilterWithoutObjectLookups(EngagementTestFilterHelper): - lead = CharFilter( - field_name="lead__username", - lookup_expr="iexact", - label="Lead Username", - help_text="Search for Lead username that are an exact match") - lead_contains = CharFilter( - field_name="lead__username", - lookup_expr="icontains", - label="Lead Username Contains", - help_text="Search for Lead username that contain a given pattern") - api_scan_configuration__tool_configuration__name = CharFilter( - field_name="api_scan_configuration__tool_configuration__name", - lookup_expr="iexact", - label="API Scan Configuration Name", - help_text="Search for Lead username that are an exact match") - api_scan_configuration__tool_configuration__name_contains = CharFilter( - field_name="api_scan_configuration__tool_configuration__name", - lookup_expr="icontains", - label="API Scan Configuration Name Contains", - help_text="Search for Lead username that contain a given pattern") - tags_contains = CharFilter( - label="Test Tag Contains", - field_name="tags__name", - lookup_expr="icontains", - help_text="Search for tags on a Test that contain a given pattern") - tags = CharFilter( - label="Test Tag", - field_name="tags__name", - lookup_expr="iexact", - help_text="Search for tags on a Test that are an exact match") - not_tags_contains = CharFilter( - label="Test Tag Does Not Contain", - field_name="tags__name", - lookup_expr="icontains", - help_text="Search for tags on a Test that contain a given pattern, and exclude them", - exclude=True) - not_tags = CharFilter( - label="Not Test Tag", - field_name="tags__name", - lookup_expr="iexact", - help_text="Search for tags on a Test that are an exact match, and exclude them", - exclude=True) - - class Meta: - model = Test - fields = [ - "title", "test_type", "target_start", - "target_end", "percent_complete", "version", - ] - - def __init__(self, *args, **kwargs): - self.engagement = kwargs.pop("engagement") - super().__init__(*args, **kwargs) - self.form.fields["test_type"].queryset = Test_Type.objects.filter(test__engagement=self.engagement).distinct().order_by("name") - - class ApiAppAnalysisFilter(DojoFilter): tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Tag name contains") tags = CharFieldInFilter( diff --git a/dojo/forms.py b/dojo/forms.py index cb11e819313..bab612749b0 100644 --- a/dojo/forms.py +++ b/dojo/forms.py @@ -29,7 +29,6 @@ from dojo.authorization.authorization import user_has_configuration_permission, user_is_superuser_or_global_owner from dojo.endpoint.utils import endpoint_filter, endpoint_get_or_create, validate_endpoints_to_add -from dojo.engagement.queries import get_authorized_engagements from dojo.finding.queries import get_authorized_findings from dojo.github.ui.forms import ( # noqa: F401 -- backward compat DeleteGITHUBConfForm, @@ -73,8 +72,6 @@ Dojo_User, DojoMeta, Endpoint, - Engagement, - Engagement_Presets, Engagement_Survey, FileUpload, Finding, @@ -971,95 +968,16 @@ class Meta: "sensitive_data", "sensitive_issues", "other", "other_issues"] -class EngForm(forms.ModelForm): - name = forms.CharField( - max_length=300, required=False, - help_text=( - "Add a descriptive name to identify this engagement. " - "Without a name the target start date will be set." - )) - description = forms.CharField(widget=forms.Textarea(attrs={}), - required=False, help_text="Description of the engagement and details regarding the engagement.") - product = forms.ModelChoiceField(label=labels.ASSET_LABEL, - queryset=Product.objects.none(), - required=True) - target_start = forms.DateField(widget=forms.TextInput( - attrs={"class": "datepicker", "autocomplete": "off"})) - target_end = forms.DateField(widget=forms.TextInput( - attrs={"class": "datepicker", "autocomplete": "off"})) - lead = forms.ModelChoiceField( - queryset=None, - required=True, label="Testing Lead") - test_strategy = forms.URLField(required=False, label="Test Strategy URL") - - def __init__(self, *args, **kwargs): - cicd = False - product = None - if "cicd" in kwargs: - cicd = kwargs.pop("cicd") - - if "product" in kwargs: - product = kwargs.pop("product") - - self.user = None - if "user" in kwargs: - self.user = kwargs.pop("user") - - super().__init__(*args, **kwargs) - - if product: - self.fields["preset"] = forms.ModelChoiceField(help_text="Settings and notes for performing this engagement.", required=False, queryset=Engagement_Presets.objects.filter(product=product)) - self.fields["lead"].queryset = get_authorized_users_for_product_and_product_type(None, product, "view").filter(is_active=True) - else: - self.fields["lead"].queryset = get_authorized_users("view").filter(is_active=True) - - self.fields["product"].queryset = get_authorized_products("add") - - # Don't show CICD fields on a interactive engagement - if cicd is False: - del self.fields["build_id"] - del self.fields["commit_hash"] - del self.fields["branch_tag"] - del self.fields["build_server"] - del self.fields["source_code_management_server"] - # del self.fields['source_code_management_uri'] - del self.fields["orchestration_engine"] - else: - del self.fields["test_strategy"] - del self.fields["status"] - - def is_valid(self): - valid = super().is_valid() - - # we're done now if not valid - if not valid: - return valid - if self.cleaned_data["target_start"] > self.cleaned_data["target_end"]: - self.add_error("target_start", "Your target start date exceeds your target end date") - self.add_error("target_end", "Your target start date exceeds your target end date") - return False - return True - - def clean_tags(self): - tag_validator(self.cleaned_data.get("tags")) - return self.cleaned_data.get("tags") - - class Meta: - model = Engagement - exclude = ("first_contacted", "real_start", "engagement_type", "inherited_tags", - "real_end", "requester", "reason", "updated", "report_type", - "product", "threat_model", "api_test", "pen_test", "check_list") - - -class DeleteEngagementForm(forms.ModelForm): - id = forms.IntegerField(required=True, - widget=forms.widgets.HiddenInput()) - - class Meta: - model = Engagement - fields = ["id"] - - +# Engagement forms live in dojo/engagement/ui/forms.py. Re-exported here for +# backward compat. DeleteEngagementForm has no external consumers, so it is not +# re-exported (imported directly from dojo.engagement.ui.forms by its only user). +from dojo.engagement.ui.forms import ( # noqa: E402, F401 -- backward compat + AddEngagementForm, + DeleteEngagementPresetsForm, + EngagementPresetsForm, + EngForm, + ExistingEngagementForm, +) from dojo.test.ui.forms import TestForm # noqa: E402, F401 -- backward compat @@ -2635,33 +2553,6 @@ def clean(self): return self.cleaned_data -class EngagementPresetsForm(forms.ModelForm): - - notes = forms.CharField(widget=forms.Textarea(attrs={}), - required=False, help_text="Description of what needs to be tested or setting up environment for testing") - - scope = forms.CharField(widget=forms.Textarea(attrs={}), - required=False, help_text="Scope of Engagement testing, IP's/Resources/URL's)") - - class Meta: - model = Engagement_Presets - exclude = ["product"] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - if disclaimer := get_system_setting("disclaimer_notes"): - self.disclaimer = disclaimer.strip() - - -class DeleteEngagementPresetsForm(forms.ModelForm): - id = forms.IntegerField(required=True, - widget=forms.widgets.HiddenInput()) - - class Meta: - model = Engagement_Presets - fields = ["id"] - - class SystemSettingsForm(forms.ModelForm): jira_webhook_secret = forms.CharField(required=False) @@ -3140,30 +3031,6 @@ class Meta: exclude = ["engagement", "survey", "responder", "completed", "answered_on"] -class AddEngagementForm(forms.Form): - product = forms.ModelChoiceField( - queryset=Product.objects.none(), - required=True, - widget=forms.widgets.Select(), - help_text="Select which product to attach Engagement") - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.fields["product"].queryset = get_authorized_products("add") - - -class ExistingEngagementForm(forms.Form): - engagement = forms.ModelChoiceField( - queryset=Engagement.objects.none(), - required=True, - widget=forms.widgets.Select(), - help_text="Select which Engagement to link the Questionnaire to") - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.fields["engagement"].queryset = get_authorized_engagements("edit").order_by("-target_start") - - class ConfigurationPermissionsForm(forms.Form): def __init__(self, *args, **kwargs): diff --git a/dojo/product/views.py b/dojo/product/views.py index 6f5afb4e9fa..f9dab849576 100644 --- a/dojo/product/views.py +++ b/dojo/product/views.py @@ -29,16 +29,18 @@ from dojo.authorization.authorization import user_has_permission_or_403 from dojo.authorization.roles_permissions import Permissions from dojo.components.sql_group_concat import Sql_GroupConcat -from dojo.filters import ( +from dojo.engagement.ui.filters import ( EngagementFilter, EngagementFilterWithoutObjectLookups, + ProductEngagementFilter, + ProductEngagementFilterWithoutObjectLookups, +) +from dojo.filters import ( MetricsEndpointFilter, MetricsEndpointFilterWithoutObjectLookups, MetricsFindingFilter, MetricsFindingFilterWithoutObjectLookups, ProductComponentFilter, - ProductEngagementFilter, - ProductEngagementFilterWithoutObjectLookups, ProductFilter, ProductFilterWithoutObjectLookups, ) From 712971d9ddda78532ca247785c04587bc112012e Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Sun, 7 Jun 2026 21:15:46 +0200 Subject: [PATCH 4/5] refactor(engagement): move views + urls into dojo/engagement/ui/ [engagement Phase 5] --- dojo/asset/urls.py | 2 +- dojo/authorization/url_permissions.py | 2 +- dojo/engagement/{ => ui}/urls.py | 2 +- dojo/engagement/{ => ui}/views.py | 0 dojo/urls.py | 2 +- unittests/test_query_utils.py | 2 +- 6 files changed, 5 insertions(+), 5 deletions(-) rename dojo/engagement/{ => ui}/urls.py (98%) rename dojo/engagement/{ => ui}/views.py (100%) diff --git a/dojo/asset/urls.py b/dojo/asset/urls.py index 3f4c5019fcc..55474a155d2 100644 --- a/dojo/asset/urls.py +++ b/dojo/asset/urls.py @@ -1,7 +1,7 @@ from django.conf import settings from django.urls import re_path -from dojo.engagement import views as dojo_engagement_views +from dojo.engagement.ui import views as dojo_engagement_views from dojo.product import views from dojo.utils import redirect_view diff --git a/dojo/authorization/url_permissions.py b/dojo/authorization/url_permissions.py index 65807ee52c2..d6c3ba64e0e 100644 --- a/dojo/authorization/url_permissions.py +++ b/dojo/authorization/url_permissions.py @@ -59,7 +59,7 @@ "delete_api_scan_configuration": [("object", Product_API_Scan_Configuration, "delete", "pascid")], # ----------------------------------------------------------------------- - # Engagement (dojo/engagement/views.py -> dojo/engagement/urls.py) + # Engagement (dojo/engagement/ui/views.py -> dojo/engagement/ui/urls.py) # ----------------------------------------------------------------------- "edit_engagement": [("object", Engagement, "edit", "eid")], "delete_engagement": [("object", Engagement, "delete", "eid")], diff --git a/dojo/engagement/urls.py b/dojo/engagement/ui/urls.py similarity index 98% rename from dojo/engagement/urls.py rename to dojo/engagement/ui/urls.py index 0f33c3aa697..0af9f481a87 100644 --- a/dojo/engagement/urls.py +++ b/dojo/engagement/ui/urls.py @@ -1,6 +1,6 @@ from django.urls import re_path -from dojo.engagement import views +from dojo.engagement.ui import views urlpatterns = [ # engagements and calendar diff --git a/dojo/engagement/views.py b/dojo/engagement/ui/views.py similarity index 100% rename from dojo/engagement/views.py rename to dojo/engagement/ui/views.py diff --git a/dojo/urls.py b/dojo/urls.py index 2930b601b11..6e3a60d2426 100644 --- a/dojo/urls.py +++ b/dojo/urls.py @@ -60,7 +60,7 @@ from dojo.components.urls import urlpatterns as component_urls from dojo.development_environment.urls import urlpatterns as dev_env_urls from dojo.endpoint.urls import urlpatterns as endpoint_urls -from dojo.engagement.urls import urlpatterns as eng_urls +from dojo.engagement.ui.urls import urlpatterns as eng_urls from dojo.finding.urls import urlpatterns as finding_urls from dojo.finding_group.urls import urlpatterns as finding_group_urls from dojo.github.ui.urls import urlpatterns as github_urls diff --git a/unittests/test_query_utils.py b/unittests/test_query_utils.py index e953efd1df9..5e98b507fac 100644 --- a/unittests/test_query_utils.py +++ b/unittests/test_query_utils.py @@ -1,6 +1,6 @@ from django.db.models import Count -from dojo.engagement.views import prefetch_for_view_tests +from dojo.engagement.ui.views import prefetch_for_view_tests from dojo.models import Finding, Test from unittests.dojo_test_case import DojoTestCase, versioned_fixtures From bc82e746a5956c7aebc67c22f41452fcf2c16756 Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Sun, 7 Jun 2026 21:30:31 +0200 Subject: [PATCH 5/5] refactor(engagement): extract API layer into dojo/engagement/api/ [engagement Phase 6,7,8,9] --- dojo/api_v2/serializers.py | 82 +------ dojo/api_v2/views.py | 337 --------------------------- dojo/engagement/api/__init__.py | 1 + dojo/engagement/api/filters.py | 69 ++++++ dojo/engagement/api/serializer.py | 101 ++++++++ dojo/engagement/api/urls.py | 7 + dojo/engagement/api/views.py | 371 ++++++++++++++++++++++++++++++ dojo/filters.py | 53 ----- dojo/urls.py | 6 +- unittests/test_rest_framework.py | 2 +- 10 files changed, 557 insertions(+), 472 deletions(-) create mode 100644 dojo/engagement/api/__init__.py create mode 100644 dojo/engagement/api/filters.py create mode 100644 dojo/engagement/api/serializer.py create mode 100644 dojo/engagement/api/urls.py create mode 100644 dojo/engagement/api/views.py diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index 73ba9db408c..0992f6c6ac3 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -48,7 +48,6 @@ Announcement, App_Analysis, BurpRawRequestResponse, - Check_List, Development_Environment, Dojo_User, DojoMeta, @@ -56,7 +55,6 @@ Endpoint_Params, Endpoint_Status, Engagement, - Engagement_Presets, FileUpload, Finding, Finding_Group, @@ -719,50 +717,14 @@ class Meta: fields = ["path"] +# Engagement serializers live in dojo/engagement/api/serializer.py. +# EngagementSerializer is re-exported here because ReportGenerateSerializer and +# RiskAcceptanceSerializer (below) still reference it. The other engagement +# serializers are imported directly from dojo.engagement.api by their consumers. +from dojo.engagement.api.serializer import EngagementSerializer # noqa: E402 -- backward compat from dojo.product_type.api.serializer import ProductTypeSerializer # noqa: E402 -class EngagementSerializer(serializers.ModelSerializer): - tags = TagListSerializerField(required=False) - - class Meta: - model = Engagement - exclude = ("inherited_tags",) - - def validate(self, data): - if self.context["request"].method == "POST": - if data.get("target_start") > data.get("target_end"): - msg = "Your target start date exceeds your target end date" - raise serializers.ValidationError(msg) - if ( - self.instance is not None - and "product" in data - and data.get("product") != self.instance.product - and not user_has_permission( - self.context["request"].user, - data.get("product"), - "edit", - ) - ): - msg = "You are not permitted to edit engagements in the destination product" - raise PermissionDenied(msg) - return data - - def build_relational_field(self, field_name, relation_info): - if field_name == "notes": - return NoteSerializer, {"many": True, "read_only": True} - if field_name == "files": - return FileSerializer, {"many": True, "read_only": True} - return super().build_relational_field(field_name, relation_info) - - -class EngagementToNotesSerializer(serializers.Serializer): - engagement_id = serializers.PrimaryKeyRelatedField( - queryset=Engagement.objects.all(), many=False, allow_null=True, - ) - notes = NoteSerializer(many=True) - - class RiskAcceptanceToNotesSerializer(serializers.Serializer): risk_acceptance_id = serializers.PrimaryKeyRelatedField( queryset=Risk_Acceptance.objects.all(), many=False, allow_null=True, @@ -770,34 +732,6 @@ class RiskAcceptanceToNotesSerializer(serializers.Serializer): notes = NoteSerializer(many=True) -class EngagementToFilesSerializer(serializers.Serializer): - engagement_id = serializers.PrimaryKeyRelatedField( - queryset=Engagement.objects.all(), many=False, allow_null=True, - ) - files = FileSerializer(many=True) - - def to_representation(self, data): - engagement = data.get("engagement_id") - files = data.get("files") - new_files = [{ - "id": file.id, - "file": "{site_url}/{file_access_url}".format( - site_url=settings.SITE_URL, - file_access_url=file.get_accessible_url( - engagement, engagement.id, - ), - ), - "title": file.title, - } for file in files] - return {"engagement_id": engagement.id, "files": new_files} - - -class EngagementCheckListSerializer(serializers.ModelSerializer): - class Meta: - model = Check_List - fields = "__all__" - - class AppAnalysisSerializer(serializers.ModelSerializer): tags = TagListSerializerField(required=False) @@ -2537,12 +2471,6 @@ class FindingNoteSerializer(serializers.Serializer): from dojo.notifications.api.serializer import NotificationsSerializer # noqa: E402, F401 -- backward compat -class EngagementPresetsSerializer(serializers.ModelSerializer): - class Meta: - model = Engagement_Presets - fields = "__all__" - - class NetworkLocationsSerializer(serializers.ModelSerializer): class Meta: model = Network_Locations diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index 55826031ea5..11c728dd435 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -44,22 +44,17 @@ prefetch, serializers, ) -from dojo.api_v2.prefetch.prefetcher import _Prefetcher from dojo.authorization import api_permissions as permissions from dojo.authorization.authorization import user_has_permission_or_403 -from dojo.celery_dispatch import dojo_dispatch_task from dojo.endpoint.queries import ( get_authorized_endpoint_status, get_authorized_endpoints, ) from dojo.endpoint.views import get_endpoint_ids -from dojo.engagement.queries import get_authorized_engagements -from dojo.engagement.services import close_engagement, reopen_engagement from dojo.filters import ( ApiAppAnalysisFilter, ApiDojoMetaFilter, ApiEndpointFilter, - ApiEngagementFilter, ApiFindingFilter, ApiProductFilter, ApiRiskAcceptanceFilter, @@ -83,14 +78,11 @@ Announcement, App_Analysis, BurpRawRequestResponse, - Check_List, Development_Environment, Dojo_User, DojoMeta, Endpoint, Endpoint_Status, - Engagement, - Engagement_Presets, FileUpload, Finding, Finding_Template, @@ -118,7 +110,6 @@ from dojo.product.queries import ( get_authorized_app_analysis, get_authorized_dojo_meta, - get_authorized_engagement_presets, get_authorized_languages, get_authorized_product_api_scan_configurations, get_authorized_products, @@ -318,317 +309,6 @@ def get_queryset(self): ).distinct() -# Authorization: object-based -# @extend_schema_view(**schema_with_prefetch()) -# Nested models with prefetch make the response schema too long for Swagger UI -class EngagementViewSet( - # PrefetchDojoModelViewSet, - DojoModelViewSet, - ra_api.AcceptedRisksMixin, -): - serializer_class = serializers.EngagementSerializer - queryset = Engagement.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_class = ApiEngagementFilter - - permission_classes = ( - IsAuthenticated, - permissions.UserHasEngagementPermission, - ) - - @property - def risk_application_model_class(self): - return Engagement - - def destroy(self, request, *args, **kwargs): - instance = self.get_object() - if get_setting("ASYNC_OBJECT_DELETE"): - async_del = async_delete() - async_del.delete(instance) - else: - instance.delete() - return Response(status=status.HTTP_204_NO_CONTENT) - - def get_queryset(self): - return ( - get_authorized_engagements("view") - .prefetch_related("notes", "risk_acceptance", "files") - .distinct() - ) - - @extend_schema( - request=OpenApiTypes.NONE, responses={status.HTTP_200_OK: ""}, - ) - @action(detail=True, methods=["post"], permission_classes=(IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission)) - def close(self, request, pk=None): - eng = self.get_object() - close_engagement(eng) - return Response({}, status=status.HTTP_200_OK) - - @extend_schema( - request=OpenApiTypes.NONE, responses={status.HTTP_200_OK: ""}, - ) - @action(detail=True, methods=["post"], permission_classes=(IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission)) - def reopen(self, request, pk=None): - eng = self.get_object() - reopen_engagement(eng) - return Response({}, status=status.HTTP_200_OK) - - @extend_schema( - request=serializers.ReportGenerateOptionSerializer, - responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, - ) - @action( - detail=True, methods=["post"], - # IsAuthenticated only: report generation requires View permission, - # enforced by the permission-filtered get_queryset(). The viewset's - # permission_classes would check Edit (POST), which is too restrictive. - permission_classes=[IsAuthenticated], - ) - def generate_report(self, request, pk=None): - engagement = self.get_object() - - options = {} - # prepare post data - report_options = serializers.ReportGenerateOptionSerializer( - data=request.data, - ) - if report_options.is_valid(): - options["include_finding_notes"] = report_options.validated_data[ - "include_finding_notes" - ] - options["include_finding_images"] = report_options.validated_data[ - "include_finding_images" - ] - options[ - "include_executive_summary" - ] = report_options.validated_data["include_executive_summary"] - options[ - "include_table_of_contents" - ] = report_options.validated_data["include_table_of_contents"] - else: - return Response( - report_options.errors, status=status.HTTP_400_BAD_REQUEST, - ) - - data = report_generate(request, engagement, options) - report = serializers.ReportGenerateSerializer(data) - return Response(report.data) - - @extend_schema( - methods=["GET"], - responses={ - status.HTTP_200_OK: serializers.EngagementToNotesSerializer, - }, - ) - @extend_schema( - methods=["POST"], - request=serializers.AddNewNoteOptionSerializer, - responses={status.HTTP_201_CREATED: serializers.NoteSerializer}, - ) - @action(detail=True, methods=["get", "post"], permission_classes=[IsAuthenticated, permissions.UserHasEngagementNotePermission]) - def notes(self, request, pk=None): - engagement = self.get_object() - if request.method == "POST": - new_note = serializers.AddNewNoteOptionSerializer( - data=request.data, - ) - if new_note.is_valid(): - entry = new_note.validated_data["entry"] - private = new_note.validated_data.get("private", False) - note_type = new_note.validated_data.get("note_type", None) - else: - return Response( - new_note.errors, status=status.HTTP_400_BAD_REQUEST, - ) - - notes = engagement.notes.filter(note_type=note_type).first() - if notes and note_type and note_type.is_single: - return Response("Only one instance of this note_type allowed on an engagement.", status=status.HTTP_400_BAD_REQUEST) - - author = request.user - note = Notes( - entry=entry, - author=author, - private=private, - note_type=note_type, - ) - note.save() - # Add an entry to the note history - history = NoteHistory.objects.create(data=note.entry, time=note.date, current_editor=note.author) - note.history.add(history) - # Now add the note to the object - engagement.notes.add(note) - # Determine if we need to send any notifications for user mentioned - process_tag_notifications( - request=request, - note=note, - parent_url=request.build_absolute_uri( - reverse("view_engagement", args=(engagement.id,)), - ), - parent_title=f"Engagement: {engagement.name}", - ) - - serialized_note = serializers.NoteSerializer( - {"author": author, "entry": entry, "private": private}, - ) - return Response( - serialized_note.data, status=status.HTTP_201_CREATED, - ) - notes = engagement.notes.all() - - serialized_notes = serializers.EngagementToNotesSerializer( - {"engagement_id": engagement, "notes": notes}, - ) - return Response(serialized_notes.data, status=status.HTTP_200_OK) - - @extend_schema( - methods=["GET"], - responses={ - status.HTTP_200_OK: serializers.EngagementToFilesSerializer, - }, - ) - @extend_schema( - methods=["POST"], - request=serializers.AddNewFileOptionSerializer, - responses={status.HTTP_201_CREATED: serializers.FileSerializer}, - ) - @action( - detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission], - ) - def files(self, request, pk=None): - engagement = self.get_object() - if request.method == "POST": - new_file = serializers.FileSerializer(data=request.data) - if new_file.is_valid(): - title = new_file.validated_data["title"] - file = new_file.validated_data["file"] - else: - return Response( - new_file.errors, status=status.HTTP_400_BAD_REQUEST, - ) - - file = FileUpload(title=title, file=file) - file.save() - engagement.files.add(file) - - serialized_file = serializers.FileSerializer(file) - return Response( - serialized_file.data, status=status.HTTP_201_CREATED, - ) - - files = engagement.files.all() - serialized_files = serializers.EngagementToFilesSerializer( - {"engagement_id": engagement, "files": files}, - ) - return Response(serialized_files.data, status=status.HTTP_200_OK) - - @extend_schema( - methods=["POST"], - request=serializers.EngagementCheckListSerializer, - responses={ - status.HTTP_201_CREATED: serializers.EngagementCheckListSerializer, - }, - ) - @action(detail=True, methods=["get", "post"], permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission]) - def complete_checklist(self, request, pk=None): - engagement = self.get_object() - check_lists = Check_List.objects.filter(engagement=engagement) - if request.method == "POST": - if check_lists.count() > 0: - return Response( - { - "message": "A completed checklist for this engagement already exists.", - }, - status=status.HTTP_400_BAD_REQUEST, - ) - check_list = serializers.EngagementCheckListSerializer( - data=request.data, - ) - if not check_list.is_valid(): - return Response( - check_list.errors, status=status.HTTP_400_BAD_REQUEST, - ) - check_list = Check_List(**check_list.data) - check_list.engagement = engagement - check_list.save() - serialized_check_list = serializers.EngagementCheckListSerializer( - check_list, - ) - return Response( - serialized_check_list.data, status=status.HTTP_201_CREATED, - ) - prefetch_params = request.GET.get("prefetch", "").split(",") - prefetcher = _Prefetcher() - entry = check_lists.first() - # Get the queried object representation - result = serializers.EngagementCheckListSerializer(entry).data - prefetcher._prefetch(entry, prefetch_params) - result["prefetch"] = prefetcher.prefetched_data - return Response(result, status=status.HTTP_200_OK) - - @extend_schema( - methods=["GET"], - responses={ - status.HTTP_200_OK: serializers.RawFileSerializer, - }, - ) - @action( - detail=True, - methods=["get"], - url_path=r"files/download/(?P\d+)", - permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission], - ) - def download_file(self, request, file_id, pk=None): - engagement = self.get_object() - # Get the file object - file_object_qs = engagement.files.filter(id=file_id) - file_object = ( - file_object_qs.first() if len(file_object_qs) > 0 else None - ) - if file_object is None: - return Response( - {"error": "File ID not associated with Engagement"}, - status=status.HTTP_404_NOT_FOUND, - ) - # send file - return generate_file_response(file_object) - - @extend_schema( - request=serializers.EngagementUpdateJiraEpicSerializer, - responses={status.HTTP_200_OK: serializers.EngagementUpdateJiraEpicSerializer}, - ) - @action( - detail=True, methods=["post"], - permission_classes=(IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission), - ) - def update_jira_epic(self, request, pk=None): - engagement = self.get_object() - try: - if engagement.has_jira_issue: - task = jira_services.get_epic_task("update_epic") - if task: - dojo_dispatch_task(task, engagement.id, **request.data) - response = Response( - {"info": "Jira Epic update query sent"}, - status=status.HTTP_200_OK, - ) - else: - task = jira_services.get_epic_task("add_epic") - if task: - dojo_dispatch_task(task, engagement.id, **request.data) - response = Response( - {"info": "Jira Epic create query sent"}, - status=status.HTTP_200_OK, - ) - except ValidationError: - return Response( - {"error": "Bad Request!"}, - status=status.HTTP_400_BAD_REQUEST, - ) - return response - - # @extend_schema_view(**schema_with_prefetch()) # Nested models with prefetch make the response schema too long for Swagger UI class RiskAcceptanceViewSet( @@ -2593,23 +2273,6 @@ def queue_task_purge(self, request): return Response({"purged": purged}) -@extend_schema_view(**schema_with_prefetch()) -class EngagementPresetsViewset( - PrefetchDojoModelViewSet, -): - serializer_class = serializers.EngagementPresetsSerializer - queryset = Engagement_Presets.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_fields = ["id", "title", "product"] - permission_classes = ( - IsAuthenticated, - permissions.UserHasEngagementPresetPermission, - ) - - def get_queryset(self): - return get_authorized_engagement_presets("view") - - class NetworkLocationsViewset( DojoModelViewSet, ): diff --git a/dojo/engagement/api/__init__.py b/dojo/engagement/api/__init__.py new file mode 100644 index 00000000000..60e35ed2e10 --- /dev/null +++ b/dojo/engagement/api/__init__.py @@ -0,0 +1 @@ +path = "engagements" # noqa: RUF067 diff --git a/dojo/engagement/api/filters.py b/dojo/engagement/api/filters.py new file mode 100644 index 00000000000..1015b019fd0 --- /dev/null +++ b/dojo/engagement/api/filters.py @@ -0,0 +1,69 @@ +from django_filters import ( + BooleanFilter, + CharFilter, + OrderingFilter, +) + +from dojo.filters import ( + CharFieldFilterANDExpression, + CharFieldInFilter, + DojoFilter, + NumberInFilter, +) +from dojo.labels import get_labels +from dojo.models import Engagement + +labels = get_labels() + + +class ApiEngagementFilter(DojoFilter): + product__prod_type = NumberInFilter(field_name="product__prod_type", lookup_expr="in") + tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Tag name contains") + tags = CharFieldInFilter( + field_name="tags__name", + lookup_expr="in", + help_text="Comma separated list of exact tags (uses OR for multiple values)") + tags__and = CharFieldFilterANDExpression( + field_name="tags__name", + help_text="Comma separated list of exact tags to match with an AND expression") + product__tags = CharFieldInFilter( + field_name="product__tags__name", + lookup_expr="in", + help_text=labels.ASSET_FILTERS_CSV_TAGS_OR_HELP) + product__tags__and = CharFieldFilterANDExpression( + field_name="product__tags__name", + help_text=labels.ASSET_FILTERS_CSV_TAGS_AND_HELP) + + not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Not Tag name contains", exclude="True") + not_tags = CharFieldInFilter(field_name="tags__name", lookup_expr="in", + help_text="Comma separated list of exact tags not present on model", exclude="True") + not_product__tags = CharFieldInFilter(field_name="product__tags__name", + lookup_expr="in", + help_text=labels.ASSET_FILTERS_CSV_TAGS_NOT_HELP, + exclude="True") + has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") + + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("name", "name"), + ("version", "version"), + ("target_start", "target_start"), + ("target_end", "target_end"), + ("status", "status"), + ("lead", "lead"), + ("created", "created"), + ("updated", "updated"), + ), + field_labels={ + "name": "Engagement Name", + }, + + ) + + class Meta: + model = Engagement + fields = ["id", "active", "target_start", + "target_end", "requester", "report_type", + "updated", "threat_model", "api_test", + "pen_test", "status", "product", "name", "version", "tags"] diff --git a/dojo/engagement/api/serializer.py b/dojo/engagement/api/serializer.py new file mode 100644 index 00000000000..1ebe2857f13 --- /dev/null +++ b/dojo/engagement/api/serializer.py @@ -0,0 +1,101 @@ +from django.conf import settings +from django.core.exceptions import PermissionDenied +from rest_framework import serializers + +from dojo.authorization.authorization import user_has_permission +from dojo.models import Check_List, Engagement, Engagement_Presets + + +class EngagementSerializer(serializers.ModelSerializer): + class Meta: + model = Engagement + exclude = ("inherited_tags",) + + def get_fields(self): + from dojo.api_v2.serializers import ( # noqa: PLC0415 -- lazy import, avoids circular dependency + TagListSerializerField, + ) + fields = super().get_fields() + fields["tags"] = TagListSerializerField(required=False) + return fields + + def validate(self, data): + if self.context["request"].method == "POST": + if data.get("target_start") > data.get("target_end"): + msg = "Your target start date exceeds your target end date" + raise serializers.ValidationError(msg) + if ( + self.instance is not None + and "product" in data + and data.get("product") != self.instance.product + and not user_has_permission( + self.context["request"].user, + data.get("product"), + "edit", + ) + ): + msg = "You are not permitted to edit engagements in the destination product" + raise PermissionDenied(msg) + return data + + def build_relational_field(self, field_name, relation_info): + from dojo.api_v2.serializers import ( # noqa: PLC0415 -- lazy import, avoids circular dependency + FileSerializer, + NoteSerializer, + ) + if field_name == "notes": + return NoteSerializer, {"many": True, "read_only": True} + if field_name == "files": + return FileSerializer, {"many": True, "read_only": True} + return super().build_relational_field(field_name, relation_info) + + +class EngagementToNotesSerializer(serializers.Serializer): + engagement_id = serializers.PrimaryKeyRelatedField( + queryset=Engagement.objects.all(), many=False, allow_null=True, + ) + + def get_fields(self): + from dojo.api_v2.serializers import NoteSerializer # noqa: PLC0415 -- lazy import, avoids circular dependency + fields = super().get_fields() + fields["notes"] = NoteSerializer(many=True) + return fields + + +class EngagementToFilesSerializer(serializers.Serializer): + engagement_id = serializers.PrimaryKeyRelatedField( + queryset=Engagement.objects.all(), many=False, allow_null=True, + ) + + def get_fields(self): + from dojo.api_v2.serializers import FileSerializer # noqa: PLC0415 -- lazy import, avoids circular dependency + fields = super().get_fields() + fields["files"] = FileSerializer(many=True) + return fields + + def to_representation(self, data): + engagement = data.get("engagement_id") + files = data.get("files") + new_files = [{ + "id": file.id, + "file": "{site_url}/{file_access_url}".format( + site_url=settings.SITE_URL, + file_access_url=file.get_accessible_url( + engagement, engagement.id, + ), + ), + "title": file.title, + } for file in files] + return {"engagement_id": engagement.id, "files": new_files} + + +class EngagementCheckListSerializer(serializers.ModelSerializer): + class Meta: + model = Check_List + fields = "__all__" + + +class EngagementPresetsSerializer(serializers.ModelSerializer): + class Meta: + model = Engagement_Presets + fields = "__all__" diff --git a/dojo/engagement/api/urls.py b/dojo/engagement/api/urls.py new file mode 100644 index 00000000000..7c5ba0c2758 --- /dev/null +++ b/dojo/engagement/api/urls.py @@ -0,0 +1,7 @@ +from dojo.engagement.api.views import EngagementPresetsViewset, EngagementViewSet + + +def add_engagement_urls(router): + router.register("engagements", EngagementViewSet, basename="engagement") + router.register("engagement_presets", EngagementPresetsViewset, basename="engagement_presets") + return router diff --git a/dojo/engagement/api/views.py b/dojo/engagement/api/views.py new file mode 100644 index 00000000000..34ed278a358 --- /dev/null +++ b/dojo/engagement/api/views.py @@ -0,0 +1,371 @@ +from django.core.exceptions import ValidationError +from django.urls import reverse +from django_filters.rest_framework import DjangoFilterBackend +from drf_spectacular.types import OpenApiTypes +from drf_spectacular.utils import extend_schema, extend_schema_view +from rest_framework import status +from rest_framework.decorators import action +from rest_framework.parsers import MultiPartParser +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response + +from dojo.api_v2 import serializers as api_v2_serializers +from dojo.api_v2.prefetch.prefetcher import _Prefetcher +from dojo.api_v2.views import DojoModelViewSet, PrefetchDojoModelViewSet, report_generate, schema_with_prefetch +from dojo.authorization import api_permissions as permissions +from dojo.celery_dispatch import dojo_dispatch_task +from dojo.engagement.api.filters import ApiEngagementFilter +from dojo.engagement.api.serializer import ( + EngagementCheckListSerializer, + EngagementPresetsSerializer, + EngagementSerializer, + EngagementToFilesSerializer, + EngagementToNotesSerializer, +) +from dojo.engagement.queries import get_authorized_engagements +from dojo.engagement.services import close_engagement, reopen_engagement +from dojo.jira import services as jira_services +from dojo.models import ( + Check_List, + Engagement, + Engagement_Presets, + FileUpload, + NoteHistory, + Notes, +) +from dojo.product.queries import get_authorized_engagement_presets +from dojo.risk_acceptance import api as ra_api +from dojo.utils import ( + async_delete, + generate_file_response, + get_setting, + process_tag_notifications, +) + + +# Authorization: object-based +# @extend_schema_view(**schema_with_prefetch()) +# Nested models with prefetch make the response schema too long for Swagger UI +class EngagementViewSet( + # PrefetchDojoModelViewSet, + DojoModelViewSet, + ra_api.AcceptedRisksMixin, +): + serializer_class = EngagementSerializer + queryset = Engagement.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_class = ApiEngagementFilter + + permission_classes = ( + IsAuthenticated, + permissions.UserHasEngagementPermission, + ) + + @property + def risk_application_model_class(self): + return Engagement + + def destroy(self, request, *args, **kwargs): + instance = self.get_object() + if get_setting("ASYNC_OBJECT_DELETE"): + async_del = async_delete() + async_del.delete(instance) + else: + instance.delete() + return Response(status=status.HTTP_204_NO_CONTENT) + + def get_queryset(self): + return ( + get_authorized_engagements("view") + .prefetch_related("notes", "risk_acceptance", "files") + .distinct() + ) + + @extend_schema( + request=OpenApiTypes.NONE, responses={status.HTTP_200_OK: ""}, + ) + @action(detail=True, methods=["post"], permission_classes=(IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission)) + def close(self, request, pk=None): + eng = self.get_object() + close_engagement(eng) + return Response({}, status=status.HTTP_200_OK) + + @extend_schema( + request=OpenApiTypes.NONE, responses={status.HTTP_200_OK: ""}, + ) + @action(detail=True, methods=["post"], permission_classes=(IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission)) + def reopen(self, request, pk=None): + eng = self.get_object() + reopen_engagement(eng) + return Response({}, status=status.HTTP_200_OK) + + @extend_schema( + request=api_v2_serializers.ReportGenerateOptionSerializer, + responses={status.HTTP_200_OK: api_v2_serializers.ReportGenerateSerializer}, + ) + @action( + detail=True, methods=["post"], + # IsAuthenticated only: report generation requires View permission, + # enforced by the permission-filtered get_queryset(). The viewset's + # permission_classes would check Edit (POST), which is too restrictive. + permission_classes=[IsAuthenticated], + ) + def generate_report(self, request, pk=None): + engagement = self.get_object() + + options = {} + # prepare post data + report_options = api_v2_serializers.ReportGenerateOptionSerializer( + data=request.data, + ) + if report_options.is_valid(): + options["include_finding_notes"] = report_options.validated_data[ + "include_finding_notes" + ] + options["include_finding_images"] = report_options.validated_data[ + "include_finding_images" + ] + options[ + "include_executive_summary" + ] = report_options.validated_data["include_executive_summary"] + options[ + "include_table_of_contents" + ] = report_options.validated_data["include_table_of_contents"] + else: + return Response( + report_options.errors, status=status.HTTP_400_BAD_REQUEST, + ) + + data = report_generate(request, engagement, options) + report = api_v2_serializers.ReportGenerateSerializer(data) + return Response(report.data) + + @extend_schema( + methods=["GET"], + responses={ + status.HTTP_200_OK: EngagementToNotesSerializer, + }, + ) + @extend_schema( + methods=["POST"], + request=api_v2_serializers.AddNewNoteOptionSerializer, + responses={status.HTTP_201_CREATED: api_v2_serializers.NoteSerializer}, + ) + @action(detail=True, methods=["get", "post"], permission_classes=[IsAuthenticated, permissions.UserHasEngagementNotePermission]) + def notes(self, request, pk=None): + engagement = self.get_object() + if request.method == "POST": + new_note = api_v2_serializers.AddNewNoteOptionSerializer( + data=request.data, + ) + if new_note.is_valid(): + entry = new_note.validated_data["entry"] + private = new_note.validated_data.get("private", False) + note_type = new_note.validated_data.get("note_type", None) + else: + return Response( + new_note.errors, status=status.HTTP_400_BAD_REQUEST, + ) + + notes = engagement.notes.filter(note_type=note_type).first() + if notes and note_type and note_type.is_single: + return Response("Only one instance of this note_type allowed on an engagement.", status=status.HTTP_400_BAD_REQUEST) + + author = request.user + note = Notes( + entry=entry, + author=author, + private=private, + note_type=note_type, + ) + note.save() + # Add an entry to the note history + history = NoteHistory.objects.create(data=note.entry, time=note.date, current_editor=note.author) + note.history.add(history) + # Now add the note to the object + engagement.notes.add(note) + # Determine if we need to send any notifications for user mentioned + process_tag_notifications( + request=request, + note=note, + parent_url=request.build_absolute_uri( + reverse("view_engagement", args=(engagement.id,)), + ), + parent_title=f"Engagement: {engagement.name}", + ) + + serialized_note = api_v2_serializers.NoteSerializer( + {"author": author, "entry": entry, "private": private}, + ) + return Response( + serialized_note.data, status=status.HTTP_201_CREATED, + ) + notes = engagement.notes.all() + + serialized_notes = EngagementToNotesSerializer( + {"engagement_id": engagement, "notes": notes}, + ) + return Response(serialized_notes.data, status=status.HTTP_200_OK) + + @extend_schema( + methods=["GET"], + responses={ + status.HTTP_200_OK: EngagementToFilesSerializer, + }, + ) + @extend_schema( + methods=["POST"], + request=api_v2_serializers.AddNewFileOptionSerializer, + responses={status.HTTP_201_CREATED: api_v2_serializers.FileSerializer}, + ) + @action( + detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission], + ) + def files(self, request, pk=None): + engagement = self.get_object() + if request.method == "POST": + new_file = api_v2_serializers.FileSerializer(data=request.data) + if new_file.is_valid(): + title = new_file.validated_data["title"] + file = new_file.validated_data["file"] + else: + return Response( + new_file.errors, status=status.HTTP_400_BAD_REQUEST, + ) + + file = FileUpload(title=title, file=file) + file.save() + engagement.files.add(file) + + serialized_file = api_v2_serializers.FileSerializer(file) + return Response( + serialized_file.data, status=status.HTTP_201_CREATED, + ) + + files = engagement.files.all() + serialized_files = EngagementToFilesSerializer( + {"engagement_id": engagement, "files": files}, + ) + return Response(serialized_files.data, status=status.HTTP_200_OK) + + @extend_schema( + methods=["POST"], + request=EngagementCheckListSerializer, + responses={ + status.HTTP_201_CREATED: EngagementCheckListSerializer, + }, + ) + @action(detail=True, methods=["get", "post"], permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission]) + def complete_checklist(self, request, pk=None): + engagement = self.get_object() + check_lists = Check_List.objects.filter(engagement=engagement) + if request.method == "POST": + if check_lists.count() > 0: + return Response( + { + "message": "A completed checklist for this engagement already exists.", + }, + status=status.HTTP_400_BAD_REQUEST, + ) + check_list = EngagementCheckListSerializer( + data=request.data, + ) + if not check_list.is_valid(): + return Response( + check_list.errors, status=status.HTTP_400_BAD_REQUEST, + ) + check_list = Check_List(**check_list.data) + check_list.engagement = engagement + check_list.save() + serialized_check_list = EngagementCheckListSerializer( + check_list, + ) + return Response( + serialized_check_list.data, status=status.HTTP_201_CREATED, + ) + prefetch_params = request.GET.get("prefetch", "").split(",") + prefetcher = _Prefetcher() + entry = check_lists.first() + # Get the queried object representation + result = EngagementCheckListSerializer(entry).data + prefetcher._prefetch(entry, prefetch_params) + result["prefetch"] = prefetcher.prefetched_data + return Response(result, status=status.HTTP_200_OK) + + @extend_schema( + methods=["GET"], + responses={ + status.HTTP_200_OK: api_v2_serializers.RawFileSerializer, + }, + ) + @action( + detail=True, + methods=["get"], + url_path=r"files/download/(?P\d+)", + permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission], + ) + def download_file(self, request, file_id, pk=None): + engagement = self.get_object() + # Get the file object + file_object_qs = engagement.files.filter(id=file_id) + file_object = ( + file_object_qs.first() if len(file_object_qs) > 0 else None + ) + if file_object is None: + return Response( + {"error": "File ID not associated with Engagement"}, + status=status.HTTP_404_NOT_FOUND, + ) + # send file + return generate_file_response(file_object) + + @extend_schema( + request=api_v2_serializers.EngagementUpdateJiraEpicSerializer, + responses={status.HTTP_200_OK: api_v2_serializers.EngagementUpdateJiraEpicSerializer}, + ) + @action( + detail=True, methods=["post"], + permission_classes=(IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission), + ) + def update_jira_epic(self, request, pk=None): + engagement = self.get_object() + try: + if engagement.has_jira_issue: + task = jira_services.get_epic_task("update_epic") + if task: + dojo_dispatch_task(task, engagement.id, **request.data) + response = Response( + {"info": "Jira Epic update query sent"}, + status=status.HTTP_200_OK, + ) + else: + task = jira_services.get_epic_task("add_epic") + if task: + dojo_dispatch_task(task, engagement.id, **request.data) + response = Response( + {"info": "Jira Epic create query sent"}, + status=status.HTTP_200_OK, + ) + except ValidationError: + return Response( + {"error": "Bad Request!"}, + status=status.HTTP_400_BAD_REQUEST, + ) + return response + + +@extend_schema_view(**schema_with_prefetch()) +class EngagementPresetsViewset( + PrefetchDojoModelViewSet, +): + serializer_class = EngagementPresetsSerializer + queryset = Engagement_Presets.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_fields = ["id", "title", "product"] + permission_classes = ( + IsAuthenticated, + permissions.UserHasEngagementPresetPermission, + ) + + def get_queryset(self): + return get_authorized_engagement_presets("view") diff --git a/dojo/filters.py b/dojo/filters.py index 21357ca5b9e..46ca7e0927b 100644 --- a/dojo/filters.py +++ b/dojo/filters.py @@ -1056,59 +1056,6 @@ def __init__(self, *args, **kwargs): "test__engagement__product"].queryset = get_authorized_products("view") -class ApiEngagementFilter(DojoFilter): - product__prod_type = NumberInFilter(field_name="product__prod_type", lookup_expr="in") - tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Tag name contains") - tags = CharFieldInFilter( - field_name="tags__name", - lookup_expr="in", - help_text="Comma separated list of exact tags (uses OR for multiple values)") - tags__and = CharFieldFilterANDExpression( - field_name="tags__name", - help_text="Comma separated list of exact tags to match with an AND expression") - product__tags = CharFieldInFilter( - field_name="product__tags__name", - lookup_expr="in", - help_text=labels.ASSET_FILTERS_CSV_TAGS_OR_HELP) - product__tags__and = CharFieldFilterANDExpression( - field_name="product__tags__name", - help_text=labels.ASSET_FILTERS_CSV_TAGS_AND_HELP) - - not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Not Tag name contains", exclude="True") - not_tags = CharFieldInFilter(field_name="tags__name", lookup_expr="in", - help_text="Comma separated list of exact tags not present on model", exclude="True") - not_product__tags = CharFieldInFilter(field_name="product__tags__name", - lookup_expr="in", - help_text=labels.ASSET_FILTERS_CSV_TAGS_NOT_HELP, - exclude="True") - has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") - - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("name", "name"), - ("version", "version"), - ("target_start", "target_start"), - ("target_end", "target_end"), - ("status", "status"), - ("lead", "lead"), - ("created", "created"), - ("updated", "updated"), - ), - field_labels={ - "name": "Engagement Name", - }, - - ) - - class Meta: - model = Engagement - fields = ["id", "active", "target_start", - "target_end", "requester", "report_type", - "updated", "threat_model", "api_test", - "pen_test", "status", "product", "name", "version", "tags"] - - class ProductFilterHelper(FilterSet): name = CharFilter(lookup_expr="icontains", label=labels.ASSET_FILTERS_NAME_LABEL) name_exact = CharFilter(field_name="name", lookup_expr="iexact", label=labels.ASSET_FILTERS_NAME_EXACT_LABEL) diff --git a/dojo/urls.py b/dojo/urls.py index 6e3a60d2426..cfccb1152d1 100644 --- a/dojo/urls.py +++ b/dojo/urls.py @@ -22,8 +22,6 @@ EndpointMetaImporterView, EndpointStatusViewSet, EndPointViewSet, - EngagementPresetsViewset, - EngagementViewSet, FindingTemplatesViewSet, FindingViewSet, ImportLanguagesView, @@ -60,6 +58,7 @@ from dojo.components.urls import urlpatterns as component_urls from dojo.development_environment.urls import urlpatterns as dev_env_urls from dojo.endpoint.urls import urlpatterns as endpoint_urls +from dojo.engagement.api.urls import add_engagement_urls from dojo.engagement.ui.urls import urlpatterns as eng_urls from dojo.finding.urls import urlpatterns as finding_urls from dojo.finding_group.urls import urlpatterns as finding_group_urls @@ -111,8 +110,6 @@ # RBAC endpoints moved to Pro under legacy authorization: # dojo_groups, dojo_group_members → pro/groups, pro/group_members v2_api.register(r"endpoint_meta_import", EndpointMetaImporterView, basename="endpointmetaimport") -v2_api.register(r"engagements", EngagementViewSet, basename="engagement") -v2_api.register(r"engagement_presets", EngagementPresetsViewset, basename="engagement_presets") v2_api.register(r"finding_templates", FindingTemplatesViewSet, basename="finding_template") v2_api.register(r"findings", FindingViewSet, basename="finding") # RBAC endpoint moved to Pro under legacy authorization: global_roles → pro/global_roles @@ -135,6 +132,7 @@ # RBAC endpoints moved to Pro under legacy authorization: # product_groups, product_members → pro/product_groups, pro/product_members v2_api = add_product_type_urls(v2_api) +v2_api = add_engagement_urls(v2_api) # RBAC endpoints moved to Pro under legacy authorization: # product_type_members, product_type_groups → pro/product_type_members, pro/product_type_groups v2_api.register(r"regulations", RegulationsViewSet, basename="regulations") diff --git a/unittests/test_rest_framework.py b/unittests/test_rest_framework.py index 4b16e26d358..b195500889e 100644 --- a/unittests/test_rest_framework.py +++ b/unittests/test_rest_framework.py @@ -44,7 +44,6 @@ DevelopmentEnvironmentViewSet, EndpointStatusViewSet, EndPointViewSet, - EngagementViewSet, FindingTemplatesViewSet, FindingViewSet, ImportLanguagesView, @@ -71,6 +70,7 @@ AssetViewSet, ) from dojo.authorization.roles_permissions import Permissions, permission_to_action +from dojo.engagement.api.views import EngagementViewSet from dojo.location.api.endpoint_compat import V3EndpointCompatibleViewSet, V3EndpointStatusCompatibleViewSet from dojo.location.api.views import LocationFindingReferenceViewSet, LocationProductReferenceViewSet, LocationViewSet from dojo.location.models import Location, LocationFindingReference, LocationProductReference