Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions docs/content/supported_tools/parsers/api/lacework.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
---
title: "Lacework API Import"
toc_hide: true
---
All parsers that use API pull have common basic configuration steps, but with different values. Please, [read these steps](../) first.

## Tool Configuration

In `Tool Configuration`, select `Tool Type` "Lacework" and `Authentication Type` "API Key".
The URL must be in the format of `https://<your_lacework_instance>.lacework.net`
Enter the key ID in the "Username" field and paste your Lacework API secret key in the "API Key" field.
By default, the tool will import both container and host vulnerabilities.

To restrict the import to only containers or only hosts, use the "Extras" field with the following options:

| Extras value | Effect |
|---|---|
| *(empty)* | Import both containers and hosts (default) |
| `include_hosts=false` | Import containers only |
| `include_containers=false` | Import hosts only |
| `include_containers=true,include_hosts=true` | Both (same as empty) |

## Product-Level Configuration

In `Add API Scan Configuration`
- `Service key 1` can optionally be set to filter container vulnerabilities by repository name pattern.
When set, only container repositories matching the pattern (rlike) will be imported.
Leave empty to import all container repositories.

## Import All Repositories (Management Command)

Instead of importing per-product through the UI, you can import all repositories at once and auto-create Products using the management command:

```bash
python manage.py lacework_import_all --tool-config <tool_config_id>
```

This command will:
1. Fetch all container vulnerabilities from Lacework
2. Automatically group them by repository
3. Create a Product for each unique repository (if it doesn't already exist)
4. Create an Engagement and Test for each product
5. Create Findings for each vulnerability

The configuration for `include_containers` and `include_hosts` is automatically read from the "Extras" field of the Tool Configuration.

## Sample Scan Data

Sample Lacework vulnerability data can be examined using the debug command:

```bash
python manage.py lacework_debug_vuln --tool-config <tool_config_id> --type containers
python manage.py lacework_debug_vuln --tool-config <tool_config_id> --type hosts
```

## Field Mapping

Lacework vulnerability fields are mapped to DefectDojo Finding fields as follows:

| Lacework Field | Finding Field | Example |
|---|---|---|
| `vulnId` | `vuln_id_from_tool` | CVE-2025-62727 |
| `severity` (or inferred from `riskScore`) | `severity` | Critical, High, Medium, Low, Info |
| `cveProps.description` + `featureProps.introduced_in` | `description` | Vulnerability description |
| `cveProps.link` + `cveProps.source` | `references` | CVE link and data source |
| `featureKey.name` | `component_name` | starlette, zlib, openssl |
| `featureKey.version` / `version_installed` | `component_version` | 0.47.3 |
| `featureProps.src` | `file_path` | Package path within image |
| `fixInfo.fix_available` | `fix_available` | 1 (true) if fix exists |
| `fixInfo.fixed_version` | `fix_version` | 0.49.1 |
| `cveRiskScore` / `riskScore` | `cvssv3_score` | 9.8 |
| `status` (VULNERABLE/GOOD) | `active` / `verified` | Active only if vulnerable |
| `packageStatus` | tags | pkg:NO_AGENT_AVAILABLE |
| `evalCtx.request_source` | tags | scanner:INLINE_SCANNER |
| `evalCtx.integration_props.NAME` | tags | integration:bitbucket-pipelines |
| `featureProps.feed` | tags | feed:rbs |

## Deduplication

The Lacework API Import uses `hash_code` algorithm for deduplication with the following fields:
- `vuln_id_from_tool` (CVE ID)
- `component_name` (package name)
- `file_path` (namespace/package path)

This means the same CVE found in the same package will be properly deduplicated.

## Multiple Lacework API Configurations

In the import or re-import dialog, you can select which `API Scan Configuration` shall be used. If you do not choose any, DefectDojo will use the `API Scan Configuration` of the Product if there is only one defined or the Lacework `Tool Configuration` if there is only one.
194 changes: 194 additions & 0 deletions dojo/management/commands/lacework_debug_vuln.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""
Management command to dump raw Lacework vulnerability data for debugging.

Usage:
python manage.py lacework_debug_vuln --tool-config <ID>

This will fetch one vulnerability and print its full JSON structure
so you can see what fields are available for mapping.
"""

import json
import logging
from datetime import UTC, datetime, timedelta

from django.core.management.base import BaseCommand, CommandError

from dojo.models import Tool_Configuration
from dojo.tools.api_lacework.api_client import LaceworkAPI

logger = logging.getLogger(__name__)


class Command(BaseCommand):
help = "Dump raw Lacework vulnerability data for debugging field mapping"

def add_arguments(self, parser):
parser.add_argument(
"--tool-config",
type=int,
required=True,
help="ID of the Tool Configuration for Lacework",
)
parser.add_argument(
"--type",
type=str,
default="containers",
choices=["containers", "hosts"],
help="Type of vulnerabilities to dump (default: containers)",
)

def handle(self, *args, **options):
tool_config_id = options["tool_config"]
vuln_type = options["type"]

try:
tool_config = Tool_Configuration.objects.get(id=tool_config_id)
except Tool_Configuration.DoesNotExist:
msg = f"Tool Configuration with id {tool_config_id} not found"
raise CommandError(
msg,
)

self.stdout.write(f"Using Tool Configuration: {tool_config.name}")
self.stdout.write(f" URL: {tool_config.url}")

client = LaceworkAPI(tool_config)

end_time = datetime.now(UTC)
start_time = end_time - timedelta(hours=24)
start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")

if vuln_type == "containers":
self.stdout.write("\nFetching container vulnerabilities...")
vulns = client.search_container_vulnerabilities(
start_time=start_time_str,
end_time=end_time_str,
)
else:
self.stdout.write("\nFetching host vulnerabilities...")
vulns = client.search_host_vulnerabilities(
start_time=start_time_str,
end_time=end_time_str,
)

if not vulns:
self.stdout.write(self.style.WARNING("No vulnerabilities found"))
return

self.stdout.write(f"Total vulnerabilities: {len(vulns)}")

# Find first vulnerability that actually has a CVE/vulnId
vuln = None
for v in vulns:
if v.get("vulnId") and v.get("severity"):
vuln = v
break

if not vuln:
self.stdout.write(self.style.WARNING("No vulnerability with CVE data found"))
return

self.stdout.write(f"\nSelected vulnerability: {vuln.get('vulnId')} ({vuln.get('severity')})\n")
self.stdout.write(
self.style.SUCCESS(
f"\n=== Full JSON structure of 1 {vuln_type} vulnerability ===\n",
),
)
self.stdout.write(json.dumps(vuln, indent=2, default=str))

self.stdout.write(
self.style.SUCCESS(
"\n=== TOP-LEVEL FIELDS ===\n",
),
)
for key, value in vuln.items():
if not isinstance(value, (dict, list)):
self.stdout.write(f" {key}: {value}")
else:
self.stdout.write(f" {key}: ({type(value).__name__})")

# Analyze available fields for mapping
self.stdout.write(
self.style.SUCCESS(
"\n=== ANALYSIS ===\n",
),
)
self.stdout.write(f"vulnId (CVE): {vuln.get('vulnId', 'N/A')}")
self.stdout.write(f"severity: {vuln.get('severity', 'N/A')}")
self.stdout.write(f"status: {vuln.get('status', 'N/A')}")
self.stdout.write(f"riskScore: {vuln.get('riskScore', 'N/A')}")
self.stdout.write(f"cveRiskScore: {vuln.get('cveRiskScore', 'N/A')}")
self.stdout.write(f"startTime: {vuln.get('startTime', 'N/A')}")
self.stdout.write(f"endTime: {vuln.get('endTime', 'N/A')}")
self.stdout.write(f"evalGuid: {vuln.get('evalGuid', 'N/A')}")
self.stdout.write(f"imageId: {vuln.get('imageId', 'N/A')}")

# FeatureKey details
fk = vuln.get("featureKey", {})
self.stdout.write(f"\n featureKey.name: {fk.get('name', 'N/A')}")
self.stdout.write(f" featureKey.namespace: {fk.get('namespace', 'N/A')}")
if fk.get("version"):
self.stdout.write(f" featureKey.version: {fk['version']}")
if fk.get("version_installed"):
self.stdout.write(f" featureKey.version_installed: {fk['version_installed']}")
if fk.get("version_format"):
self.stdout.write(f" featureKey.version_format: {fk['version_format']}")
if fk.get("src"):
self.stdout.write(f" featureKey.src: {fk['src']}")
if fk.get("introduced_in"):
self.stdout.write(f" featureKey.introduced_in: {fk['introduced_in']}")
if fk.get("layer"):
self.stdout.write(f" featureKey.layer: {fk['layer']}")
if fk.get("package_active"):
self.stdout.write(f" featureKey.package_active: {fk['package_active']}")
if fk.get("package_path"):
self.stdout.write(f" featureKey.package_path: {fk['package_path']}")

# FixInfo
fi = vuln.get("fixInfo", {})
self.stdout.write(f"\n fixInfo.fix_available: {fi.get('fix_available', 'N/A')}")
self.stdout.write(f" fixInfo.fixed_version: {fi.get('fixed_version', 'N/A')}")

# CveProps
cp = vuln.get("cveProps", {})
self.stdout.write(f"\n cveProps.description: {cp.get('description', 'N/A')[:100]}...")
self.stdout.write(f" cveProps.link: {cp.get('link', 'N/A')}")
self.stdout.write(f" cveProps.source: {cp.get('source', 'N/A')}")

# Metadata
meta = cp.get("metadata", {})
nvd = meta.get("NVD", {})
rbs = meta.get("RBS", {})
self.stdout.write(f"\n metadata.NVD.CVSSv3.Score: {nvd.get('CVSSv3', {}).get('Score', 'N/A')}")
self.stdout.write(f" metadata.NVD.CVSSv2.Score: {nvd.get('CVSSv2', {}).get('Score', 'N/A')}")
self.stdout.write(f" metadata.RBS.CVSSv3.Score: {rbs.get('CVSSv3', {}).get('Score', 'N/A')}")
self.stdout.write(f" metadata.RBS.cwe_id: {rbs.get('cwe_id', 'N/A')}")

# EvalCtx for containers
ec = vuln.get("evalCtx", {})
if ec:
self.stdout.write(f"\n evalCtx.collector_type: {ec.get('collector_type', 'N/A')}")
ii = ec.get("image_info", {})
if ii:
self.stdout.write(f" evalCtx.image_info.repo: {ii.get('repo', 'N/A')}")
self.stdout.write(f" evalCtx.image_info.registry: {ii.get('registry', 'N/A')}")
self.stdout.write(f" evalCtx.image_info.tags: {ii.get('tags', 'N/A')}")
self.stdout.write(f" evalCtx.image_info.digest: {ii.get('digest', 'N/A')}")
self.stdout.write(f" evalCtx.image_info.status: {ii.get('status', 'N/A')}")
self.stdout.write(f" evalCtx.image_info.type: {ii.get('type', 'N/A')}")
self.stdout.write(f" evalCtx.image_info.size: {ii.get('size', 'N/A')}")

# Machine info for hosts
machine_tags = vuln.get("machineTags", {})
if machine_tags:
self.stdout.write(f"\n machineTags.Hostname: {machine_tags.get('Hostname', 'N/A')}")
self.stdout.write(f" machineTags.VmProvider: {machine_tags.get('VmProvider', 'N/A')}")
self.stdout.write(f" machineTags.InstanceId: {machine_tags.get('InstanceId', 'N/A')}")
self.stdout.write(f" machineTags.Region: {machine_tags.get('Region', 'N/A')}")

# Additional fields
self.stdout.write("\n additional top-level keys:")
for key in sorted(vuln.keys()):
self.stdout.write(f" - {key}")
Loading