diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index ea1c7a651ff..73ba9db408c 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -77,8 +77,6 @@ Sonarqube_Issue_Transition, System_Settings, Test, - Test_Import, - Test_Import_Finding_Action, Test_Type, Tool_Configuration, Tool_Product_Settings, @@ -1028,96 +1026,7 @@ class Meta: fields = ("id", "name", "test", "jira_issue") -class TestSerializer(serializers.ModelSerializer): - tags = TagListSerializerField(required=False) - test_type_name = serializers.ReadOnlyField() - finding_groups = FindingGroupSerializer( - source="finding_group_set", many=True, read_only=True, - ) - - class Meta: - model = Test - exclude = ("inherited_tags",) - - def build_relational_field(self, field_name, relation_info): - if field_name == "notes": - return NoteSerializer, {"many": True, "read_only": True} - if field_name == "files": - return FileSerializer, {"many": True, "read_only": True} - return super().build_relational_field(field_name, relation_info) - - -class TestCreateSerializer(serializers.ModelSerializer): - engagement = serializers.PrimaryKeyRelatedField( - queryset=Engagement.objects.all(), - ) - notes = serializers.PrimaryKeyRelatedField( - allow_null=True, - queryset=Notes.objects.all(), - many=True, - required=False, - ) - tags = TagListSerializerField(required=False) - - class Meta: - model = Test - exclude = ("inherited_tags",) - - -class TestTypeCreateSerializer(serializers.ModelSerializer): - - class Meta: - model = Test_Type - exclude = ("dynamically_generated",) - - -class TestTypeSerializer(serializers.ModelSerializer): - name = serializers.ReadOnlyField() - - class Meta: - model = Test_Type - exclude = ("dynamically_generated",) - - -class TestToNotesSerializer(serializers.Serializer): - test_id = serializers.PrimaryKeyRelatedField( - queryset=Test.objects.all(), many=False, allow_null=True, - ) - notes = NoteSerializer(many=True) - - -class TestToFilesSerializer(serializers.Serializer): - test_id = serializers.PrimaryKeyRelatedField( - queryset=Test.objects.all(), many=False, allow_null=True, - ) - files = FileSerializer(many=True) - - def to_representation(self, data): - test = data.get("test_id") - files = data.get("files") - new_files = [{ - "id": file.id, - "file": f"{settings.SITE_URL}/{file.get_accessible_url(test, test.id)}", - "title": file.title, - } for file in files] - return {"test_id": test.id, "files": new_files} - - -class TestImportFindingActionSerializer(serializers.ModelSerializer): - class Meta: - model = Test_Import_Finding_Action - fields = "__all__" - - -class TestImportSerializer(serializers.ModelSerializer): - # findings = TestImportFindingActionSerializer(source='test_import_finding_action', many=True, read_only=True) - test_import_finding_action_set = TestImportFindingActionSerializer( - many=True, read_only=True, - ) - - class Meta: - model = Test_Import - fields = "__all__" +from dojo.test.api.serializer import TestSerializer # noqa: E402 -- backward compat re-export class RiskAcceptanceSerializer(serializers.ModelSerializer): diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index bdffb613d0e..55826031ea5 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -64,11 +64,9 @@ ApiProductFilter, ApiRiskAcceptanceFilter, ApiTemplateFindingFilter, - ApiTestFilter, ApiUserFilter, ReportFindingFilter, ReportFindingFilterWithoutObjectLookups, - TestImportAPIFilter, ) from dojo.finding.queries import ( get_authorized_findings, @@ -111,8 +109,6 @@ Sonarqube_Issue_Transition, System_Settings, Test, - Test_Import, - Test_Type, Tool_Configuration, Tool_Product_Settings, Tool_Type, @@ -135,7 +131,7 @@ from dojo.risk_acceptance import api as ra_api from dojo.risk_acceptance.helper import remove_finding_from_risk_acceptance from dojo.risk_acceptance.queries import get_authorized_risk_acceptances -from dojo.test.queries import get_authorized_test_imports, get_authorized_tests +from dojo.test.queries import get_authorized_tests from dojo.tool_product.queries import get_authorized_tool_product_settings from dojo.user.authentication import reset_token_for_user from dojo.user.utils import get_configuration_permissions_codenames @@ -1792,293 +1788,6 @@ def get_queryset(self): return Development_Environment.objects.all().order_by("id") -# Authorization: object-based -# @extend_schema_view(**schema_with_prefetch()) -# Nested models with prefetch make the response schema too long for Swagger UI -class TestsViewSet( - PrefetchDojoModelViewSet, - ra_api.AcceptedRisksMixin, -): - serializer_class = serializers.TestSerializer - queryset = Test.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_class = ApiTestFilter - permission_classes = (IsAuthenticated, permissions.UserHasTestPermission) - - @property - def risk_application_model_class(self): - return Test - - def get_queryset(self): - return ( - get_authorized_tests("view") - .prefetch_related("notes", "files") - .distinct() - ) - - def destroy(self, request, *args, **kwargs): - instance = self.get_object() - if get_setting("ASYNC_OBJECT_DELETE"): - async_del = async_delete() - async_del.delete(instance) - else: - instance.delete() - return Response(status=status.HTTP_204_NO_CONTENT) - - def get_serializer_class(self): - if self.request and self.request.method == "POST": - if self.action == "accept_risks": - return ra_api.AcceptedRiskSerializer - return serializers.TestCreateSerializer - return serializers.TestSerializer - - @extend_schema( - request=serializers.ReportGenerateOptionSerializer, - responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, - ) - @action( - detail=True, methods=["post"], - # IsAuthenticated only: report generation requires View permission, - # enforced by the permission-filtered get_queryset(). The viewset's - # permission_classes would check Edit (POST), which is too restrictive. - permission_classes=[IsAuthenticated], - ) - def generate_report(self, request, pk=None): - test = self.get_object() - - options = {} - # prepare post data - report_options = serializers.ReportGenerateOptionSerializer( - data=request.data, - ) - if report_options.is_valid(): - options["include_finding_notes"] = report_options.validated_data[ - "include_finding_notes" - ] - options["include_finding_images"] = report_options.validated_data[ - "include_finding_images" - ] - options[ - "include_executive_summary" - ] = report_options.validated_data["include_executive_summary"] - options[ - "include_table_of_contents" - ] = report_options.validated_data["include_table_of_contents"] - else: - return Response( - report_options.errors, status=status.HTTP_400_BAD_REQUEST, - ) - - data = report_generate(request, test, options) - report = serializers.ReportGenerateSerializer(data) - return Response(report.data) - - @extend_schema( - methods=["GET"], - responses={status.HTTP_200_OK: serializers.TestToNotesSerializer}, - ) - @extend_schema( - methods=["POST"], - request=serializers.AddNewNoteOptionSerializer, - responses={status.HTTP_201_CREATED: serializers.NoteSerializer}, - ) - @action(detail=True, methods=["get", "post"], permission_classes=(IsAuthenticated, permissions.UserHasTestNotePermission)) - def notes(self, request, pk=None): - test = self.get_object() - if request.method == "POST": - new_note = serializers.AddNewNoteOptionSerializer( - data=request.data, - ) - if new_note.is_valid(): - entry = new_note.validated_data["entry"] - private = new_note.validated_data.get("private", False) - note_type = new_note.validated_data.get("note_type", None) - else: - return Response( - new_note.errors, status=status.HTTP_400_BAD_REQUEST, - ) - - notes = test.notes.filter(note_type=note_type).first() - if notes and note_type and note_type.is_single: - return Response("Only one instance of this note_type allowed on a test.", status=status.HTTP_400_BAD_REQUEST) - - author = request.user - note = Notes( - entry=entry, - author=author, - private=private, - note_type=note_type, - ) - note.save() - # Add an entry to the note history - history = NoteHistory.objects.create(data=note.entry, time=note.date, current_editor=note.author) - note.history.add(history) - # Now add the note to the object - test.notes.add(note) - # Determine if we need to send any notifications for user mentioned - process_tag_notifications( - request=request, - note=note, - parent_url=request.build_absolute_uri( - reverse("view_test", args=(test.id,)), - ), - parent_title=f"Test: {test.title}", - ) - - serialized_note = serializers.NoteSerializer( - {"author": author, "entry": entry, "private": private}, - ) - return Response( - serialized_note.data, status=status.HTTP_201_CREATED, - ) - notes = test.notes.all() - - serialized_notes = serializers.TestToNotesSerializer( - {"test_id": test, "notes": notes}, - ) - return Response(serialized_notes.data, status=status.HTTP_200_OK) - - @extend_schema( - methods=["GET"], - responses={status.HTTP_200_OK: serializers.TestToFilesSerializer}, - ) - @extend_schema( - methods=["POST"], - request=serializers.AddNewFileOptionSerializer, - responses={status.HTTP_201_CREATED: serializers.FileSerializer}, - ) - @action( - detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), permission_classes=(IsAuthenticated, permissions.UserHasTestRelatedObjectPermission), - ) - def files(self, request, pk=None): - test = self.get_object() - if request.method == "POST": - new_file = serializers.FileSerializer(data=request.data) - if new_file.is_valid(): - title = new_file.validated_data["title"] - file = new_file.validated_data["file"] - else: - return Response( - new_file.errors, status=status.HTTP_400_BAD_REQUEST, - ) - - file = FileUpload(title=title, file=file) - file.save() - test.files.add(file) - - serialized_file = serializers.FileSerializer(file) - return Response( - serialized_file.data, status=status.HTTP_201_CREATED, - ) - - files = test.files.all() - serialized_files = serializers.TestToFilesSerializer( - {"test_id": test, "files": files}, - ) - return Response(serialized_files.data, status=status.HTTP_200_OK) - - @extend_schema( - methods=["GET"], - responses={ - status.HTTP_200_OK: serializers.RawFileSerializer, - }, - ) - @action( - detail=True, - methods=["get"], - url_path=r"files/download/(?P\d+)", - permission_classes=(IsAuthenticated, permissions.UserHasTestRelatedObjectPermission), - ) - def download_file(self, request, file_id, pk=None): - test = self.get_object() - # Get the file object - file_object_qs = test.files.filter(id=file_id) - file_object = ( - file_object_qs.first() if len(file_object_qs) > 0 else None - ) - if file_object is None: - return Response( - {"error": "File ID not associated with Test"}, - status=status.HTTP_404_NOT_FOUND, - ) - # send file - return generate_file_response(file_object) - - -# Authorization: authenticated, configuration -class TestTypesViewSet( - mixins.UpdateModelMixin, - mixins.CreateModelMixin, - viewsets.ReadOnlyModelViewSet, -): - serializer_class = serializers.TestTypeSerializer - queryset = Test_Type.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_fields = [ - "name", - ] - permission_classes = (IsAuthenticated, DjangoModelPermissions) - - def get_queryset(self): - return Test_Type.objects.all().order_by("id") - - def get_serializer_class(self): - if self.action == "create": - return serializers.TestTypeCreateSerializer - return serializers.TestTypeSerializer - - -# @extend_schema_view(**schema_with_prefetch()) -# Nested models with prefetch make the response schema too long for Swagger UI -class TestImportViewSet( - PrefetchDojoModelViewSet, -): - serializer_class = serializers.TestImportSerializer - queryset = Test_Import.objects.none() - filter_backends = (DjangoFilterBackend,) - - filterset_class = TestImportAPIFilter - - permission_classes = ( - IsAuthenticated, - permissions.UserHasTestImportPermission, - ) - - def get_queryset(self): - return get_authorized_test_imports( - "view", - ).prefetch_related( - "test_import_finding_action_set", - "findings_affected", - "findings_affected__endpoints", - "findings_affected__status_finding", - "findings_affected__finding_meta", - "findings_affected__jira_issue", - "findings_affected__burprawrequestresponse_set", - "findings_affected__jira_issue", - "findings_affected__jira_issue", - "findings_affected__jira_issue", - "findings_affected__reviewers", - "findings_affected__notes", - "findings_affected__notes__author", - "findings_affected__notes__history", - "findings_affected__files", - "findings_affected__found_by", - "findings_affected__tags", - "findings_affected__risk_acceptance_set", - "test", - "test__tags", - "test__notes", - "test__notes__author", - "test__files", - "test__test_type", - "test__engagement", - "test__environment", - "test__engagement__product", - "test__engagement__product__prod_type", - ) - - # Authorization: configurations @extend_schema_view(**schema_with_prefetch()) class ToolConfigurationsViewSet( diff --git a/dojo/filters.py b/dojo/filters.py index 8354e51b2f4..14af254bb69 100644 --- a/dojo/filters.py +++ b/dojo/filters.py @@ -57,7 +57,6 @@ from dojo.models import ( EFFORT_FOR_FIXING_CHOICES, ENGAGEMENT_STATUS_CHOICES, - IMPORT_ACTIONS, SEVERITY_CHOICES, App_Analysis, ChoiceQuestion, @@ -78,8 +77,6 @@ Question, Risk_Acceptance, Test, - Test_Import, - Test_Import_Finding_Action, Test_Type, TextQuestion, User, @@ -3285,74 +3282,6 @@ def __init__(self, *args, **kwargs): self.form.fields["test_type"].queryset = Test_Type.objects.filter(test__engagement=self.engagement).distinct().order_by("name") -class ApiTestFilter(DojoFilter): - tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Tag name contains") - tags = CharFieldInFilter( - field_name="tags__name", - lookup_expr="in", - help_text="Comma separated list of exact tags (uses OR for multiple values)") - tags__and = CharFieldFilterANDExpression( - field_name="tags__name", - help_text="Comma separated list of exact tags to match with an AND expression") - engagement__tags = CharFieldInFilter( - field_name="engagement__tags__name", - lookup_expr="in", - help_text="Comma separated list of exact tags present on engagement (uses OR for multiple values)") - engagement__tags__and = CharFieldFilterANDExpression( - field_name="engagement__tags__name", - help_text="Comma separated list of exact tags to match with an AND expression present on engagement") - engagement__product__tags = CharFieldInFilter( - field_name="engagement__product__tags__name", - lookup_expr="in", - help_text=labels.ASSET_FILTERS_CSV_TAGS_OR_HELP) - engagement__product__tags__and = CharFieldFilterANDExpression( - field_name="engagement__product__tags__name", - help_text=labels.ASSET_FILTERS_CSV_TAGS_AND_HELP) - - not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Not Tag name contains", exclude="True") - not_tags = CharFieldInFilter(field_name="tags__name", lookup_expr="in", - help_text="Comma separated list of exact tags not present on model", exclude="True") - not_engagement__tags = CharFieldInFilter(field_name="engagement__tags__name", lookup_expr="in", - help_text="Comma separated list of exact tags not present on engagement", - exclude="True") - not_engagement__product__tags = CharFieldInFilter(field_name="engagement__product__tags__name", - lookup_expr="in", - help_text=labels.ASSET_FILTERS_CSV_TAGS_NOT_HELP, - exclude="True") - has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") - - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("title", "title"), - ("version", "version"), - ("target_start", "target_start"), - ("target_end", "target_end"), - ("test_type", "test_type"), - ("lead", "lead"), - ("version", "version"), - ("branch_tag", "branch_tag"), - ("build_id", "build_id"), - ("commit_hash", "commit_hash"), - ("api_scan_configuration", "api_scan_configuration"), - ("engagement", "engagement"), - ("created", "created"), - ("updated", "updated"), - ), - field_labels={ - "name": "Test Name", - }, - ) - - class Meta: - model = Test - fields = ["id", "title", "test_type", "target_start", - "target_end", "notes", "percent_complete", - "engagement", "version", - "branch_tag", "build_id", "commit_hash", - "api_scan_configuration", "scan_type"] - - class ApiAppAnalysisFilter(DojoFilter): tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Tag name contains") tags = CharFieldInFilter( @@ -3705,95 +3634,13 @@ class Meta: fields = ["is_superuser", "is_staff", "is_active", "first_name", "last_name", "username", "email"] -# This class is used exclusively by Findings -class TestImportFilter(DojoFilter): - version = CharFilter(field_name="version", lookup_expr="icontains") - version_exact = CharFilter(field_name="version", lookup_expr="iexact", label="Version Exact") - branch_tag = CharFilter(lookup_expr="icontains", label="Branch/Tag") - build_id = CharFilter(lookup_expr="icontains", label="Build ID") - commit_hash = CharFilter(lookup_expr="icontains", label="Commit hash") - - findings_affected = BooleanFilter(field_name="findings_affected", lookup_expr="isnull", exclude=True, label="Findings affected") - - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("date", "date"), - ("version", "version"), - ("branch_tag", "branch_tag"), - ("build_id", "build_id"), - ("commit_hash", "commit_hash"), - - ), - ) - - class Meta: - model = Test_Import - fields = [] - - -# This class is used exclusively by Findings -class TestImportFindingActionFilter(DojoFilter): - action = MultipleChoiceFilter(choices=IMPORT_ACTIONS) - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("action", "action"), - ), - ) - - class Meta: - model = Test_Import_Finding_Action - fields = [] - - -# Used within the TestImport API -class TestImportAPIFilter(DojoFilter): - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("id", "id"), - ("created", "created"), - ("modified", "modified"), - ("version", "version"), - ("branch_tag", "branch_tag"), - ("build_id", "build_id"), - ("commit_hash", "commit_hash"), - - ), - ) - - class Meta: - model = Test_Import - fields = ["test", - "findings_affected", - "version", - "branch_tag", - "build_id", - "commit_hash", - "test_import_finding_action__action", - "test_import_finding_action__finding", - "test_import_finding_action__created"] +# TestImportFilter and TestImportFindingActionFilter live in dojo/test/ui/filters.py and are +# re-exported at the bottom of this module for backward compatibility. # LogEntryFilter and PgHistoryFilter live in dojo/auditlog/filters.py and are # re-exported at the bottom of this module for backward compatibility. - - -class TestTypeFilter(DojoFilter): - name = CharFilter(lookup_expr="icontains") - - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("name", "name"), - ), - ) - - class Meta: - model = Test_Type - exclude = [] - include = ("name",) +# TestTypeFilter lives in dojo/test/ui/filters.py and is re-exported below. class DevelopmentEnvironmentFilter(DojoFilter): diff --git a/dojo/finding/views.py b/dojo/finding/views.py index d3cc025be26..192b46c2ab2 100644 --- a/dojo/finding/views.py +++ b/dojo/finding/views.py @@ -40,8 +40,6 @@ SimilarFindingFilter, SimilarFindingFilterWithoutObjectLookups, TemplateFindingFilter, - TestImportFilter, - TestImportFindingActionFilter, ) from dojo.finding.deduplication import ( _fetch_fp_candidates_for_batch, @@ -96,6 +94,7 @@ from dojo.notifications.helper import create_notification from dojo.tags.utils import bulk_add_tags_to_instances from dojo.test.queries import get_authorized_tests +from dojo.test.ui.filters import TestImportFilter, TestImportFindingActionFilter from dojo.tools import tool_issue_updater from dojo.utils import ( FileIterWrapper, diff --git a/dojo/forms.py b/dojo/forms.py index 89163ffb14a..cb11e819313 100644 --- a/dojo/forms.py +++ b/dojo/forms.py @@ -109,7 +109,6 @@ from dojo.user.utils import get_configuration_permissions_fields from dojo.utils import ( get_password_requirements_string, - get_product, get_system_setting, is_finding_groups_enabled, is_scan_file_too_large, @@ -1061,80 +1060,7 @@ class Meta: fields = ["id"] -class TestForm(forms.ModelForm): - title = forms.CharField(max_length=255, required=False) - description = forms.CharField(widget=forms.Textarea(attrs={"rows": "3"}), required=False) - test_type = forms.ModelChoiceField(queryset=Test_Type.objects.all().order_by("name")) - environment = forms.ModelChoiceField( - queryset=Development_Environment.objects.all().order_by("name")) - target_start = forms.DateTimeField(widget=forms.TextInput( - attrs={"class": "datepicker", "autocomplete": "off"})) - target_end = forms.DateTimeField(widget=forms.TextInput( - attrs={"class": "datepicker", "autocomplete": "off"})) - lead = forms.ModelChoiceField( - queryset=None, - required=False, label="Testing Lead") - - def __init__(self, *args, **kwargs): - obj = None - - if "engagement" in kwargs: - obj = kwargs.pop("engagement") - - if "instance" in kwargs: - obj = kwargs.get("instance") - - super().__init__(*args, **kwargs) - - if obj: - product = get_product(obj) - self.fields["lead"].queryset = get_authorized_users_for_product_and_product_type(None, product, "view").filter(is_active=True) - self.fields["api_scan_configuration"].queryset = Product_API_Scan_Configuration.objects.filter(product=product) - else: - self.fields["lead"].queryset = get_authorized_users("view").filter(is_active=True) - - def is_valid(self): - valid = super().is_valid() - - # we're done now if not valid - if not valid: - return valid - if self.cleaned_data["target_start"] > self.cleaned_data["target_end"]: - self.add_error("target_start", "Your target start date exceeds your target end date") - self.add_error("target_end", "Your target start date exceeds your target end date") - return False - return True - - class Meta: - model = Test - fields = ["title", "test_type", "target_start", "target_end", "description", - "environment", "percent_complete", "tags", "lead", "version", "branch_tag", "build_id", "commit_hash", - "api_scan_configuration"] - - def clean_tags(self): - tag_validator(self.cleaned_data.get("tags")) - return self.cleaned_data.get("tags") - - -class DeleteTestForm(forms.ModelForm): - id = forms.IntegerField(required=True, - widget=forms.widgets.HiddenInput()) - - class Meta: - model = Test - fields = ["id"] - - -class CopyTestForm(forms.Form): - engagement = forms.ModelChoiceField( - required=True, - queryset=Engagement.objects.none(), - error_messages={"required": "*"}) - - def __init__(self, *args, **kwargs): - authorized_lists = kwargs.pop("engagements", None) - super().__init__(*args, **kwargs) - self.fields["engagement"].queryset = authorized_lists +from dojo.test.ui.forms import TestForm # noqa: E402, F401 -- backward compat class AddFindingForm(forms.ModelForm): diff --git a/dojo/models.py b/dojo/models.py index eaf21960650..dd833e0a427 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -26,7 +26,7 @@ from django.core.files.base import ContentFile from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator, validate_ipv46_address from django.db import connection, models -from django.db.models import Count, F, JSONField, Q +from django.db.models import Count, F, Q from django.db.models.expressions import Case, When from django.db.models.functions import Lower from django.urls import reverse @@ -66,18 +66,6 @@ # default template with all values set to 0 DEFAULT_STATS = {sev.lower(): dict.fromkeys(STATS_FIELDS, 0) for sev in SEVERITIES} -IMPORT_CREATED_FINDING = "N" -IMPORT_CLOSED_FINDING = "C" -IMPORT_REACTIVATED_FINDING = "R" -IMPORT_UNTOUCHED_FINDING = "U" - -IMPORT_ACTIONS = [ - (IMPORT_CREATED_FINDING, "created"), - (IMPORT_CLOSED_FINDING, "closed"), - (IMPORT_REACTIVATED_FINDING, "reactivated"), - (IMPORT_UNTOUCHED_FINDING, "untouched"), -] - def _get_annotations_for_statistics(): annotations = {stats_field.lower(): Count(Case(When(**{stats_field: True}, then=1))) for stats_field in STATS_FIELDS if stats_field != "total"} @@ -759,6 +747,17 @@ def clean(self): from dojo.product_type.models import Product_Type # noqa: E402 -- re-export; mid-file as Product FK uses it below +from dojo.test.models import ( # noqa: E402 -- re-export; class-body FKs below reference these + IMPORT_ACTIONS, # noqa: F401 -- re-export + IMPORT_CLOSED_FINDING, # noqa: F401 -- re-export + IMPORT_CREATED_FINDING, # noqa: F401 -- re-export + IMPORT_REACTIVATED_FINDING, # noqa: F401 -- re-export + IMPORT_UNTOUCHED_FINDING, # noqa: F401 -- re-export + Test, + Test_Import, # noqa: F401 -- re-export + Test_Import_Finding_Action, # noqa: F401 -- re-export + Test_Type, +) class Product_Line(models.Model): @@ -773,26 +772,6 @@ class Report_Type(models.Model): name = models.CharField(max_length=255) -class Test_Type(models.Model): - name = models.CharField(max_length=200, unique=True) - static_tool = models.BooleanField(default=False) - dynamic_tool = models.BooleanField(default=False) - active = models.BooleanField(default=True) - dynamically_generated = models.BooleanField( - default=False, - help_text=_("Set to True for test types that are created at import time")) - - class Meta: - ordering = ("name",) - - def __str__(self): - return self.name - - def get_breadcrumbs(self): - return [{"title": str(self), - "url": None}] - - class DojoMeta(models.Model): name = models.CharField(max_length=120) value = models.CharField(max_length=300) @@ -1978,235 +1957,6 @@ class Meta: ordering = ("-created", ) -class Test(models.Model): - engagement = models.ForeignKey(Engagement, editable=False, on_delete=models.CASCADE) - lead = models.ForeignKey(Dojo_User, editable=True, null=True, blank=True, on_delete=models.RESTRICT) - test_type = models.ForeignKey(Test_Type, on_delete=models.CASCADE) - scan_type = models.TextField(null=True) - title = models.CharField(max_length=255, null=True, blank=True) - description = models.TextField(null=True, blank=True) - target_start = models.DateTimeField() - target_end = models.DateTimeField() - percent_complete = models.IntegerField(null=True, blank=True, - editable=True) - notes = models.ManyToManyField(Notes, blank=True, - editable=False) - files = models.ManyToManyField(FileUpload, blank=True, editable=False) - environment = models.ForeignKey(Development_Environment, null=True, - blank=False, on_delete=models.RESTRICT) - - updated = models.DateTimeField(auto_now=True, null=True) - created = models.DateTimeField(auto_now_add=True, null=True) - - tags = TagField(blank=True, force_lowercase=True, help_text=_("Add tags that help describe this test. Choose from the list or add new tags. Press Enter key to add.")) - inherited_tags = TagField(blank=True, force_lowercase=True, help_text=_("Internal use tags sepcifically for maintaining parity with product. This field will be present as a subset in the tags field")) - - version = models.CharField(max_length=100, null=True, blank=True) - - build_id = models.CharField(editable=True, max_length=150, - null=True, blank=True, help_text=_("Build ID that was tested, a reimport may update this field."), verbose_name=_("Build ID")) - commit_hash = models.CharField(editable=True, max_length=150, - null=True, blank=True, help_text=_("Commit hash tested, a reimport may update this field."), verbose_name=_("Commit Hash")) - branch_tag = models.CharField(editable=True, max_length=150, - null=True, blank=True, help_text=_("Tag or branch that was tested, a reimport may update this field."), verbose_name=_("Branch/Tag")) - api_scan_configuration = models.ForeignKey(Product_API_Scan_Configuration, null=True, editable=True, blank=True, on_delete=models.CASCADE, verbose_name=_("API Scan Configuration")) - - class Meta: - indexes = [ - models.Index(fields=["engagement", "test_type"]), - ] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.unsaved_metadata: list = [] - - def __str__(self): - if self.title: - return f"{self.title} ({self.test_type})" - return str(self.test_type) - - def get_absolute_url(self): - return reverse("view_test", args=[str(self.id)]) - - def test_type_name(self) -> str: - return self.test_type.name - - def get_breadcrumbs(self): - bc = self.engagement.get_breadcrumbs() - bc += [{"title": str(self), - "url": reverse("view_test", args=(self.id,))}] - return bc - - def copy(self, engagement=None): - copy = copy_model_util(self) - # Save the necessary ManyToMany relationships - old_notes = list(self.notes.all()) - old_files = list(self.files.all()) - old_tags = list(self.tags.all()) - old_findings = list(Finding.objects.filter(test=self)) - if engagement: - copy.engagement = engagement - # Save the object before setting any ManyToMany relationships - copy.save() - # Copy the notes - for notes in old_notes: - copy.notes.add(notes.copy()) - # Copy the files - for files in old_files: - copy.files.add(files.copy()) - # Copy the Findings - for finding in old_findings: - finding.copy(test=copy) - # Assign any tags - copy.tags.set(old_tags) - - return copy - - # only used by bulk risk acceptance api - @property - def unaccepted_open_findings(self): - from dojo.utils import get_system_setting # noqa: PLC0415 circular import - findings = Finding.objects.filter(risk_accepted=False, active=True, duplicate=False, test=self) - if get_system_setting("enforce_verified_status", True) or get_system_setting("enforce_verified_status_metrics", True): - findings = findings.filter(verified=True) - - return findings - - def accept_risks(self, accepted_risks): - self.engagement.risk_acceptance.add(*accepted_risks) - - @property - def deduplication_algorithm(self): - deduplicationAlgorithm = settings.DEDUPE_ALGO_LEGACY - - if hasattr(settings, "DEDUPLICATION_ALGORITHM_PER_PARSER"): - if (self.test_type.name in settings.DEDUPLICATION_ALGORITHM_PER_PARSER): - deduplicationLogger.debug(f"using DEDUPLICATION_ALGORITHM_PER_PARSER for test_type.name: {self.test_type.name}") - deduplicationAlgorithm = settings.DEDUPLICATION_ALGORITHM_PER_PARSER[self.test_type.name] - elif (self.scan_type in settings.DEDUPLICATION_ALGORITHM_PER_PARSER): - deduplicationLogger.debug(f"using DEDUPLICATION_ALGORITHM_PER_PARSER for scan_type: {self.scan_type}") - deduplicationAlgorithm = settings.DEDUPLICATION_ALGORITHM_PER_PARSER[self.scan_type] - else: - deduplicationLogger.debug("Section DEDUPLICATION_ALGORITHM_PER_PARSER not found in settings.dist.py") - - deduplicationLogger.debug(f"DEDUPLICATION_ALGORITHM_PER_PARSER is: {deduplicationAlgorithm}") - return deduplicationAlgorithm - - @property - def hash_code_fields(self): - """Retrieve OS HASH_CODE_FIELDS_PER_SCANNER settings. Be aware when calling this to make sure Pro doesn't use these OS seetings""" - hashCodeFields = None - - if hasattr(settings, "HASHCODE_FIELDS_PER_SCANNER"): - if (self.test_type.name in settings.HASHCODE_FIELDS_PER_SCANNER): - deduplicationLogger.debug(f"using HASHCODE_FIELDS_PER_SCANNER for test_type.name: {self.test_type.name}") - hashCodeFields = settings.HASHCODE_FIELDS_PER_SCANNER[self.test_type.name] - elif (self.scan_type in settings.HASHCODE_FIELDS_PER_SCANNER): - deduplicationLogger.debug(f"using HASHCODE_FIELDS_PER_SCANNER for scan_type: {self.scan_type}") - hashCodeFields = settings.HASHCODE_FIELDS_PER_SCANNER[self.scan_type] - else: - deduplicationLogger.warning(f"test_type name {self.test_type.name} and scan_type {self.scan_type} not found in HASHCODE_FIELDS_PER_SCANNER") - else: - deduplicationLogger.debug("Section HASHCODE_FIELDS_PER_SCANNER not found in settings.dist.py") - - hash_code_fields_always = getattr(settings, "HASH_CODE_FIELDS_ALWAYS", []) - deduplicationLogger.debug(f"HASHCODE_FIELDS_PER_SCANNER is: {hashCodeFields} + HASH_CODE_FIELDS_ALWAYS: {hash_code_fields_always}") - - return hashCodeFields - - @property - def hash_code_allows_null_cwe(self): - hashCodeAllowsNullCwe = True - - if hasattr(settings, "HASHCODE_ALLOWS_NULL_CWE"): - if (self.test_type.name in settings.HASHCODE_ALLOWS_NULL_CWE): - deduplicationLogger.debug(f"using HASHCODE_ALLOWS_NULL_CWE for test_type.name: {self.test_type.name}") - hashCodeAllowsNullCwe = settings.HASHCODE_ALLOWS_NULL_CWE[self.test_type.name] - elif (self.scan_type in settings.HASHCODE_ALLOWS_NULL_CWE): - deduplicationLogger.debug(f"using HASHCODE_ALLOWS_NULL_CWE for scan_type: {self.scan_type}") - hashCodeAllowsNullCwe = settings.HASHCODE_ALLOWS_NULL_CWE[self.scan_type] - else: - deduplicationLogger.debug("Section HASHCODE_ALLOWS_NULL_CWE not found in settings.dist.py") - - deduplicationLogger.debug(f"HASHCODE_ALLOWS_NULL_CWE is: {hashCodeAllowsNullCwe}") - return hashCodeAllowsNullCwe - - def delete(self, *args, product_grading_option=True, **kwargs): - logger.debug("%d test delete", self.id) - super().delete(*args, **kwargs) - if product_grading_option: - with suppress(Test.DoesNotExist, Engagement.DoesNotExist, Product.DoesNotExist): - # Suppressing a potential issue created from async delete removing - # related objects in a separate task - from dojo.utils import perform_product_grading # noqa: PLC0415 circular import - perform_product_grading(self.engagement.product) - - @property - def statistics(self): - """Queries the database, no prefetching, so could be slow for lists of model instances""" - return _get_statistics_for_queryset(Finding.objects.filter(test=self), _get_annotations_for_statistics) - - -class Test_Import(TimeStampedModel): - - IMPORT_TYPE = "import" - REIMPORT_TYPE = "reimport" - - test = models.ForeignKey(Test, editable=False, null=False, blank=False, on_delete=models.CASCADE) - findings_affected = models.ManyToManyField("Finding", through="Test_Import_Finding_Action") - import_settings = JSONField(null=True) - type = models.CharField(max_length=64, null=False, blank=False, default="unknown") - - version = models.CharField(max_length=100, null=True, blank=True) - build_id = models.CharField(editable=True, max_length=150, - null=True, blank=True, help_text=_("Build ID that was tested, a reimport may update this field."), verbose_name=_("Build ID")) - commit_hash = models.CharField(editable=True, max_length=150, - null=True, blank=True, help_text=_("Commit hash tested, a reimport may update this field."), verbose_name=_("Commit Hash")) - branch_tag = models.CharField(editable=True, max_length=150, - null=True, blank=True, help_text=_("Tag or branch that was tested, a reimport may update this field."), verbose_name=_("Branch/Tag")) - - def get_queryset(self): - logger.debug("prefetch test_import counts") - super_query = super().get_queryset() - super_query = super_query.annotate(created_findings_count=Count("findings", filter=Q(test_import_finding_action__action=IMPORT_CREATED_FINDING))) - super_query = super_query.annotate(closed_findings_count=Count("findings", filter=Q(test_import_finding_action__action=IMPORT_CLOSED_FINDING))) - super_query = super_query.annotate(reactivated_findings_count=Count("findings", filter=Q(test_import_finding_action__action=IMPORT_REACTIVATED_FINDING))) - return super_query.annotate(untouched_findings_count=Count("findings", filter=Q(test_import_finding_action__action=IMPORT_UNTOUCHED_FINDING))) - - class Meta: - ordering = ("-id",) - indexes = [ - models.Index(fields=["created", "test", "type"]), - ] - - def __str__(self): - return self.created.strftime("%Y-%m-%d %H:%M:%S") - - @property - def statistics(self): - """Queries the database, no prefetching, so could be slow for lists of model instances""" - stats = {} - for action in IMPORT_ACTIONS: - stats[action[1].lower()] = _get_statistics_for_queryset(Finding.objects.filter(test_import_finding_action__test_import=self, test_import_finding_action__action=action[0]), _get_annotations_for_statistics) - return stats - - -class Test_Import_Finding_Action(TimeStampedModel): - test_import = models.ForeignKey(Test_Import, editable=False, null=False, blank=False, on_delete=models.CASCADE) - finding = models.ForeignKey("Finding", editable=False, null=False, blank=False, on_delete=models.CASCADE) - action = models.CharField(max_length=100, null=True, blank=True, choices=IMPORT_ACTIONS) - - class Meta: - indexes = [ - models.Index(fields=["finding", "action", "test_import"]), - ] - unique_together = (("test_import", "finding")) - ordering = ("test_import", "action", "finding") - - def __str__(self): - return f"{self.finding.id}: {self.action}" - - class Finding(BaseModel): # Fields loaded when performing deduplication (used by get_finding_models_for_deduplication # and build_candidate_scope_queryset to restrict the SELECT to only what is needed). @@ -4352,14 +4102,12 @@ def __str__(self): admin.site.register(Languages) admin.site.register(Language_Type) admin.site.register(App_Analysis) -admin.site.register(Test) admin.site.register(Finding, FindingAdmin) admin.site.register(FileUpload) admin.site.register(FileAccessToken) admin.site.register(Engagement) admin.site.register(Risk_Acceptance) admin.site.register(Check_List) -admin.site.register(Test_Type) admin.site.register(Endpoint_Params) admin.site.register(Endpoint_Status) admin.site.register(Endpoint) @@ -4414,6 +4162,4 @@ def __str__(self): admin.site.register(BannerConf) admin.site.register(Tool_Product_History) admin.site.register(General_Survey) -admin.site.register(Test_Import) -admin.site.register(Test_Import_Finding_Action) admin.site.register(Finding_Group) diff --git a/dojo/test/__init__.py b/dojo/test/__init__.py index e69de29bb2d..1a931c0dba9 100644 --- a/dojo/test/__init__.py +++ b/dojo/test/__init__.py @@ -0,0 +1 @@ +import dojo.test.admin # noqa: F401 diff --git a/dojo/test/admin.py b/dojo/test/admin.py new file mode 100644 index 00000000000..a6f18c4bb82 --- /dev/null +++ b/dojo/test/admin.py @@ -0,0 +1,21 @@ +from django.contrib import admin + +from dojo.test.models import Test, Test_Import, Test_Type + + +@admin.register(Test_Type) +class Test_TypeAdmin(admin.ModelAdmin): + + """Admin support for the Test_Type model.""" + + +@admin.register(Test) +class TestAdmin(admin.ModelAdmin): + + """Admin support for the Test model.""" + + +@admin.register(Test_Import) +class Test_ImportAdmin(admin.ModelAdmin): + + """Admin support for the Test_Import model.""" diff --git a/dojo/test/api/__init__.py b/dojo/test/api/__init__.py new file mode 100644 index 00000000000..ab9d3d2e082 --- /dev/null +++ b/dojo/test/api/__init__.py @@ -0,0 +1 @@ +path = "tests" # noqa: RUF067 diff --git a/dojo/test/api/filters.py b/dojo/test/api/filters.py new file mode 100644 index 00000000000..9d5d0614653 --- /dev/null +++ b/dojo/test/api/filters.py @@ -0,0 +1,114 @@ +from django_filters import ( + BooleanFilter, + CharFilter, + OrderingFilter, +) + +from dojo.filters import ( + CharFieldFilterANDExpression, + CharFieldInFilter, + DojoFilter, +) +from dojo.labels import get_labels +from dojo.models import ( + Test, + Test_Import, +) + +labels = get_labels() + + +class ApiTestFilter(DojoFilter): + tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Tag name contains") + tags = CharFieldInFilter( + field_name="tags__name", + lookup_expr="in", + help_text="Comma separated list of exact tags (uses OR for multiple values)") + tags__and = CharFieldFilterANDExpression( + field_name="tags__name", + help_text="Comma separated list of exact tags to match with an AND expression") + engagement__tags = CharFieldInFilter( + field_name="engagement__tags__name", + lookup_expr="in", + help_text="Comma separated list of exact tags present on engagement (uses OR for multiple values)") + engagement__tags__and = CharFieldFilterANDExpression( + field_name="engagement__tags__name", + help_text="Comma separated list of exact tags to match with an AND expression present on engagement") + engagement__product__tags = CharFieldInFilter( + field_name="engagement__product__tags__name", + lookup_expr="in", + help_text=labels.ASSET_FILTERS_CSV_TAGS_OR_HELP) + engagement__product__tags__and = CharFieldFilterANDExpression( + field_name="engagement__product__tags__name", + help_text=labels.ASSET_FILTERS_CSV_TAGS_AND_HELP) + + not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Not Tag name contains", exclude="True") + not_tags = CharFieldInFilter(field_name="tags__name", lookup_expr="in", + help_text="Comma separated list of exact tags not present on model", exclude="True") + not_engagement__tags = CharFieldInFilter(field_name="engagement__tags__name", lookup_expr="in", + help_text="Comma separated list of exact tags not present on engagement", + exclude="True") + not_engagement__product__tags = CharFieldInFilter(field_name="engagement__product__tags__name", + lookup_expr="in", + help_text=labels.ASSET_FILTERS_CSV_TAGS_NOT_HELP, + exclude="True") + has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") + + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("title", "title"), + ("version", "version"), + ("target_start", "target_start"), + ("target_end", "target_end"), + ("test_type", "test_type"), + ("lead", "lead"), + ("version", "version"), + ("branch_tag", "branch_tag"), + ("build_id", "build_id"), + ("commit_hash", "commit_hash"), + ("api_scan_configuration", "api_scan_configuration"), + ("engagement", "engagement"), + ("created", "created"), + ("updated", "updated"), + ), + field_labels={ + "name": "Test Name", + }, + ) + + class Meta: + model = Test + fields = ["id", "title", "test_type", "target_start", + "target_end", "notes", "percent_complete", + "engagement", "version", + "branch_tag", "build_id", "commit_hash", + "api_scan_configuration", "scan_type"] + + +class TestImportAPIFilter(DojoFilter): + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("id", "id"), + ("created", "created"), + ("modified", "modified"), + ("version", "version"), + ("branch_tag", "branch_tag"), + ("build_id", "build_id"), + ("commit_hash", "commit_hash"), + + ), + ) + + class Meta: + model = Test_Import + fields = ["test", + "findings_affected", + "version", + "branch_tag", + "build_id", + "commit_hash", + "test_import_finding_action__action", + "test_import_finding_action__finding", + "test_import_finding_action__created"] diff --git a/dojo/test/api/serializer.py b/dojo/test/api/serializer.py new file mode 100644 index 00000000000..fcce90fdd82 --- /dev/null +++ b/dojo/test/api/serializer.py @@ -0,0 +1,132 @@ +from django.conf import settings +from rest_framework import serializers + +from dojo.models import ( + Engagement, + Notes, + Test, + Test_Import, + Test_Import_Finding_Action, + Test_Type, +) + + +class TestSerializer(serializers.ModelSerializer): + test_type_name = serializers.ReadOnlyField() + + class Meta: + model = Test + exclude = ("inherited_tags",) + + def get_fields(self): + from dojo.api_v2.serializers import ( # noqa: PLC0415 -- lazy import, avoids circular dependency + FindingGroupSerializer, + TagListSerializerField, + ) + fields = super().get_fields() + fields["tags"] = TagListSerializerField(required=False) + fields["finding_groups"] = FindingGroupSerializer( + source="finding_group_set", many=True, read_only=True, + ) + return fields + + def build_relational_field(self, field_name, relation_info): + from dojo.api_v2.serializers import ( # noqa: PLC0415 -- lazy import, avoids circular dependency + FileSerializer, + NoteSerializer, + ) + if field_name == "notes": + return NoteSerializer, {"many": True, "read_only": True} + if field_name == "files": + return FileSerializer, {"many": True, "read_only": True} + return super().build_relational_field(field_name, relation_info) + + +class TestCreateSerializer(serializers.ModelSerializer): + engagement = serializers.PrimaryKeyRelatedField( + queryset=Engagement.objects.all(), + ) + notes = serializers.PrimaryKeyRelatedField( + allow_null=True, + queryset=Notes.objects.all(), + many=True, + required=False, + ) + + class Meta: + model = Test + exclude = ("inherited_tags",) + + def get_fields(self): + from dojo.api_v2.serializers import ( # noqa: PLC0415 -- lazy import, avoids circular dependency + TagListSerializerField, + ) + fields = super().get_fields() + fields["tags"] = TagListSerializerField(required=False) + return fields + + +class TestTypeCreateSerializer(serializers.ModelSerializer): + + class Meta: + model = Test_Type + exclude = ("dynamically_generated",) + + +class TestTypeSerializer(serializers.ModelSerializer): + name = serializers.ReadOnlyField() + + class Meta: + model = Test_Type + exclude = ("dynamically_generated",) + + +class TestToNotesSerializer(serializers.Serializer): + test_id = serializers.PrimaryKeyRelatedField( + queryset=Test.objects.all(), many=False, allow_null=True, + ) + + def get_fields(self): + from dojo.api_v2.serializers import NoteSerializer # noqa: PLC0415 -- lazy import, avoids circular dependency + fields = super().get_fields() + fields["notes"] = NoteSerializer(many=True) + return fields + + +class TestToFilesSerializer(serializers.Serializer): + test_id = serializers.PrimaryKeyRelatedField( + queryset=Test.objects.all(), many=False, allow_null=True, + ) + + def get_fields(self): + from dojo.api_v2.serializers import FileSerializer # noqa: PLC0415 -- lazy import, avoids circular dependency + fields = super().get_fields() + fields["files"] = FileSerializer(many=True) + return fields + + def to_representation(self, data): + test = data.get("test_id") + files = data.get("files") + new_files = [{ + "id": file.id, + "file": f"{settings.SITE_URL}/{file.get_accessible_url(test, test.id)}", + "title": file.title, + } for file in files] + return {"test_id": test.id, "files": new_files} + + +class TestImportFindingActionSerializer(serializers.ModelSerializer): + class Meta: + model = Test_Import_Finding_Action + fields = "__all__" + + +class TestImportSerializer(serializers.ModelSerializer): + # findings = TestImportFindingActionSerializer(source='test_import_finding_action', many=True, read_only=True) + test_import_finding_action_set = TestImportFindingActionSerializer( + many=True, read_only=True, + ) + + class Meta: + model = Test_Import + fields = "__all__" diff --git a/dojo/test/api/urls.py b/dojo/test/api/urls.py new file mode 100644 index 00000000000..b98f633d3bd --- /dev/null +++ b/dojo/test/api/urls.py @@ -0,0 +1,8 @@ +from dojo.test.api.views import TestImportViewSet, TestsViewSet, TestTypesViewSet + + +def add_test_urls(router): + router.register("tests", TestsViewSet, basename="test") + router.register("test_types", TestTypesViewSet, basename="test_type") + router.register("test_imports", TestImportViewSet, basename="test_imports") + return router diff --git a/dojo/test/api/views.py b/dojo/test/api/views.py new file mode 100644 index 00000000000..7c2066374c4 --- /dev/null +++ b/dojo/test/api/views.py @@ -0,0 +1,325 @@ +from django.urls import reverse +from django_filters.rest_framework import DjangoFilterBackend +from drf_spectacular.utils import extend_schema +from rest_framework import mixins, status, viewsets +from rest_framework.decorators import action +from rest_framework.parsers import MultiPartParser +from rest_framework.permissions import DjangoModelPermissions, IsAuthenticated +from rest_framework.response import Response + +from dojo.api_v2 import serializers as api_v2_serializers +from dojo.api_v2.views import PrefetchDojoModelViewSet, report_generate +from dojo.authorization import api_permissions as permissions +from dojo.models import ( + FileUpload, + NoteHistory, + Notes, + Test, + Test_Import, + Test_Type, +) +from dojo.risk_acceptance import api as ra_api +from dojo.test.api.filters import ApiTestFilter, TestImportAPIFilter +from dojo.test.api.serializer import ( + TestCreateSerializer, + TestImportSerializer, + TestSerializer, + TestToFilesSerializer, + TestToNotesSerializer, + TestTypeCreateSerializer, + TestTypeSerializer, +) +from dojo.test.queries import get_authorized_test_imports, get_authorized_tests +from dojo.utils import ( + async_delete, + generate_file_response, + get_setting, + process_tag_notifications, +) + + +# Authorization: object-based +# @extend_schema_view(**schema_with_prefetch()) +# Nested models with prefetch make the response schema too long for Swagger UI +class TestsViewSet( + PrefetchDojoModelViewSet, + ra_api.AcceptedRisksMixin, +): + serializer_class = TestSerializer + queryset = Test.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_class = ApiTestFilter + permission_classes = (IsAuthenticated, permissions.UserHasTestPermission) + + @property + def risk_application_model_class(self): + return Test + + def get_queryset(self): + return ( + get_authorized_tests("view") + .prefetch_related("notes", "files") + .distinct() + ) + + def destroy(self, request, *args, **kwargs): + instance = self.get_object() + if get_setting("ASYNC_OBJECT_DELETE"): + async_del = async_delete() + async_del.delete(instance) + else: + instance.delete() + return Response(status=status.HTTP_204_NO_CONTENT) + + def get_serializer_class(self): + if self.request and self.request.method == "POST": + if self.action == "accept_risks": + return ra_api.AcceptedRiskSerializer + return TestCreateSerializer + return TestSerializer + + @extend_schema( + request=api_v2_serializers.ReportGenerateOptionSerializer, + responses={status.HTTP_200_OK: api_v2_serializers.ReportGenerateSerializer}, + ) + @action( + detail=True, methods=["post"], + # IsAuthenticated only: report generation requires View permission, + # enforced by the permission-filtered get_queryset(). The viewset's + # permission_classes would check Edit (POST), which is too restrictive. + permission_classes=[IsAuthenticated], + ) + def generate_report(self, request, pk=None): + test = self.get_object() + + options = {} + # prepare post data + report_options = api_v2_serializers.ReportGenerateOptionSerializer( + data=request.data, + ) + if report_options.is_valid(): + options["include_finding_notes"] = report_options.validated_data[ + "include_finding_notes" + ] + options["include_finding_images"] = report_options.validated_data[ + "include_finding_images" + ] + options[ + "include_executive_summary" + ] = report_options.validated_data["include_executive_summary"] + options[ + "include_table_of_contents" + ] = report_options.validated_data["include_table_of_contents"] + else: + return Response( + report_options.errors, status=status.HTTP_400_BAD_REQUEST, + ) + + data = report_generate(request, test, options) + report = api_v2_serializers.ReportGenerateSerializer(data) + return Response(report.data) + + @extend_schema( + methods=["GET"], + responses={status.HTTP_200_OK: TestToNotesSerializer}, + ) + @extend_schema( + methods=["POST"], + request=api_v2_serializers.AddNewNoteOptionSerializer, + responses={status.HTTP_201_CREATED: api_v2_serializers.NoteSerializer}, + ) + @action(detail=True, methods=["get", "post"], permission_classes=(IsAuthenticated, permissions.UserHasTestNotePermission)) + def notes(self, request, pk=None): + test = self.get_object() + if request.method == "POST": + new_note = api_v2_serializers.AddNewNoteOptionSerializer( + data=request.data, + ) + if new_note.is_valid(): + entry = new_note.validated_data["entry"] + private = new_note.validated_data.get("private", False) + note_type = new_note.validated_data.get("note_type", None) + else: + return Response( + new_note.errors, status=status.HTTP_400_BAD_REQUEST, + ) + + notes = test.notes.filter(note_type=note_type).first() + if notes and note_type and note_type.is_single: + return Response("Only one instance of this note_type allowed on a test.", status=status.HTTP_400_BAD_REQUEST) + + author = request.user + note = Notes( + entry=entry, + author=author, + private=private, + note_type=note_type, + ) + note.save() + # Add an entry to the note history + history = NoteHistory.objects.create(data=note.entry, time=note.date, current_editor=note.author) + note.history.add(history) + # Now add the note to the object + test.notes.add(note) + # Determine if we need to send any notifications for user mentioned + process_tag_notifications( + request=request, + note=note, + parent_url=request.build_absolute_uri( + reverse("view_test", args=(test.id,)), + ), + parent_title=f"Test: {test.title}", + ) + + serialized_note = api_v2_serializers.NoteSerializer( + {"author": author, "entry": entry, "private": private}, + ) + return Response( + serialized_note.data, status=status.HTTP_201_CREATED, + ) + notes = test.notes.all() + + serialized_notes = TestToNotesSerializer( + {"test_id": test, "notes": notes}, + ) + return Response(serialized_notes.data, status=status.HTTP_200_OK) + + @extend_schema( + methods=["GET"], + responses={status.HTTP_200_OK: TestToFilesSerializer}, + ) + @extend_schema( + methods=["POST"], + request=api_v2_serializers.AddNewFileOptionSerializer, + responses={status.HTTP_201_CREATED: api_v2_serializers.FileSerializer}, + ) + @action( + detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), permission_classes=(IsAuthenticated, permissions.UserHasTestRelatedObjectPermission), + ) + def files(self, request, pk=None): + test = self.get_object() + if request.method == "POST": + new_file = api_v2_serializers.FileSerializer(data=request.data) + if new_file.is_valid(): + title = new_file.validated_data["title"] + file = new_file.validated_data["file"] + else: + return Response( + new_file.errors, status=status.HTTP_400_BAD_REQUEST, + ) + + file = FileUpload(title=title, file=file) + file.save() + test.files.add(file) + + serialized_file = api_v2_serializers.FileSerializer(file) + return Response( + serialized_file.data, status=status.HTTP_201_CREATED, + ) + + files = test.files.all() + serialized_files = TestToFilesSerializer( + {"test_id": test, "files": files}, + ) + return Response(serialized_files.data, status=status.HTTP_200_OK) + + @extend_schema( + methods=["GET"], + responses={ + status.HTTP_200_OK: api_v2_serializers.RawFileSerializer, + }, + ) + @action( + detail=True, + methods=["get"], + url_path=r"files/download/(?P\d+)", + permission_classes=(IsAuthenticated, permissions.UserHasTestRelatedObjectPermission), + ) + def download_file(self, request, file_id, pk=None): + test = self.get_object() + # Get the file object + file_object_qs = test.files.filter(id=file_id) + file_object = ( + file_object_qs.first() if len(file_object_qs) > 0 else None + ) + if file_object is None: + return Response( + {"error": "File ID not associated with Test"}, + status=status.HTTP_404_NOT_FOUND, + ) + # send file + return generate_file_response(file_object) + + +# Authorization: authenticated, configuration +class TestTypesViewSet( + mixins.UpdateModelMixin, + mixins.CreateModelMixin, + viewsets.ReadOnlyModelViewSet, +): + serializer_class = TestTypeSerializer + queryset = Test_Type.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_fields = [ + "name", + ] + permission_classes = (IsAuthenticated, DjangoModelPermissions) + + def get_queryset(self): + return Test_Type.objects.all().order_by("id") + + def get_serializer_class(self): + if self.action == "create": + return TestTypeCreateSerializer + return TestTypeSerializer + + +# @extend_schema_view(**schema_with_prefetch()) +# Nested models with prefetch make the response schema too long for Swagger UI +class TestImportViewSet( + PrefetchDojoModelViewSet, +): + serializer_class = TestImportSerializer + queryset = Test_Import.objects.none() + filter_backends = (DjangoFilterBackend,) + + filterset_class = TestImportAPIFilter + + permission_classes = ( + IsAuthenticated, + permissions.UserHasTestImportPermission, + ) + + def get_queryset(self): + return get_authorized_test_imports( + "view", + ).prefetch_related( + "test_import_finding_action_set", + "findings_affected", + "findings_affected__endpoints", + "findings_affected__status_finding", + "findings_affected__finding_meta", + "findings_affected__jira_issue", + "findings_affected__burprawrequestresponse_set", + "findings_affected__jira_issue", + "findings_affected__jira_issue", + "findings_affected__jira_issue", + "findings_affected__reviewers", + "findings_affected__notes", + "findings_affected__notes__author", + "findings_affected__notes__history", + "findings_affected__files", + "findings_affected__found_by", + "findings_affected__tags", + "findings_affected__risk_acceptance_set", + "test", + "test__tags", + "test__notes", + "test__notes__author", + "test__files", + "test__test_type", + "test__engagement", + "test__environment", + "test__engagement__product", + "test__engagement__product__prod_type", + ) diff --git a/dojo/test/models.py b/dojo/test/models.py new file mode 100644 index 00000000000..31cefaf52ac --- /dev/null +++ b/dojo/test/models.py @@ -0,0 +1,287 @@ +import logging +from contextlib import suppress + +from django.conf import settings +from django.db import models +from django.db.models import Count, Q +from django.urls import reverse +from django.utils.translation import gettext as _ +from django_extensions.db.models import TimeStampedModel +from tagulous.models import TagField + +logger = logging.getLogger(__name__) +deduplicationLogger = logging.getLogger("dojo.specific-loggers.deduplication") + +IMPORT_CREATED_FINDING = "N" +IMPORT_CLOSED_FINDING = "C" +IMPORT_REACTIVATED_FINDING = "R" +IMPORT_UNTOUCHED_FINDING = "U" + +IMPORT_ACTIONS = [ + (IMPORT_CREATED_FINDING, "created"), + (IMPORT_CLOSED_FINDING, "closed"), + (IMPORT_REACTIVATED_FINDING, "reactivated"), + (IMPORT_UNTOUCHED_FINDING, "untouched"), +] + + +class Test_Type(models.Model): + name = models.CharField(max_length=200, unique=True) + static_tool = models.BooleanField(default=False) + dynamic_tool = models.BooleanField(default=False) + active = models.BooleanField(default=True) + dynamically_generated = models.BooleanField( + default=False, + help_text=_("Set to True for test types that are created at import time")) + + class Meta: + ordering = ("name",) + + def __str__(self): + return self.name + + def get_breadcrumbs(self): + return [{"title": str(self), + "url": None}] + + +class Test(models.Model): + engagement = models.ForeignKey("dojo.Engagement", editable=False, on_delete=models.CASCADE) + lead = models.ForeignKey("dojo.Dojo_User", editable=True, null=True, blank=True, on_delete=models.RESTRICT) + test_type = models.ForeignKey("dojo.Test_Type", on_delete=models.CASCADE) + scan_type = models.TextField(null=True) + title = models.CharField(max_length=255, null=True, blank=True) + description = models.TextField(null=True, blank=True) + target_start = models.DateTimeField() + target_end = models.DateTimeField() + percent_complete = models.IntegerField(null=True, blank=True, + editable=True) + notes = models.ManyToManyField("dojo.Notes", blank=True, + editable=False) + files = models.ManyToManyField("dojo.FileUpload", blank=True, editable=False) + environment = models.ForeignKey("dojo.Development_Environment", null=True, + blank=False, on_delete=models.RESTRICT) + + updated = models.DateTimeField(auto_now=True, null=True) + created = models.DateTimeField(auto_now_add=True, null=True) + + tags = TagField(blank=True, force_lowercase=True, help_text=_("Add tags that help describe this test. Choose from the list or add new tags. Press Enter key to add.")) + inherited_tags = TagField(blank=True, force_lowercase=True, help_text=_("Internal use tags sepcifically for maintaining parity with product. This field will be present as a subset in the tags field")) + + version = models.CharField(max_length=100, null=True, blank=True) + + build_id = models.CharField(editable=True, max_length=150, + null=True, blank=True, help_text=_("Build ID that was tested, a reimport may update this field."), verbose_name=_("Build ID")) + commit_hash = models.CharField(editable=True, max_length=150, + null=True, blank=True, help_text=_("Commit hash tested, a reimport may update this field."), verbose_name=_("Commit Hash")) + branch_tag = models.CharField(editable=True, max_length=150, + null=True, blank=True, help_text=_("Tag or branch that was tested, a reimport may update this field."), verbose_name=_("Branch/Tag")) + api_scan_configuration = models.ForeignKey("dojo.Product_API_Scan_Configuration", null=True, editable=True, blank=True, on_delete=models.CASCADE, verbose_name=_("API Scan Configuration")) + + class Meta: + indexes = [ + models.Index(fields=["engagement", "test_type"]), + ] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.unsaved_metadata: list = [] + + def __str__(self): + if self.title: + return f"{self.title} ({self.test_type})" + return str(self.test_type) + + def get_absolute_url(self): + return reverse("view_test", args=[str(self.id)]) + + def test_type_name(self) -> str: + return self.test_type.name + + def get_breadcrumbs(self): + bc = self.engagement.get_breadcrumbs() + bc += [{"title": str(self), + "url": reverse("view_test", args=(self.id,))}] + return bc + + def copy(self, engagement=None): + from dojo.models import Finding, copy_model_util # noqa: PLC0415 -- lazy import, avoids circular dependency # isort: skip + copy = copy_model_util(self) + # Save the necessary ManyToMany relationships + old_notes = list(self.notes.all()) + old_files = list(self.files.all()) + old_tags = list(self.tags.all()) + old_findings = list(Finding.objects.filter(test=self)) + if engagement: + copy.engagement = engagement + # Save the object before setting any ManyToMany relationships + copy.save() + # Copy the notes + for notes in old_notes: + copy.notes.add(notes.copy()) + # Copy the files + for files in old_files: + copy.files.add(files.copy()) + # Copy the Findings + for finding in old_findings: + finding.copy(test=copy) + # Assign any tags + copy.tags.set(old_tags) + + return copy + + # only used by bulk risk acceptance api + @property + def unaccepted_open_findings(self): + from dojo.models import Finding # noqa: PLC0415 -- lazy import, avoids circular dependency + from dojo.utils import get_system_setting # noqa: PLC0415 circular import + findings = Finding.objects.filter(risk_accepted=False, active=True, duplicate=False, test=self) + if get_system_setting("enforce_verified_status", True) or get_system_setting("enforce_verified_status_metrics", True): + findings = findings.filter(verified=True) + + return findings + + def accept_risks(self, accepted_risks): + self.engagement.risk_acceptance.add(*accepted_risks) + + @property + def deduplication_algorithm(self): + deduplicationAlgorithm = settings.DEDUPE_ALGO_LEGACY + + if hasattr(settings, "DEDUPLICATION_ALGORITHM_PER_PARSER"): + if (self.test_type.name in settings.DEDUPLICATION_ALGORITHM_PER_PARSER): + deduplicationLogger.debug(f"using DEDUPLICATION_ALGORITHM_PER_PARSER for test_type.name: {self.test_type.name}") + deduplicationAlgorithm = settings.DEDUPLICATION_ALGORITHM_PER_PARSER[self.test_type.name] + elif (self.scan_type in settings.DEDUPLICATION_ALGORITHM_PER_PARSER): + deduplicationLogger.debug(f"using DEDUPLICATION_ALGORITHM_PER_PARSER for scan_type: {self.scan_type}") + deduplicationAlgorithm = settings.DEDUPLICATION_ALGORITHM_PER_PARSER[self.scan_type] + else: + deduplicationLogger.debug("Section DEDUPLICATION_ALGORITHM_PER_PARSER not found in settings.dist.py") + + deduplicationLogger.debug(f"DEDUPLICATION_ALGORITHM_PER_PARSER is: {deduplicationAlgorithm}") + return deduplicationAlgorithm + + @property + def hash_code_fields(self): + """Retrieve OS HASH_CODE_FIELDS_PER_SCANNER settings. Be aware when calling this to make sure Pro doesn't use these OS seetings""" + hashCodeFields = None + + if hasattr(settings, "HASHCODE_FIELDS_PER_SCANNER"): + if (self.test_type.name in settings.HASHCODE_FIELDS_PER_SCANNER): + deduplicationLogger.debug(f"using HASHCODE_FIELDS_PER_SCANNER for test_type.name: {self.test_type.name}") + hashCodeFields = settings.HASHCODE_FIELDS_PER_SCANNER[self.test_type.name] + elif (self.scan_type in settings.HASHCODE_FIELDS_PER_SCANNER): + deduplicationLogger.debug(f"using HASHCODE_FIELDS_PER_SCANNER for scan_type: {self.scan_type}") + hashCodeFields = settings.HASHCODE_FIELDS_PER_SCANNER[self.scan_type] + else: + deduplicationLogger.warning(f"test_type name {self.test_type.name} and scan_type {self.scan_type} not found in HASHCODE_FIELDS_PER_SCANNER") + else: + deduplicationLogger.debug("Section HASHCODE_FIELDS_PER_SCANNER not found in settings.dist.py") + + hash_code_fields_always = getattr(settings, "HASH_CODE_FIELDS_ALWAYS", []) + deduplicationLogger.debug(f"HASHCODE_FIELDS_PER_SCANNER is: {hashCodeFields} + HASH_CODE_FIELDS_ALWAYS: {hash_code_fields_always}") + + return hashCodeFields + + @property + def hash_code_allows_null_cwe(self): + hashCodeAllowsNullCwe = True + + if hasattr(settings, "HASHCODE_ALLOWS_NULL_CWE"): + if (self.test_type.name in settings.HASHCODE_ALLOWS_NULL_CWE): + deduplicationLogger.debug(f"using HASHCODE_ALLOWS_NULL_CWE for test_type.name: {self.test_type.name}") + hashCodeAllowsNullCwe = settings.HASHCODE_ALLOWS_NULL_CWE[self.test_type.name] + elif (self.scan_type in settings.HASHCODE_ALLOWS_NULL_CWE): + deduplicationLogger.debug(f"using HASHCODE_ALLOWS_NULL_CWE for scan_type: {self.scan_type}") + hashCodeAllowsNullCwe = settings.HASHCODE_ALLOWS_NULL_CWE[self.scan_type] + else: + deduplicationLogger.debug("Section HASHCODE_ALLOWS_NULL_CWE not found in settings.dist.py") + + deduplicationLogger.debug(f"HASHCODE_ALLOWS_NULL_CWE is: {hashCodeAllowsNullCwe}") + return hashCodeAllowsNullCwe + + def delete(self, *args, product_grading_option=True, **kwargs): + logger.debug("%d test delete", self.id) + super().delete(*args, **kwargs) + if product_grading_option: + from dojo.models import Engagement, Product # noqa: PLC0415 -- lazy import, avoids circular dependency + with suppress(Test.DoesNotExist, Engagement.DoesNotExist, Product.DoesNotExist): + # Suppressing a potential issue created from async delete removing + # related objects in a separate task + from dojo.utils import perform_product_grading # noqa: PLC0415 circular import + perform_product_grading(self.engagement.product) + + @property + def statistics(self): + """Queries the database, no prefetching, so could be slow for lists of model instances""" + from dojo.models import ( # noqa: PLC0415 -- lazy import, avoids circular dependency + Finding, + _get_annotations_for_statistics, + _get_statistics_for_queryset, + ) + return _get_statistics_for_queryset(Finding.objects.filter(test=self), _get_annotations_for_statistics) + + +class Test_Import(TimeStampedModel): + + IMPORT_TYPE = "import" + REIMPORT_TYPE = "reimport" + + test = models.ForeignKey("dojo.Test", editable=False, null=False, blank=False, on_delete=models.CASCADE) + findings_affected = models.ManyToManyField("dojo.Finding", through="dojo.Test_Import_Finding_Action") + import_settings = models.JSONField(null=True) + type = models.CharField(max_length=64, null=False, blank=False, default="unknown") + + version = models.CharField(max_length=100, null=True, blank=True) + build_id = models.CharField(editable=True, max_length=150, + null=True, blank=True, help_text=_("Build ID that was tested, a reimport may update this field."), verbose_name=_("Build ID")) + commit_hash = models.CharField(editable=True, max_length=150, + null=True, blank=True, help_text=_("Commit hash tested, a reimport may update this field."), verbose_name=_("Commit Hash")) + branch_tag = models.CharField(editable=True, max_length=150, + null=True, blank=True, help_text=_("Tag or branch that was tested, a reimport may update this field."), verbose_name=_("Branch/Tag")) + + def get_queryset(self): + logger.debug("prefetch test_import counts") + super_query = super().get_queryset() + super_query = super_query.annotate(created_findings_count=Count("findings", filter=Q(test_import_finding_action__action=IMPORT_CREATED_FINDING))) + super_query = super_query.annotate(closed_findings_count=Count("findings", filter=Q(test_import_finding_action__action=IMPORT_CLOSED_FINDING))) + super_query = super_query.annotate(reactivated_findings_count=Count("findings", filter=Q(test_import_finding_action__action=IMPORT_REACTIVATED_FINDING))) + return super_query.annotate(untouched_findings_count=Count("findings", filter=Q(test_import_finding_action__action=IMPORT_UNTOUCHED_FINDING))) + + class Meta: + ordering = ("-id",) + indexes = [ + models.Index(fields=["created", "test", "type"]), + ] + + def __str__(self): + return self.created.strftime("%Y-%m-%d %H:%M:%S") + + @property + def statistics(self): + """Queries the database, no prefetching, so could be slow for lists of model instances""" + from dojo.models import ( # noqa: PLC0415 -- lazy import, avoids circular dependency + Finding, + _get_annotations_for_statistics, + _get_statistics_for_queryset, + ) + stats = {} + for action in IMPORT_ACTIONS: + stats[action[1].lower()] = _get_statistics_for_queryset(Finding.objects.filter(test_import_finding_action__test_import=self, test_import_finding_action__action=action[0]), _get_annotations_for_statistics) + return stats + + +class Test_Import_Finding_Action(TimeStampedModel): + test_import = models.ForeignKey("dojo.Test_Import", editable=False, null=False, blank=False, on_delete=models.CASCADE) + finding = models.ForeignKey("dojo.Finding", editable=False, null=False, blank=False, on_delete=models.CASCADE) + action = models.CharField(max_length=100, null=True, blank=True, choices=IMPORT_ACTIONS) + + class Meta: + indexes = [ + models.Index(fields=["finding", "action", "test_import"]), + ] + unique_together = (("test_import", "finding")) + ordering = ("test_import", "action", "finding") + + def __str__(self): + return f"{self.finding.id}: {self.action}" diff --git a/dojo/test/services.py b/dojo/test/services.py new file mode 100644 index 00000000000..fbd5dbf3b59 --- /dev/null +++ b/dojo/test/services.py @@ -0,0 +1,33 @@ +# # tests +import logging + +from django.urls import reverse +from django.utils.translation import gettext as _ + +from dojo.celery_dispatch import dojo_dispatch_task +from dojo.notifications.helper import create_notification +from dojo.utils import calculate_grade + +logger = logging.getLogger(__name__) + + +def copy_test(test, engagement, user): + """ + Copy a test (and its findings) into the given engagement, recalculate the product + grade, and notify. Returns the new test. + + HTTP-free so both the UI view and (eventually) the API can call it. + """ + product = test.engagement.product + test_copy = test.copy(engagement=engagement) + dojo_dispatch_task(calculate_grade, product.id) + create_notification( + event="test_copied", + title=_("Copying of %s") % test.title, + description=f'The test "{test.title}" was copied by {user} to {engagement.name}', + product=product, + url=reverse("view_test", args=(test_copy.id,)), + recipients=[test.engagement.lead], + icon="exclamation-triangle", + ) + return test_copy diff --git a/dojo/test/ui/__init__.py b/dojo/test/ui/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/test/ui/filters.py b/dojo/test/ui/filters.py new file mode 100644 index 00000000000..60bc7960d54 --- /dev/null +++ b/dojo/test/ui/filters.py @@ -0,0 +1,64 @@ +import logging + +from django_filters import BooleanFilter, CharFilter, MultipleChoiceFilter, OrderingFilter + +from dojo.filters import DojoFilter +from dojo.models import IMPORT_ACTIONS, Test_Import, Test_Import_Finding_Action, Test_Type + +logger = logging.getLogger(__name__) + + +class TestImportFilter(DojoFilter): + version = CharFilter(field_name="version", lookup_expr="icontains") + version_exact = CharFilter(field_name="version", lookup_expr="iexact", label="Version Exact") + branch_tag = CharFilter(lookup_expr="icontains", label="Branch/Tag") + build_id = CharFilter(lookup_expr="icontains", label="Build ID") + commit_hash = CharFilter(lookup_expr="icontains", label="Commit hash") + + findings_affected = BooleanFilter(field_name="findings_affected", lookup_expr="isnull", exclude=True, label="Findings affected") + + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("date", "date"), + ("version", "version"), + ("branch_tag", "branch_tag"), + ("build_id", "build_id"), + ("commit_hash", "commit_hash"), + + ), + ) + + class Meta: + model = Test_Import + fields = [] + + +class TestImportFindingActionFilter(DojoFilter): + action = MultipleChoiceFilter(choices=IMPORT_ACTIONS) + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("action", "action"), + ), + ) + + class Meta: + model = Test_Import_Finding_Action + fields = [] + + +class TestTypeFilter(DojoFilter): + name = CharFilter(lookup_expr="icontains") + + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("name", "name"), + ), + ) + + class Meta: + model = Test_Type + exclude = [] + include = ("name",) diff --git a/dojo/test/ui/forms.py b/dojo/test/ui/forms.py new file mode 100644 index 00000000000..6114817ef00 --- /dev/null +++ b/dojo/test/ui/forms.py @@ -0,0 +1,86 @@ +import logging + +from django import forms + +from dojo.models import Development_Environment, Engagement, Product_API_Scan_Configuration, Test, Test_Type +from dojo.user.queries import get_authorized_users, get_authorized_users_for_product_and_product_type +from dojo.utils import get_product +from dojo.validators import tag_validator + +logger = logging.getLogger(__name__) + + +class TestForm(forms.ModelForm): + title = forms.CharField(max_length=255, required=False) + description = forms.CharField(widget=forms.Textarea(attrs={"rows": "3"}), required=False) + test_type = forms.ModelChoiceField(queryset=Test_Type.objects.all().order_by("name")) + environment = forms.ModelChoiceField( + queryset=Development_Environment.objects.all().order_by("name")) + target_start = forms.DateTimeField(widget=forms.TextInput( + attrs={"class": "datepicker", "autocomplete": "off"})) + target_end = forms.DateTimeField(widget=forms.TextInput( + attrs={"class": "datepicker", "autocomplete": "off"})) + lead = forms.ModelChoiceField( + queryset=None, + required=False, label="Testing Lead") + + def __init__(self, *args, **kwargs): + obj = None + + if "engagement" in kwargs: + obj = kwargs.pop("engagement") + + if "instance" in kwargs: + obj = kwargs.get("instance") + + super().__init__(*args, **kwargs) + + if obj: + product = get_product(obj) + self.fields["lead"].queryset = get_authorized_users_for_product_and_product_type(None, product, "view").filter(is_active=True) + self.fields["api_scan_configuration"].queryset = Product_API_Scan_Configuration.objects.filter(product=product) + else: + self.fields["lead"].queryset = get_authorized_users("view").filter(is_active=True) + + def is_valid(self): + valid = super().is_valid() + + # we're done now if not valid + if not valid: + return valid + if self.cleaned_data["target_start"] > self.cleaned_data["target_end"]: + self.add_error("target_start", "Your target start date exceeds your target end date") + self.add_error("target_end", "Your target start date exceeds your target end date") + return False + return True + + class Meta: + model = Test + fields = ["title", "test_type", "target_start", "target_end", "description", + "environment", "percent_complete", "tags", "lead", "version", "branch_tag", "build_id", "commit_hash", + "api_scan_configuration"] + + def clean_tags(self): + tag_validator(self.cleaned_data.get("tags")) + return self.cleaned_data.get("tags") + + +class DeleteTestForm(forms.ModelForm): + id = forms.IntegerField(required=True, + widget=forms.widgets.HiddenInput()) + + class Meta: + model = Test + fields = ["id"] + + +class CopyTestForm(forms.Form): + engagement = forms.ModelChoiceField( + required=True, + queryset=Engagement.objects.none(), + error_messages={"required": "*"}) + + def __init__(self, *args, **kwargs): + authorized_lists = kwargs.pop("engagements", None) + super().__init__(*args, **kwargs) + self.fields["engagement"].queryset = authorized_lists diff --git a/dojo/test/urls.py b/dojo/test/ui/urls.py similarity index 97% rename from dojo/test/urls.py rename to dojo/test/ui/urls.py index 335cf260b86..403068023cd 100644 --- a/dojo/test/urls.py +++ b/dojo/test/ui/urls.py @@ -1,6 +1,6 @@ from django.urls import re_path -from dojo.test import views +from dojo.test.ui import views urlpatterns = [ # tests diff --git a/dojo/test/views.py b/dojo/test/ui/views.py similarity index 97% rename from dojo/test/views.py rename to dojo/test/ui/views.py index 4e7f9c54dba..7d56d4e6587 100644 --- a/dojo/test/views.py +++ b/dojo/test/ui/views.py @@ -24,15 +24,12 @@ import dojo.finding.helper as finding_helper from dojo.authorization.authorization import user_has_permission_or_403 -from dojo.celery_dispatch import dojo_dispatch_task from dojo.engagement.queries import get_authorized_engagements -from dojo.filters import FindingFilter, FindingFilterWithoutObjectLookups, TemplateFindingFilter, TestImportFilter +from dojo.filters import FindingFilter, FindingFilterWithoutObjectLookups, TemplateFindingFilter from dojo.finding.queries import prefetch_for_findings from dojo.finding.views import find_available_notetypes from dojo.forms import ( AddFindingForm, - CopyTestForm, - DeleteTestForm, FindingBulkUpdateForm, JIRAFindingForm, JIRAImportScanForm, @@ -63,6 +60,9 @@ ScanTypeProductAnnouncement, ) from dojo.test.queries import get_authorized_tests +from dojo.test.services import copy_test as copy_test_service +from dojo.test.ui.filters import TestImportFilter +from dojo.test.ui.forms import CopyTestForm, DeleteTestForm from dojo.tools.factory import get_choices_sorted, get_scan_types_sorted from dojo.user.queries import get_authorized_users from dojo.utils import ( @@ -72,7 +72,6 @@ add_field_errors_to_response, add_success_message_to_response, async_delete, - calculate_grade, get_cal_event, get_page_items, get_page_items_and_count, @@ -325,21 +324,12 @@ def copy_test(request, tid): form = CopyTestForm(request.POST, engagements=engagement_list) if form.is_valid(): engagement = form.cleaned_data.get("engagement") - product = test.engagement.product - test_copy = test.copy(engagement=engagement) - dojo_dispatch_task(calculate_grade, product.id) + copy_test_service(test, engagement, request.user) messages.add_message( request, messages.SUCCESS, "Test Copied successfully.", extra_tags="alert-success") - create_notification(event="test_copied", # TODO: - if 'copy' functionality will be supported by API as well, 'create_notification' needs to be migrated to place where it will be able to cover actions from both interfaces - title=f"Copying of {test.title}", - description=f'The test "{test.title}" was copied by {request.user} to {engagement.name}', - product=product, - url=request.build_absolute_uri(reverse("view_test", args=(test_copy.id,))), - recipients=[test.engagement.lead], - icon="exclamation-triangle") return redirect_to_return_url_or_else(request, reverse("view_engagement", args=(engagement.id, ))) messages.add_message( request, diff --git a/dojo/test_type/views.py b/dojo/test_type/views.py index 5a25e9ed00a..c025761ba76 100644 --- a/dojo/test_type/views.py +++ b/dojo/test_type/views.py @@ -7,9 +7,9 @@ from django.shortcuts import get_object_or_404, render from django.urls import reverse -from dojo.filters import TestTypeFilter from dojo.forms import Test_TypeForm from dojo.models import Test_Type +from dojo.test.ui.filters import TestTypeFilter from dojo.utils import add_breadcrumb, get_page_items logger = logging.getLogger(__name__) diff --git a/dojo/urls.py b/dojo/urls.py index a31c6e62bd3..2930b601b11 100644 --- a/dojo/urls.py +++ b/dojo/urls.py @@ -45,9 +45,6 @@ SonarqubeIssueTransitionViewSet, SonarqubeIssueViewSet, SystemSettingsViewSet, - TestImportViewSet, - TestsViewSet, - TestTypesViewSet, ToolConfigurationsViewSet, ToolProductSettingsViewSet, ToolTypesViewSet, @@ -86,7 +83,8 @@ from dojo.sla_config.urls import urlpatterns as sla_urls from dojo.survey.urls import urlpatterns as survey_urls from dojo.system_settings.urls import urlpatterns as system_settings_urls -from dojo.test.urls import urlpatterns as test_urls +from dojo.test.api.urls import add_test_urls +from dojo.test.ui.urls import urlpatterns as test_urls from dojo.test_type.urls import urlpatterns as test_type_urls from dojo.tool_config.urls import urlpatterns as tool_config_urls from dojo.tool_product.urls import urlpatterns as tool_product_urls @@ -149,9 +147,7 @@ v2_api.register(r"sonarqube_transitions", SonarqubeIssueTransitionViewSet, basename="sonarqube_issue_transition") v2_api.register(r"system_settings", SystemSettingsViewSet, basename="system_settings") v2_api.register(r"technologies", AppAnalysisViewSet, basename="app_analysis") -v2_api.register(r"tests", TestsViewSet, basename="test") -v2_api.register(r"test_types", TestTypesViewSet, basename="test_type") -v2_api.register(r"test_imports", TestImportViewSet, basename="test_imports") +v2_api = add_test_urls(v2_api) v2_api.register(r"tool_configurations", ToolConfigurationsViewSet, basename="tool_configuration") v2_api.register(r"tool_product_settings", ToolProductSettingsViewSet, basename="tool_product_settings") v2_api.register(r"tool_types", ToolTypesViewSet, basename="tool_type") diff --git a/unittests/test_apply_finding_template.py b/unittests/test_apply_finding_template.py index f2fd228a7c0..51404069ac3 100644 --- a/unittests/test_apply_finding_template.py +++ b/unittests/test_apply_finding_template.py @@ -28,7 +28,7 @@ Test_Type, Vulnerability_Id, ) -from dojo.test import views as test_views +from dojo.test.ui import views as test_views from unittests.dojo_test_case import DojoTestCase, versioned_fixtures diff --git a/unittests/test_copy_model.py b/unittests/test_copy_model.py index f36348753b2..d67246630cc 100644 --- a/unittests/test_copy_model.py +++ b/unittests/test_copy_model.py @@ -1,6 +1,9 @@ +from unittest.mock import patch + from dojo.location.models import Location, LocationFindingReference from dojo.models import Endpoint, Endpoint_Status, Engagement, Finding, Product, Test, User +from dojo.test.services import copy_test from dojo.url.models import URL from .dojo_test_case import DojoTestCase, skip_unless_v2, skip_unless_v3 @@ -276,6 +279,34 @@ def test_duplicate_test_with_tags_and_notes(self): self.assertEqual(test.tags, test_copy.tags) +class TestCopyTestService(DojoTestCase): + + """Phase 2: the copy_test service holds the copy workflow extracted from the UI view.""" + + @patch("dojo.test.services.create_notification") + @patch("dojo.test.services.dojo_dispatch_task") + def test_copy_test_service(self, mock_dispatch, mock_notification): + user, _ = User.objects.get_or_create(username="admin") + product_type = self.create_product_type("svc_pt_test") + product = self.create_product("svc_copy_test_product", prod_type=product_type) + engagement = self.create_engagement("svc_eng_test", product) + test = self.create_test(engagement=engagement, scan_type="NPM Audit Scan", title="test") + _ = Finding.objects.create(test=test, reporter=user) + before_tests = Test.objects.filter(engagement=engagement).count() + before_findings = Finding.objects.filter(test__engagement=engagement).count() + # Run the service (copy into the same engagement) + test_copy = copy_test(test, engagement, user) + # A new test was created under the engagement, with its findings + self.assertEqual(before_tests + 1, Test.objects.filter(engagement=engagement).count()) + self.assertNotEqual(test.id, test_copy.id) + self.assertEqual(engagement, test_copy.engagement) + self.assertEqual(before_findings + 1, Finding.objects.filter(test__engagement=engagement).count()) + # Side effects: grade recalculation dispatched and a notification raised + mock_dispatch.assert_called_once() + mock_notification.assert_called_once() + self.assertEqual(mock_notification.call_args.kwargs["event"], "test_copied") + + class TestCopyEngagementModel(DojoTestCase): def test_duplicate_engagement(self): diff --git a/unittests/test_rest_framework.py b/unittests/test_rest_framework.py index c6b9d747231..4b16e26d358 100644 --- a/unittests/test_rest_framework.py +++ b/unittests/test_rest_framework.py @@ -60,8 +60,6 @@ ProductViewSet, RiskAcceptanceViewSet, SonarqubeIssueViewSet, - TestsViewSet, - TestTypesViewSet, ToolConfigurationsViewSet, ToolProductSettingsViewSet, ToolTypesViewSet, @@ -116,6 +114,7 @@ OrganizationViewSet, ) from dojo.product_type.api.views import ProductTypeViewSet +from dojo.test.api.views import TestsViewSet, TestTypesViewSet from dojo.url.api.views import URLViewSet from dojo.url.models import URL