From 80afeaaad796edab0a6fae2e14d594098d25c8fe Mon Sep 17 00:00:00 2001 From: GoldraK Date: Thu, 4 Jun 2026 13:57:21 +0200 Subject: [PATCH 1/2] feat(lacework): add Lacework API importer for vulnerability ingestion --- .../supported_tools/parsers/api/lacework.md | 89 +++ .../commands/lacework_debug_vuln.py | 193 ++++++ .../commands/lacework_import_all.py | 373 +++++++++++ dojo/settings/settings.dist.py | 10 + dojo/tool_config/factory.py | 2 + dojo/tools/api_lacework/__init__.py | 0 dojo/tools/api_lacework/api_client.py | 560 ++++++++++++++++ dojo/tools/api_lacework/importer.py | 586 ++++++++++++++++ dojo/tools/api_lacework/parser.py | 65 ++ dojo/tools/api_lacework/updater.py | 43 ++ dojo/tools/tool_issue_updater.py | 6 +- unittests/test_api_lacework.py | 633 ++++++++++++++++++ 12 files changed, 2559 insertions(+), 1 deletion(-) create mode 100644 docs/content/supported_tools/parsers/api/lacework.md create mode 100644 dojo/management/commands/lacework_debug_vuln.py create mode 100644 dojo/management/commands/lacework_import_all.py create mode 100644 dojo/tools/api_lacework/__init__.py create mode 100644 dojo/tools/api_lacework/api_client.py create mode 100644 dojo/tools/api_lacework/importer.py create mode 100644 dojo/tools/api_lacework/parser.py create mode 100644 dojo/tools/api_lacework/updater.py create mode 100644 unittests/test_api_lacework.py diff --git a/docs/content/supported_tools/parsers/api/lacework.md b/docs/content/supported_tools/parsers/api/lacework.md new file mode 100644 index 00000000000..f99f4043255 --- /dev/null +++ b/docs/content/supported_tools/parsers/api/lacework.md @@ -0,0 +1,89 @@ +--- +title: "Lacework API Import" +toc_hide: true +--- +All parsers that use API pull have common basic configuration steps, but with different values. Please, [read these steps](../) first. + +## Tool Configuration + +In `Tool Configuration`, select `Tool Type` "Lacework" and `Authentication Type` "API Key". +The URL must be in the format of `https://.lacework.net` +Enter the key ID in the "Username" field and paste your Lacework API secret key in the "API Key" field. +By default, the tool will import both container and host vulnerabilities. + +To restrict the import to only containers or only hosts, use the "Extras" field with the following options: + +| Extras value | Effect | +|---|---| +| *(empty)* | Import both containers and hosts (default) | +| `include_hosts=false` | Import containers only | +| `include_containers=false` | Import hosts only | +| `include_containers=true,include_hosts=true` | Both (same as empty) | + +## Product-Level Configuration + +In `Add API Scan Configuration` +- `Service key 1` can optionally be set to filter container vulnerabilities by repository name pattern. + When set, only container repositories matching the pattern (rlike) will be imported. + Leave empty to import all container repositories. + +## Import All Repositories (Management Command) + +Instead of importing per-product through the UI, you can import all repositories at once and auto-create Products using the management command: + +```bash +python manage.py lacework_import_all --tool-config +``` + +This command will: +1. Fetch all container vulnerabilities from Lacework +2. Automatically group them by repository +3. Create a Product for each unique repository (if it doesn't already exist) +4. Create an Engagement and Test for each product +5. Create Findings for each vulnerability + +The configuration for `include_containers` and `include_hosts` is automatically read from the "Extras" field of the Tool Configuration. + +## Sample Scan Data + +Sample Lacework vulnerability data can be examined using the debug command: + +```bash +python manage.py lacework_debug_vuln --tool-config --type containers +python manage.py lacework_debug_vuln --tool-config --type hosts +``` + +## Field Mapping + +Lacework vulnerability fields are mapped to DefectDojo Finding fields as follows: + +| Lacework Field | Finding Field | Example | +|---|---|---| +| `vulnId` | `vuln_id_from_tool` | CVE-2025-62727 | +| `severity` (or inferred from `riskScore`) | `severity` | Critical, High, Medium, Low, Info | +| `cveProps.description` + `featureProps.introduced_in` | `description` | Vulnerability description | +| `cveProps.link` + `cveProps.source` | `references` | CVE link and data source | +| `featureKey.name` | `component_name` | starlette, zlib, openssl | +| `featureKey.version` / `version_installed` | `component_version` | 0.47.3 | +| `featureProps.src` | `file_path` | Package path within image | +| `fixInfo.fix_available` | `fix_available` | 1 (true) if fix exists | +| `fixInfo.fixed_version` | `fix_version` | 0.49.1 | +| `cveRiskScore` / `riskScore` | `cvssv3_score` | 9.8 | +| `status` (VULNERABLE/GOOD) | `active` / `verified` | Active only if vulnerable | +| `packageStatus` | tags | pkg:NO_AGENT_AVAILABLE | +| `evalCtx.request_source` | tags | scanner:INLINE_SCANNER | +| `evalCtx.integration_props.NAME` | tags | integration:bitbucket-pipelines | +| `featureProps.feed` | tags | feed:rbs | + +## Deduplication + +The Lacework API Import uses `hash_code` algorithm for deduplication with the following fields: +- `vuln_id_from_tool` (CVE ID) +- `component_name` (package name) +- `file_path` (namespace/package path) + +This means the same CVE found in the same package will be properly deduplicated. + +## Multiple Lacework API Configurations + +In the import or re-import dialog, you can select which `API Scan Configuration` shall be used. If you do not choose any, DefectDojo will use the `API Scan Configuration` of the Product if there is only one defined or the Lacework `Tool Configuration` if there is only one. \ No newline at end of file diff --git a/dojo/management/commands/lacework_debug_vuln.py b/dojo/management/commands/lacework_debug_vuln.py new file mode 100644 index 00000000000..fdc170b31d3 --- /dev/null +++ b/dojo/management/commands/lacework_debug_vuln.py @@ -0,0 +1,193 @@ +""" +Management command to dump raw Lacework vulnerability data for debugging. + +Usage: + python manage.py lacework_debug_vuln --tool-config + +This will fetch one vulnerability and print its full JSON structure +so you can see what fields are available for mapping. +""" + +import json +import logging + +from django.core.management.base import BaseCommand, CommandError + +from dojo.models import Tool_Configuration +from dojo.tools.api_lacework.api_client import LaceworkAPI + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Dump raw Lacework vulnerability data for debugging field mapping" + + def add_arguments(self, parser): + parser.add_argument( + "--tool-config", + type=int, + required=True, + help="ID of the Tool Configuration for Lacework", + ) + parser.add_argument( + "--type", + type=str, + default="containers", + choices=["containers", "hosts"], + help="Type of vulnerabilities to dump (default: containers)", + ) + + def handle(self, *args, **options): + tool_config_id = options["tool_config"] + vuln_type = options["type"] + + try: + tool_config = Tool_Configuration.objects.get(id=tool_config_id) + except Tool_Configuration.DoesNotExist: + raise CommandError( + f"Tool Configuration with id {tool_config_id} not found" + ) + + self.stdout.write(f"Using Tool Configuration: {tool_config.name}") + self.stdout.write(f" URL: {tool_config.url}") + + client = LaceworkAPI(tool_config) + + from datetime import datetime, timedelta, timezone + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(hours=24) + start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") + end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") + + if vuln_type == "containers": + self.stdout.write(f"\nFetching container vulnerabilities...") + vulns = client.search_container_vulnerabilities( + start_time=start_time_str, + end_time=end_time_str, + ) + else: + self.stdout.write(f"\nFetching host vulnerabilities...") + vulns = client.search_host_vulnerabilities( + start_time=start_time_str, + end_time=end_time_str, + ) + + if not vulns: + self.stdout.write(self.style.WARNING("No vulnerabilities found")) + return + + self.stdout.write(f"Total vulnerabilities: {len(vulns)}") + + # Find first vulnerability that actually has a CVE/vulnId + vuln = None + for v in vulns: + if v.get("vulnId") and v.get("severity"): + vuln = v + break + + if not vuln: + self.stdout.write(self.style.WARNING("No vulnerability with CVE data found")) + return + + self.stdout.write(f"\nSelected vulnerability: {vuln.get('vulnId')} ({vuln.get('severity')})\n") + self.stdout.write( + self.style.SUCCESS( + f"\n=== Full JSON structure of 1 {vuln_type} vulnerability ===\n" + ) + ) + self.stdout.write(json.dumps(vuln, indent=2, default=str)) + + self.stdout.write( + self.style.SUCCESS( + f"\n=== TOP-LEVEL FIELDS ===\n" + ) + ) + for key, value in vuln.items(): + if not isinstance(value, (dict, list)): + self.stdout.write(f" {key}: {value}") + else: + self.stdout.write(f" {key}: ({type(value).__name__})") + + # Analyze available fields for mapping + self.stdout.write( + self.style.SUCCESS( + f"\n=== ANALYSIS ===\n" + ) + ) + self.stdout.write(f"vulnId (CVE): {vuln.get('vulnId', 'N/A')}") + self.stdout.write(f"severity: {vuln.get('severity', 'N/A')}") + self.stdout.write(f"status: {vuln.get('status', 'N/A')}") + self.stdout.write(f"riskScore: {vuln.get('riskScore', 'N/A')}") + self.stdout.write(f"cveRiskScore: {vuln.get('cveRiskScore', 'N/A')}") + self.stdout.write(f"startTime: {vuln.get('startTime', 'N/A')}") + self.stdout.write(f"endTime: {vuln.get('endTime', 'N/A')}") + self.stdout.write(f"evalGuid: {vuln.get('evalGuid', 'N/A')}") + self.stdout.write(f"imageId: {vuln.get('imageId', 'N/A')}") + + # FeatureKey details + fk = vuln.get("featureKey", {}) + self.stdout.write(f"\n featureKey.name: {fk.get('name', 'N/A')}") + self.stdout.write(f" featureKey.namespace: {fk.get('namespace', 'N/A')}") + if fk.get("version"): + self.stdout.write(f" featureKey.version: {fk['version']}") + if fk.get("version_installed"): + self.stdout.write(f" featureKey.version_installed: {fk['version_installed']}") + if fk.get("version_format"): + self.stdout.write(f" featureKey.version_format: {fk['version_format']}") + if fk.get("src"): + self.stdout.write(f" featureKey.src: {fk['src']}") + if fk.get("introduced_in"): + self.stdout.write(f" featureKey.introduced_in: {fk['introduced_in']}") + if fk.get("layer"): + self.stdout.write(f" featureKey.layer: {fk['layer']}") + if fk.get("package_active"): + self.stdout.write(f" featureKey.package_active: {fk['package_active']}") + if fk.get("package_path"): + self.stdout.write(f" featureKey.package_path: {fk['package_path']}") + + # FixInfo + fi = vuln.get("fixInfo", {}) + self.stdout.write(f"\n fixInfo.fix_available: {fi.get('fix_available', 'N/A')}") + self.stdout.write(f" fixInfo.fixed_version: {fi.get('fixed_version', 'N/A')}") + + # CveProps + cp = vuln.get("cveProps", {}) + self.stdout.write(f"\n cveProps.description: {cp.get('description', 'N/A')[:100]}...") + self.stdout.write(f" cveProps.link: {cp.get('link', 'N/A')}") + self.stdout.write(f" cveProps.source: {cp.get('source', 'N/A')}") + + # Metadata + meta = cp.get("metadata", {}) + nvd = meta.get("NVD", {}) + rbs = meta.get("RBS", {}) + self.stdout.write(f"\n metadata.NVD.CVSSv3.Score: {nvd.get('CVSSv3', {}).get('Score', 'N/A')}") + self.stdout.write(f" metadata.NVD.CVSSv2.Score: {nvd.get('CVSSv2', {}).get('Score', 'N/A')}") + self.stdout.write(f" metadata.RBS.CVSSv3.Score: {rbs.get('CVSSv3', {}).get('Score', 'N/A')}") + self.stdout.write(f" metadata.RBS.cwe_id: {rbs.get('cwe_id', 'N/A')}") + + # EvalCtx for containers + ec = vuln.get("evalCtx", {}) + if ec: + self.stdout.write(f"\n evalCtx.collector_type: {ec.get('collector_type', 'N/A')}") + ii = ec.get("image_info", {}) + if ii: + self.stdout.write(f" evalCtx.image_info.repo: {ii.get('repo', 'N/A')}") + self.stdout.write(f" evalCtx.image_info.registry: {ii.get('registry', 'N/A')}") + self.stdout.write(f" evalCtx.image_info.tags: {ii.get('tags', 'N/A')}") + self.stdout.write(f" evalCtx.image_info.digest: {ii.get('digest', 'N/A')}") + self.stdout.write(f" evalCtx.image_info.status: {ii.get('status', 'N/A')}") + self.stdout.write(f" evalCtx.image_info.type: {ii.get('type', 'N/A')}") + self.stdout.write(f" evalCtx.image_info.size: {ii.get('size', 'N/A')}") + + # Machine info for hosts + machine_tags = vuln.get("machineTags", {}) + if machine_tags: + self.stdout.write(f"\n machineTags.Hostname: {machine_tags.get('Hostname', 'N/A')}") + self.stdout.write(f" machineTags.VmProvider: {machine_tags.get('VmProvider', 'N/A')}") + self.stdout.write(f" machineTags.InstanceId: {machine_tags.get('InstanceId', 'N/A')}") + self.stdout.write(f" machineTags.Region: {machine_tags.get('Region', 'N/A')}") + + # Additional fields + self.stdout.write(f"\n additional top-level keys:") + for key in sorted(vuln.keys()): + self.stdout.write(f" - {key}") \ No newline at end of file diff --git a/dojo/management/commands/lacework_import_all.py b/dojo/management/commands/lacework_import_all.py new file mode 100644 index 00000000000..b6e5cb8fe8b --- /dev/null +++ b/dojo/management/commands/lacework_import_all.py @@ -0,0 +1,373 @@ +""" +Management command to import all Lacework vulnerabilities, auto-creating +Products per repository, Engagements, and Tests. + +Usage: + python manage.py lacework_import_all --tool-config + +Optional: + --include-hosts Also import host vulnerabilities (default: true) + --include-containers Also import container vulnerabilities (default: true) + --product-type-name Product Type name for auto-created products (default: "Lacework") + --engagement-name Engagement name template (default: "Lacework Scan {date}") +""" + +import logging +from datetime import datetime, timedelta, timezone + +from django.core.management.base import BaseCommand, CommandError +from django.db import transaction + +from dojo.models import ( + Development_Environment, + Engagement, + Product, + Product_Type, + Test, + Test_Type, + Tool_Configuration, +) +from dojo.tools.api_lacework.api_client import LaceworkAPI +from dojo.tools.api_lacework.importer import LaceworkApiImporter + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Import all Lacework vulnerabilities, auto-creating Products per repository." + + def add_arguments(self, parser): + parser.add_argument( + "--tool-config", + type=int, + required=True, + help="ID of the Tool Configuration for Lacework", + ) + + def _parse_extras(self, extras: str | None) -> dict: + """Parse the Tool Configuration Extras field for options. + + Returns a dict with keys: include_containers, include_hosts + """ + result = { + "include_containers": True, + "include_hosts": True, + } + if not extras: + return result + for entry in extras.split(","): + entry = entry.strip().lower() + if "=" in entry: + key, value = entry.split("=", 1) + key = key.strip() + value = value.strip().lower() + if key == "include_containers": + result["include_containers"] = value == "true" + elif key == "include_hosts": + result["include_hosts"] = value == "true" + return result + + def handle(self, *args, **options): + tool_config_id = options["tool_config"] + + # Get Tool Configuration + try: + tool_config = Tool_Configuration.objects.get(id=tool_config_id) + except Tool_Configuration.DoesNotExist: + raise CommandError( + f"Tool Configuration with id {tool_config_id} not found" + ) + + # Read configuration from Tool Configuration Extras + extras_config = self._parse_extras(tool_config.extras) + include_containers = extras_config["include_containers"] + include_hosts = extras_config["include_hosts"] + + self.stdout.write(f"Using Tool Configuration: {tool_config.name}") + self.stdout.write(f" URL: {tool_config.url}") + self.stdout.write(f" Include containers: {include_containers}") + self.stdout.write(f" Include hosts: {include_hosts}") + self.stdout.write(f" Extras: {tool_config.extras or '(empty)'}") + + # Get or create Product Type + product_type, _ = Product_Type.objects.get_or_create(name="Lacework") + self.stdout.write(f"Using Product Type: {product_type.name}") + + # Get or create Development Environment + dev_env, _ = Development_Environment.objects.get_or_create(name="Development") + + # Get or create Test Type + test_type, _ = Test_Type.objects.get_or_create(name="Lacework API Import") + self.stdout.write(f"Using Test Type: {test_type.name}") + + # Initialize Lacework API client + client = LaceworkAPI(tool_config) + + # Override include flags with options + client.include_containers = include_containers + client.include_hosts = include_hosts + + # Calculate time range + hours = 24 + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(hours=hours) + start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") + end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") + + # --- Import container vulnerabilities --- + if include_containers: + self.stdout.write( + f"\nFetching container vulnerabilities from {start_time_str} to {end_time_str}..." + ) + try: + container_vulns = client.search_container_vulnerabilities( + start_time=start_time_str, + end_time=end_time_str, + ) + self.stdout.write( + self.style.SUCCESS( + f"Found {len(container_vulns)} container vulnerabilities" + ) + ) + except Exception as e: + self.stderr.write( + self.style.ERROR( + f"Failed to fetch container vulnerabilities: {e}" + ) + ) + container_vulns = [] + + # Group container vulns by repository + container_by_repo = self._group_container_vulns_by_repo(container_vulns) + self.stdout.write( + f"Found {len(container_by_repo)} unique container repositories" + ) + + for repo_name, vulns in container_by_repo.items(): + self._import_vulns_to_product( + client=client, + repo_name=repo_name, + vulns=vulns, + product_type=product_type, + dev_env=dev_env, + test_type=test_type, + engagement_template="Lacework Scan {date}", + is_container=True, + ) + else: + self.stdout.write("Container vulnerabilities import is disabled.") + + # --- Import host vulnerabilities --- + if include_hosts: + self.stdout.write( + f"\nFetching host vulnerabilities from {start_time_str} to {end_time_str}..." + ) + try: + host_vulns = client.search_host_vulnerabilities( + start_time=start_time_str, + end_time=end_time_str, + ) + self.stdout.write( + self.style.SUCCESS( + f"Found {len(host_vulns)} host vulnerabilities" + ) + ) + except Exception as e: + self.stderr.write( + self.style.ERROR( + f"Failed to fetch host vulnerabilities: {e}" + ) + ) + host_vulns = [] + + # Group host vulns by hostname/machine + host_by_machine = self._group_host_vulns_by_machine(host_vulns) + self.stdout.write( + f"Found {len(host_by_machine)} unique host machines" + ) + + for machine_name, vulns in host_by_machine.items(): + self._import_vulns_to_product( + client=client, + repo_name=machine_name, + vulns=vulns, + product_type=product_type, + dev_env=dev_env, + test_type=test_type, + engagement_template="Lacework Scan {date}", + is_container=False, + ) + else: + self.stdout.write("Host vulnerabilities import is disabled.") + + self.stdout.write(self.style.SUCCESS("\nImport completed successfully.")) + + def _group_container_vulns_by_repo(self, vulns: list) -> dict: + """Group container vulnerabilities by repository name.""" + grouped = {} + for vuln in vulns: + eval_ctx = vuln.get("evalCtx", {}) + image_info = eval_ctx.get("image_info", {}) + repo = image_info.get("repo", "unknown") + if repo not in grouped: + grouped[repo] = [] + grouped[repo].append(vuln) + return grouped + + def _group_host_vulns_by_machine(self, vulns: list) -> dict: + """Group host vulnerabilities by machine hostname.""" + grouped = {} + for vuln in vulns: + machine_tags = vuln.get("machineTags", {}) + hostname = machine_tags.get("Hostname", "unknown") + if hostname not in grouped: + grouped[hostname] = [] + grouped[hostname].append(vuln) + return grouped + + @transaction.atomic + def _import_vulns_to_product( + self, + client, + repo_name: str, + vulns: list, + product_type, + dev_env, + test_type, + engagement_template: str, + is_container: bool, + ): + """Import vulnerabilities into a Product, auto-creating if needed.""" + source_type = "container" if is_container else "host" + display_name = f"Lacework {source_type}: {repo_name}" + + # Sanitize product name (max 255 chars) + product_name = repo_name[:255] + + # Get or create Product + try: + product, created = Product.objects.get_or_create( + name=product_name, + defaults={ + "prod_type": product_type, + "description": f"Auto-created from Lacework {source_type} import", + }, + ) + if created: + self.stdout.write( + self.style.SUCCESS(f" Created Product: {product_name}") + ) + else: + self.stdout.write(f" Using existing Product: {product_name}") + except Exception as e: + self.stderr.write( + self.style.ERROR(f" Failed to get/create Product {product_name}: {e}") + ) + return + + # Create Engagement + today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + engagement_name = engagement_template.replace("{date}", today) + try: + engagement, created = Engagement.objects.get_or_create( + name=engagement_name, + product=product, + defaults={ + "target_start": datetime.now(timezone.utc).date(), + "target_end": datetime.now(timezone.utc).date(), + "active": True, + "status": "In Progress", + }, + ) + if created: + self.stdout.write(f" Created Engagement: {engagement_name}") + else: + self.stdout.write(f" Using existing Engagement: {engagement_name}") + except Exception as e: + self.stderr.write( + self.style.ERROR( + f" Failed to get/create Engagement {engagement_name}: {e}" + ) + ) + return + + # Create Test + try: + test, created = Test.objects.get_or_create( + engagement=engagement, + test_type=test_type, + defaults={ + "title": f"{source_type.capitalize()} scan {today}", + "target_start": datetime.now(timezone.utc), + "target_end": datetime.now(timezone.utc), + "description": f"Lacework {source_type} vulnerabilities for {repo_name}", + }, + ) + if created: + self.stdout.write(f" Created Test: {test.title}") + else: + self.stdout.write(f" Using existing Test: {test.title}") + except Exception as e: + self.stderr.write( + self.style.ERROR(f" Failed to get/create Test: {e}") + ) + return + + # Create Findings from vulnerabilities + importer = LaceworkApiImporter() + if is_container: + new_findings = [ + importer._create_finding_from_container_vuln(v, test) + for v in vulns + if v.get("vulnId") + ] + else: + new_findings = [ + importer._create_finding_from_host_vuln(v, test) + for v in vulns + if v.get("vulnId") + ] + + # Filter out None values + new_findings = [f for f in new_findings if f is not None] + + if not new_findings: + self.stdout.write(" No findings to import") + return + + # Save findings + findings_created = 0 + findings_updated = 0 + for finding in new_findings: + try: + existing = test.finding_set.filter( + vuln_id_from_tool=finding.vuln_id_from_tool, + component_name=finding.component_name, + file_path=finding.file_path, + ).first() + if existing: + # Update existing finding + for field in [ + "severity", "description", "references", + "component_version", "cvssv3_score", "cvssv3", + "fix_available", "fix_version", "active", "verified", + ]: + setattr(existing, field, getattr(finding, field)) + existing.save() + findings_updated += 1 + else: + finding.save() + findings_created += 1 + except Exception as e: + self.stderr.write( + self.style.ERROR( + f" Failed to save finding {finding.title}: {e}" + ) + ) + + self.stdout.write( + self.style.SUCCESS( + f" Created {findings_created} findings, " + f"updated {findings_updated} existing findings" + ) + ) \ No newline at end of file diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index abe31dec9f9..0c34fe9cab3 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -224,6 +224,10 @@ DD_RATE_LIMITER_ACCOUNT_LOCKOUT=(bool, False), # when enabled SonarQube API parser will download the security hotspots DD_SONARQUBE_API_PARSER_HOTSPOTS=(bool, True), + # When enabled, Lacework API importer will also import host vulnerabilities (in addition to containers) + DD_LACEWORK_API_IMPORTER_INCLUDE_HOSTS=(bool, True), + # Time delta in hours for Lacework API import queries (max 168 hours / 7 days) + DD_LACEWORK_API_IMPORTER_TIMEDELTA_HOURS=(int, 24), # When enabled, deleting objects will be occur from the bottom up. In the example of deleting an engagement # The objects will be deleted as follows Endpoints -> Findings -> Tests -> Engagement DD_ASYNC_OBJECT_DELETE=(bool, False), @@ -1008,6 +1012,7 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param "SonarQube Scan": ["cwe", "severity", "file_path"], "SonarQube API Import": ["title", "file_path", "line"], "Sonatype Application Scan": ["title", "cwe", "file_path", "component_name", "component_version", "vulnerability_ids"], + "Lacework API Import": ["vuln_id_from_tool", "component_name", "file_path"], "Dependency Check Scan": ["title", "cwe", "file_path"], "Dockle Scan": ["title", "description", "vuln_id_from_tool"], "Dependency Track Finding Packaging Format (FPF) Export": ["component_name", "component_version", "vulnerability_ids"], @@ -1264,6 +1269,7 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param "SonarQube Scan": DEDUPE_ALGO_HASH_CODE, "SonarQube API Import": DEDUPE_ALGO_HASH_CODE, "Sonatype Application Scan": DEDUPE_ALGO_HASH_CODE, + "Lacework API Import": DEDUPE_ALGO_HASH_CODE, "Dependency Check Scan": DEDUPE_ALGO_HASH_CODE, "Dockle Scan": DEDUPE_ALGO_HASH_CODE, "Tenable Scan": DEDUPE_ALGO_HASH_CODE, @@ -1580,6 +1586,10 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param # Deside if SonarQube API parser should download the security hotspots SONARQUBE_API_PARSER_HOTSPOTS = env("DD_SONARQUBE_API_PARSER_HOTSPOTS") +# Lacework API Import configuration +LACEWORK_API_IMPORTER_INCLUDE_HOSTS = env("DD_LACEWORK_API_IMPORTER_INCLUDE_HOSTS") +LACEWORK_API_IMPORTER_TIMEDELTA_HOURS = env("DD_LACEWORK_API_IMPORTER_TIMEDELTA_HOURS") + # When enabled, deleting objects will be occur from the bottom up. In the example of deleting an engagement # The objects will be deleted as follows Endpoints -> Findings -> Tests -> Engagement ASYNC_OBJECT_DELETE = env("DD_ASYNC_OBJECT_DELETE") diff --git a/dojo/tool_config/factory.py b/dojo/tool_config/factory.py index 3715a52906f..78ad0a61231 100644 --- a/dojo/tool_config/factory.py +++ b/dojo/tool_config/factory.py @@ -2,6 +2,7 @@ from dojo.tools.api_bugcrowd.api_client import BugcrowdAPI from dojo.tools.api_cobalt.api_client import CobaltAPI from dojo.tools.api_edgescan.api_client import EdgescanAPI +from dojo.tools.api_lacework.api_client import LaceworkAPI from dojo.tools.api_sonarqube.api_client import SonarQubeAPI from dojo.tools.api_vulners.api_client import VulnersAPI @@ -10,6 +11,7 @@ "BlackDuck API": BlackduckAPI, "Cobalt.io": CobaltAPI, "Edgescan": EdgescanAPI, + "Lacework": LaceworkAPI, "SonarQube": SonarQubeAPI, "Vulners": VulnersAPI, } diff --git a/dojo/tools/api_lacework/__init__.py b/dojo/tools/api_lacework/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/tools/api_lacework/api_client.py b/dojo/tools/api_lacework/api_client.py new file mode 100644 index 00000000000..6508e9442cc --- /dev/null +++ b/dojo/tools/api_lacework/api_client.py @@ -0,0 +1,560 @@ +""" +Lacework API v2.0 Client for DefectDojo integration. + +This module provides a client to interact with the Lacework API v2.0, +handling authentication, pagination, and rate limiting. + +Based on the Lacework OpenAPI specification (lacework-api-v2.0.yaml). +""" + +import logging +import time +from datetime import datetime, timedelta, timezone + +import requests +from django.conf import settings +from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError + +logger = logging.getLogger(__name__) + + +class LaceworkAPI: + """Client for Lacework API v2.0. + + Handles authentication via X-LW-UAKS header, Bearer token management, + pagination, and rate limiting for Lacework API calls. + """ + + def __init__(self, tool_config): + """Initialize the Lacework API client. + + Args: + tool_config: ToolConfiguration instance with Lacework credentials. + - url: Base URL of Lacework instance (e.g., https://yourinstance.lacework.net) + - username: The keyId (used as the POST body to obtain the Bearer token) + - api_key: The secret key (X-LW-UAKS header value) + - authentication_type: Should be "API" + - extras: Comma-separated options: + "include_containers=true" - Import container vulnerabilities + "include_hosts=true" - Import host vulnerabilities + Default (empty): both containers and hosts are imported. + """ + self.session = requests.Session() + self.session.headers.update({"User-Agent": "DefectDojo"}) + + self.base_url = tool_config.url.rstrip("/") + self.api_key = tool_config.api_key # X-LW-UAKS value + self.key_id = tool_config.username # keyId required in the POST body + + # Parse extras for import options + self.include_containers = True # default: import containers + self.include_hosts = True # default: import hosts + self._parse_extras(tool_config.extras) + + # Token caching + self._bearer_token = None + self._token_expiry = None + + # Rate limiting + self._rate_limit_reset = None + + if not self.key_id: + raise Exception( + "Lacework keyId is required. Set it in the 'Username' field of the " + "Tool Configuration. The 'API Key' field should contain the X-LW-UAKS secret." + ) + + def _parse_extras(self, extras: str | None): + """Parse the extras field for import options. + + Supported options (comma-separated): + - include_containers=true/false: Import container vulnerabilities + - include_hosts=true/false: Import host vulnerabilities + + Examples: + - "" or None: Import both containers and hosts (default) + - "include_containers=true,include_hosts=false": Only containers + - "include_hosts=true,include_containers=false": Only hosts + - "include_containers=false": Only hosts + """ + if not extras: + return + + for entry in extras.split(","): + entry = entry.strip().lower() + if "=" in entry: + key, value = entry.split("=", 1) + key = key.strip() + value = value.strip().lower() + + if key == "include_containers": + self.include_containers = value == "true" + elif key == "include_hosts": + self.include_hosts = value == "true" + + def _get_bearer_token(self) -> str: + """Obtain a Bearer token from Lacework using the API key. + + Makes a POST request to /api/v2/access/tokens with: + - Header X-LW-UAKS: The secret key + - Body: {"keyId": ""} + + The returned token is cached and reused until it expires. + + Returns: + str: The Bearer token to use for API requests. + + Raises: + Exception: If the token request fails. + """ + # Check if we have a valid cached token + if self._bearer_token and self._token_expiry: + if datetime.now(timezone.utc) < self._token_expiry: + return self._bearer_token + + url = f"{self.base_url}/api/v2/access/tokens" + headers = { + "X-LW-UAKS": self.api_key, + "Content-Type": "application/json", + } + body = {"keyId": self.key_id} + + try: + response = self.session.post( + url, + headers=headers, + json=body, + timeout=getattr(settings, "REQUESTS_TIMEOUT", 30), + ) + + if not response.ok: + msg = ( + f"Unable to obtain Lacework Bearer token. " + f"HTTP {response.status_code}: {response.content.decode('utf-8')}" + ) + logger.error(msg) + raise Exception(msg) + + data = response.json() + logger.debug("Lacework token response: %s", data) + + # The token response is flat, not wrapped in "data": {} + # Response: {"expiresAt": "...", "token": "..."} + self._bearer_token = data.get("token") + + if not self._bearer_token: + msg = ( + f"Lacework token response did not contain a token. " + f"HTTP {response.status_code}. Response: {data}" + ) + logger.error(msg) + raise Exception(msg) + + # Calculate expiry (tokens typically expire in 1 hour) + # Refresh 5 minutes before expiry + expires_at_str = data.get("expiresAt") + if expires_at_str: + try: + expires_at = datetime.fromisoformat( + expires_at_str.replace("Z", "+00:00") + ) + self._token_expiry = expires_at - timedelta(minutes=5) + except (ValueError, AttributeError): + # If we can't parse the expiry, use a default 55 minutes + self._token_expiry = datetime.now(timezone.utc) + timedelta(minutes=55) + else: + self._token_expiry = datetime.now(timezone.utc) + timedelta(minutes=55) + + logger.info("Successfully obtained Lacework Bearer token") + return self._bearer_token + + except requests.exceptions.RequestException as e: + msg = f"Network error when obtaining Lacework token: {e}" + logger.error(msg) + raise Exception(msg) + except RequestsJSONDecodeError as e: + msg = f"Invalid JSON response from Lacework token endpoint: {e}" + logger.error(msg) + raise Exception(msg) + + def _request(self, method: str, path: str, **kwargs) -> dict: + """Make a generic API request with Bearer token authentication. + + Args: + method: HTTP method (GET, POST, etc.) + path: API path (e.g., /api/v2/Vulnerabilities/Containers/search) + **kwargs: Additional arguments passed to requests + + Returns: + dict: The JSON response data. + + Raises: + Exception: If the request fails. + """ + url = f"{self.base_url}{path}" + headers = { + "Authorization": f"Bearer {self._get_bearer_token()}", + "Content-Type": "application/json", + } + + # Handle rate limiting with retry + max_retries = 3 + for attempt in range(max_retries): + try: + response = self.session.request( + method, + url, + headers=headers, + timeout=getattr(settings, "REQUESTS_TIMEOUT", 30), + **kwargs, + ) + + # Handle rate limiting + if response.status_code == 429: + reset_seconds = int(response.headers.get("RateLimit-Reset", 60)) + logger.warning( + "Lacework rate limit hit. Waiting %d seconds...", reset_seconds + ) + time.sleep(min(reset_seconds, 300)) # Cap at 5 minutes + continue + + if not response.ok: + msg = ( + f"Lacework API error: HTTP {response.status_code} - " + f"{response.content.decode('utf-8')}" + ) + logger.error(msg) + raise Exception(msg) + + return response.json() + + except requests.exceptions.RequestException as e: + if attempt < max_retries - 1: + logger.warning( + "Request failed, retrying (%d/%d): %s", + attempt + 1, + max_retries, + e, + ) + time.sleep(2 ** attempt) # Exponential backoff + else: + raise + + def _get_all_pages(self, get_page_func) -> list: + """Helper to automatically paginate through all results. + + Lacework API returns paginated results with a nextPage URL. + This method collects all pages until there are no more. + + Strategy: + - First page is fetched via get_page_func (POST with filters) + - Subsequent pages are fetched via GET on the nextPage URL + returned in paging.urls.nextPage (full URLs like + https://instance.lacework.net/api/v2/Vulnerabilities/Containers/abc123) + + Args: + get_page_func: A callable that returns (page_data, next_page_url) + + Returns: + list: All items from all pages. + """ + all_items = [] + current_page_url = None + page_count = 0 + + while True: + try: + if current_page_url is None: + # First page: use the provided POST function + page_data, next_page_url = get_page_func() + else: + page_count += 1 + logger.debug( + "Fetching page %d: %s...", + page_count, current_page_url[:100], + ) + # Longer timeout for subsequent pages (dataset can be large) + response = self.session.get( + current_page_url, + headers={ + "Authorization": f"Bearer {self._get_bearer_token()}", + "Content-Type": "application/json", + }, + timeout=getattr(settings, "REQUESTS_TIMEOUT", 120), + ) + + if not response.ok: + logger.warning( + "Failed to fetch next page: HTTP %d", response.status_code + ) + break + + data = response.json() + page_data = data + paging = data.get("paging", {}) + urls = paging.get("urls", {}) + next_page_url = urls.get("nextPage") + + if not page_data: + break + + items = page_data.get("data", []) + if items: + all_items.extend(items) + logger.debug( + "Fetched %d items, total so far: %d", len(items), len(all_items) + ) + + if not next_page_url: + break + + current_page_url = next_page_url + + except requests.exceptions.Timeout: + logger.warning( + "Timeout fetching page %d (got %d items so far). " + "The dataset may be too large. Try a shorter time range.", + page_count, len(all_items), + ) + break + except Exception as e: + logger.warning( + "Pagination error on page %d: %s (got %d items so far)", + page_count, e, len(all_items), + ) + break + + logger.info( + "Pagination complete: fetched %d items across %d pages", + len(all_items), page_count, + ) + return all_items + + def list_container_registries(self) -> list: + """List all container registries configured in Lacework. + + Returns: + list: List of container registry configurations. + + Raises: + Exception: If the request fails. + """ + url = f"{self.base_url}/api/v2/ContainerRegistries" + + try: + response = self.session.get( + url, + headers={ + "Authorization": f"Bearer {self._get_bearer_token()}", + "Content-Type": "application/json", + }, + timeout=getattr(settings, "REQUESTS_TIMEOUT", 30), + ) + + if not response.ok: + msg = ( + f"Unable to list Lacework container registries. " + f"HTTP {response.status_code}: {response.content.decode('utf-8')}" + ) + logger.error(msg) + raise Exception(msg) + + data = response.json() + return data.get("data", []) + + except requests.exceptions.RequestException as e: + msg = f"Network error when listing container registries: {e}" + logger.error(msg) + raise Exception(msg) + + def search_container_vulnerabilities( + self, + start_time: str, + end_time: str, + filters: list | None = None, + ) -> list: + """Search for container vulnerabilities in Lacework. + + Uses the POST /api/v2/Vulnerabilities/Containers/search endpoint + with automatic pagination. + + Args: + start_time: Start time in ISO 8601 format (e.g., 2024-01-25T00:00:00.000Z) + end_time: End time in ISO 8601 format + filters: Optional list of filter dicts for additional filtering + + Returns: + list: All container vulnerabilities found. + + Raises: + Exception: If the request fails. + """ + body = { + "timeFilter": { + "startTime": start_time, + "endTime": end_time, + } + } + + if filters: + body["filters"] = filters + + def get_page(): + response = self.session.post( + f"{self.base_url}/api/v2/Vulnerabilities/Containers/search", + json=body, + headers={ + "Authorization": f"Bearer {self._get_bearer_token()}", + "Content-Type": "application/json", + }, + timeout=getattr(settings, "REQUESTS_TIMEOUT", 30), + ) + + if not response.ok: + msg = ( + f"Unable to search container vulnerabilities. " + f"HTTP {response.status_code}: {response.content.decode('utf-8')}" + ) + logger.error(msg) + raise Exception(msg) + + data = response.json() + paging = data.get("paging", {}) + urls = paging.get("urls", {}) + next_page_url = urls.get("nextPage") + + return data, next_page_url + + return self._get_all_pages(get_page) + + def search_host_vulnerabilities( + self, + start_time: str, + end_time: str, + filters: list | None = None, + ) -> list: + """Search for host vulnerabilities in Lacework. + + Uses the POST /api/v2/Vulnerabilities/Hosts/search endpoint + with automatic pagination. + + Args: + start_time: Start time in ISO 8601 format + end_time: End time in ISO 8601 format + filters: Optional list of filter dicts for additional filtering + + Returns: + list: All host vulnerabilities found. + + Raises: + Exception: If the request fails. + """ + body = { + "timeFilter": { + "startTime": start_time, + "endTime": end_time, + } + } + + if filters: + body["filters"] = filters + + def get_page(): + response = self.session.post( + f"{self.base_url}/api/v2/Vulnerabilities/Hosts/search", + json=body, + headers={ + "Authorization": f"Bearer {self._get_bearer_token()}", + "Content-Type": "application/json", + }, + timeout=getattr(settings, "REQUESTS_TIMEOUT", 30), + ) + + if not response.ok: + msg = ( + f"Unable to search host vulnerabilities. " + f"HTTP {response.status_code}: {response.content.decode('utf-8')}" + ) + logger.error(msg) + raise Exception(msg) + + data = response.json() + paging = data.get("paging", {}) + urls = paging.get("urls", {}) + next_page_url = urls.get("nextPage") + + return data, next_page_url + + return self._get_all_pages(get_page) + + def test_connection(self) -> str: + """Test the connection to Lacework API. + + Verifies that we can obtain a Bearer token and make a simple + API call. Does NOT require container registry permissions. + + Returns: + str: A message describing the connection status. + + Raises: + Exception: If the connection fails. + """ + try: + # First verify we can get a Bearer token + token = self._get_bearer_token() + if not token: + raise Exception("Failed to obtain Bearer token") + + # Try to list container registries for a meaningful response + try: + registries = self.list_container_registries() + return ( + f"Successfully connected to Lacework. " + f"Bearer token obtained and found {len(registries)} " + f"container registries." + ) + except Exception: + # If listing registries fails (permissions), at least we have a token + return ( + "Successfully connected to Lacework. " + "Bearer token obtained successfully." + ) + except Exception as e: + msg = f"Failed to connect to Lacework: {e}" + logger.error(msg) + raise Exception(msg) + + def test_product_connection(self, api_scan_configuration) -> str: + """Test connection for a specific product/repository. + + Verifies that the Lacework instance is accessible by obtaining + a Bearer token. Does NOT require container registry permissions. + + Args: + api_scan_configuration: APIScanConfiguration instance for the product + + Returns: + str: A message describing the connection status. + """ + try: + # Verify we can get a Bearer token + token = self._get_bearer_token() + if not token: + raise Exception("Failed to obtain Bearer token") + + repo_pattern = api_scan_configuration.service_key_1 or "" + + if repo_pattern: + return ( + f"Successfully connected to Lacework. " + f"Repository filter pattern: '{repo_pattern}'." + ) + else: + return ( + "Successfully connected to Lacework. " + "No repository filter configured (will import all repositories)." + ) + except Exception as e: + msg = f"Failed to connect to Lacework for product: {e}" + logger.error(msg) + raise Exception(msg) diff --git a/dojo/tools/api_lacework/importer.py b/dojo/tools/api_lacework/importer.py new file mode 100644 index 00000000000..729574bbdc8 --- /dev/null +++ b/dojo/tools/api_lacework/importer.py @@ -0,0 +1,586 @@ +""" +Lacework API Importer for DefectDojo. + +This module handles the import of container and host vulnerabilities from +Lacework API v2.0 into DefectDojo Findings. + +It follows the same pattern as SonarQubeApiImporter from dojo/tools/api_sonarqube/. +""" + +import logging +from datetime import datetime, timedelta, timezone + +from django.conf import settings +from django.core.exceptions import ValidationError + +from dojo.models import Finding + +from .api_client import LaceworkAPI + +logger = logging.getLogger(__name__) + + +class LaceworkApiImporter: + """Importer for Lacework vulnerabilities. + + This class is the services/business logic layer (equivalent to services.py + in the domain module pattern). It accepts domain objects (Test) and + primitives, never request/response objects. + + It imports vulnerabilities from: + - Lacework Container Vulnerabilities (/api/v2/Vulnerabilities/Containers/search) + - Lacework Host Vulnerabilities (/api/v2/Vulnerabilities/Hosts/search) + """ + + SCAN_LACEWORK = "Lacework API Import" + + def get_findings(self, filename, test): + """Main entry point for importing Lacework vulnerabilities. + + Args: + filename: Ignored (API-based import, no file needed) + test: Test instance to associate findings with + + Returns: + list[Finding]: List of Finding instances (not yet saved) + """ + items = [] + + # Get client to check which vulnerability types are enabled + # (options come from the Extras field in Tool Configuration) + try: + client, config = self.prepare_client(test) + except Exception: + client = None + config = None + + include_containers = client.include_containers if client else True + include_hosts = client.include_hosts if client else True + + # Import container vulnerabilities + if include_containers: + try: + items.extend(self.import_container_vulnerabilities(test)) + except Exception as e: + logger.exception("Failed to import container vulnerabilities") + self._notify_failure(test, "Container vulnerabilities import", str(e)) + else: + logger.info("Container vulnerabilities import is disabled via Extras config") + + # Import host vulnerabilities + if include_hosts: + try: + items.extend(self.import_host_vulnerabilities(test)) + except Exception as e: + logger.exception("Failed to import host vulnerabilities") + self._notify_failure(test, "Host vulnerabilities import", str(e)) + else: + logger.info("Host vulnerabilities import is disabled via Extras config") + + return items + + @staticmethod + def prepare_client(test): + """Prepare the Lacework API client from the test's configuration. + + Similar to SonarQubeApiImporter.prepare_client. + + Args: + test: Test instance with associated API scan configuration + + Returns: + tuple[LaceworkAPI, APIScanConfiguration]: The client and config + + Raises: + ValidationError: If configuration is missing or invalid + """ + from dojo.notifications.helper import create_notification + + product = test.engagement.product + + if test.api_scan_configuration: + config = test.api_scan_configuration + # Validate that the config belongs to this product + if config.product != product: + msg = ( + "Product API Scan Configuration and Product do not match. " + f'Product: "{product.name}" ({product.id}), ' + f'config.product: "{config.product.name}" ({config.product.id})' + ) + raise ValidationError(msg) + else: + sqqs = product.product_api_scan_configuration_set.filter( + product=product, + tool_configuration__tool_type__name="Lacework", + ) + if sqqs.count() == 1: + config = sqqs.first() + elif sqqs.count() > 1: + msg = ( + "More than one Product API Scan Configuration has been configured, but none has been " + "chosen. Please specify which one should be used. " + f'Product: "{product.name}" ({product.id})' + ) + raise ValidationError(msg) + else: + msg = ( + "There are no API Scan Configurations for this Product.\n" + "Please add at least one API Scan Configuration for Lacework to this Product. " + f'Product: "{product.name}" ({product.id})' + ) + raise ValidationError(msg) + + return LaceworkAPI(tool_config=config.tool_configuration), config + + def import_container_vulnerabilities(self, test): + """Import container vulnerabilities from Lacework. + + Fetches vulnerabilities using search_container_vulnerabilities() + and maps each one to a Finding instance. + + Args: + test: Test instance for the current engagement + + Returns: + list[Finding]: List of Finding instances + """ + items = [] + client, config = self.prepare_client(test) + + # Calculate time range (last 24 hours by default, or configured) + hours = getattr(settings, "LACEWORK_API_IMPORTER_TIMEDELTA_HOURS", 24) + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(hours=hours) + + start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") + end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") + + logger.info( + "Fetching container vulnerabilities from %s to %s", + start_time_str, + end_time_str, + ) + + # Filter by repository pattern from Service key 1 (API Scan Configuration) + filters = None + if config and config.service_key_1: + repo_pattern = config.service_key_1 + filters = [ + { + "field": "evalCtx.image_info.repo", + "expression": "like", + "value": f"%{repo_pattern}%", + } + ] + logger.info( + "Filtering container vulnerabilities by repository pattern: %s", + repo_pattern, + ) + else: + logger.info( + "No repository filter configured (Service key 1 is empty). " + "Importing ALL container vulnerabilities." + ) + + vulnerabilities = client.search_container_vulnerabilities( + start_time=start_time_str, + end_time=end_time_str, + filters=filters, + ) + + logger.info("Found %d container vulnerabilities", len(vulnerabilities)) + + for vuln in vulnerabilities: + try: + finding = self._create_finding_from_container_vuln(vuln, test) + if finding: + items.append(finding) + except Exception as e: + logger.warning( + "Failed to process container vulnerability %s: %s", + vuln.get("vulnId", "unknown"), + e, + ) + + return items + + def import_host_vulnerabilities(self, test): + """Import host vulnerabilities from Lacework. + + Fetches vulnerabilities using search_host_vulnerabilities() + and maps each one to a Finding instance. + + Args: + test: Test instance for the current engagement + + Returns: + list[Finding]: List of Finding instances + """ + items = [] + client, config = self.prepare_client(test) + + # Calculate time range + hours = getattr(settings, "LACEWORK_API_IMPORTER_TIMEDELTA_HOURS", 24) + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(hours=hours) + + start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") + end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") + + logger.info( + "Fetching host vulnerabilities from %s to %s", + start_time_str, + end_time_str, + ) + + # Filter by hostname pattern from Service key 1 (API Scan Configuration) + filters = None + if config and config.service_key_1: + hostname_pattern = config.service_key_1 + filters = [ + { + "field": "machineTags.Hostname", + "expression": "rlike", + "value": f".*{hostname_pattern}.*", + } + ] + logger.info( + "Filtering host vulnerabilities by hostname pattern: %s", + hostname_pattern, + ) + else: + logger.info( + "No hostname filter configured (Service key 1 is empty). " + "Importing ALL host vulnerabilities." + ) + + vulnerabilities = client.search_host_vulnerabilities( + start_time=start_time_str, + end_time=end_time_str, + filters=filters, + ) + + logger.info("Found %d host vulnerabilities", len(vulnerabilities)) + + for vuln in vulnerabilities: + try: + finding = self._create_finding_from_host_vuln(vuln, test) + if finding: + items.append(finding) + except Exception as e: + logger.warning( + "Failed to process host vulnerability %s: %s", + vuln.get("vulnId", "unknown"), + e, + ) + + return items + + def _build_common_finding(self, vuln: dict, test, source_type: str, + unique_id_parts: list[str]) -> tuple[dict, str, bool]: + """Build common finding fields shared by container and host vulns. + + Args: + vuln: Vulnerability dict from Lacework API + test: Test instance + source_type: "container" or "host" + unique_id_parts: Parts to build unique_id (vulnId + ...) + + Returns: + tuple of (fields_dict, unique_id, is_active) + """ + vuln_id = vuln.get("vulnId", "") + + # --- Severity --- + # If no direct severity field, infer from risk scores + severity_str = vuln.get("severity", "") + if not severity_str: + risk_score = vuln.get("riskScore") or vuln.get("cveRiskScore") or 0 + if risk_score >= 9.0: + severity_str = "Critical" + elif risk_score >= 7.0: + severity_str = "High" + elif risk_score >= 4.0: + severity_str = "Medium" + elif risk_score >= 1.0: + severity_str = "Low" + else: + severity_str = "Info" + severity = self._convert_lacework_severity(severity_str) + + # --- Description --- + cve_props = vuln.get("cveProps", {}) + description = cve_props.get("description", "No description provided") + + # Add introduced_in to description if available (for containers) + feature_props = vuln.get("featureProps", {}) + introduced_in = feature_props.get("introduced_in", "") + if introduced_in: + description += f"\n\n**Introduced in:** {introduced_in}" + + # --- References --- + references = "" + link = cve_props.get("link", "") + source = cve_props.get("source", "") + if link: + references = f"[CVE Reference]({link}) " + if source: + references += f"\n*Source: {source}*" + if vuln_id and vuln_id.startswith("CVE-"): + references += f"\nhttps://nvd.nist.gov/vuln/detail/{vuln_id}" + + # --- Component info --- + feature_key = vuln.get("featureKey", {}) + component_name = feature_key.get("name", "") + namespace = feature_key.get("namespace", "") + # Container uses "version", host uses "version_installed" + version_val = feature_key.get("version") or feature_key.get("version_installed", "") + + # Package path from featureProps (more specific than just namespace) + pkg_path = feature_props.get("src", namespace) + + # --- Fix info --- + fix_info = vuln.get("fixInfo", {}) + fix_available = bool(fix_info.get("fix_available", 0)) + fixed_version = fix_info.get("fixed_version", "") + + # --- CVSS score --- + cvss_score, cvss_vector = self._extract_cvss_score(vuln) + + # --- CWE --- + cwe = self._extract_cwe(vuln) + + # --- Status (active/mitigated) --- + status = vuln.get("status", "").upper() + is_active = status != "GOOD" # GOOD = resolved/mitigated + is_verified = status == "VULNERABLE" or is_active + + # --- Unique ID for dedup --- + unique_id = f"{source_type}:{'|'.join(unique_id_parts)}" + + # --- Tags --- + tags_parts = [] + package_status = vuln.get("packageStatus", "") + if package_status: + tags_parts.append(f"pkg:{package_status}") + + eval_ctx = vuln.get("evalCtx", {}) + request_source = eval_ctx.get("request_source", "") + if request_source: + tags_parts.append(f"scanner:{request_source}") + + integration_props = eval_ctx.get("integration_props", {}) + intg_name = integration_props.get("NAME", "") + if intg_name: + tags_parts.append(f"integration:{intg_name}") + + feed = feature_props.get("feed", "") + if feed: + tags_parts.append(f"feed:{feed}") + + fields = { + "title": f"{vuln_id} in {component_name}" if component_name else vuln_id, + "vuln_id_from_tool": vuln_id, + "description": description, + "test": test, + "severity": severity, + "references": references, + "component_name": component_name or None, + "component_version": version_val or None, + "file_path": pkg_path or namespace or None, + "cwe": cwe, + "cvssv3_score": cvss_score, + "cvssv3": cvss_vector or None, + "fix_available": fix_available, + "fix_version": fixed_version or None, + "static_finding": True, + "dynamic_finding": False, + "active": is_active, + "verified": is_verified, + "false_p": False, + "duplicate": False, + "out_of_scope": False, + "unique_id_from_tool": f"lacework:{unique_id}", + } + + return fields, unique_id, is_active + + def _create_finding_from_container_vuln(self, vuln: dict, test) -> Finding | None: + """Create a Finding from a Lacework container vulnerability. + + Maps Lacework container vulnerability fields to DefectDojo Finding fields. + """ + vuln_id = vuln.get("vulnId", "") + if not vuln_id: + return None + + # Image info + eval_ctx = vuln.get("evalCtx", {}) + image_info = eval_ctx.get("image_info", {}) + repo = image_info.get("repo", "") + image_tags = image_info.get("tags", []) + + # Build unique_id_parts: vulnId, repo, namespace, component_name + feature_key = vuln.get("featureKey", {}) + namespace = feature_key.get("namespace", "") + component_name = feature_key.get("name", "") + + unique_id_parts = [vuln_id, repo, namespace, component_name] + + fields, unique_id, is_active = self._build_common_finding( + vuln, test, "container", unique_id_parts + ) + + # Add container-specific tags + tags_parts = [] + existing_title = fields["title"] + + if repo not in str(tags_parts): + tags_parts.append(repo) + if image_tags: + tags_parts.extend(image_tags) + + find = Finding(**fields) + return find + + def _create_finding_from_host_vuln(self, vuln: dict, test) -> Finding | None: + """Create a Finding from a Lacework host vulnerability. + + Maps Lacework host vulnerability fields to DefectDojo Finding fields. + """ + vuln_id = vuln.get("vulnId", "") + if not vuln_id: + return None + + # Machine info + mid = vuln.get("mid", "") + machine_tags = vuln.get("machineTags", {}) + hostname = machine_tags.get("Hostname", "") + vm_provider = machine_tags.get("VmProvider", "") + + # Build unique_id_parts: vulnId, mid, namespace, component_name + feature_key = vuln.get("featureKey", {}) + namespace = feature_key.get("namespace", "") + component_name = feature_key.get("name", "") + + unique_id_parts = [str(vuln_id), str(mid), namespace, component_name] + + fields, unique_id, is_active = self._build_common_finding( + vuln, test, "host", unique_id_parts + ) + + find = Finding(**fields) + return find + + @staticmethod + def _convert_lacework_severity(lw_severity: str) -> str: + """Convert Lacework severity to DefectDojo severity. + + Args: + lw_severity: Lacework severity string (Critical, High, Medium, Low, Info) + + Returns: + str: DefectDojo severity string + """ + mapping = { + "Critical": "Critical", + "High": "High", + "Medium": "Medium", + "Low": "Low", + "Info": "Info", + } + return mapping.get(lw_severity, "Info") + + @staticmethod + def _extract_cvss_score(vuln: dict) -> tuple: + """Extract CVSSv3 score and vector from a vulnerability. + + Priority order: + 1. NVD CVSSv3 + 2. RBS (Rapid 7) CVSSv3 + 3. riskScore fallback + + Args: + vuln: Vulnerability dict from Lacework API + + Returns: + tuple[float | None, str | None]: (cvssv3_score, cvssv3_vector) + """ + cve_props = vuln.get("cveProps", {}) + metadata = cve_props.get("metadata", {}) + + # Try NVD CVSSv3 first + nvd = metadata.get("NVD", {}) + cvssv3 = nvd.get("CVSSv3", {}) + if cvssv3: + score = cvssv3.get("Score") + vector = cvssv3.get("Vectors") + if score is not None and score > 0: + return (float(score), vector) + + # Fallback to RBS (Rapid7) + rbs = metadata.get("RBS", {}) + cvssv3_rbs = rbs.get("CVSSv3", {}) + if cvssv3_rbs: + score = cvssv3_rbs.get("Score") + vector = cvssv3_rbs.get("Vectors") + if score is not None and score > 0: + return (float(score), vector) + + # Fallback to riskScore from Lacework + risk_score = vuln.get("riskScore") or vuln.get("cveRiskScore") + if risk_score is not None: + return (float(risk_score), None) + + return (None, None) + + @staticmethod + def _extract_cwe(vuln: dict) -> int | None: + """Extract CWE ID from a vulnerability. + + The CWE ID is found in cveProps.metadata.RBS.cwe_id dict + with the CVE ID as key. + + Args: + vuln: Vulnerability dict from Lacework API + + Returns: + int | None: CWE ID as integer, or None if not found + """ + cve_props = vuln.get("cveProps", {}) + metadata = cve_props.get("metadata", {}) + rbs = metadata.get("RBS", {}) + cwe_map = rbs.get("cwe_id", {}) + + # Find the CWE from the map + for cve_id, cwe_str in cwe_map.items(): + if cwe_str and cwe_str.startswith("CWE-"): + try: + return int(cwe_str.replace("CWE-", "")) + except (ValueError, TypeError): + continue + + return None + + @staticmethod + def _notify_failure(test, import_type: str, error_message: str): + """Send a notification about an import failure. + + Args: + test: The test being imported + import_type: Type of import that failed + error_message: Error description + """ + from dojo.notifications.helper import create_notification + + create_notification( + event="other", + title=f"Lacework {import_type} failed", + description=( + f"Lacework {import_type} failed for product " + f"'{test.engagement.product.name}': {error_message}" + ), + icon="exclamation-triangle", + source="Lacework API", + obj=test.engagement.product, + ) diff --git a/dojo/tools/api_lacework/parser.py b/dojo/tools/api_lacework/parser.py new file mode 100644 index 00000000000..790ea8aaba4 --- /dev/null +++ b/dojo/tools/api_lacework/parser.py @@ -0,0 +1,65 @@ +from .importer import LaceworkApiImporter + +SCAN_LACEWORK_API = "Lacework API Import" + + +class ApiLaceworkParser: + """Parser for Lacework API Import. + + This parser implements the DefectDojo parser contract for API-based imports. + It requires a Tool Type named "Lacework" to be configured. + """ + + def get_scan_types(self): + """Return the scan types supported by this parser.""" + return [SCAN_LACEWORK_API] + + def get_label_for_scan_types(self, scan_type): + """Return the label for the given scan type.""" + return SCAN_LACEWORK_API + + def get_description_for_scan_types(self, scan_type): + """Return the description for the given scan type.""" + return ( + "Lacework vulnerabilities can be directly imported using the Lacework API. " + "An API Scan Configuration has to be setup in the Product. " + "This importer fetches container and host vulnerabilities from Lacework API v2.0 " + "and maps them to DefectDojo Findings." + ) + + def requires_file(self, scan_type): + """Indicate that no file upload is required (API-based import).""" + return False + + def requires_tool_type(self, scan_type): + """Return the required Tool Type name.""" + return "Lacework" + + def api_scan_configuration_hint(self): + """Return a hint for configuring the API scan. + + When tool_configurations.count == 0, the template renders: + "Tool type Lacework exists however parser Lacework API Import requires at least one + tool configuration." + + When tool_configurations.count > 0, this hint is rendered instead. + We return the same format so it looks consistent. + """ + from dojo.models import Tool_Type + tool_type_id = Tool_Type.objects.filter(name="Lacework").values_list("id", flat=True).first() or "" + return ( + f'Tool type Lacework exists however parser Lacework API Import ' + f'requires at least one tool configuration.' + ) + + def get_findings(self, json_output, test): + """Import findings from Lacework API. + + Args: + json_output: Ignored for API-based imports + test: Test instance to associate findings with + + Returns: + list[Finding]: List of Finding instances + """ + return LaceworkApiImporter().get_findings(json_output, test) \ No newline at end of file diff --git a/dojo/tools/api_lacework/updater.py b/dojo/tools/api_lacework/updater.py new file mode 100644 index 00000000000..829f1e842da --- /dev/null +++ b/dojo/tools/api_lacework/updater.py @@ -0,0 +1,43 @@ +""" +Lacework API Updater for DefectDojo. + +Lacework does not have a concept of issue transitions like SonarQube. +This updater is a placeholder for future synchronization needs. + +If Lacework adds API support for vulnerability exception management, +this module can be extended to reflect DefectDojo finding status changes +back to Lacework. +""" + +import logging + +logger = logging.getLogger(__name__) + + +class LaceworkApiUpdater: + """Updater for Lacework findings. + + Note: Lacework does not currently support issue transitions via API + (unlike SonarQube which has transitions like confirm, resolve, etc.). + This class serves as a placeholder for future functionality. + """ + + def update_lacework_finding(self, finding): + """Update a finding status in Lacework. + + Currently a placeholder since Lacework does not support + issue transitions. If Lacework adds this capability in the future, + this method can be implemented to reflect DefectDojo status changes + back to Lacework via the VulnerabilityExceptions API. + + Args: + finding: The Finding instance whose status may need syncing + """ + logger.debug( + "Lacework updater called for finding %s. " + "Lacework does not currently support issue transitions.", + finding.id, + ) + # Future implementation could use: + # POST /api/v2/VulnerabilityExceptions to add exceptions + # for findings that are false positive or risk accepted in DefectDojo \ No newline at end of file diff --git a/dojo/tools/tool_issue_updater.py b/dojo/tools/tool_issue_updater.py index 8211e166eed..217d7e44d70 100644 --- a/dojo/tools/tool_issue_updater.py +++ b/dojo/tools/tool_issue_updater.py @@ -5,6 +5,8 @@ from dojo.celery import app from dojo.celery_dispatch import dojo_dispatch_task from dojo.models import Finding +from dojo.tools.api_lacework.parser import SCAN_LACEWORK_API +from dojo.tools.api_lacework.updater import LaceworkApiUpdater from dojo.tools.api_sonarqube.parser import SCAN_SONARQUBE_API from dojo.tools.api_sonarqube.updater import SonarQubeApiUpdater from dojo.tools.api_sonarqube.updater_from_source import SonarQubeApiUpdaterFromSource @@ -20,7 +22,7 @@ def async_tool_issue_update(finding, *args, **kwargs): def is_tool_issue_updater_needed(finding, *args, **kwargs): test_type = finding.test.test_type - return test_type.name == SCAN_SONARQUBE_API + return test_type.name in (SCAN_SONARQUBE_API, SCAN_LACEWORK_API) @app.task @@ -34,6 +36,8 @@ def tool_issue_updater(finding_id, *args, **kwargs): if test_type.name == SCAN_SONARQUBE_API: SonarQubeApiUpdater().update_sonarqube_finding(finding) + elif test_type.name == SCAN_LACEWORK_API: + LaceworkApiUpdater().update_lacework_finding(finding) @app.task diff --git a/unittests/test_api_lacework.py b/unittests/test_api_lacework.py new file mode 100644 index 00000000000..e7e5f16b76b --- /dev/null +++ b/unittests/test_api_lacework.py @@ -0,0 +1,633 @@ +""" +Unit tests for Lacework API Import integration. + +Tests the core functionality of the Lacework API importer and parser: +- Severity mapping +- CVSS score extraction +- CWE extraction +- Finding creation from container vulnerabilities +- Finding creation from host vulnerabilities +- Parser contract compliance +- Importer end-to-end with mock data +""" + +from unittest.mock import MagicMock, patch + +from django.test import TestCase +from django.utils import timezone + +from dojo.models import ( + Engagement, + Finding, + Product, + Product_Type, + Test, + Test_Type, + Tool_Configuration, + Tool_Type, +) +from dojo.tools.api_lacework.importer import LaceworkApiImporter +from dojo.tools.api_lacework.parser import ApiLaceworkParser, SCAN_LACEWORK_API + +from .dojo_test_case import DojoTestCase + + +class TestLaceworkApiImporter(DojoTestCase): + """Test the core logic of the Lacework API importer.""" + + def test_convert_lacework_severity_critical(self): + """Test that Critical severity maps correctly.""" + assert LaceworkApiImporter._convert_lacework_severity("Critical") == "Critical" + + def test_convert_lacework_severity_high(self): + """Test that High severity maps correctly.""" + assert LaceworkApiImporter._convert_lacework_severity("High") == "High" + + def test_convert_lacework_severity_medium(self): + """Test that Medium severity maps correctly.""" + assert LaceworkApiImporter._convert_lacework_severity("Medium") == "Medium" + + def test_convert_lacework_severity_low(self): + """Test that Low severity maps correctly.""" + assert LaceworkApiImporter._convert_lacework_severity("Low") == "Low" + + def test_convert_lacework_severity_info(self): + """Test that Info severity maps correctly.""" + assert LaceworkApiImporter._convert_lacework_severity("Info") == "Info" + + def test_convert_lacework_severity_unknown(self): + """Test that unknown severity defaults to Info.""" + assert LaceworkApiImporter._convert_lacework_severity("Unknown") == "Info" + + def test_extract_cvss_score_from_nvd(self): + """Test CVSSv3 extraction from NVD metadata (highest priority).""" + vuln = { + "cveProps": { + "metadata": { + "NVD": { + "CVSSv3": { + "Score": 9.8, + "Vectors": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", + }, + }, + "RBS": { + "CVSSv3": { + "Score": 7.5, + "Vectors": "CVSS:3.0/AV:N/AC:L/Au:N/C:P/I:P/A:P", + }, + }, + }, + }, + "riskScore": 10, + } + score, vector = LaceworkApiImporter._extract_cvss_score(vuln) + assert score == 9.8 + assert vector == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H" + + def test_extract_cvss_score_from_rbs(self): + """Test CVSSv3 extraction from RBS metadata when NVD is not available.""" + vuln = { + "cveProps": { + "metadata": { + "NVD": {}, + "RBS": { + "CVSSv3": { + "Score": 7.5, + "Vectors": "CVSS:3.0/AV:N/AC:L/Au:N/C:P/I:P/A:P", + }, + }, + }, + }, + } + score, vector = LaceworkApiImporter._extract_cvss_score(vuln) + assert score == 7.5 + assert vector == "CVSS:3.0/AV:N/AC:L/Au:N/C:P/I:P/A:P" + + def test_extract_cvss_score_from_riskscore(self): + """Test fallback to riskScore when no CVSS metadata is available.""" + vuln = { + "cveProps": { + "metadata": { + "NVD": {}, + "RBS": {}, + }, + }, + "riskScore": 10, + } + score, vector = LaceworkApiImporter._extract_cvss_score(vuln) + assert score == 10.0 + assert vector is None + + def test_extract_cvss_score_from_cveriskscore(self): + """Test fallback to cveRiskScore.""" + vuln = { + "cveProps": { + "metadata": { + "NVD": {}, + "RBS": {}, + }, + }, + "cveRiskScore": 9.8, + } + score, vector = LaceworkApiImporter._extract_cvss_score(vuln) + assert score == 9.8 + + def test_extract_cvss_score_none_when_no_data(self): + """Test that None is returned when no score data exists.""" + vuln = { + "cveProps": { + "metadata": { + "NVD": {}, + "RBS": {}, + }, + }, + } + score, vector = LaceworkApiImporter._extract_cvss_score(vuln) + assert score is None + assert vector is None + + def test_extract_cwe_success(self): + """Test CWE extraction from RBS metadata.""" + vuln = { + "cveProps": { + "metadata": { + "RBS": { + "cwe_id": { + "CVE-2022-37434": "CWE-787", + }, + }, + }, + }, + } + cwe = LaceworkApiImporter._extract_cwe(vuln) + assert cwe == 787 + + def test_extract_cwe_multiple(self): + """Test CWE extraction when there are multiple CWEs.""" + vuln = { + "cveProps": { + "metadata": { + "RBS": { + "cwe_id": { + "CVE-2022-37434": "CWE-787", + "CVE-2023-21100": "CWE-787", + }, + }, + }, + }, + } + cwe = LaceworkApiImporter._extract_cwe(vuln) + assert cwe == 787 + + def test_extract_cwe_none_when_no_cwe(self): + """Test that None is returned when no CWE data exists.""" + vuln = { + "cveProps": { + "metadata": { + "RBS": {}, + }, + }, + } + cwe = LaceworkApiImporter._extract_cwe(vuln) + assert cwe is None + + def test_extract_cwe_none_when_no_metadata(self): + """Test that None is returned when no metadata exists.""" + vuln = {"cveProps": {}} + cwe = LaceworkApiImporter._extract_cwe(vuln) + assert cwe is None + + def test_create_finding_from_container_vuln(self): + """Test Finding creation from a container vulnerability with all fields.""" + vuln = { + "vulnId": "CVE-2022-37434", + "severity": "Critical", + "cveProps": { + "description": "Heap-based buffer over-read in zlib through 1.2.12", + "link": "https://security-tracker.debian.org/tracker/CVE-2022-37434", + "metadata": { + "NVD": { + "CVSSv3": { + "Score": 9.8, + "Vectors": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", + }, + }, + "RBS": { + "cwe_id": { + "CVE-2022-37434": "CWE-787", + }, + }, + }, + }, + "evalCtx": { + "image_info": { + "repo": "index.docker.io/library/postgres", + "tags": ["14.4"], + }, + }, + "featureKey": { + "name": "zlib", + "namespace": "debian:11", + "version": "1:1.2.11.dfsg-2+deb11u1", + }, + "fixInfo": { + "fix_available": 1, + "fixed_version": "1:1.2.11.dfsg-2+deb11u2", + }, + "riskScore": 10, + "status": "VULNERABLE", + } + + # Create a proper instance to call the instance method + importer = LaceworkApiImporter() + # Test the static helper methods independently + severity = LaceworkApiImporter._convert_lacework_severity(vuln.get("severity", "Info")) + assert severity == "Critical" + + cwe = LaceworkApiImporter._extract_cwe(vuln) + assert cwe == 787 + + score, vector = LaceworkApiImporter._extract_cvss_score(vuln) + assert score == 9.8 + assert vector == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H" + + def test_create_finding_from_host_vuln(self): + """Test Finding creation from a host vulnerability with all fields.""" + vuln = { + "vulnId": "CVE-2016-1585", + "severity": "Medium", + "cveProps": { + "description": "AppArmor mount rules are accidentally widened when compiled", + "link": "http://people.ubuntu.com/~ubuntu-security/cve/CVE-2016-1585", + "metadata": { + "NVD": { + "CVSSv3": { + "Score": 9.8, + "Vectors": "CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", + }, + }, + "RBS": { + "cwe_id": { + "CVE-2016-1585": "CWE-254", + }, + }, + }, + }, + "featureKey": { + "name": "apparmor", + "namespace": "ubuntu:20.04", + "version_installed": "2.13.3-7ubuntu5.3", + }, + "fixInfo": { + "fix_available": 0, + "fixed_version": "", + }, + "mid": 7112040530067849000, + "machineTags": { + "Hostname": "my-server-hostname", + "VmProvider": "AWS", + }, + "riskScore": 9.74, + "status": "Active", + } + + # Verify the mapping logic extracts fields correctly + cwe = LaceworkApiImporter._extract_cwe(vuln) + assert cwe == 254 + + score, vector = LaceworkApiImporter._extract_cvss_score(vuln) + assert score == 9.8 + assert vector == "CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H" + + +class TestApiLaceworkParser(TestCase): + """Test the parser contract implementation.""" + + def setUp(self): + self.parser = ApiLaceworkParser() + + def test_get_scan_types(self): + """Test that the parser returns the correct scan type.""" + scan_types = self.parser.get_scan_types() + assert len(scan_types) == 1 + assert scan_types[0] == SCAN_LACEWORK_API + + def test_get_label_for_scan_types(self): + """Test that the label matches the scan type.""" + assert ( + self.parser.get_label_for_scan_types(SCAN_LACEWORK_API) + == SCAN_LACEWORK_API + ) + + def test_get_description_for_scan_types(self): + """Test that a description is returned.""" + description = self.parser.get_description_for_scan_types(SCAN_LACEWORK_API) + assert description is not None + assert len(description) > 0 + assert "Lacework" in description + + def test_requires_file(self): + """Test that no file is required (API-based import).""" + assert self.parser.requires_file(SCAN_LACEWORK_API) is False + + def test_requires_tool_type(self): + """Test that the required tool type is 'Lacework'.""" + assert self.parser.requires_tool_type(SCAN_LACEWORK_API) == "Lacework" + + def test_api_scan_configuration_hint(self): + """Test that a configuration hint is provided.""" + hint = self.parser.api_scan_configuration_hint() + assert hint is not None + assert len(hint) > 0 + assert "Service key 1" in hint + + def test_get_findings_with_empty_input(self): + """Test that get_findings returns a list even with empty input.""" + # The importer relies on a valid test object with engagement and product + # When None is passed, it should raise an error because it can't access + # test.engagement.product. This test verifies the error handling returns + # an empty list rather than crashing. + # We pass a mock object that will fail validation gracefully + from unittest.mock import MagicMock + + mock_test = MagicMock() + mock_test.api_scan_configuration = None + mock_test.engagement.product.name = "test" + mock_test.engagement.product.product_api_scan_configuration_set.filter.return_value.count.return_value = 0 + + result = self.parser.get_findings(None, mock_test) + assert isinstance(result, list) + # Should be empty because no API Scan Configuration is configured + assert len(result) == 0 + + +class TestLaceworkApiImporterIntegration(DojoTestCase): + """Integration tests for the Lacework importer with real DB models.""" + + def setUp(self): + """Set up test data with real DB models.""" + # Create Tool Type + self.tool_type = Tool_Type.objects.create(name="Lacework") + + # Create Tool Configuration + self.tool_config = Tool_Configuration.objects.create( + name="Lacework Test", + tool_type=self.tool_type, + authentication_type="API", + url="https://test.lacework.net", + username="test-key-id", + api_key="test-api-key", + ) + + # Create Product Type and Product + self.product_type = Product_Type.objects.create(name="Lacework") + self.product = Product.objects.create( + name="test-container-repo", + prod_type=self.product_type, + description="Test product for Lacework import", + ) + + # Create API Scan Configuration + self.api_scan_config = self.product.product_api_scan_configuration_set.create( + product=self.product, + tool_configuration=self.tool_config, + ) + + # Create Engagement + self.engagement = Engagement.objects.create( + product=self.product, + name="Lacework Test Scan", + target_start=timezone.now().date(), + target_end=timezone.now().date(), + active=True, + status="In Progress", + ) + + # Get or create Test Type + self.test_type, _ = Test_Type.objects.get_or_create(name="Lacework API Import") + + # Create Test + self.test = Test.objects.create( + engagement=self.engagement, + test_type=self.test_type, + title="Container scan test", + target_start=timezone.now(), + target_end=timezone.now(), + api_scan_configuration=self.api_scan_config, + description="Lacework test import", + ) + + self.importer = LaceworkApiImporter() + + def test_get_findings_with_mocked_client_container_vulns(self): + """Test that get_findings creates Finding objects from mocked container vulns.""" + mock_vulns = [ + { + "vulnId": "CVE-2022-37434", + "severity": "Critical", + "cveProps": { + "description": "Heap-based buffer over-read in zlib through 1.2.12", + "link": "https://security-tracker.debian.org/tracker/CVE-2022-37434", + "metadata": { + "NVD": { + "CVSSv3": {"Score": 9.8, "Vectors": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H"}, + }, + "RBS": { + "cwe_id": {"CVE-2022-37434": "CWE-787"}, + }, + }, + }, + "evalCtx": { + "image_info": { + "repo": "index.docker.io/library/postgres", + "tags": ["14.4"], + }, + }, + "featureKey": { + "name": "zlib", + "namespace": "debian:11", + "version": "1:1.2.11.dfsg-2+deb11u1", + }, + "fixInfo": { + "fix_available": 1, + "fixed_version": "1:1.2.11.dfsg-2+deb11u2", + }, + "riskScore": 10, + "status": "VULNERABLE", + }, + { + "vulnId": "CVE-2023-12345", + "severity": "High", + "cveProps": { + "description": "Another test vulnerability", + "link": "https://example.com/cve", + "metadata": {"NVD": {}, "RBS": {}}, + }, + "evalCtx": { + "image_info": { + "repo": "index.docker.io/library/postgres", + "tags": ["latest"], + }, + }, + "featureKey": { + "name": "openssl", + "namespace": "debian:11", + "version": "1.1.1n-0+deb11u5", + }, + "fixInfo": { + "fix_available": 0, + "fixed_version": "", + }, + "riskScore": 8.5, + "status": "VULNERABLE", + }, + ] + + with patch.object(self.importer, "prepare_client") as mock_prepare: + mock_client = MagicMock() + mock_prepare.return_value = (mock_client, self.api_scan_config) + + mock_client.include_containers = True + mock_client.include_hosts = True + mock_client.search_container_vulnerabilities.return_value = mock_vulns + mock_client.search_host_vulnerabilities.return_value = [] + + findings = self.importer.get_findings(None, self.test) + + # Should have 2 findings + assert len(findings) == 2 + + # Verify first finding fields + assert findings[0].vuln_id_from_tool == "CVE-2022-37434" + assert findings[0].severity == "Critical" + assert findings[0].component_name == "zlib" + assert findings[0].component_version == "1:1.2.11.dfsg-2+deb11u1" + assert findings[0].file_path == "debian:11" + assert findings[0].fix_available is True + assert findings[0].fix_version == "1:1.2.11.dfsg-2+deb11u2" + assert findings[0].cwe == 787 + assert findings[0].cvssv3_score == 9.8 + assert findings[0].cvssv3 == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H" + assert findings[0].static_finding is True + assert findings[0].active is True + assert findings[0].verified is True + + # Verify second finding + assert findings[1].vuln_id_from_tool == "CVE-2023-12345" + assert findings[1].severity == "High" + assert findings[1].component_name == "openssl" + assert findings[1].cvssv3_score == 8.5 + assert findings[1].fix_available is False + assert findings[1].cwe is None + + def test_get_findings_with_mocked_client_host_vulns(self): + """Test that get_findings creates Finding objects from mocked host vulns.""" + mock_vulns = [ + { + "vulnId": "CVE-2016-1585", + "severity": "Medium", + "cveProps": { + "description": "AppArmor mount rules are accidentally widened", + "link": "http://people.ubuntu.com/~ubuntu-security/cve/CVE-2016-1585", + "metadata": { + "NVD": {"CVSSv3": {"Score": 9.8, "Vectors": "CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H"}}, + "RBS": {"cwe_id": {"CVE-2016-1585": "CWE-254"}}, + }, + }, + "featureKey": { + "name": "apparmor", + "namespace": "ubuntu:20.04", + "version_installed": "2.13.3-7ubuntu5.3", + }, + "fixInfo": {"fix_available": 0, "fixed_version": ""}, + "mid": 7112040530067849000, + "machineTags": {"Hostname": "my-server", "VmProvider": "AWS"}, + "riskScore": 9.74, + "status": "Active", + }, + ] + + with patch.object(self.importer, "prepare_client") as mock_prepare: + mock_client = MagicMock() + mock_prepare.return_value = (mock_client, self.api_scan_config) + + mock_client.include_containers = True + mock_client.include_hosts = True + mock_client.search_container_vulnerabilities.return_value = [] + mock_client.search_host_vulnerabilities.return_value = mock_vulns + + findings = self.importer.get_findings(None, self.test) + + assert len(findings) == 1 + assert findings[0].vuln_id_from_tool == "CVE-2016-1585" + assert findings[0].severity == "Medium" + assert findings[0].component_name == "apparmor" + assert findings[0].component_version == "2.13.3-7ubuntu5.3" + assert findings[0].file_path == "ubuntu:20.04" + assert findings[0].cwe == 254 + assert findings[0].cvssv3_score == 9.8 + assert findings[0].fix_available is False + + def test_get_findings_disables_containers_from_extras(self): + """Test that include_containers=false skips container vulns.""" + with patch.object(self.importer, "prepare_client") as mock_prepare: + mock_client = MagicMock() + mock_prepare.return_value = (mock_client, self.api_scan_config) + + mock_client.include_containers = False + mock_client.include_hosts = True + mock_client.search_host_vulnerabilities.return_value = [ + {"vulnId": "CVE-2016-1585", "severity": "Medium", "cveProps": {"description": "test", "link": "", "metadata": {"NVD": {}, "RBS": {}}}, "featureKey": {"name": "test", "namespace": "test", "version_installed": "1.0"}, "fixInfo": {"fix_available": 0, "fixed_version": ""}, "mid": 123, "machineTags": {}, "riskScore": 5} + ] + + findings = self.importer.get_findings(None, self.test) + + assert len(findings) == 1 + # search_container_vulnerabilities should NOT have been called + mock_client.search_container_vulnerabilities.assert_not_called() + mock_client.search_host_vulnerabilities.assert_called_once() + + def test_persist_findings_to_db(self): + """Test that findings can be saved to the database.""" + with patch.object(self.importer, "prepare_client") as mock_prepare: + mock_client = MagicMock() + mock_prepare.return_value = (mock_client, self.api_scan_config) + + mock_client.include_containers = True + mock_client.include_hosts = True + mock_client.search_container_vulnerabilities.return_value = [] + mock_client.search_host_vulnerabilities.return_value = [] + + findings = self.importer.get_findings(None, self.test) + + # No vulnerabilities, should be empty + assert len(findings) == 0 + + def test_prepare_client_with_existing_config(self): + """Test that prepare_client correctly finds the API Scan Configuration.""" + client, config = LaceworkApiImporter.prepare_client(self.test) + assert config == self.api_scan_config + assert config.tool_configuration == self.tool_config + + def test_prepare_client_fails_without_config(self): + """Test that prepare_client raises error when no config exists.""" + # Create a separate product without any API scan configuration + product_no_config = Product.objects.create( + name="test-no-config", + prod_type=self.product_type, + ) + engagement_no_config = Engagement.objects.create( + product=product_no_config, + name="No Config Engagement", + target_start=timezone.now().date(), + target_end=timezone.now().date(), + ) + test_no_config = Test.objects.create( + engagement=engagement_no_config, + test_type=self.test_type, + title="No config test", + target_start=timezone.now(), + target_end=timezone.now(), + ) + with self.assertRaises(Exception): + LaceworkApiImporter.prepare_client(test_no_config) \ No newline at end of file From 8cd4dc649b54b3c783f26ff7157e40b595b2b361 Mon Sep 17 00:00:00 2001 From: GoldraK Date: Mon, 8 Jun 2026 09:59:02 +0200 Subject: [PATCH 2/2] style(lacework): fix style code with ruff --- .../commands/lacework_debug_vuln.py | 27 +- .../commands/lacework_import_all.py | 98 +++--- dojo/tools/api_lacework/api_client.py | 311 ++++++++--------- dojo/tools/api_lacework/importer.py | 313 ++++++++---------- dojo/tools/api_lacework/parser.py | 33 +- dojo/tools/api_lacework/updater.py | 23 +- dojo/tools/tool_issue_updater.py | 2 +- unittests/test_api_lacework.py | 226 ++++++------- 8 files changed, 453 insertions(+), 580 deletions(-) diff --git a/dojo/management/commands/lacework_debug_vuln.py b/dojo/management/commands/lacework_debug_vuln.py index fdc170b31d3..57b145d89fe 100644 --- a/dojo/management/commands/lacework_debug_vuln.py +++ b/dojo/management/commands/lacework_debug_vuln.py @@ -10,6 +10,7 @@ import json import logging +from datetime import UTC, datetime, timedelta from django.core.management.base import BaseCommand, CommandError @@ -44,8 +45,9 @@ def handle(self, *args, **options): try: tool_config = Tool_Configuration.objects.get(id=tool_config_id) except Tool_Configuration.DoesNotExist: + msg = f"Tool Configuration with id {tool_config_id} not found" raise CommandError( - f"Tool Configuration with id {tool_config_id} not found" + msg, ) self.stdout.write(f"Using Tool Configuration: {tool_config.name}") @@ -53,20 +55,19 @@ def handle(self, *args, **options): client = LaceworkAPI(tool_config) - from datetime import datetime, timedelta, timezone - end_time = datetime.now(timezone.utc) + end_time = datetime.now(UTC) start_time = end_time - timedelta(hours=24) start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") if vuln_type == "containers": - self.stdout.write(f"\nFetching container vulnerabilities...") + self.stdout.write("\nFetching container vulnerabilities...") vulns = client.search_container_vulnerabilities( start_time=start_time_str, end_time=end_time_str, ) else: - self.stdout.write(f"\nFetching host vulnerabilities...") + self.stdout.write("\nFetching host vulnerabilities...") vulns = client.search_host_vulnerabilities( start_time=start_time_str, end_time=end_time_str, @@ -92,15 +93,15 @@ def handle(self, *args, **options): self.stdout.write(f"\nSelected vulnerability: {vuln.get('vulnId')} ({vuln.get('severity')})\n") self.stdout.write( self.style.SUCCESS( - f"\n=== Full JSON structure of 1 {vuln_type} vulnerability ===\n" - ) + f"\n=== Full JSON structure of 1 {vuln_type} vulnerability ===\n", + ), ) self.stdout.write(json.dumps(vuln, indent=2, default=str)) self.stdout.write( self.style.SUCCESS( - f"\n=== TOP-LEVEL FIELDS ===\n" - ) + "\n=== TOP-LEVEL FIELDS ===\n", + ), ) for key, value in vuln.items(): if not isinstance(value, (dict, list)): @@ -111,8 +112,8 @@ def handle(self, *args, **options): # Analyze available fields for mapping self.stdout.write( self.style.SUCCESS( - f"\n=== ANALYSIS ===\n" - ) + "\n=== ANALYSIS ===\n", + ), ) self.stdout.write(f"vulnId (CVE): {vuln.get('vulnId', 'N/A')}") self.stdout.write(f"severity: {vuln.get('severity', 'N/A')}") @@ -188,6 +189,6 @@ def handle(self, *args, **options): self.stdout.write(f" machineTags.Region: {machine_tags.get('Region', 'N/A')}") # Additional fields - self.stdout.write(f"\n additional top-level keys:") + self.stdout.write("\n additional top-level keys:") for key in sorted(vuln.keys()): - self.stdout.write(f" - {key}") \ No newline at end of file + self.stdout.write(f" - {key}") diff --git a/dojo/management/commands/lacework_import_all.py b/dojo/management/commands/lacework_import_all.py index b6e5cb8fe8b..b5f5bc2382a 100644 --- a/dojo/management/commands/lacework_import_all.py +++ b/dojo/management/commands/lacework_import_all.py @@ -13,7 +13,7 @@ """ import logging -from datetime import datetime, timedelta, timezone +from datetime import UTC, datetime, timedelta from django.core.management.base import BaseCommand, CommandError from django.db import transaction @@ -45,8 +45,9 @@ def add_arguments(self, parser): ) def _parse_extras(self, extras: str | None) -> dict: - """Parse the Tool Configuration Extras field for options. - + """ + Parse the Tool Configuration Extras field for options. + Returns a dict with keys: include_containers, include_hosts """ result = { @@ -55,8 +56,8 @@ def _parse_extras(self, extras: str | None) -> dict: } if not extras: return result - for entry in extras.split(","): - entry = entry.strip().lower() + for raw_entry in extras.split(","): + entry = raw_entry.strip().lower() if "=" in entry: key, value = entry.split("=", 1) key = key.strip() @@ -74,8 +75,9 @@ def handle(self, *args, **options): try: tool_config = Tool_Configuration.objects.get(id=tool_config_id) except Tool_Configuration.DoesNotExist: + msg = f"Tool Configuration with id {tool_config_id} not found" raise CommandError( - f"Tool Configuration with id {tool_config_id} not found" + msg, ) # Read configuration from Tool Configuration Extras @@ -109,7 +111,7 @@ def handle(self, *args, **options): # Calculate time range hours = 24 - end_time = datetime.now(timezone.utc) + end_time = datetime.now(UTC) start_time = end_time - timedelta(hours=hours) start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") @@ -117,7 +119,7 @@ def handle(self, *args, **options): # --- Import container vulnerabilities --- if include_containers: self.stdout.write( - f"\nFetching container vulnerabilities from {start_time_str} to {end_time_str}..." + f"\nFetching container vulnerabilities from {start_time_str} to {end_time_str}...", ) try: container_vulns = client.search_container_vulnerabilities( @@ -126,21 +128,21 @@ def handle(self, *args, **options): ) self.stdout.write( self.style.SUCCESS( - f"Found {len(container_vulns)} container vulnerabilities" - ) + f"Found {len(container_vulns)} container vulnerabilities", + ), ) except Exception as e: self.stderr.write( self.style.ERROR( - f"Failed to fetch container vulnerabilities: {e}" - ) + f"Failed to fetch container vulnerabilities: {e}", + ), ) container_vulns = [] # Group container vulns by repository container_by_repo = self._group_container_vulns_by_repo(container_vulns) self.stdout.write( - f"Found {len(container_by_repo)} unique container repositories" + f"Found {len(container_by_repo)} unique container repositories", ) for repo_name, vulns in container_by_repo.items(): @@ -160,7 +162,7 @@ def handle(self, *args, **options): # --- Import host vulnerabilities --- if include_hosts: self.stdout.write( - f"\nFetching host vulnerabilities from {start_time_str} to {end_time_str}..." + f"\nFetching host vulnerabilities from {start_time_str} to {end_time_str}...", ) try: host_vulns = client.search_host_vulnerabilities( @@ -169,21 +171,21 @@ def handle(self, *args, **options): ) self.stdout.write( self.style.SUCCESS( - f"Found {len(host_vulns)} host vulnerabilities" - ) + f"Found {len(host_vulns)} host vulnerabilities", + ), ) except Exception as e: self.stderr.write( self.style.ERROR( - f"Failed to fetch host vulnerabilities: {e}" - ) + f"Failed to fetch host vulnerabilities: {e}", + ), ) host_vulns = [] # Group host vulns by hostname/machine host_by_machine = self._group_host_vulns_by_machine(host_vulns) self.stdout.write( - f"Found {len(host_by_machine)} unique host machines" + f"Found {len(host_by_machine)} unique host machines", ) for machine_name, vulns in host_by_machine.items(): @@ -235,11 +237,11 @@ def _import_vulns_to_product( dev_env, test_type, engagement_template: str, + *, is_container: bool, ): """Import vulnerabilities into a Product, auto-creating if needed.""" source_type = "container" if is_container else "host" - display_name = f"Lacework {source_type}: {repo_name}" # Sanitize product name (max 255 chars) product_name = repo_name[:255] @@ -255,26 +257,26 @@ def _import_vulns_to_product( ) if created: self.stdout.write( - self.style.SUCCESS(f" Created Product: {product_name}") + self.style.SUCCESS(f" Created Product: {product_name}"), ) else: self.stdout.write(f" Using existing Product: {product_name}") except Exception as e: self.stderr.write( - self.style.ERROR(f" Failed to get/create Product {product_name}: {e}") + self.style.ERROR(f" Failed to get/create Product {product_name}: {e}"), ) return # Create Engagement - today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + today = datetime.now(UTC).strftime("%Y-%m-%d") engagement_name = engagement_template.replace("{date}", today) try: engagement, created = Engagement.objects.get_or_create( name=engagement_name, product=product, defaults={ - "target_start": datetime.now(timezone.utc).date(), - "target_end": datetime.now(timezone.utc).date(), + "target_start": datetime.now(UTC).date(), + "target_end": datetime.now(UTC).date(), "active": True, "status": "In Progress", }, @@ -286,8 +288,8 @@ def _import_vulns_to_product( except Exception as e: self.stderr.write( self.style.ERROR( - f" Failed to get/create Engagement {engagement_name}: {e}" - ) + f" Failed to get/create Engagement {engagement_name}: {e}", + ), ) return @@ -298,8 +300,8 @@ def _import_vulns_to_product( test_type=test_type, defaults={ "title": f"{source_type.capitalize()} scan {today}", - "target_start": datetime.now(timezone.utc), - "target_end": datetime.now(timezone.utc), + "target_start": datetime.now(UTC), + "target_end": datetime.now(UTC), "description": f"Lacework {source_type} vulnerabilities for {repo_name}", }, ) @@ -309,24 +311,16 @@ def _import_vulns_to_product( self.stdout.write(f" Using existing Test: {test.title}") except Exception as e: self.stderr.write( - self.style.ERROR(f" Failed to get/create Test: {e}") + self.style.ERROR(f" Failed to get/create Test: {e}"), ) return # Create Findings from vulnerabilities importer = LaceworkApiImporter() if is_container: - new_findings = [ - importer._create_finding_from_container_vuln(v, test) - for v in vulns - if v.get("vulnId") - ] + new_findings = [importer._create_finding_from_container_vuln(v, test) for v in vulns if v.get("vulnId")] else: - new_findings = [ - importer._create_finding_from_host_vuln(v, test) - for v in vulns - if v.get("vulnId") - ] + new_findings = [importer._create_finding_from_host_vuln(v, test) for v in vulns if v.get("vulnId")] # Filter out None values new_findings = [f for f in new_findings if f is not None] @@ -348,9 +342,16 @@ def _import_vulns_to_product( if existing: # Update existing finding for field in [ - "severity", "description", "references", - "component_version", "cvssv3_score", "cvssv3", - "fix_available", "fix_version", "active", "verified", + "severity", + "description", + "references", + "component_version", + "cvssv3_score", + "cvssv3", + "fix_available", + "fix_version", + "active", + "verified", ]: setattr(existing, field, getattr(finding, field)) existing.save() @@ -361,13 +362,12 @@ def _import_vulns_to_product( except Exception as e: self.stderr.write( self.style.ERROR( - f" Failed to save finding {finding.title}: {e}" - ) + f" Failed to save finding {finding.title}: {e}", + ), ) self.stdout.write( self.style.SUCCESS( - f" Created {findings_created} findings, " - f"updated {findings_updated} existing findings" - ) - ) \ No newline at end of file + f" Created {findings_created} findings, updated {findings_updated} existing findings", + ), + ) diff --git a/dojo/tools/api_lacework/api_client.py b/dojo/tools/api_lacework/api_client.py index 6508e9442cc..c8eacd580f1 100644 --- a/dojo/tools/api_lacework/api_client.py +++ b/dojo/tools/api_lacework/api_client.py @@ -9,7 +9,7 @@ import logging import time -from datetime import datetime, timedelta, timezone +from datetime import UTC, datetime, timedelta import requests from django.conf import settings @@ -19,15 +19,10 @@ class LaceworkAPI: - """Client for Lacework API v2.0. - - Handles authentication via X-LW-UAKS header, Bearer token management, - pagination, and rate limiting for Lacework API calls. - """ - def __init__(self, tool_config): - """Initialize the Lacework API client. - + """ + Initialize the Lacework API client. + Args: tool_config: ToolConfiguration instance with Lacework credentials. - url: Base URL of Lacework instance (e.g., https://yourinstance.lacework.net) @@ -38,87 +33,80 @@ def __init__(self, tool_config): "include_containers=true" - Import container vulnerabilities "include_hosts=true" - Import host vulnerabilities Default (empty): both containers and hosts are imported. + """ self.session = requests.Session() self.session.headers.update({"User-Agent": "DefectDojo"}) - + self.base_url = tool_config.url.rstrip("/") self.api_key = tool_config.api_key # X-LW-UAKS value self.key_id = tool_config.username # keyId required in the POST body - + # Parse extras for import options self.include_containers = True # default: import containers self.include_hosts = True # default: import hosts self._parse_extras(tool_config.extras) - + # Token caching self._bearer_token = None self._token_expiry = None - + # Rate limiting self._rate_limit_reset = None - + if not self.key_id: - raise Exception( + msg = ( "Lacework keyId is required. Set it in the 'Username' field of the " "Tool Configuration. The 'API Key' field should contain the X-LW-UAKS secret." ) + raise Exception( + msg, + ) def _parse_extras(self, extras: str | None): - """Parse the extras field for import options. - + """ + Parse the extras field for import options. + Supported options (comma-separated): - include_containers=true/false: Import container vulnerabilities - include_hosts=true/false: Import host vulnerabilities - + Examples: - "" or None: Import both containers and hosts (default) - "include_containers=true,include_hosts=false": Only containers - "include_hosts=true,include_containers=false": Only hosts - "include_containers=false": Only hosts + """ if not extras: return - - for entry in extras.split(","): - entry = entry.strip().lower() + + for raw_entry in extras.split(","): + entry = raw_entry.strip().lower() if "=" in entry: key, value = entry.split("=", 1) key = key.strip() value = value.strip().lower() - + if key == "include_containers": self.include_containers = value == "true" elif key == "include_hosts": self.include_hosts = value == "true" def _get_bearer_token(self) -> str: - """Obtain a Bearer token from Lacework using the API key. - - Makes a POST request to /api/v2/access/tokens with: - - Header X-LW-UAKS: The secret key - - Body: {"keyId": ""} - - The returned token is cached and reused until it expires. - - Returns: - str: The Bearer token to use for API requests. - - Raises: - Exception: If the token request fails. - """ + """Obtain a Bearer token from Lacework using the API key.""" # Check if we have a valid cached token if self._bearer_token and self._token_expiry: - if datetime.now(timezone.utc) < self._token_expiry: + if datetime.now(UTC) < self._token_expiry: return self._bearer_token - + url = f"{self.base_url}/api/v2/access/tokens" headers = { "X-LW-UAKS": self.api_key, "Content-Type": "application/json", } body = {"keyId": self.key_id} - + try: response = self.session.post( url, @@ -126,7 +114,7 @@ def _get_bearer_token(self) -> str: json=body, timeout=getattr(settings, "REQUESTS_TIMEOUT", 30), ) - + if not response.ok: msg = ( f"Unable to obtain Lacework Bearer token. " @@ -134,40 +122,35 @@ def _get_bearer_token(self) -> str: ) logger.error(msg) raise Exception(msg) - + data = response.json() logger.debug("Lacework token response: %s", data) - + # The token response is flat, not wrapped in "data": {} # Response: {"expiresAt": "...", "token": "..."} self._bearer_token = data.get("token") - + if not self._bearer_token: - msg = ( - f"Lacework token response did not contain a token. " - f"HTTP {response.status_code}. Response: {data}" - ) + msg = f"Lacework token response did not contain a token. HTTP {response.status_code}. Response: {data}" logger.error(msg) raise Exception(msg) - + # Calculate expiry (tokens typically expire in 1 hour) # Refresh 5 minutes before expiry expires_at_str = data.get("expiresAt") if expires_at_str: try: expires_at = datetime.fromisoformat( - expires_at_str.replace("Z", "+00:00") + expires_at_str, ) self._token_expiry = expires_at - timedelta(minutes=5) except (ValueError, AttributeError): # If we can't parse the expiry, use a default 55 minutes - self._token_expiry = datetime.now(timezone.utc) + timedelta(minutes=55) + self._token_expiry = datetime.now(UTC) + timedelta(minutes=55) else: - self._token_expiry = datetime.now(timezone.utc) + timedelta(minutes=55) - + self._token_expiry = datetime.now(UTC) + timedelta(minutes=55) + logger.info("Successfully obtained Lacework Bearer token") - return self._bearer_token - except requests.exceptions.RequestException as e: msg = f"Network error when obtaining Lacework token: {e}" logger.error(msg) @@ -176,27 +159,31 @@ def _get_bearer_token(self) -> str: msg = f"Invalid JSON response from Lacework token endpoint: {e}" logger.error(msg) raise Exception(msg) + else: + return self._bearer_token def _request(self, method: str, path: str, **kwargs) -> dict: - """Make a generic API request with Bearer token authentication. - + """ + Make a generic API request with Bearer token authentication. + Args: method: HTTP method (GET, POST, etc.) path: API path (e.g., /api/v2/Vulnerabilities/Containers/search) **kwargs: Additional arguments passed to requests - + Returns: dict: The JSON response data. - + Raises: - Exception: If the request fails. + requests.exceptions.RequestException: If the request fails. + """ url = f"{self.base_url}{path}" headers = { "Authorization": f"Bearer {self._get_bearer_token()}", "Content-Type": "application/json", } - + # Handle rate limiting with retry max_retries = 3 for attempt in range(max_retries): @@ -208,26 +195,24 @@ def _request(self, method: str, path: str, **kwargs) -> dict: timeout=getattr(settings, "REQUESTS_TIMEOUT", 30), **kwargs, ) - + # Handle rate limiting if response.status_code == 429: reset_seconds = int(response.headers.get("RateLimit-Reset", 60)) logger.warning( - "Lacework rate limit hit. Waiting %d seconds...", reset_seconds + "Lacework rate limit hit. Waiting %d seconds...", + reset_seconds, ) time.sleep(min(reset_seconds, 300)) # Cap at 5 minutes continue - + if not response.ok: - msg = ( - f"Lacework API error: HTTP {response.status_code} - " - f"{response.content.decode('utf-8')}" - ) + msg = f"Lacework API error: HTTP {response.status_code} - {response.content.decode('utf-8')}" logger.error(msg) raise Exception(msg) - + return response.json() - + except requests.exceptions.RequestException as e: if attempt < max_retries - 1: logger.warning( @@ -236,32 +221,35 @@ def _request(self, method: str, path: str, **kwargs) -> dict: max_retries, e, ) - time.sleep(2 ** attempt) # Exponential backoff + time.sleep(2**attempt) # Exponential backoff else: raise + return None def _get_all_pages(self, get_page_func) -> list: - """Helper to automatically paginate through all results. - + """ + Helper to automatically paginate through all results. + Lacework API returns paginated results with a nextPage URL. This method collects all pages until there are no more. - + Strategy: - First page is fetched via get_page_func (POST with filters) - Subsequent pages are fetched via GET on the nextPage URL returned in paging.urls.nextPage (full URLs like https://instance.lacework.net/api/v2/Vulnerabilities/Containers/abc123) - + Args: get_page_func: A callable that returns (page_data, next_page_url) - + Returns: list: All items from all pages. + """ all_items = [] current_page_url = None page_count = 0 - + while True: try: if current_page_url is None: @@ -271,7 +259,8 @@ def _get_all_pages(self, get_page_func) -> list: page_count += 1 logger.debug( "Fetching page %d: %s...", - page_count, current_page_url[:100], + page_count, + current_page_url[:100], ) # Longer timeout for subsequent pages (dataset can be large) response = self.session.get( @@ -282,65 +271,65 @@ def _get_all_pages(self, get_page_func) -> list: }, timeout=getattr(settings, "REQUESTS_TIMEOUT", 120), ) - + if not response.ok: logger.warning( - "Failed to fetch next page: HTTP %d", response.status_code + "Failed to fetch next page: HTTP %d", + response.status_code, ) break - + data = response.json() page_data = data paging = data.get("paging", {}) urls = paging.get("urls", {}) next_page_url = urls.get("nextPage") - + if not page_data: break - + items = page_data.get("data", []) if items: all_items.extend(items) logger.debug( - "Fetched %d items, total so far: %d", len(items), len(all_items) + "Fetched %d items, total so far: %d", + len(items), + len(all_items), ) - + if not next_page_url: break - + current_page_url = next_page_url - + except requests.exceptions.Timeout: logger.warning( "Timeout fetching page %d (got %d items so far). " "The dataset may be too large. Try a shorter time range.", - page_count, len(all_items), + page_count, + len(all_items), ) break except Exception as e: logger.warning( "Pagination error on page %d: %s (got %d items so far)", - page_count, e, len(all_items), + page_count, + e, + len(all_items), ) break - + logger.info( "Pagination complete: fetched %d items across %d pages", - len(all_items), page_count, + len(all_items), + page_count, ) return all_items def list_container_registries(self) -> list: - """List all container registries configured in Lacework. - - Returns: - list: List of container registry configurations. - - Raises: - Exception: If the request fails. - """ + """List all container registries configured in Lacework.""" url = f"{self.base_url}/api/v2/ContainerRegistries" - + try: response = self.session.get( url, @@ -350,7 +339,7 @@ def list_container_registries(self) -> list: }, timeout=getattr(settings, "REQUESTS_TIMEOUT", 30), ) - + if not response.ok: msg = ( f"Unable to list Lacework container registries. " @@ -358,10 +347,10 @@ def list_container_registries(self) -> list: ) logger.error(msg) raise Exception(msg) - + data = response.json() return data.get("data", []) - + except requests.exceptions.RequestException as e: msg = f"Network error when listing container registries: {e}" logger.error(msg) @@ -373,32 +362,31 @@ def search_container_vulnerabilities( end_time: str, filters: list | None = None, ) -> list: - """Search for container vulnerabilities in Lacework. - + """ + Search for container vulnerabilities in Lacework. + Uses the POST /api/v2/Vulnerabilities/Containers/search endpoint with automatic pagination. - + Args: start_time: Start time in ISO 8601 format (e.g., 2024-01-25T00:00:00.000Z) end_time: End time in ISO 8601 format filters: Optional list of filter dicts for additional filtering - + Returns: list: All container vulnerabilities found. - - Raises: - Exception: If the request fails. + """ body = { "timeFilter": { "startTime": start_time, "endTime": end_time, - } + }, } - + if filters: body["filters"] = filters - + def get_page(): response = self.session.post( f"{self.base_url}/api/v2/Vulnerabilities/Containers/search", @@ -409,7 +397,7 @@ def get_page(): }, timeout=getattr(settings, "REQUESTS_TIMEOUT", 30), ) - + if not response.ok: msg = ( f"Unable to search container vulnerabilities. " @@ -417,14 +405,14 @@ def get_page(): ) logger.error(msg) raise Exception(msg) - + data = response.json() paging = data.get("paging", {}) urls = paging.get("urls", {}) next_page_url = urls.get("nextPage") - + return data, next_page_url - + return self._get_all_pages(get_page) def search_host_vulnerabilities( @@ -433,32 +421,31 @@ def search_host_vulnerabilities( end_time: str, filters: list | None = None, ) -> list: - """Search for host vulnerabilities in Lacework. - + """ + Search for host vulnerabilities in Lacework. + Uses the POST /api/v2/Vulnerabilities/Hosts/search endpoint with automatic pagination. - + Args: start_time: Start time in ISO 8601 format end_time: End time in ISO 8601 format filters: Optional list of filter dicts for additional filtering - + Returns: list: All host vulnerabilities found. - - Raises: - Exception: If the request fails. + """ body = { "timeFilter": { "startTime": start_time, "endTime": end_time, - } + }, } - + if filters: body["filters"] = filters - + def get_page(): response = self.session.post( f"{self.base_url}/api/v2/Vulnerabilities/Hosts/search", @@ -469,7 +456,7 @@ def get_page(): }, timeout=getattr(settings, "REQUESTS_TIMEOUT", 30), ) - + if not response.ok: msg = ( f"Unable to search host vulnerabilities. " @@ -477,34 +464,29 @@ def get_page(): ) logger.error(msg) raise Exception(msg) - + data = response.json() paging = data.get("paging", {}) urls = paging.get("urls", {}) next_page_url = urls.get("nextPage") - + return data, next_page_url - + return self._get_all_pages(get_page) def test_connection(self) -> str: - """Test the connection to Lacework API. - - Verifies that we can obtain a Bearer token and make a simple - API call. Does NOT require container registry permissions. - - Returns: - str: A message describing the connection status. - - Raises: - Exception: If the connection fails. - """ + """Test the connection to Lacework API.""" try: # First verify we can get a Bearer token token = self._get_bearer_token() if not token: - raise Exception("Failed to obtain Bearer token") - + msg = "Failed to obtain Bearer token" + raise Exception(msg) + except Exception as e: + msg = f"Failed to connect to Lacework: {e}" + logger.error(msg) + raise Exception(msg) + else: # Try to list container registries for a meaningful response try: registries = self.list_container_registries() @@ -515,46 +497,23 @@ def test_connection(self) -> str: ) except Exception: # If listing registries fails (permissions), at least we have a token - return ( - "Successfully connected to Lacework. " - "Bearer token obtained successfully." - ) - except Exception as e: - msg = f"Failed to connect to Lacework: {e}" - logger.error(msg) - raise Exception(msg) + return "Successfully connected to Lacework. Bearer token obtained successfully." def test_product_connection(self, api_scan_configuration) -> str: - """Test connection for a specific product/repository. - - Verifies that the Lacework instance is accessible by obtaining - a Bearer token. Does NOT require container registry permissions. - - Args: - api_scan_configuration: APIScanConfiguration instance for the product - - Returns: - str: A message describing the connection status. - """ + """Test connection for a specific product/repository.""" try: # Verify we can get a Bearer token token = self._get_bearer_token() if not token: - raise Exception("Failed to obtain Bearer token") - - repo_pattern = api_scan_configuration.service_key_1 or "" - - if repo_pattern: - return ( - f"Successfully connected to Lacework. " - f"Repository filter pattern: '{repo_pattern}'." - ) - else: - return ( - "Successfully connected to Lacework. " - "No repository filter configured (will import all repositories)." - ) + msg = "Failed to obtain Bearer token" + raise Exception(msg) except Exception as e: msg = f"Failed to connect to Lacework for product: {e}" logger.error(msg) raise Exception(msg) + else: + repo_pattern = api_scan_configuration.service_key_1 or "" + + if repo_pattern: + return f"Successfully connected to Lacework. Repository filter pattern: '{repo_pattern}'." + return "Successfully connected to Lacework. No repository filter configured (will import all repositories)." diff --git a/dojo/tools/api_lacework/importer.py b/dojo/tools/api_lacework/importer.py index 729574bbdc8..a0c62c3beaf 100644 --- a/dojo/tools/api_lacework/importer.py +++ b/dojo/tools/api_lacework/importer.py @@ -8,7 +8,7 @@ """ import logging -from datetime import datetime, timedelta, timezone +from datetime import UTC, datetime, timedelta from django.conf import settings from django.core.exceptions import ValidationError @@ -21,42 +21,32 @@ class LaceworkApiImporter: - """Importer for Lacework vulnerabilities. - - This class is the services/business logic layer (equivalent to services.py - in the domain module pattern). It accepts domain objects (Test) and - primitives, never request/response objects. - - It imports vulnerabilities from: - - Lacework Container Vulnerabilities (/api/v2/Vulnerabilities/Containers/search) - - Lacework Host Vulnerabilities (/api/v2/Vulnerabilities/Hosts/search) - """ - SCAN_LACEWORK = "Lacework API Import" - + def get_findings(self, filename, test): - """Main entry point for importing Lacework vulnerabilities. - + """ + Main entry point for importing Lacework vulnerabilities. + Args: filename: Ignored (API-based import, no file needed) test: Test instance to associate findings with - + Returns: list[Finding]: List of Finding instances (not yet saved) + """ items = [] - + # Get client to check which vulnerability types are enabled # (options come from the Extras field in Tool Configuration) try: - client, config = self.prepare_client(test) + client, _config = self.prepare_client(test) except Exception: client = None - config = None - + include_containers = client.include_containers if client else True include_hosts = client.include_hosts if client else True - + # Import container vulnerabilities if include_containers: try: @@ -66,7 +56,7 @@ def get_findings(self, filename, test): self._notify_failure(test, "Container vulnerabilities import", str(e)) else: logger.info("Container vulnerabilities import is disabled via Extras config") - + # Import host vulnerabilities if include_hosts: try: @@ -76,28 +66,28 @@ def get_findings(self, filename, test): self._notify_failure(test, "Host vulnerabilities import", str(e)) else: logger.info("Host vulnerabilities import is disabled via Extras config") - + return items - + @staticmethod def prepare_client(test): - """Prepare the Lacework API client from the test's configuration. - + """ + Prepare the Lacework API client from the test's configuration. + Similar to SonarQubeApiImporter.prepare_client. - + Args: test: Test instance with associated API scan configuration - + Returns: tuple[LaceworkAPI, APIScanConfiguration]: The client and config - + Raises: ValidationError: If configuration is missing or invalid + """ - from dojo.notifications.helper import create_notification - product = test.engagement.product - + if test.api_scan_configuration: config = test.api_scan_configuration # Validate that the config belongs to this product @@ -129,38 +119,40 @@ def prepare_client(test): f'Product: "{product.name}" ({product.id})' ) raise ValidationError(msg) - + return LaceworkAPI(tool_config=config.tool_configuration), config - + def import_container_vulnerabilities(self, test): - """Import container vulnerabilities from Lacework. - + """ + Import container vulnerabilities from Lacework. + Fetches vulnerabilities using search_container_vulnerabilities() and maps each one to a Finding instance. - + Args: test: Test instance for the current engagement - + Returns: list[Finding]: List of Finding instances + """ items = [] client, config = self.prepare_client(test) - + # Calculate time range (last 24 hours by default, or configured) hours = getattr(settings, "LACEWORK_API_IMPORTER_TIMEDELTA_HOURS", 24) - end_time = datetime.now(timezone.utc) + end_time = datetime.now(UTC) start_time = end_time - timedelta(hours=hours) - + start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") - + logger.info( "Fetching container vulnerabilities from %s to %s", start_time_str, end_time_str, ) - + # Filter by repository pattern from Service key 1 (API Scan Configuration) filters = None if config and config.service_key_1: @@ -170,7 +162,7 @@ def import_container_vulnerabilities(self, test): "field": "evalCtx.image_info.repo", "expression": "like", "value": f"%{repo_pattern}%", - } + }, ] logger.info( "Filtering container vulnerabilities by repository pattern: %s", @@ -178,18 +170,17 @@ def import_container_vulnerabilities(self, test): ) else: logger.info( - "No repository filter configured (Service key 1 is empty). " - "Importing ALL container vulnerabilities." + "No repository filter configured (Service key 1 is empty). Importing ALL container vulnerabilities.", ) - + vulnerabilities = client.search_container_vulnerabilities( start_time=start_time_str, end_time=end_time_str, filters=filters, ) - + logger.info("Found %d container vulnerabilities", len(vulnerabilities)) - + for vuln in vulnerabilities: try: finding = self._create_finding_from_container_vuln(vuln, test) @@ -201,38 +192,40 @@ def import_container_vulnerabilities(self, test): vuln.get("vulnId", "unknown"), e, ) - + return items - + def import_host_vulnerabilities(self, test): - """Import host vulnerabilities from Lacework. - + """ + Import host vulnerabilities from Lacework. + Fetches vulnerabilities using search_host_vulnerabilities() and maps each one to a Finding instance. - + Args: test: Test instance for the current engagement - + Returns: list[Finding]: List of Finding instances + """ items = [] client, config = self.prepare_client(test) - + # Calculate time range hours = getattr(settings, "LACEWORK_API_IMPORTER_TIMEDELTA_HOURS", 24) - end_time = datetime.now(timezone.utc) + end_time = datetime.now(UTC) start_time = end_time - timedelta(hours=hours) - + start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") - + logger.info( "Fetching host vulnerabilities from %s to %s", start_time_str, end_time_str, ) - + # Filter by hostname pattern from Service key 1 (API Scan Configuration) filters = None if config and config.service_key_1: @@ -242,7 +235,7 @@ def import_host_vulnerabilities(self, test): "field": "machineTags.Hostname", "expression": "rlike", "value": f".*{hostname_pattern}.*", - } + }, ] logger.info( "Filtering host vulnerabilities by hostname pattern: %s", @@ -250,18 +243,17 @@ def import_host_vulnerabilities(self, test): ) else: logger.info( - "No hostname filter configured (Service key 1 is empty). " - "Importing ALL host vulnerabilities." + "No hostname filter configured (Service key 1 is empty). Importing ALL host vulnerabilities.", ) - + vulnerabilities = client.search_host_vulnerabilities( start_time=start_time_str, end_time=end_time_str, filters=filters, ) - + logger.info("Found %d host vulnerabilities", len(vulnerabilities)) - + for vuln in vulnerabilities: try: finding = self._create_finding_from_host_vuln(vuln, test) @@ -273,24 +265,31 @@ def import_host_vulnerabilities(self, test): vuln.get("vulnId", "unknown"), e, ) - + return items - - def _build_common_finding(self, vuln: dict, test, source_type: str, - unique_id_parts: list[str]) -> tuple[dict, str, bool]: - """Build common finding fields shared by container and host vulns. - + + def _build_common_finding( + self, + vuln: dict, + test, + source_type: str, + unique_id_parts: list[str], + ) -> tuple[dict, str, bool]: + """ + Build common finding fields shared by container and host vulns. + Args: vuln: Vulnerability dict from Lacework API test: Test instance source_type: "container" or "host" unique_id_parts: Parts to build unique_id (vulnId + ...) - + Returns: tuple of (fields_dict, unique_id, is_active) + """ vuln_id = vuln.get("vulnId", "") - + # --- Severity --- # If no direct severity field, infer from risk scores severity_str = vuln.get("severity", "") @@ -307,17 +306,17 @@ def _build_common_finding(self, vuln: dict, test, source_type: str, else: severity_str = "Info" severity = self._convert_lacework_severity(severity_str) - + # --- Description --- cve_props = vuln.get("cveProps", {}) description = cve_props.get("description", "No description provided") - + # Add introduced_in to description if available (for containers) feature_props = vuln.get("featureProps", {}) introduced_in = feature_props.get("introduced_in", "") if introduced_in: description += f"\n\n**Introduced in:** {introduced_in}" - + # --- References --- references = "" link = cve_props.get("link", "") @@ -328,56 +327,56 @@ def _build_common_finding(self, vuln: dict, test, source_type: str, references += f"\n*Source: {source}*" if vuln_id and vuln_id.startswith("CVE-"): references += f"\nhttps://nvd.nist.gov/vuln/detail/{vuln_id}" - + # --- Component info --- feature_key = vuln.get("featureKey", {}) component_name = feature_key.get("name", "") namespace = feature_key.get("namespace", "") # Container uses "version", host uses "version_installed" version_val = feature_key.get("version") or feature_key.get("version_installed", "") - + # Package path from featureProps (more specific than just namespace) pkg_path = feature_props.get("src", namespace) - + # --- Fix info --- fix_info = vuln.get("fixInfo", {}) fix_available = bool(fix_info.get("fix_available", 0)) fixed_version = fix_info.get("fixed_version", "") - + # --- CVSS score --- cvss_score, cvss_vector = self._extract_cvss_score(vuln) - + # --- CWE --- cwe = self._extract_cwe(vuln) - + # --- Status (active/mitigated) --- status = vuln.get("status", "").upper() is_active = status != "GOOD" # GOOD = resolved/mitigated is_verified = status == "VULNERABLE" or is_active - + # --- Unique ID for dedup --- unique_id = f"{source_type}:{'|'.join(unique_id_parts)}" - + # --- Tags --- tags_parts = [] package_status = vuln.get("packageStatus", "") if package_status: tags_parts.append(f"pkg:{package_status}") - + eval_ctx = vuln.get("evalCtx", {}) request_source = eval_ctx.get("request_source", "") if request_source: tags_parts.append(f"scanner:{request_source}") - + integration_props = eval_ctx.get("integration_props", {}) intg_name = integration_props.get("NAME", "") if intg_name: tags_parts.append(f"integration:{intg_name}") - + feed = feature_props.get("feed", "") if feed: tags_parts.append(f"feed:{feed}") - + fields = { "title": f"{vuln_id} in {component_name}" if component_name else vuln_id, "vuln_id_from_tool": vuln_id, @@ -402,86 +401,77 @@ def _build_common_finding(self, vuln: dict, test, source_type: str, "out_of_scope": False, "unique_id_from_tool": f"lacework:{unique_id}", } - + return fields, unique_id, is_active def _create_finding_from_container_vuln(self, vuln: dict, test) -> Finding | None: - """Create a Finding from a Lacework container vulnerability. - - Maps Lacework container vulnerability fields to DefectDojo Finding fields. - """ + """Create a Finding from a Lacework container vulnerability.""" vuln_id = vuln.get("vulnId", "") if not vuln_id: return None - + # Image info eval_ctx = vuln.get("evalCtx", {}) image_info = eval_ctx.get("image_info", {}) repo = image_info.get("repo", "") image_tags = image_info.get("tags", []) - + # Build unique_id_parts: vulnId, repo, namespace, component_name feature_key = vuln.get("featureKey", {}) namespace = feature_key.get("namespace", "") component_name = feature_key.get("name", "") - + unique_id_parts = [vuln_id, repo, namespace, component_name] - - fields, unique_id, is_active = self._build_common_finding( - vuln, test, "container", unique_id_parts + + fields, _unique_id, _is_active = self._build_common_finding( + vuln, + test, + "container", + unique_id_parts, ) - + # Add container-specific tags tags_parts = [] - existing_title = fields["title"] - + fields["title"] + if repo not in str(tags_parts): tags_parts.append(repo) if image_tags: tags_parts.extend(image_tags) - - find = Finding(**fields) - return find - + + return Finding(**fields) + def _create_finding_from_host_vuln(self, vuln: dict, test) -> Finding | None: - """Create a Finding from a Lacework host vulnerability. - - Maps Lacework host vulnerability fields to DefectDojo Finding fields. - """ + """Create a Finding from a Lacework host vulnerability.""" vuln_id = vuln.get("vulnId", "") if not vuln_id: return None - + # Machine info mid = vuln.get("mid", "") machine_tags = vuln.get("machineTags", {}) - hostname = machine_tags.get("Hostname", "") - vm_provider = machine_tags.get("VmProvider", "") - + _hostname = machine_tags.get("Hostname", "") + _vm_provider = machine_tags.get("VmProvider", "") + # Build unique_id_parts: vulnId, mid, namespace, component_name feature_key = vuln.get("featureKey", {}) namespace = feature_key.get("namespace", "") component_name = feature_key.get("name", "") - + unique_id_parts = [str(vuln_id), str(mid), namespace, component_name] - - fields, unique_id, is_active = self._build_common_finding( - vuln, test, "host", unique_id_parts + + fields, _unique_id, _is_active = self._build_common_finding( + vuln, + test, + "host", + unique_id_parts, ) - - find = Finding(**fields) - return find - + + return Finding(**fields) + @staticmethod def _convert_lacework_severity(lw_severity: str) -> str: - """Convert Lacework severity to DefectDojo severity. - - Args: - lw_severity: Lacework severity string (Critical, High, Medium, Low, Info) - - Returns: - str: DefectDojo severity string - """ + """Convert Lacework severity to DefectDojo severity.""" mapping = { "Critical": "Critical", "High": "High", @@ -490,25 +480,13 @@ def _convert_lacework_severity(lw_severity: str) -> str: "Info": "Info", } return mapping.get(lw_severity, "Info") - + @staticmethod def _extract_cvss_score(vuln: dict) -> tuple: - """Extract CVSSv3 score and vector from a vulnerability. - - Priority order: - 1. NVD CVSSv3 - 2. RBS (Rapid 7) CVSSv3 - 3. riskScore fallback - - Args: - vuln: Vulnerability dict from Lacework API - - Returns: - tuple[float | None, str | None]: (cvssv3_score, cvssv3_vector) - """ + """Extract CVSSv3 score and vector from a vulnerability.""" cve_props = vuln.get("cveProps", {}) metadata = cve_props.get("metadata", {}) - + # Try NVD CVSSv3 first nvd = metadata.get("NVD", {}) cvssv3 = nvd.get("CVSSv3", {}) @@ -517,7 +495,7 @@ def _extract_cvss_score(vuln: dict) -> tuple: vector = cvssv3.get("Vectors") if score is not None and score > 0: return (float(score), vector) - + # Fallback to RBS (Rapid7) rbs = metadata.get("RBS", {}) cvssv3_rbs = rbs.get("CVSSv3", {}) @@ -526,59 +504,42 @@ def _extract_cvss_score(vuln: dict) -> tuple: vector = cvssv3_rbs.get("Vectors") if score is not None and score > 0: return (float(score), vector) - + # Fallback to riskScore from Lacework risk_score = vuln.get("riskScore") or vuln.get("cveRiskScore") if risk_score is not None: return (float(risk_score), None) - + return (None, None) - + @staticmethod def _extract_cwe(vuln: dict) -> int | None: - """Extract CWE ID from a vulnerability. - - The CWE ID is found in cveProps.metadata.RBS.cwe_id dict - with the CVE ID as key. - - Args: - vuln: Vulnerability dict from Lacework API - - Returns: - int | None: CWE ID as integer, or None if not found - """ + """Extract CWE ID from a vulnerability.""" cve_props = vuln.get("cveProps", {}) metadata = cve_props.get("metadata", {}) rbs = metadata.get("RBS", {}) cwe_map = rbs.get("cwe_id", {}) - + # Find the CWE from the map - for cve_id, cwe_str in cwe_map.items(): + for cwe_str in cwe_map.values(): if cwe_str and cwe_str.startswith("CWE-"): try: return int(cwe_str.replace("CWE-", "")) except (ValueError, TypeError): continue - + return None - + @staticmethod def _notify_failure(test, import_type: str, error_message: str): - """Send a notification about an import failure. - - Args: - test: The test being imported - import_type: Type of import that failed - error_message: Error description - """ - from dojo.notifications.helper import create_notification - + """Send a notification about an import failure.""" + from dojo.notifications.helper import create_notification # noqa: PLC0415 + create_notification( event="other", title=f"Lacework {import_type} failed", description=( - f"Lacework {import_type} failed for product " - f"'{test.engagement.product.name}': {error_message}" + f"Lacework {import_type} failed for product '{test.engagement.product.name}': {error_message}" ), icon="exclamation-triangle", source="Lacework API", diff --git a/dojo/tools/api_lacework/parser.py b/dojo/tools/api_lacework/parser.py index 790ea8aaba4..4ca51585ff4 100644 --- a/dojo/tools/api_lacework/parser.py +++ b/dojo/tools/api_lacework/parser.py @@ -4,12 +4,6 @@ class ApiLaceworkParser: - """Parser for Lacework API Import. - - This parser implements the DefectDojo parser contract for API-based imports. - It requires a Tool Type named "Lacework" to be configured. - """ - def get_scan_types(self): """Return the scan types supported by this parser.""" return [SCAN_LACEWORK_API] @@ -36,30 +30,15 @@ def requires_tool_type(self, scan_type): return "Lacework" def api_scan_configuration_hint(self): - """Return a hint for configuring the API scan. - - When tool_configurations.count == 0, the template renders: - "Tool type Lacework exists however parser Lacework API Import requires at least one - tool configuration." - - When tool_configurations.count > 0, this hint is rendered instead. - We return the same format so it looks consistent. - """ - from dojo.models import Tool_Type + """Return a hint for configuring the API scan.""" + from dojo.models import Tool_Type # noqa: PLC0415 + tool_type_id = Tool_Type.objects.filter(name="Lacework").values_list("id", flat=True).first() or "" return ( - f'Tool type Lacework exists however parser Lacework API Import ' + f"Tool type Lacework exists however parser Lacework API Import " f'requires at least one tool configuration.' ) def get_findings(self, json_output, test): - """Import findings from Lacework API. - - Args: - json_output: Ignored for API-based imports - test: Test instance to associate findings with - - Returns: - list[Finding]: List of Finding instances - """ - return LaceworkApiImporter().get_findings(json_output, test) \ No newline at end of file + """Import findings from Lacework API.""" + return LaceworkApiImporter().get_findings(json_output, test) diff --git a/dojo/tools/api_lacework/updater.py b/dojo/tools/api_lacework/updater.py index 829f1e842da..b34bad83175 100644 --- a/dojo/tools/api_lacework/updater.py +++ b/dojo/tools/api_lacework/updater.py @@ -15,29 +15,12 @@ class LaceworkApiUpdater: - """Updater for Lacework findings. - - Note: Lacework does not currently support issue transitions via API - (unlike SonarQube which has transitions like confirm, resolve, etc.). - This class serves as a placeholder for future functionality. - """ - def update_lacework_finding(self, finding): - """Update a finding status in Lacework. - - Currently a placeholder since Lacework does not support - issue transitions. If Lacework adds this capability in the future, - this method can be implemented to reflect DefectDojo status changes - back to Lacework via the VulnerabilityExceptions API. - - Args: - finding: The Finding instance whose status may need syncing - """ + """Update a finding status in Lacework.""" logger.debug( - "Lacework updater called for finding %s. " - "Lacework does not currently support issue transitions.", + "Lacework updater called for finding %s. Lacework does not currently support issue transitions.", finding.id, ) # Future implementation could use: # POST /api/v2/VulnerabilityExceptions to add exceptions - # for findings that are false positive or risk accepted in DefectDojo \ No newline at end of file + # for findings that are false positive or risk accepted in DefectDojo diff --git a/dojo/tools/tool_issue_updater.py b/dojo/tools/tool_issue_updater.py index 217d7e44d70..413579dfddd 100644 --- a/dojo/tools/tool_issue_updater.py +++ b/dojo/tools/tool_issue_updater.py @@ -22,7 +22,7 @@ def async_tool_issue_update(finding, *args, **kwargs): def is_tool_issue_updater_needed(finding, *args, **kwargs): test_type = finding.test.test_type - return test_type.name in (SCAN_SONARQUBE_API, SCAN_LACEWORK_API) + return test_type.name in {SCAN_SONARQUBE_API, SCAN_LACEWORK_API} @app.task diff --git a/unittests/test_api_lacework.py b/unittests/test_api_lacework.py index e7e5f16b76b..c57786fa6bc 100644 --- a/unittests/test_api_lacework.py +++ b/unittests/test_api_lacework.py @@ -18,7 +18,6 @@ from dojo.models import ( Engagement, - Finding, Product, Product_Type, Test, @@ -27,37 +26,35 @@ Tool_Type, ) from dojo.tools.api_lacework.importer import LaceworkApiImporter -from dojo.tools.api_lacework.parser import ApiLaceworkParser, SCAN_LACEWORK_API +from dojo.tools.api_lacework.parser import SCAN_LACEWORK_API, ApiLaceworkParser from .dojo_test_case import DojoTestCase class TestLaceworkApiImporter(DojoTestCase): - """Test the core logic of the Lacework API importer.""" - def test_convert_lacework_severity_critical(self): """Test that Critical severity maps correctly.""" - assert LaceworkApiImporter._convert_lacework_severity("Critical") == "Critical" + self.assertEqual(LaceworkApiImporter._convert_lacework_severity("Critical"), "Critical") def test_convert_lacework_severity_high(self): """Test that High severity maps correctly.""" - assert LaceworkApiImporter._convert_lacework_severity("High") == "High" + self.assertEqual(LaceworkApiImporter._convert_lacework_severity("High"), "High") def test_convert_lacework_severity_medium(self): """Test that Medium severity maps correctly.""" - assert LaceworkApiImporter._convert_lacework_severity("Medium") == "Medium" + self.assertEqual(LaceworkApiImporter._convert_lacework_severity("Medium"), "Medium") def test_convert_lacework_severity_low(self): """Test that Low severity maps correctly.""" - assert LaceworkApiImporter._convert_lacework_severity("Low") == "Low" + self.assertEqual(LaceworkApiImporter._convert_lacework_severity("Low"), "Low") def test_convert_lacework_severity_info(self): """Test that Info severity maps correctly.""" - assert LaceworkApiImporter._convert_lacework_severity("Info") == "Info" + self.assertEqual(LaceworkApiImporter._convert_lacework_severity("Info"), "Info") def test_convert_lacework_severity_unknown(self): """Test that unknown severity defaults to Info.""" - assert LaceworkApiImporter._convert_lacework_severity("Unknown") == "Info" + self.assertEqual(LaceworkApiImporter._convert_lacework_severity("Unknown"), "Info") def test_extract_cvss_score_from_nvd(self): """Test CVSSv3 extraction from NVD metadata (highest priority).""" @@ -81,8 +78,8 @@ def test_extract_cvss_score_from_nvd(self): "riskScore": 10, } score, vector = LaceworkApiImporter._extract_cvss_score(vuln) - assert score == 9.8 - assert vector == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H" + self.assertAlmostEqual(score, 9.8, places=1) + self.assertEqual(vector, "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H") def test_extract_cvss_score_from_rbs(self): """Test CVSSv3 extraction from RBS metadata when NVD is not available.""" @@ -100,8 +97,8 @@ def test_extract_cvss_score_from_rbs(self): }, } score, vector = LaceworkApiImporter._extract_cvss_score(vuln) - assert score == 7.5 - assert vector == "CVSS:3.0/AV:N/AC:L/Au:N/C:P/I:P/A:P" + self.assertAlmostEqual(score, 7.5, places=1) + self.assertEqual(vector, "CVSS:3.0/AV:N/AC:L/Au:N/C:P/I:P/A:P") def test_extract_cvss_score_from_riskscore(self): """Test fallback to riskScore when no CVSS metadata is available.""" @@ -115,8 +112,8 @@ def test_extract_cvss_score_from_riskscore(self): "riskScore": 10, } score, vector = LaceworkApiImporter._extract_cvss_score(vuln) - assert score == 10.0 - assert vector is None + self.assertAlmostEqual(score, 10.0, places=1) + self.assertIsNone(vector) def test_extract_cvss_score_from_cveriskscore(self): """Test fallback to cveRiskScore.""" @@ -129,8 +126,8 @@ def test_extract_cvss_score_from_cveriskscore(self): }, "cveRiskScore": 9.8, } - score, vector = LaceworkApiImporter._extract_cvss_score(vuln) - assert score == 9.8 + score, _vector = LaceworkApiImporter._extract_cvss_score(vuln) + self.assertAlmostEqual(score, 9.8, places=1) def test_extract_cvss_score_none_when_no_data(self): """Test that None is returned when no score data exists.""" @@ -143,8 +140,8 @@ def test_extract_cvss_score_none_when_no_data(self): }, } score, vector = LaceworkApiImporter._extract_cvss_score(vuln) - assert score is None - assert vector is None + self.assertIsNone(score) + self.assertIsNone(vector) def test_extract_cwe_success(self): """Test CWE extraction from RBS metadata.""" @@ -160,7 +157,7 @@ def test_extract_cwe_success(self): }, } cwe = LaceworkApiImporter._extract_cwe(vuln) - assert cwe == 787 + self.assertEqual(cwe, 787) def test_extract_cwe_multiple(self): """Test CWE extraction when there are multiple CWEs.""" @@ -177,7 +174,7 @@ def test_extract_cwe_multiple(self): }, } cwe = LaceworkApiImporter._extract_cwe(vuln) - assert cwe == 787 + self.assertEqual(cwe, 787) def test_extract_cwe_none_when_no_cwe(self): """Test that None is returned when no CWE data exists.""" @@ -189,13 +186,13 @@ def test_extract_cwe_none_when_no_cwe(self): }, } cwe = LaceworkApiImporter._extract_cwe(vuln) - assert cwe is None + self.assertIsNone(cwe) def test_extract_cwe_none_when_no_metadata(self): """Test that None is returned when no metadata exists.""" vuln = {"cveProps": {}} cwe = LaceworkApiImporter._extract_cwe(vuln) - assert cwe is None + self.assertIsNone(cwe) def test_create_finding_from_container_vuln(self): """Test Finding creation from a container vulnerability with all fields.""" @@ -239,17 +236,17 @@ def test_create_finding_from_container_vuln(self): } # Create a proper instance to call the instance method - importer = LaceworkApiImporter() + LaceworkApiImporter() # Test the static helper methods independently severity = LaceworkApiImporter._convert_lacework_severity(vuln.get("severity", "Info")) - assert severity == "Critical" - + self.assertEqual(severity, "Critical") + cwe = LaceworkApiImporter._extract_cwe(vuln) - assert cwe == 787 - + self.assertEqual(cwe, 787) + score, vector = LaceworkApiImporter._extract_cvss_score(vuln) - assert score == 9.8 - assert vector == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H" + self.assertAlmostEqual(score, 9.8, places=1) + self.assertEqual(vector, "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H") def test_create_finding_from_host_vuln(self): """Test Finding creation from a host vulnerability with all fields.""" @@ -293,82 +290,67 @@ def test_create_finding_from_host_vuln(self): # Verify the mapping logic extracts fields correctly cwe = LaceworkApiImporter._extract_cwe(vuln) - assert cwe == 254 + self.assertEqual(cwe, 254) score, vector = LaceworkApiImporter._extract_cvss_score(vuln) - assert score == 9.8 - assert vector == "CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H" + self.assertAlmostEqual(score, 9.8, places=1) + self.assertEqual(vector, "CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H") class TestApiLaceworkParser(TestCase): - """Test the parser contract implementation.""" - def setUp(self): self.parser = ApiLaceworkParser() def test_get_scan_types(self): """Test that the parser returns the correct scan type.""" scan_types = self.parser.get_scan_types() - assert len(scan_types) == 1 - assert scan_types[0] == SCAN_LACEWORK_API + self.assertEqual(len(scan_types), 1) + self.assertEqual(scan_types[0], SCAN_LACEWORK_API) def test_get_label_for_scan_types(self): """Test that the label matches the scan type.""" - assert ( - self.parser.get_label_for_scan_types(SCAN_LACEWORK_API) - == SCAN_LACEWORK_API - ) + self.assertEqual(self.parser.get_label_for_scan_types(SCAN_LACEWORK_API), SCAN_LACEWORK_API) def test_get_description_for_scan_types(self): """Test that a description is returned.""" description = self.parser.get_description_for_scan_types(SCAN_LACEWORK_API) - assert description is not None - assert len(description) > 0 - assert "Lacework" in description + self.assertIsNotNone(description) + self.assertGreater(len(description), 0) + self.assertIn("Lacework", description) def test_requires_file(self): """Test that no file is required (API-based import).""" - assert self.parser.requires_file(SCAN_LACEWORK_API) is False + self.assertFalse(self.parser.requires_file(SCAN_LACEWORK_API)) def test_requires_tool_type(self): """Test that the required tool type is 'Lacework'.""" - assert self.parser.requires_tool_type(SCAN_LACEWORK_API) == "Lacework" + self.assertEqual(self.parser.requires_tool_type(SCAN_LACEWORK_API), "Lacework") def test_api_scan_configuration_hint(self): """Test that a configuration hint is provided.""" hint = self.parser.api_scan_configuration_hint() - assert hint is not None - assert len(hint) > 0 - assert "Service key 1" in hint + self.assertIsNotNone(hint) + self.assertGreater(len(hint), 0) + self.assertIn("Service key 1", hint) def test_get_findings_with_empty_input(self): """Test that get_findings returns a list even with empty input.""" - # The importer relies on a valid test object with engagement and product - # When None is passed, it should raise an error because it can't access - # test.engagement.product. This test verifies the error handling returns - # an empty list rather than crashing. - # We pass a mock object that will fail validation gracefully - from unittest.mock import MagicMock - mock_test = MagicMock() mock_test.api_scan_configuration = None mock_test.engagement.product.name = "test" mock_test.engagement.product.product_api_scan_configuration_set.filter.return_value.count.return_value = 0 - + result = self.parser.get_findings(None, mock_test) - assert isinstance(result, list) - # Should be empty because no API Scan Configuration is configured - assert len(result) == 0 + self.assertIsInstance(result, list) + self.assertEqual(len(result), 0) class TestLaceworkApiImporterIntegration(DojoTestCase): - """Integration tests for the Lacework importer with real DB models.""" - def setUp(self): """Set up test data with real DB models.""" # Create Tool Type self.tool_type = Tool_Type.objects.create(name="Lacework") - + # Create Tool Configuration self.tool_config = Tool_Configuration.objects.create( name="Lacework Test", @@ -378,7 +360,7 @@ def setUp(self): username="test-key-id", api_key="test-api-key", ) - + # Create Product Type and Product self.product_type = Product_Type.objects.create(name="Lacework") self.product = Product.objects.create( @@ -386,13 +368,13 @@ def setUp(self): prod_type=self.product_type, description="Test product for Lacework import", ) - + # Create API Scan Configuration self.api_scan_config = self.product.product_api_scan_configuration_set.create( product=self.product, tool_configuration=self.tool_config, ) - + # Create Engagement self.engagement = Engagement.objects.create( product=self.product, @@ -402,10 +384,10 @@ def setUp(self): active=True, status="In Progress", ) - + # Get or create Test Type self.test_type, _ = Test_Type.objects.get_or_create(name="Lacework API Import") - + # Create Test self.test = Test.objects.create( engagement=self.engagement, @@ -416,7 +398,7 @@ def setUp(self): api_scan_configuration=self.api_scan_config, description="Lacework test import", ) - + self.importer = LaceworkApiImporter() def test_get_findings_with_mocked_client_container_vulns(self): @@ -486,39 +468,39 @@ def test_get_findings_with_mocked_client_container_vulns(self): with patch.object(self.importer, "prepare_client") as mock_prepare: mock_client = MagicMock() mock_prepare.return_value = (mock_client, self.api_scan_config) - + mock_client.include_containers = True mock_client.include_hosts = True mock_client.search_container_vulnerabilities.return_value = mock_vulns mock_client.search_host_vulnerabilities.return_value = [] findings = self.importer.get_findings(None, self.test) - + # Should have 2 findings - assert len(findings) == 2 - + self.assertEqual(len(findings), 2) + # Verify first finding fields - assert findings[0].vuln_id_from_tool == "CVE-2022-37434" - assert findings[0].severity == "Critical" - assert findings[0].component_name == "zlib" - assert findings[0].component_version == "1:1.2.11.dfsg-2+deb11u1" - assert findings[0].file_path == "debian:11" - assert findings[0].fix_available is True - assert findings[0].fix_version == "1:1.2.11.dfsg-2+deb11u2" - assert findings[0].cwe == 787 - assert findings[0].cvssv3_score == 9.8 - assert findings[0].cvssv3 == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H" - assert findings[0].static_finding is True - assert findings[0].active is True - assert findings[0].verified is True - + self.assertEqual(findings[0].vuln_id_from_tool, "CVE-2022-37434") + self.assertEqual(findings[0].severity, "Critical") + self.assertEqual(findings[0].component_name, "zlib") + self.assertEqual(findings[0].component_version, "1:1.2.11.dfsg-2+deb11u1") + self.assertEqual(findings[0].file_path, "debian:11") + self.assertTrue(findings[0].fix_available) + self.assertEqual(findings[0].fix_version, "1:1.2.11.dfsg-2+deb11u2") + self.assertEqual(findings[0].cwe, 787) + self.assertAlmostEqual(findings[0].cvssv3_score, 9.8, places=1) + self.assertEqual(findings[0].cvssv3, "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H") + self.assertTrue(findings[0].static_finding) + self.assertTrue(findings[0].active) + self.assertTrue(findings[0].verified) + # Verify second finding - assert findings[1].vuln_id_from_tool == "CVE-2023-12345" - assert findings[1].severity == "High" - assert findings[1].component_name == "openssl" - assert findings[1].cvssv3_score == 8.5 - assert findings[1].fix_available is False - assert findings[1].cwe is None + self.assertEqual(findings[1].vuln_id_from_tool, "CVE-2023-12345") + self.assertEqual(findings[1].severity, "High") + self.assertEqual(findings[1].component_name, "openssl") + self.assertAlmostEqual(findings[1].cvssv3_score, 8.5, places=1) + self.assertFalse(findings[1].fix_available) + self.assertIsNone(findings[1].cwe) def test_get_findings_with_mocked_client_host_vulns(self): """Test that get_findings creates Finding objects from mocked host vulns.""" @@ -550,39 +532,48 @@ def test_get_findings_with_mocked_client_host_vulns(self): with patch.object(self.importer, "prepare_client") as mock_prepare: mock_client = MagicMock() mock_prepare.return_value = (mock_client, self.api_scan_config) - + mock_client.include_containers = True mock_client.include_hosts = True mock_client.search_container_vulnerabilities.return_value = [] mock_client.search_host_vulnerabilities.return_value = mock_vulns findings = self.importer.get_findings(None, self.test) - - assert len(findings) == 1 - assert findings[0].vuln_id_from_tool == "CVE-2016-1585" - assert findings[0].severity == "Medium" - assert findings[0].component_name == "apparmor" - assert findings[0].component_version == "2.13.3-7ubuntu5.3" - assert findings[0].file_path == "ubuntu:20.04" - assert findings[0].cwe == 254 - assert findings[0].cvssv3_score == 9.8 - assert findings[0].fix_available is False + + self.assertEqual(len(findings), 1) + self.assertEqual(findings[0].vuln_id_from_tool, "CVE-2016-1585") + self.assertEqual(findings[0].severity, "Medium") + self.assertEqual(findings[0].component_name, "apparmor") + self.assertEqual(findings[0].component_version, "2.13.3-7ubuntu5.3") + self.assertEqual(findings[0].file_path, "ubuntu:20.04") + self.assertEqual(findings[0].cwe, 254) + self.assertAlmostEqual(findings[0].cvssv3_score, 9.8, places=1) + self.assertFalse(findings[0].fix_available) def test_get_findings_disables_containers_from_extras(self): """Test that include_containers=false skips container vulns.""" with patch.object(self.importer, "prepare_client") as mock_prepare: mock_client = MagicMock() mock_prepare.return_value = (mock_client, self.api_scan_config) - + mock_client.include_containers = False mock_client.include_hosts = True mock_client.search_host_vulnerabilities.return_value = [ - {"vulnId": "CVE-2016-1585", "severity": "Medium", "cveProps": {"description": "test", "link": "", "metadata": {"NVD": {}, "RBS": {}}}, "featureKey": {"name": "test", "namespace": "test", "version_installed": "1.0"}, "fixInfo": {"fix_available": 0, "fixed_version": ""}, "mid": 123, "machineTags": {}, "riskScore": 5} + { + "vulnId": "CVE-2016-1585", + "severity": "Medium", + "cveProps": {"description": "test", "link": "", "metadata": {"NVD": {}, "RBS": {}}}, + "featureKey": {"name": "test", "namespace": "test", "version_installed": "1.0"}, + "fixInfo": {"fix_available": 0, "fixed_version": ""}, + "mid": 123, + "machineTags": {}, + "riskScore": 5, + }, ] findings = self.importer.get_findings(None, self.test) - - assert len(findings) == 1 + + self.assertEqual(len(findings), 1) # search_container_vulnerabilities should NOT have been called mock_client.search_container_vulnerabilities.assert_not_called() mock_client.search_host_vulnerabilities.assert_called_once() @@ -592,26 +583,25 @@ def test_persist_findings_to_db(self): with patch.object(self.importer, "prepare_client") as mock_prepare: mock_client = MagicMock() mock_prepare.return_value = (mock_client, self.api_scan_config) - + mock_client.include_containers = True mock_client.include_hosts = True mock_client.search_container_vulnerabilities.return_value = [] mock_client.search_host_vulnerabilities.return_value = [] findings = self.importer.get_findings(None, self.test) - + # No vulnerabilities, should be empty - assert len(findings) == 0 + self.assertEqual(len(findings), 0) def test_prepare_client_with_existing_config(self): """Test that prepare_client correctly finds the API Scan Configuration.""" - client, config = LaceworkApiImporter.prepare_client(self.test) - assert config == self.api_scan_config - assert config.tool_configuration == self.tool_config + _client, config = LaceworkApiImporter.prepare_client(self.test) + self.assertEqual(config, self.api_scan_config) + self.assertEqual(config.tool_configuration, self.tool_config) def test_prepare_client_fails_without_config(self): """Test that prepare_client raises error when no config exists.""" - # Create a separate product without any API scan configuration product_no_config = Product.objects.create( name="test-no-config", prod_type=self.product_type, @@ -629,5 +619,5 @@ def test_prepare_client_fails_without_config(self): target_start=timezone.now(), target_end=timezone.now(), ) - with self.assertRaises(Exception): - LaceworkApiImporter.prepare_client(test_no_config) \ No newline at end of file + with self.assertRaises(ValueError): + LaceworkApiImporter.prepare_client(test_no_config)