Skip to content

Commit 19f7044

Browse files
perf(importers): batch endpoint creation and status updates during import/reimport (#14489)
* perf(importers): batch endpoint creation and status updates during import/reimport Replace per-finding endpoint_get_or_create() calls with a stateful EndpointManager that accumulates endpoints and statuses across findings and flushes them in bulk at batch boundaries. Reduces ~6200 DB queries to ~3-5 for a 1000-finding scan with 5 endpoints per finding (200 unique). - EndpointManager now takes a `product` param and holds internal accumulators - `record_endpoint()` deduplicates by normalized key within a batch - `record_status_for_create()` / `record_statuses_to_mitigate()` / `record_statuses_to_reactivate()` accumulate operations - `persist()` flushes all pending creates and bulk_updates in one shot - `update_endpoint_status()` accumulates mitigate/reactivate lists instead of dispatching per-finding Celery tasks - Removed old `chunk_endpoints_and_*`, `add_endpoints_to_unsaved_finding`, `mitigate_endpoint_status`, `reactivate_endpoint_status` methods - Added unit tests for `_make_endpoint_unique_tuple` normalization - Updated performance test fixture to match new stateful manager interface * fix endpoint manager initialization * fix(importers): restore tag inheritance and endpoint_manager init for direct callers - bulk_create bypasses Django post_save signals, so manually call inherit_instance_tags() for each newly created Endpoint to preserve product tag inheritance behavior - Initialize endpoint_manager in create_test() so callers that invoke create_test() + process_findings() directly (without going through process_scan()) don't hit a NoneType error * refactor(importers): explicit test param for endpoint manager, rename and improve get_or_create_endpoints - Add `test` parameter to `_create_endpoint_manager()` so callers pass the test explicitly instead of relying on `self.test` being set - Raise `ValueError` with a clear message when `test is None` - Initialize endpoint_manager in `DefaultImporter.process_scan()` after `parse_findings()` (which calls `create_test()` for fresh imports), covering both the sync path and the async Celery task path - Rename `_fetch_and_create_endpoints()` → `get_or_create_endpoints()` - Change return type to `tuple[dict, list]`: returns `(endpoints_by_key, created)` so callers know exactly which endpoints were newly inserted - Rename local `key_to_endpoint` → `endpoints_by_key` for clarity * ruff * fix create endpoint manager * fix counts * refactor(importers): initialize EndpointManager eagerly in __init__ Instead of lazily initializing endpoint_manager in process_scan() or create_test(), initialize it immediately in __init__ using the product from the required engagement/test parameter. This eliminates the NoneType AttributeError when callers invoke create_test() + process_findings() directly without going through process_scan(). - DefaultImporter: uses self.engagement.product (engagement is required) - DefaultReImporter: uses self.test.engagement.product (test is required) - Remove _create_endpoint_manager() factory methods and lazy init sites - Remove self.endpoint_manager = None from BaseImporter.__init__ * add test case * ruff * remove mock * fix: replace status_finding_non_special prefetch with Python filtering Drop the to_attr Prefetch for status_finding_non_special and instead prefetch all endpoint statuses, filtering non-special ones in Python via EndpointManager.get_non_special_endpoint_statuses(). This avoids AttributeError when a finding created during the same reimport batch (via add_new_finding_to_candidates) is matched by a subsequent finding — such findings were never loaded through the prefetch queryset and lacked the to_attr attribute. See: #14569
1 parent 0a72930 commit 19f7044

File tree

9 files changed

+716
-180
lines changed

9 files changed

+716
-180
lines changed

dojo/finding/deduplication.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -336,17 +336,18 @@ def build_candidate_scope_queryset(test, mode="deduplication", service=None):
336336
# Base prefetches for both modes
337337
prefetch_list = ["endpoints", "vulnerability_id_set", "found_by"]
338338

339-
# Additional prefetches for reimport mode: fetch only non-special endpoint statuses with their
340-
# endpoint joined in, so endpoint_manager can read status_finding_non_special directly without
341-
# any extra DB queries
339+
# Prefetch all endpoint statuses with their endpoint for reimport mode.
340+
# The non-special filtering (excluding false_positive, out_of_scope, risk_accepted)
341+
# is done in Python by EndpointManager.get_non_special_endpoint_statuses().
342+
# We avoid using to_attr here because findings created during the same reimport
343+
# batch (via add_new_finding_to_candidates) are never loaded through this queryset
344+
# and would lack the to_attr, causing an AttributeError.
345+
# See: https://github.com/DefectDojo/django-DefectDojo/pull/14569
342346
if mode == "reimport":
343347
prefetch_list.append(
344348
Prefetch(
345349
"status_finding",
346-
queryset=Endpoint_Status.objects.exclude(
347-
Q(false_positive=True) | Q(out_of_scope=True) | Q(risk_accepted=True),
348-
).select_related("endpoint"),
349-
to_attr="status_finding_non_special",
350+
queryset=Endpoint_Status.objects.select_related("endpoint"),
350351
),
351352
)
352353

dojo/importers/base_importer.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import dojo.finding.helper as finding_helper
1515
import dojo.risk_acceptance.helper as ra_helper
1616
from dojo.celery_dispatch import dojo_dispatch_task
17-
from dojo.importers.endpoint_manager import EndpointManager
1817
from dojo.importers.location_manager import LocationManager, UnsavedLocation
1918
from dojo.importers.options import ImporterOptions
2019
from dojo.jira_link.helper import is_keep_in_sync_with_jira
@@ -83,9 +82,6 @@ def __init__(
8382
ImporterOptions.__init__(self, *args, **kwargs)
8483
if settings.V3_FEATURE_LOCATIONS:
8584
self.location_manager = LocationManager()
86-
else:
87-
# TODO: Delete this after the move to Locations
88-
self.endpoint_manager = EndpointManager()
8985

9086
def check_child_implementation_exception(self):
9187
"""
@@ -825,12 +821,17 @@ def process_endpoints(
825821
msg = "BaseImporter#process_endpoints() method is deprecated when V3_FEATURE_LOCATIONS is enabled"
826822
raise NotImplementedError(msg)
827823

828-
# Save the unsaved endpoints
829-
self.endpoint_manager.chunk_endpoints_and_disperse(finding, finding.unsaved_endpoints)
830-
# Check for any that were added in the form
824+
# Clean and record unsaved endpoints from the report
825+
self.endpoint_manager.clean_unsaved_endpoints(finding.unsaved_endpoints)
826+
for endpoint in finding.unsaved_endpoints:
827+
key = self.endpoint_manager.record_endpoint(endpoint)
828+
self.endpoint_manager.record_status_for_create(finding, key)
829+
# Record any endpoints added from the form
831830
if len(endpoints_to_add) > 0:
832831
logger.debug("endpoints_to_add: %s", endpoints_to_add)
833-
self.endpoint_manager.chunk_endpoints_and_disperse(finding, endpoints_to_add)
832+
for endpoint in endpoints_to_add:
833+
key = self.endpoint_manager.record_endpoint(endpoint)
834+
self.endpoint_manager.record_status_for_create(finding, key)
834835

835836
def sanitize_vulnerability_ids(self, finding) -> None:
836837
"""Remove undisired vulnerability id values"""
@@ -934,14 +935,8 @@ def mitigate_finding(
934935
)
935936
else:
936937
# TODO: Delete this after the move to Locations
937-
# Mitigate the endpoint statuses
938-
dojo_dispatch_task(
939-
EndpointManager.mitigate_endpoint_status,
940-
finding.status_finding.all(),
941-
self.user,
942-
kwuser=self.user,
943-
sync=True,
944-
)
938+
# Accumulate endpoint statuses for bulk mitigate in persist()
939+
self.endpoint_manager.record_statuses_to_mitigate(finding.status_finding.all())
945940
# to avoid pushing a finding group multiple times, we push those outside of the loop
946941
if finding_groups_enabled and finding.finding_group:
947942
# don't try to dedupe findings that we are closing

dojo/importers/default_importer.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from dojo.celery_dispatch import dojo_dispatch_task
1010
from dojo.finding import helper as finding_helper
1111
from dojo.importers.base_importer import BaseImporter, Parser
12+
from dojo.importers.endpoint_manager import EndpointManager
1213
from dojo.importers.options import ImporterOptions
1314
from dojo.jira_link.helper import is_keep_in_sync_with_jira
1415
from dojo.models import (
@@ -56,6 +57,8 @@ def __init__(self, *args, **kwargs):
5657
import_type=Test_Import.IMPORT_TYPE,
5758
**kwargs,
5859
)
60+
if not settings.V3_FEATURE_LOCATIONS:
61+
self.endpoint_manager = EndpointManager(self.engagement.product)
5962

6063
def create_test(
6164
self,
@@ -109,6 +112,8 @@ def process_scan(
109112
parser = self.get_parser()
110113
# Get the findings from the parser based on what methods the parser supplies
111114
# This could either mean traditional file parsing, or API pull parsing
115+
# Note: for fresh imports, parse_findings() calls create_test() internally,
116+
# so self.test is guaranteed to be set after this call.
112117
parsed_findings = self.parse_findings(scan, parser) or []
113118
new_findings = self.process_findings(parsed_findings, **kwargs)
114119
# Close any old findings in the processed list if the the user specified for that
@@ -259,8 +264,10 @@ def process_findings(
259264
logger.debug("process_findings: computed push_to_jira=%s", push_to_jira)
260265
batch_finding_ids.append(finding.id)
261266

262-
# If batch is full or we're at the end, dispatch one batched task
267+
# If batch is full or we're at the end, persist endpoints and dispatch
263268
if len(batch_finding_ids) >= batch_max_size or is_final_finding:
269+
if not settings.V3_FEATURE_LOCATIONS:
270+
self.endpoint_manager.persist(user=self.user)
264271
finding_ids_batch = list(batch_finding_ids)
265272
batch_finding_ids.clear()
266273
logger.debug("process_findings: dispatching batch with push_to_jira=%s (batch_size=%d, is_final=%s)",
@@ -378,6 +385,9 @@ def close_old_findings(
378385
finding_groups_enabled=self.findings_groups_enabled,
379386
product_grading_option=False,
380387
)
388+
# Persist any accumulated endpoint status mitigations
389+
if not settings.V3_FEATURE_LOCATIONS:
390+
self.endpoint_manager.persist(user=self.user)
381391
# push finding groups to jira since we only only want to push whole groups
382392
# We dont check if the finding jira sync is applicable quite yet until we can get in the loop
383393
# but this is a way to at least make it that far

dojo/importers/default_reimporter.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
find_candidates_for_reimport_legacy,
1515
)
1616
from dojo.importers.base_importer import BaseImporter, Parser
17+
from dojo.importers.endpoint_manager import EndpointManager
1718
from dojo.importers.options import ImporterOptions
1819
from dojo.jira_link.helper import is_keep_in_sync_with_jira
1920
from dojo.location.status import FindingLocationStatus
@@ -76,6 +77,8 @@ def __init__(self, *args, **kwargs):
7677
import_type=Test_Import.REIMPORT_TYPE,
7778
**kwargs,
7879
)
80+
if not settings.V3_FEATURE_LOCATIONS:
81+
self.endpoint_manager = EndpointManager(self.test.engagement.product)
7982

8083
def process_scan(
8184
self,
@@ -430,6 +433,8 @@ def process_findings(
430433
# - Deduplication batches: optimize bulk operations (larger batches = fewer queries)
431434
# They don't need to be aligned since they optimize different operations.
432435
if len(batch_finding_ids) >= dedupe_batch_max_size or is_final:
436+
if not settings.V3_FEATURE_LOCATIONS:
437+
self.endpoint_manager.persist(user=self.user)
433438
finding_ids_batch = list(batch_finding_ids)
434439
batch_finding_ids.clear()
435440
dojo_dispatch_task(
@@ -497,6 +502,9 @@ def close_old_findings(
497502
product_grading_option=False,
498503
)
499504
mitigated_findings.append(finding)
505+
# Persist any accumulated endpoint status mitigations
506+
if not settings.V3_FEATURE_LOCATIONS:
507+
self.endpoint_manager.persist(user=self.user)
500508
# push finding groups to jira since we only only want to push whole groups
501509
# We dont check if the finding jira sync is applicable quite yet until we can get in the loop
502510
# but this is a way to at least make it that far
@@ -763,9 +771,10 @@ def process_matched_mitigated_finding(
763771
self.location_manager.chunk_locations_and_reactivate(mitigated_locations)
764772
else:
765773
# TODO: Delete this after the move to Locations
766-
# Reactivate mitigated endpoints that are not false positives, out of scope, or risk accepted
767-
# status_finding_non_special is prefetched by build_candidate_scope_queryset
768-
self.endpoint_manager.chunk_endpoints_and_reactivate(existing_finding.status_finding_non_special)
774+
# Accumulate endpoint statuses for bulk reactivation in persist()
775+
self.endpoint_manager.record_statuses_to_reactivate(
776+
self.endpoint_manager.get_non_special_endpoint_statuses(existing_finding),
777+
)
769778
existing_finding.notes.add(note)
770779
self.reactivated_items.append(existing_finding)
771780
# The new finding is active while the existing on is mitigated. The existing finding needs to
@@ -932,9 +941,13 @@ def finding_post_processing(
932941
self.location_manager.chunk_locations_and_disperse(finding, self.endpoints_to_add)
933942
else:
934943
# TODO: Delete this after the move to Locations
935-
self.endpoint_manager.chunk_endpoints_and_disperse(finding, finding_from_report.unsaved_endpoints)
944+
for endpoint in finding_from_report.unsaved_endpoints:
945+
key = self.endpoint_manager.record_endpoint(endpoint)
946+
self.endpoint_manager.record_status_for_create(finding, key)
936947
if len(self.endpoints_to_add) > 0:
937-
self.endpoint_manager.chunk_endpoints_and_disperse(finding, self.endpoints_to_add)
948+
for endpoint in self.endpoints_to_add:
949+
key = self.endpoint_manager.record_endpoint(endpoint)
950+
self.endpoint_manager.record_status_for_create(finding, key)
938951
# Parsers shouldn't use the tags field, and use unsaved_tags instead.
939952
# Merge any tags set by parser into unsaved_tags
940953
tags_from_parser = finding_from_report.tags if isinstance(finding_from_report.tags, list) else []

0 commit comments

Comments
 (0)