diff --git a/backend/Makefile b/backend/Makefile index e2e88f2415..a4292c1830 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -108,6 +108,8 @@ shell-db: sync-data: \ update-data \ + owasp-update-project-health-metrics \ + owasp-update-project-health-scores \ enrich-data \ index-data @@ -133,6 +135,7 @@ update-data: \ github-update-related-organizations \ github-update-users \ owasp-aggregate-projects \ + owasp-sync-official-project-levels \ owasp-update-events \ owasp-sync-posts \ owasp-update-sponsors \ diff --git a/backend/apps/owasp/Makefile b/backend/apps/owasp/Makefile index 4febcd2572..24b495c87c 100644 --- a/backend/apps/owasp/Makefile +++ b/backend/apps/owasp/Makefile @@ -33,6 +33,10 @@ owasp-update-project-health-metrics: @echo "Updating OWASP project health metrics" @CMD="python manage.py owasp_update_project_health_metrics" $(MAKE) exec-backend-command +owasp-sync-official-project-levels: + @echo "Syncing official OWASP project levels" + @CMD="python manage.py owasp_update_project_health_metrics --sync-official-levels-only" $(MAKE) exec-backend-command + owasp-update-project-health-requirements: @echo "Updating OWASP project health requirements" @CMD="python manage.py owasp_update_project_health_requirements" $(MAKE) exec-backend-command diff --git a/backend/apps/owasp/management/commands/owasp_detect_project_level_compliance.py b/backend/apps/owasp/management/commands/owasp_detect_project_level_compliance.py new file mode 100644 index 0000000000..21e7be718a --- /dev/null +++ b/backend/apps/owasp/management/commands/owasp_detect_project_level_compliance.py @@ -0,0 +1,112 @@ +"""A command to detect and report project level compliance status.""" + +import logging + +from django.core.management.base import BaseCommand + +from apps.owasp.models.project import Project + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + """Command to detect and report project level compliance status. + + This is a reporting command only - it does not sync or update any data. + For data synchronization, use the main data pipeline: make sync-data + + Architecture: + - Part 1: Official level syncing happens during 'make update-data' + - Part 2: Health scoring with compliance penalties happens during 'make sync-data' + - This command: Reporting and analysis only + """ + + help = "Detect and report projects with non-compliant level assignments" + + def add_arguments(self, parser): + """Add command line arguments.""" + parser.add_argument( + "--verbose", + action="store_true", + help="Enable verbose output showing all projects", + ) + + def handle(self, *args, **options): + """Execute compliance detection and reporting.""" + verbose = options["verbose"] + + self.stdout.write("Analyzing project level compliance status...") + + # Get all active projects + active_projects = Project.objects.filter(is_active=True).select_related() + + compliant_projects = [] + non_compliant_projects = [] + + for project in active_projects: + if project.is_level_compliant: + compliant_projects.append(project) + if verbose: + self.stdout.write(f"✓ {project.name}: {project.level} (matches official)") + else: + non_compliant_projects.append(project) + self.stdout.write( + self.style.WARNING( + f"✗ {project.name}: Local={project.level}, " + f"Official={project.project_level_official}" + ) + ) + + # Summary statistics + total_projects = len(active_projects) + compliant_count = len(compliant_projects) + non_compliant_count = len(non_compliant_projects) + compliance_rate = (compliant_count / total_projects * 100) if total_projects else 0.0 + + self.stdout.write("\n" + "=" * 60) + self.stdout.write("PROJECT LEVEL COMPLIANCE SUMMARY") + self.stdout.write("=" * 60) + self.stdout.write(f"Total active projects: {total_projects}") + self.stdout.write(f"Compliant projects: {compliant_count}") + self.stdout.write(f"Non-compliant projects: {non_compliant_count}") + self.stdout.write(f"Compliance rate: {compliance_rate:.1f}%") + + if non_compliant_count > 0: + warning_msg = f"WARNING: Found {non_compliant_count} non-compliant projects" + self.stdout.write(f"\n{self.style.WARNING(warning_msg)}") + penalty_msg = ( + "These projects will receive score penalties in the next health score update." + ) + self.stdout.write(penalty_msg) + else: + self.stdout.write(f"\n{self.style.SUCCESS('✓ All projects are level compliant!')}") + + # Log summary for monitoring + logger.info( + "Project level compliance analysis completed", + extra={ + "total_projects": total_projects, + "compliant_projects": compliant_count, + "non_compliant_projects": non_compliant_count, + "compliance_rate": f"{compliance_rate:.1f}%", + }, + ) + + # Check if official levels are populated + from apps.owasp.models.enums.project import ProjectLevel + + default_level = ProjectLevel.OTHER + projects_without_official_level = sum( + 1 for project in active_projects if project.project_level_official == default_level + ) + + if projects_without_official_level > 0: + info_msg = ( + f"INFO: {projects_without_official_level} projects have default official levels" + ) + self.stdout.write(f"\n{self.style.NOTICE(info_msg)}") + sync_msg = ( + "Run 'make update-data' to sync official levels, " + "then 'make sync-data' for scoring." + ) + self.stdout.write(sync_msg) diff --git a/backend/apps/owasp/management/commands/owasp_update_project_health_metrics.py b/backend/apps/owasp/management/commands/owasp_update_project_health_metrics.py index 71e51970e0..b85c0891ff 100644 --- a/backend/apps/owasp/management/commands/owasp_update_project_health_metrics.py +++ b/backend/apps/owasp/management/commands/owasp_update_project_health_metrics.py @@ -1,15 +1,164 @@ """A command to update OWASP project health metrics.""" +import logging + +import requests from django.core.management.base import BaseCommand +from requests.exceptions import RequestException from apps.owasp.models.project import Project from apps.owasp.models.project_health_metrics import ProjectHealthMetrics +logger = logging.getLogger(__name__) + +OWASP_PROJECT_LEVELS_URL = ( + "https://raw.githubusercontent.com/OWASP/owasp.github.io/main/_data/project_levels.json" +) + class Command(BaseCommand): help = "Update OWASP project health metrics." + def add_arguments(self, parser): + """Add command line arguments.""" + parser.add_argument( + "--skip-official-levels", + action="store_true", + help="Skip fetching official project levels from OWASP GitHub repository", + ) + parser.add_argument( + "--sync-official-levels-only", + action="store_true", + help="Only sync official project levels, skip health metrics updates", + ) + parser.add_argument( + "--timeout", + type=int, + default=30, + help="HTTP timeout for fetching project levels (default: 30 seconds)", + ) + + def fetch_official_project_levels(self, timeout: int = 30) -> dict[str, str] | None: + """Fetch project levels from OWASP GitHub repository. + + Args: + timeout: HTTP request timeout in seconds + + Returns: + Dict mapping project names to their official levels, or None if fetch fails + + """ + try: + response = requests.get( + OWASP_PROJECT_LEVELS_URL, + timeout=timeout, + headers={"Accept": "application/json"}, + ) + response.raise_for_status() + data = response.json() + if not isinstance(data, list): + logger.exception( + "Invalid project levels data format", + extra={"expected": "list", "got": type(data).__name__}, + ) + return None + + # Convert the list to a dict mapping project names to their levels + project_levels = {} + for entry in data: + if not isinstance(entry, dict): + continue + project_name = entry.get("name") + level = entry.get("level") + if ( + isinstance(project_name, str) + and isinstance(level, (str, int, float)) + and project_name.strip() + ): + project_levels[project_name.strip()] = str(level) + + except (RequestException, ValueError) as e: + logger.exception( + "Failed to fetch project levels", + extra={"url": OWASP_PROJECT_LEVELS_URL, "error": str(e)}, + ) + return None + else: + return project_levels + + def update_official_levels(self, official_levels: dict[str, str]) -> int: + """Update official levels for projects. + + Args: + official_levels: Dict mapping project names to their official levels + + Returns: + Number of projects updated + + """ + updated_count = 0 + projects_to_update = [] + + # Normalize official levels by stripping whitespace and normalizing case + normalized_official_levels = { + k.strip().lower(): v.strip().lower() for k, v in official_levels.items() + } + + for project in Project.objects.filter(is_active=True): + normalized_project_name = project.name.strip().lower() + if normalized_project_name in normalized_official_levels: + official_level = normalized_official_levels[normalized_project_name] + # Map string levels to enum values + level_mapping = { + "incubator": "incubator", + "lab": "lab", + "production": "production", + "flagship": "flagship", + "2": "incubator", + "3": "lab", + "3.5": "production", + "4": "flagship", + } + mapped_level = level_mapping.get(official_level, "other") + + if project.project_level_official != mapped_level: + project.project_level_official = mapped_level + projects_to_update.append(project) + updated_count += 1 + + if projects_to_update: + Project.bulk_save(projects_to_update, fields=["project_level_official"]) + self.stdout.write(f"Updated official levels for {updated_count} projects") + else: + self.stdout.write("No official level updates needed") + + return updated_count + def handle(self, *args, **options): + skip_official_levels = options["skip_official_levels"] + sync_official_levels_only = options["sync_official_levels_only"] + timeout = options["timeout"] + + # Part 1: Sync official project levels during project sync + if not skip_official_levels: + self.stdout.write("Fetching official project levels from OWASP GitHub repository...") + official_levels = self.fetch_official_project_levels(timeout=timeout) + if official_levels: + success_msg = ( + f"Successfully fetched {len(official_levels)} official project levels" + ) + self.stdout.write(success_msg) + self.update_official_levels(official_levels) + else: + warning_msg = "Failed to fetch official project levels, continuing without updates" + self.stdout.write(self.style.WARNING(warning_msg)) + + # If only syncing official levels, stop here (Part 1 only) + if sync_official_levels_only: + self.stdout.write(self.style.SUCCESS("Official level sync completed.")) + return + + # Part 2: Update project health metrics (only if not sync-only mode) metric_project_field_mapping = { "contributors_count": "contributors_count", "created_at": "created_at", diff --git a/backend/apps/owasp/management/commands/owasp_update_project_health_scores.py b/backend/apps/owasp/management/commands/owasp_update_project_health_scores.py index 44f11a5a41..7f4ccfc7c8 100644 --- a/backend/apps/owasp/management/commands/owasp_update_project_health_scores.py +++ b/backend/apps/owasp/management/commands/owasp_update_project_health_scores.py @@ -9,7 +9,8 @@ class Command(BaseCommand): help = "Update OWASP project health scores." - def handle(self, *args, **options): + def _get_field_weights(self): + """Return the field weights for scoring calculations.""" forward_fields = { "age_days": 6.0, "contributors_count": 6.0, @@ -31,11 +32,91 @@ def handle(self, *args, **options): "unanswered_issues_count": 6.0, "unassigned_issues_count": 6.0, } + return forward_fields, backward_fields + + def _calculate_base_score(self, metric, requirements, forward_fields, backward_fields): + """Calculate base score before applying any penalties.""" + score = 0.0 + + # Forward fields (higher values are better) + for field, weight in forward_fields.items(): + if int(getattr(metric, field)) >= int(getattr(requirements, field)): + score += weight + + # Backward fields (lower values are better) + for field, weight in backward_fields.items(): + if int(getattr(metric, field)) <= int(getattr(requirements, field)): + score += weight + + return score + + def _apply_compliance_penalty(self, score, metric, requirements): + """Apply compliance penalty if project is not level compliant.""" + if not metric.project.is_level_compliant: + penalty_percentage = float(getattr(requirements, "compliance_penalty_weight", 0.0)) + # Clamp to [0, 100] + penalty_percentage = max(0.0, min(100.0, penalty_percentage)) + penalty_amount = score * (penalty_percentage / 100.0) + final_score = max(0.0, score - penalty_amount) + + self._log_penalty_applied( + metric.project.name, + penalty_percentage, + penalty_amount, + final_score, + metric.project.level, + metric.project.project_level_official, + ) + return final_score, True + + return score, False + + def _log_penalty_applied( + self, + project_name, + penalty_percentage, + penalty_amount, + final_score, + local_level, + official_level, + ): + """Log penalty application details.""" + self.stdout.write( + self.style.WARNING( + f"Applied {penalty_percentage}% compliance penalty to " + f"{project_name} (penalty: {penalty_amount:.2f}, " + f"final score: {final_score:.2f}) [Local: {local_level}, " + f"Official: {official_level}]" + ) + ) + + def _log_compliance_summary(self, penalties_applied, total_projects_scored): + """Log final compliance summary.""" + if penalties_applied > 0: + compliance_rate = ( + (total_projects_scored - penalties_applied) / total_projects_scored * 100 + if total_projects_scored + else 0 + ) + self.stdout.write( + self.style.NOTICE( + f"Compliance Summary: {penalties_applied}/{total_projects_scored} projects " + f"received penalties ({compliance_rate:.1f}% compliant)" + ) + ) + + def handle(self, *args, **options): + forward_fields, backward_fields = self._get_field_weights() project_health_metrics = [] project_health_requirements = { phr.level: phr for phr in ProjectHealthRequirements.objects.all() } + + # Compliance tracking + penalties_applied = 0 + total_projects_scored = 0 + for metric in ProjectHealthMetrics.objects.filter( score__isnull=True, ).select_related( @@ -46,18 +127,30 @@ def handle(self, *args, **options): self.style.NOTICE(f"Updating score for project: {metric.project.name}") ) - requirements = project_health_requirements[metric.project.level] + requirements = project_health_requirements.get(metric.project.level) + if requirements is None: + self.stdout.write( + self.style.WARNING( + f"Missing ProjectHealthRequirements for level '{metric.project.level}' — " + f"skipping scoring for {metric.project.name}" + ) + ) + continue - score = 0.0 - for field, weight in forward_fields.items(): - if int(getattr(metric, field)) >= int(getattr(requirements, field)): - score += weight + total_projects_scored += 1 - for field, weight in backward_fields.items(): - if int(getattr(metric, field)) <= int(getattr(requirements, field)): - score += weight + # Calculate base score + score = self._calculate_base_score( + metric, requirements, forward_fields, backward_fields + ) - metric.score = score + # Apply compliance penalty if needed + score, penalty_applied = self._apply_compliance_penalty(score, metric, requirements) + if penalty_applied: + penalties_applied += 1 + + # Ensure score stays within bounds (0-100) + metric.score = max(0.0, min(100.0, score)) project_health_metrics.append(metric) ProjectHealthMetrics.bulk_save( @@ -66,4 +159,7 @@ def handle(self, *args, **options): "score", ], ) + + # Summary with compliance impact self.stdout.write(self.style.SUCCESS("Updated project health scores successfully.")) + self._log_compliance_summary(penalties_applied, total_projects_scored) diff --git a/backend/apps/owasp/migrations/0047_add_is_level_compliant_field.py b/backend/apps/owasp/migrations/0047_add_is_level_compliant_field.py new file mode 100644 index 0000000000..198e3c3f31 --- /dev/null +++ b/backend/apps/owasp/migrations/0047_add_is_level_compliant_field.py @@ -0,0 +1,21 @@ +# Generated by Django 5.2.5 on 2025-08-12 21:00 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("owasp", "0046_merge_0045_badge_0045_project_audience"), + ] + + operations = [ + migrations.AddField( + model_name="projecthealthmetrics", + name="is_level_compliant", + field=models.BooleanField( + default=True, + help_text="Whether the project's local level matches the official OWASP level", + verbose_name="Is project level compliant", + ), + ), + ] diff --git a/backend/apps/owasp/migrations/0048_add_compliance_penalty_weight.py b/backend/apps/owasp/migrations/0048_add_compliance_penalty_weight.py new file mode 100644 index 0000000000..6ddd51b25f --- /dev/null +++ b/backend/apps/owasp/migrations/0048_add_compliance_penalty_weight.py @@ -0,0 +1,29 @@ +# Generated by Django 5.2.5 on 2025-08-14 15:17 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("owasp", "0047_add_is_level_compliant_field"), + ] + + operations = [ + migrations.AddField( + model_name="projecthealthrequirements", + name="compliance_penalty_weight", + field=models.FloatField( + default=10.0, + help_text="Percentage penalty applied to non-compliant projects (0-100)", + verbose_name="Compliance penalty weight (%)", + ), + ), + migrations.AddConstraint( + model_name="projecthealthrequirements", + constraint=models.CheckConstraint( + name="owasp_compliance_penalty_weight_0_100", + check=models.Q(compliance_penalty_weight__gte=0.0) + & models.Q(compliance_penalty_weight__lte=100.0), + ), + ), + ] diff --git a/backend/apps/owasp/migrations/0049_remove_projecthealthrequirements_owasp_compliance_penalty_weight_0_100_and_more.py b/backend/apps/owasp/migrations/0049_remove_projecthealthrequirements_owasp_compliance_penalty_weight_0_100_and_more.py new file mode 100644 index 0000000000..7da585a3ce --- /dev/null +++ b/backend/apps/owasp/migrations/0049_remove_projecthealthrequirements_owasp_compliance_penalty_weight_0_100_and_more.py @@ -0,0 +1,26 @@ +# Generated by Django 5.2.5 on 2025-08-18 12:29 + +import django.core.validators +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("owasp", "0048_add_compliance_penalty_weight"), + ] + + operations = [ + migrations.AlterField( + model_name="projecthealthrequirements", + name="compliance_penalty_weight", + field=models.FloatField( + default=10.0, + help_text="Percentage penalty applied to non-compliant projects (0-100)", + validators=[ + django.core.validators.MinValueValidator(0.0), + django.core.validators.MaxValueValidator(100.0), + ], + verbose_name="Compliance penalty weight (%)", + ), + ), + ] diff --git a/backend/apps/owasp/models/project.py b/backend/apps/owasp/models/project.py index 4a67afce6b..48db67d6d7 100644 --- a/backend/apps/owasp/models/project.py +++ b/backend/apps/owasp/models/project.py @@ -60,6 +60,13 @@ class Meta: default=ProjectLevel.OTHER, ) level_raw = models.CharField(verbose_name="Level raw", max_length=50, default="") + project_level_official = models.CharField( + verbose_name="Official Level", + max_length=20, + choices=ProjectLevel.choices, + default=ProjectLevel.OTHER, + help_text="Official project level from OWASP GitHub repository", + ) type = models.CharField( verbose_name="Type", @@ -151,6 +158,11 @@ def is_leader_requirements_compliant(self) -> bool: # Have multiple Project Leaders who are not all employed by the same company. return self.leaders_count > 1 + @property + def is_level_compliant(self) -> bool: + """Indicate whether project level matches the official OWASP level.""" + return self.level == self.project_level_official + @property def is_tool_type(self) -> bool: """Indicate whether project has TOOL type.""" diff --git a/backend/apps/owasp/models/project_health_requirements.py b/backend/apps/owasp/models/project_health_requirements.py index 0feaa7b1bd..8f99346aad 100644 --- a/backend/apps/owasp/models/project_health_requirements.py +++ b/backend/apps/owasp/models/project_health_requirements.py @@ -1,5 +1,6 @@ """Project health requirements model.""" +from django.core.validators import MaxValueValidator, MinValueValidator from django.db import models from apps.common.models import TimestampedModel @@ -57,6 +58,12 @@ class Meta: unassigned_issues_count = models.PositiveIntegerField( verbose_name="Unassigned issues", default=0 ) + compliance_penalty_weight = models.FloatField( + verbose_name="Compliance penalty weight (%)", + default=10.0, + help_text="Percentage penalty applied to non-compliant projects (0-100)", + validators=[MinValueValidator(0.0), MaxValueValidator(100.0)], + ) def __str__(self) -> str: """Project health requirements human readable representation.""" diff --git a/backend/tests/apps/owasp/management/commands/owasp_detect_project_level_compliance_test.py b/backend/tests/apps/owasp/management/commands/owasp_detect_project_level_compliance_test.py new file mode 100644 index 0000000000..c51b37b21e --- /dev/null +++ b/backend/tests/apps/owasp/management/commands/owasp_detect_project_level_compliance_test.py @@ -0,0 +1,258 @@ +"""Tests for owasp_detect_project_level_compliance command.""" + +from io import StringIO +from unittest.mock import MagicMock, patch + +import pytest +from django.core.management import call_command +from django.db.models.base import ModelState + +from apps.owasp.management.commands.owasp_detect_project_level_compliance import Command +from apps.owasp.models.project import Project + +# Test constants +OWASP_ZAP_NAME = "OWASP ZAP" +OWASP_TEST_PROJECT_NAME = "OWASP Test Project" +OWASP_TOP_TEN_NAME = "OWASP Top 10" +PROJECT_FILTER_PATCH = "apps.owasp.models.project.Project.objects.filter" +STDOUT_PATCH = "sys.stdout" +FLAGSHIP_LEVEL = "flagship" +PRODUCTION_LEVEL = "production" +LAB_LEVEL = "lab" +OTHER_LEVEL = "other" +COMPLIANCE_SUMMARY_HEADER = "PROJECT LEVEL COMPLIANCE SUMMARY" +TOTAL_PROJECTS_PREFIX = "Total active projects:" +COMPLIANT_PROJECTS_PREFIX = "Compliant projects:" +NON_COMPLIANT_PROJECTS_PREFIX = "Non-compliant projects:" +COMPLIANCE_RATE_PREFIX = "Compliance rate:" +ALL_COMPLIANT_MESSAGE = "✓ All projects are level compliant!" +WARNING_PREFIX = "WARNING: Found" +INFO_PREFIX = "INFO:" +SUCCESS_CHECK = "✓" +ERROR_CHECK = "✗" + + +class TestDetectProjectLevelComplianceCommand: + """Test cases for the project level compliance detection command.""" + + @pytest.fixture(autouse=True) + def _setup(self): + """Set up test environment.""" + self.stdout = StringIO() + self.command = Command() + + def test_handle_all_compliant_projects(self): + """Test command output when all projects are compliant.""" + # Create mock compliant projects + project1 = MagicMock(spec=Project) + project1._state = ModelState() + project1.name = OWASP_ZAP_NAME + project1.level = FLAGSHIP_LEVEL + project1.project_level_official = FLAGSHIP_LEVEL + project1.is_level_compliant = True + + project2 = MagicMock(spec=Project) + project2._state = ModelState() + project2.name = OWASP_TOP_TEN_NAME + project2.level = FLAGSHIP_LEVEL + project2.project_level_official = FLAGSHIP_LEVEL + project2.is_level_compliant = True + + project3 = MagicMock(spec=Project) + project3._state = ModelState() + project3.name = OWASP_TEST_PROJECT_NAME + project3.level = PRODUCTION_LEVEL + project3.project_level_official = PRODUCTION_LEVEL + project3.is_level_compliant = True + + projects = [project1, project2, project3] + + with patch(PROJECT_FILTER_PATCH) as mock_filter, patch(STDOUT_PATCH, new=self.stdout): + mock_filter.return_value.select_related.return_value = projects + + call_command("owasp_detect_project_level_compliance") + + output = self.stdout.getvalue() + + # Verify summary output + assert COMPLIANCE_SUMMARY_HEADER in output + assert f"{TOTAL_PROJECTS_PREFIX} 3" in output + assert f"{COMPLIANT_PROJECTS_PREFIX} 3" in output + assert f"{NON_COMPLIANT_PROJECTS_PREFIX} 0" in output + assert f"{COMPLIANCE_RATE_PREFIX} 100.0%" in output + assert ALL_COMPLIANT_MESSAGE in output + + def test_handle_mixed_compliance_projects(self): + """Test command output with both compliant and non-compliant projects.""" + # Create mixed compliance projects + project1 = MagicMock(spec=Project) + project1._state = ModelState() + project1.name = OWASP_ZAP_NAME + project1.level = FLAGSHIP_LEVEL + project1.project_level_official = FLAGSHIP_LEVEL + project1.is_level_compliant = True + + project2 = MagicMock(spec=Project) + project2._state = ModelState() + project2.name = OWASP_TEST_PROJECT_NAME + project2.level = LAB_LEVEL + project2.project_level_official = PRODUCTION_LEVEL + project2.is_level_compliant = False + + project3 = MagicMock(spec=Project) + project3._state = ModelState() + project3.name = OWASP_TOP_TEN_NAME + project3.level = PRODUCTION_LEVEL + project3.project_level_official = FLAGSHIP_LEVEL + project3.is_level_compliant = False + + projects = [project1, project2, project3] + + with patch(PROJECT_FILTER_PATCH) as mock_filter, patch(STDOUT_PATCH, new=self.stdout): + mock_filter.return_value.select_related.return_value = projects + + call_command("owasp_detect_project_level_compliance") + + output = self.stdout.getvalue() + + # Verify summary output + assert f"{TOTAL_PROJECTS_PREFIX} 3" in output + assert f"{COMPLIANT_PROJECTS_PREFIX} 1" in output + assert f"{NON_COMPLIANT_PROJECTS_PREFIX} 2" in output + assert f"{COMPLIANCE_RATE_PREFIX} 33.3%" in output + assert f"{WARNING_PREFIX} 2 non-compliant projects" in output + + # Verify non-compliant projects are listed + error_msg1 = ( + f"{ERROR_CHECK} {OWASP_TEST_PROJECT_NAME}: " + f"Local={LAB_LEVEL}, Official={PRODUCTION_LEVEL}" + ) + assert error_msg1 in output + error_msg2 = ( + f"{ERROR_CHECK} {OWASP_TOP_TEN_NAME}: " + f"Local={PRODUCTION_LEVEL}, Official={FLAGSHIP_LEVEL}" + ) + assert error_msg2 in output + + def test_handle_verbose_output(self): + """Test command with verbose flag shows all projects.""" + project1 = MagicMock(spec=Project) + project1._state = ModelState() + project1.name = OWASP_ZAP_NAME + project1.level = FLAGSHIP_LEVEL + project1.project_level_official = FLAGSHIP_LEVEL + project1.is_level_compliant = True + + project2 = MagicMock(spec=Project) + project2._state = ModelState() + project2.name = OWASP_TEST_PROJECT_NAME + project2.level = LAB_LEVEL + project2.project_level_official = PRODUCTION_LEVEL + project2.is_level_compliant = False + + projects = [project1, project2] + + with patch(PROJECT_FILTER_PATCH) as mock_filter, patch(STDOUT_PATCH, new=self.stdout): + mock_filter.return_value.select_related.return_value = projects + + call_command("owasp_detect_project_level_compliance", "--verbose") + + output = self.stdout.getvalue() + + # Verify both compliant and non-compliant projects are shown + success_msg = f"{SUCCESS_CHECK} {OWASP_ZAP_NAME}: {FLAGSHIP_LEVEL} (matches official)" + assert success_msg in output + error_msg = ( + f"{ERROR_CHECK} {OWASP_TEST_PROJECT_NAME}: " + f"Local={LAB_LEVEL}, Official={PRODUCTION_LEVEL}" + ) + assert error_msg in output + + def test_handle_no_projects(self): + """Test command output when no active projects exist.""" + with patch(PROJECT_FILTER_PATCH) as mock_filter, patch(STDOUT_PATCH, new=self.stdout): + mock_filter.return_value.select_related.return_value = [] + + call_command("owasp_detect_project_level_compliance") + + output = self.stdout.getvalue() + + # Verify summary for empty project list + assert f"{TOTAL_PROJECTS_PREFIX} 0" in output + assert f"{COMPLIANT_PROJECTS_PREFIX} 0" in output + assert f"{NON_COMPLIANT_PROJECTS_PREFIX} 0" in output + assert ALL_COMPLIANT_MESSAGE in output + + def test_handle_projects_without_official_levels(self): + """Test command detects projects with default official levels.""" + project1 = MagicMock(spec=Project) + project1._state = ModelState() + project1.name = OWASP_ZAP_NAME + project1.level = FLAGSHIP_LEVEL + project1.project_level_official = FLAGSHIP_LEVEL + project1.is_level_compliant = True + + project2 = MagicMock(spec=Project) + project2._state = ModelState() + project2.name = OWASP_TEST_PROJECT_NAME + project2.level = LAB_LEVEL + project2.project_level_official = OTHER_LEVEL # Default official level + project2.is_level_compliant = True + + projects = [project1, project2] + + with patch(PROJECT_FILTER_PATCH) as mock_filter, patch(STDOUT_PATCH, new=self.stdout): + # Mock the filter for projects without official levels + mock_filter.return_value.select_related.return_value = projects + mock_filter.return_value.filter.return_value.count.return_value = 1 + + call_command("owasp_detect_project_level_compliance") + + output = self.stdout.getvalue() + + # Verify info message about default official levels + assert f"{INFO_PREFIX} 1 projects have default official levels" in output + assert ( + "Run 'make update-data' to sync official levels, " + "then 'make sync-data' for scoring." in output + ) + + def test_compliance_rate_calculation(self): + """Test compliance rate calculation with various scenarios.""" + test_cases = [ + ([], 0, 0, 0.0), # No projects + ([True], 1, 0, 100.0), # All compliant + ([False], 0, 1, 0.0), # All non-compliant + ([True, False, True], 2, 1, 66.7), # Mixed + ] + + for ( + compliance_statuses, + expected_compliant, + expected_non_compliant, + expected_rate, + ) in test_cases: + projects = [] + for i, is_compliant in enumerate(compliance_statuses): + project = MagicMock(spec=Project) + project._state = ModelState() + project.name = f"Project {i}" + project.level = LAB_LEVEL + project.project_level_official = LAB_LEVEL if is_compliant else FLAGSHIP_LEVEL + project.is_level_compliant = is_compliant + projects.append(project) + + with ( + patch("apps.owasp.models.project.Project.objects.filter") as mock_filter, + patch("sys.stdout", new=StringIO()) as mock_stdout, + ): + mock_filter.return_value.select_related.return_value = projects + mock_filter.return_value.filter.return_value.count.return_value = 0 + + call_command("owasp_detect_project_level_compliance") + + output = mock_stdout.getvalue() + + assert f"{COMPLIANT_PROJECTS_PREFIX} {expected_compliant}" in output + assert f"{NON_COMPLIANT_PROJECTS_PREFIX} {expected_non_compliant}" in output + assert f"{COMPLIANCE_RATE_PREFIX} {expected_rate:.1f}%" in output diff --git a/backend/tests/apps/owasp/management/commands/owasp_update_project_health_metrics_test.py b/backend/tests/apps/owasp/management/commands/owasp_update_project_health_metrics_test.py index 8fad2b6073..d8571bba4b 100644 --- a/backend/tests/apps/owasp/management/commands/owasp_update_project_health_metrics_test.py +++ b/backend/tests/apps/owasp/management/commands/owasp_update_project_health_metrics_test.py @@ -2,6 +2,7 @@ from unittest.mock import MagicMock, patch import pytest +import requests from django.core.management import call_command from django.db.models.base import ModelState @@ -9,6 +10,35 @@ from apps.owasp.models.project import Project from apps.owasp.models.project_health_metrics import ProjectHealthMetrics +# Test constants +TEST_PROJECT_NAME = "Test Project" +OWASP_ZAP_NAME = "OWASP ZAP" +OWASP_TOP_TEN_NAME = "OWASP Top 10" +OWASP_TEST_PROJECT_NAME = "OWASP Test Project" +VALID_PROJECT_NAME = "Valid Project" +ANOTHER_VALID_NAME = "Another Valid" +VALID_WITH_NUMBER_NAME = "Valid with number" +PROJECT_FILTER_PATCH = "apps.owasp.models.project.Project.objects.filter" +PROJECT_BULK_SAVE_PATCH = "apps.owasp.models.project.Project.bulk_save" +METRICS_BULK_SAVE_PATCH = "apps.owasp.models.project_health_metrics.ProjectHealthMetrics.bulk_save" +STDOUT_PATCH = "sys.stdout" +FLAGSHIP_LEVEL = "flagship" +PRODUCTION_LEVEL = "production" +LAB_LEVEL = "lab" +INCUBATOR_LEVEL = "incubator" +OTHER_LEVEL = "other" +OWASP_LEVELS_URL = ( + "https://raw.githubusercontent.com/OWASP/owasp.github.io/main/_data/project_levels.json" +) +FETCHING_OFFICIAL_LEVELS_MSG = "Fetching official project levels" +SUCCESSFULLY_FETCHED_MSG = "Successfully fetched" +UPDATED_OFFICIAL_LEVELS_MSG = "Updated official levels for" +EVALUATING_METRICS_MSG = "Evaluating metrics for project:" +NETWORK_ERROR_MSG = "Network error" +INVALID_JSON_ERROR_MSG = "Invalid JSON" +INVALID_FORMAT_ERROR = "Invalid format" +TIMEOUT_30_SECONDS = 30 + class TestUpdateProjectHealthMetricsCommand: @pytest.fixture(autouse=True) @@ -17,10 +47,8 @@ def _setup(self): self.stdout = StringIO() self.command = Command() with ( - patch("apps.owasp.models.project.Project.objects.filter") as projects_patch, - patch( - "apps.owasp.models.project_health_metrics.ProjectHealthMetrics.bulk_save" - ) as bulk_save_patch, + patch(PROJECT_FILTER_PATCH) as projects_patch, + patch(METRICS_BULK_SAVE_PATCH) as bulk_save_patch, ): self.mock_projects = projects_patch self.mock_bulk_save = bulk_save_patch @@ -29,7 +57,7 @@ def _setup(self): def test_handle_successful_update(self): """Test successful metrics update.""" test_data = { - "name": "Test Project", + "name": TEST_PROJECT_NAME, "contributors_count": 10, "created_at": "2023-01-01", "forks_count": 2, @@ -61,7 +89,7 @@ def test_handle_successful_update(self): mock_project.leaders_count = 2 # Execute command - with patch("sys.stdout", new=self.stdout): + with patch(STDOUT_PATCH, new=self.stdout): call_command("owasp_update_project_health_metrics") self.mock_bulk_save.assert_called_once() @@ -72,4 +100,296 @@ def test_handle_successful_update(self): assert metrics.project == mock_project # Verify command output - assert "Evaluating metrics for project: Test Project" in self.stdout.getvalue() + assert f"{EVALUATING_METRICS_MSG} {TEST_PROJECT_NAME}" in self.stdout.getvalue() + + @patch("requests.get") + def test_fetch_official_project_levels_success(self, mock_get): + """Test successful fetching of official project levels.""" + # Mock successful API response + mock_response = MagicMock() + mock_response.json.return_value = [ + {"name": OWASP_ZAP_NAME, "level": FLAGSHIP_LEVEL}, + {"name": OWASP_TOP_TEN_NAME, "level": FLAGSHIP_LEVEL}, + {"name": OWASP_TEST_PROJECT_NAME, "level": PRODUCTION_LEVEL}, + {"name": TEST_PROJECT_NAME, "level": LAB_LEVEL}, + ] + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + result = self.command.fetch_official_project_levels(timeout=TIMEOUT_30_SECONDS) + + assert result is not None + assert len(result) == 4 + assert result[OWASP_ZAP_NAME] == FLAGSHIP_LEVEL + assert result[OWASP_TOP_TEN_NAME] == FLAGSHIP_LEVEL + assert result[OWASP_TEST_PROJECT_NAME] == PRODUCTION_LEVEL + assert result[TEST_PROJECT_NAME] == LAB_LEVEL + + # Verify API call + mock_get.assert_called_once_with( + OWASP_LEVELS_URL, timeout=TIMEOUT_30_SECONDS, headers={"Accept": "application/json"} + ) + + @patch("requests.get") + def test_fetch_official_project_levels_http_error(self, mock_get): + """Test handling of HTTP errors when fetching official levels.""" + mock_get.side_effect = requests.exceptions.RequestException(NETWORK_ERROR_MSG) + + result = self.command.fetch_official_project_levels(timeout=TIMEOUT_30_SECONDS) + + assert result is None + + @patch("requests.get") + def test_fetch_official_project_levels_invalid_json(self, mock_get): + """Test handling of invalid JSON response.""" + mock_response = MagicMock() + mock_response.json.side_effect = ValueError(INVALID_JSON_ERROR_MSG) + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + result = self.command.fetch_official_project_levels(timeout=TIMEOUT_30_SECONDS) + + assert result is None + + @patch("requests.get") + def test_fetch_official_project_levels_invalid_format(self, mock_get): + """Test handling of invalid data format (not a list).""" + mock_response = MagicMock() + mock_response.json.return_value = {"error": INVALID_FORMAT_ERROR} + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + result = self.command.fetch_official_project_levels(timeout=TIMEOUT_30_SECONDS) + + assert result is None + + @patch("requests.get") + def test_fetch_official_project_levels_filters_invalid_entries(self, mock_get): + """Test that invalid entries are filtered out.""" + mock_response = MagicMock() + mock_response.json.return_value = [ + {"name": VALID_PROJECT_NAME, "level": FLAGSHIP_LEVEL}, + {"name": "", "level": LAB_LEVEL}, # Empty name should be filtered + {"level": PRODUCTION_LEVEL}, # Missing name should be filtered + {"name": ANOTHER_VALID_NAME, "level": INCUBATOR_LEVEL}, + {"name": VALID_WITH_NUMBER_NAME, "level": 3}, # Number level should work + {"name": "Invalid level"}, # Missing level should be filtered + ] + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + result = self.command.fetch_official_project_levels(timeout=TIMEOUT_30_SECONDS) + + assert result is not None + assert len(result) == 3 + assert result[VALID_PROJECT_NAME] == FLAGSHIP_LEVEL + assert result[ANOTHER_VALID_NAME] == INCUBATOR_LEVEL + assert result[VALID_WITH_NUMBER_NAME] == "3" + + def test_update_official_levels_success(self): + """Test successful update of official levels.""" + # Create mock projects + project1 = MagicMock(spec=Project) + project1.name = OWASP_ZAP_NAME + project1.project_level_official = LAB_LEVEL # Different from official + project1._state = ModelState() + + project2 = MagicMock(spec=Project) + project2.name = OWASP_TOP_TEN_NAME + project2.project_level_official = FLAGSHIP_LEVEL # Same as official + project2._state = ModelState() + + official_levels = { + OWASP_ZAP_NAME: FLAGSHIP_LEVEL, + OWASP_TOP_TEN_NAME: FLAGSHIP_LEVEL, + } + + with ( + patch(PROJECT_FILTER_PATCH) as mock_filter, + patch(PROJECT_BULK_SAVE_PATCH) as mock_bulk_save, + ): + mock_filter.return_value = [project1, project2] + + updated_count = self.command.update_official_levels(official_levels) + + # Only project1 should be updated (different level) + assert updated_count == 1 + assert project1.project_level_official == FLAGSHIP_LEVEL + assert project2.project_level_official == FLAGSHIP_LEVEL # Unchanged + + # Verify bulk_save was called with only the updated project + mock_bulk_save.assert_called_once() + saved_projects = mock_bulk_save.call_args[0][0] + assert len(saved_projects) == 1 + assert saved_projects[0] == project1 + + def test_update_official_levels_level_mapping(self): + """Test that level mapping works correctly.""" + project = MagicMock(spec=Project) + project.name = TEST_PROJECT_NAME + project.project_level_official = OTHER_LEVEL + project._state = ModelState() + + test_cases = [ + ("2", INCUBATOR_LEVEL), + ("3", LAB_LEVEL), + ("3.5", PRODUCTION_LEVEL), + ("4", FLAGSHIP_LEVEL), + (INCUBATOR_LEVEL, INCUBATOR_LEVEL), + (LAB_LEVEL, LAB_LEVEL), + (PRODUCTION_LEVEL, PRODUCTION_LEVEL), + (FLAGSHIP_LEVEL, FLAGSHIP_LEVEL), + ("unknown", OTHER_LEVEL), + ] + + for official_level, expected_mapped in test_cases: + with ( + patch(PROJECT_FILTER_PATCH) as mock_filter, + patch(PROJECT_BULK_SAVE_PATCH) as mock_bulk_save, + ): + mock_filter.return_value = [project] + project.project_level_official = OTHER_LEVEL # Reset + + official_levels = {TEST_PROJECT_NAME: official_level} + updated_count = self.command.update_official_levels(official_levels) + + assert project.project_level_official == expected_mapped + if expected_mapped != OTHER_LEVEL: # Only count as update if level changed + assert updated_count == 1 + mock_bulk_save.assert_called_once() + else: + assert updated_count == 0 + + @patch("requests.get") + def test_handle_with_official_levels_integration(self, mock_get): + """Test complete integration with official levels fetching.""" + # Mock API response + mock_response = MagicMock() + mock_response.json.return_value = [ + {"name": TEST_PROJECT_NAME, "level": FLAGSHIP_LEVEL}, + ] + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + # Mock project + mock_project = MagicMock(spec=Project) + mock_project._state = ModelState() + mock_project.name = TEST_PROJECT_NAME + mock_project.project_level_official = LAB_LEVEL # Different from official + for field in [ + "contributors_count", + "created_at", + "forks_count", + "is_funding_requirements_compliant", + "is_leader_requirements_compliant", + "pushed_at", + "released_at", + "open_issues_count", + "open_pull_requests_count", + "owasp_page_last_updated_at", + "pull_request_last_created_at", + "recent_releases_count", + "stars_count", + "issues_count", + "pull_requests_count", + "releases_count", + "unanswered_issues_count", + "unassigned_issues_count", + ]: + setattr(mock_project, field, 0) + + with ( + patch(PROJECT_FILTER_PATCH) as mock_projects, + patch(PROJECT_BULK_SAVE_PATCH), + patch(METRICS_BULK_SAVE_PATCH), + patch(STDOUT_PATCH, new=self.stdout), + ): + mock_projects.return_value = [mock_project] + + call_command("owasp_update_project_health_metrics") + + # Verify official levels were fetched and updated + assert FETCHING_OFFICIAL_LEVELS_MSG in self.stdout.getvalue() + success_msg = f"{SUCCESSFULLY_FETCHED_MSG} 1 official project levels" + assert success_msg in self.stdout.getvalue() + assert f"{UPDATED_OFFICIAL_LEVELS_MSG} 1 projects" in self.stdout.getvalue() + + # Verify project was updated with official level + assert mock_project.project_level_official == FLAGSHIP_LEVEL + + def test_handle_sync_official_levels_only(self): + """Test command with --sync-official-levels-only flag.""" + # Create mock project + mock_project = MagicMock(spec=Project) + mock_project.name = TEST_PROJECT_NAME + mock_project.project_level_official = LAB_LEVEL # Different from official + mock_project._state = ModelState() + + with ( + patch(PROJECT_FILTER_PATCH) as mock_filter, + patch(PROJECT_BULK_SAVE_PATCH) as mock_bulk_save, + patch("requests.get") as mock_get, + patch(STDOUT_PATCH, new=self.stdout), + ): + # Mock API response + mock_response = MagicMock() + mock_response.json.return_value = [ + {"name": TEST_PROJECT_NAME, "level": FLAGSHIP_LEVEL} + ] + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + mock_filter.return_value = [mock_project] + + call_command("owasp_update_project_health_metrics", "--sync-official-levels-only") + + # Verify official levels were synced + output = self.stdout.getvalue() + assert "Official level sync completed." in output + # Health metrics should be skipped + assert "Evaluating metrics for project" not in output + + # Verify project was updated + assert mock_project.project_level_official == FLAGSHIP_LEVEL + mock_bulk_save.assert_called_once() + + def test_handle_skip_official_levels(self): + """Test command with --skip-official-levels flag.""" + mock_project = MagicMock(spec=Project) + mock_project._state = ModelState() + mock_project.name = TEST_PROJECT_NAME + for field in [ + "contributors_count", + "created_at", + "forks_count", + "is_funding_requirements_compliant", + "is_leader_requirements_compliant", + "pushed_at", + "released_at", + "open_issues_count", + "open_pull_requests_count", + "owasp_page_last_updated_at", + "pull_request_last_created_at", + "recent_releases_count", + "stars_count", + "issues_count", + "pull_requests_count", + "releases_count", + "unanswered_issues_count", + "unassigned_issues_count", + ]: + setattr(mock_project, field, 0) + + with ( + patch(PROJECT_FILTER_PATCH) as mock_projects, + patch(METRICS_BULK_SAVE_PATCH), + patch(STDOUT_PATCH, new=self.stdout), + ): + mock_projects.return_value = [mock_project] + + call_command("owasp_update_project_health_metrics", "--skip-official-levels") + + # Verify official levels fetching was skipped + output = self.stdout.getvalue() + assert FETCHING_OFFICIAL_LEVELS_MSG not in output + assert f"{EVALUATING_METRICS_MSG} {TEST_PROJECT_NAME}" in output diff --git a/backend/tests/apps/owasp/management/commands/owasp_update_project_health_scores_test.py b/backend/tests/apps/owasp/management/commands/owasp_update_project_health_scores_test.py index de7862a6dd..053c1d7f2b 100644 --- a/backend/tests/apps/owasp/management/commands/owasp_update_project_health_scores_test.py +++ b/backend/tests/apps/owasp/management/commands/owasp_update_project_health_scores_test.py @@ -1,5 +1,5 @@ from io import StringIO -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, PropertyMock, patch import pytest from django.core.management import call_command @@ -8,7 +8,28 @@ from apps.owasp.models.project_health_metrics import ProjectHealthMetrics from apps.owasp.models.project_health_requirements import ProjectHealthRequirements +# Test constants +TEST_PROJECT_NAME = "Test Project" +NON_COMPLIANT_PROJECT_NAME = "Non-Compliant Project" +COMPLIANT_PROJECT_NAME = "Compliant Project" +STDOUT_PATCH = "sys.stdout" +# Long constants broken down for line length +METRICS_FILTER_PATCH = ( + "apps.owasp.models.project_health_metrics.ProjectHealthMetrics.objects.filter" +) +REQUIREMENTS_ALL_PATCH = ( + "apps.owasp.models.project_health_requirements.ProjectHealthRequirements.objects.all" +) +METRICS_BULK_SAVE_PATCH = "apps.owasp.models.project_health_metrics.ProjectHealthMetrics.bulk_save" EXPECTED_SCORE = 34.0 +LAB_LEVEL = "lab" +FLAGSHIP_LEVEL = "flagship" +PRODUCTION_LEVEL = "production" +PENALTY_TWENTY_PERCENT = 20.0 +PENALTY_ZERO_PERCENT = 0.0 +PENALTY_HUNDRED_PERCENT = 100.0 +FULL_SCORE_THRESHOLD = 90.0 +FLOAT_PRECISION = 0.01 class TestUpdateProjectHealthMetricsScoreCommand: @@ -18,15 +39,9 @@ def _setup(self): self.stdout = StringIO() self.command = Command() with ( - patch( - "apps.owasp.models.project_health_metrics.ProjectHealthMetrics.objects.filter" - ) as metrics_patch, - patch( - "apps.owasp.models.project_health_requirements.ProjectHealthRequirements.objects.all" - ) as requirements_patch, - patch( - "apps.owasp.models.project_health_metrics.ProjectHealthMetrics.bulk_save" - ) as bulk_save_patch, + patch(METRICS_FILTER_PATCH) as metrics_patch, + patch(REQUIREMENTS_ALL_PATCH) as requirements_patch, + patch(METRICS_BULK_SAVE_PATCH) as bulk_save_patch, ): self.mock_metrics = metrics_patch self.mock_requirements = requirements_patch @@ -60,14 +75,15 @@ def test_handle_successful_update(self): setattr(mock_metric, field, metric_weight) setattr(mock_requirements, field, requirement_weight) mock_metric.project.level = "test_level" - mock_metric.project.name = "Test Project" + mock_metric.project.name = TEST_PROJECT_NAME + type(mock_metric.project).is_level_compliant = PropertyMock(return_value=True) mock_metric.is_funding_requirements_compliant = True mock_metric.is_leader_requirements_compliant = True self.mock_metrics.return_value.select_related.return_value = [mock_metric] self.mock_requirements.return_value = [mock_requirements] mock_requirements.level = "test_level" # Execute command - with patch("sys.stdout", new=self.stdout): + with patch(STDOUT_PATCH, new=self.stdout): call_command("owasp_update_project_health_scores") self.mock_requirements.assert_called_once() @@ -79,6 +95,393 @@ def test_handle_successful_update(self): "score", ], ) - assert mock_metric.score == EXPECTED_SCORE + assert ( + abs(mock_metric.score - EXPECTED_SCORE) < FLOAT_PRECISION + ) # Use approximate comparison for float + assert "Updated project health scores successfully." in self.stdout.getvalue() + assert f"Updating score for project: {TEST_PROJECT_NAME}" in self.stdout.getvalue() + + def test_handle_with_compliance_penalty(self): + """Test score calculation with compliance penalty applied.""" + # Create mock metrics with test data that matches actual scoring fields + mock_metric = MagicMock(spec=ProjectHealthMetrics) + mock_requirements = MagicMock(spec=ProjectHealthRequirements) + + # Set up forward fields (higher is better) - all should meet requirements + mock_metric.age_days = 10 + mock_requirements.age_days = 5 + mock_metric.contributors_count = 10 + mock_requirements.contributors_count = 5 + mock_metric.forks_count = 10 + mock_requirements.forks_count = 5 + mock_metric.is_funding_requirements_compliant = True + mock_requirements.is_funding_requirements_compliant = True + mock_metric.is_leader_requirements_compliant = True + mock_requirements.is_leader_requirements_compliant = True + mock_metric.open_pull_requests_count = 10 + mock_requirements.open_pull_requests_count = 5 + mock_metric.recent_releases_count = 10 + mock_requirements.recent_releases_count = 5 + mock_metric.stars_count = 10 + mock_requirements.stars_count = 5 + mock_metric.total_pull_requests_count = 10 + mock_requirements.total_pull_requests_count = 5 + mock_metric.total_releases_count = 10 + mock_requirements.total_releases_count = 5 + + # Set up backward fields (lower is better) - all should meet requirements + mock_metric.last_commit_days = 1 + mock_requirements.last_commit_days = 5 + mock_metric.last_pull_request_days = 1 + mock_requirements.last_pull_request_days = 5 + mock_metric.last_release_days = 1 + mock_requirements.last_release_days = 5 + mock_metric.open_issues_count = 1 + mock_requirements.open_issues_count = 5 + mock_metric.owasp_page_last_update_days = 1 + mock_requirements.owasp_page_last_update_days = 5 + mock_metric.unanswered_issues_count = 1 + mock_requirements.unanswered_issues_count = 5 + mock_metric.unassigned_issues_count = 1 + mock_requirements.unassigned_issues_count = 5 + + # Set up project with non-compliant level + mock_metric.project.level = LAB_LEVEL + mock_metric.project.project_level_official = FLAGSHIP_LEVEL + mock_metric.project.name = NON_COMPLIANT_PROJECT_NAME + type(mock_metric.project).is_level_compliant = PropertyMock(return_value=False) + + # Set compliance requirements + mock_metric.is_funding_requirements_compliant = True + mock_metric.is_leader_requirements_compliant = True + + # Set penalty weight + mock_requirements.compliance_penalty_weight = PENALTY_TWENTY_PERCENT # 20% penalty + + self.mock_metrics.return_value.select_related.return_value = [mock_metric] + self.mock_requirements.return_value = [mock_requirements] + mock_requirements.level = LAB_LEVEL + + # Execute command + with patch(STDOUT_PATCH, new=self.stdout): + call_command("owasp_update_project_health_scores") + + # Calculate expected score + # 8 forward fields * 6.0 + 2 compliance fields * 5.0 + 7 backward fields * 6.0 = 100 + # Final score: 100.0 - 20.0 = 80.0 + + # Verify that penalty was applied + assert abs(mock_metric.score - 80.0) < FLOAT_PRECISION # Should be 80 after 20% penalty + + # Verify penalty was logged + output = self.stdout.getvalue() + assert ( + f"Applied {PENALTY_TWENTY_PERCENT}% compliance penalty to {NON_COMPLIANT_PROJECT_NAME}" + in output + ) + assert "penalty:" in output + assert "final score:" in output + assert f"[Local: {LAB_LEVEL}, Official: {FLAGSHIP_LEVEL}]" in output + + def test_handle_without_compliance_penalty(self): + """Test score calculation without compliance penalty for compliant project.""" + # Create mock metrics with test data that matches actual scoring fields + mock_metric = MagicMock(spec=ProjectHealthMetrics) + mock_requirements = MagicMock(spec=ProjectHealthRequirements) + + # Set up forward fields (higher is better) - all should meet requirements + mock_metric.age_days = 10 + mock_requirements.age_days = 5 + mock_metric.contributors_count = 10 + mock_requirements.contributors_count = 5 + mock_metric.forks_count = 10 + mock_requirements.forks_count = 5 + mock_metric.is_funding_requirements_compliant = True + mock_requirements.is_funding_requirements_compliant = True + mock_metric.is_leader_requirements_compliant = True + mock_requirements.is_leader_requirements_compliant = True + mock_metric.open_pull_requests_count = 10 + mock_requirements.open_pull_requests_count = 5 + mock_metric.recent_releases_count = 10 + mock_requirements.recent_releases_count = 5 + mock_metric.stars_count = 10 + mock_requirements.stars_count = 5 + mock_metric.total_pull_requests_count = 10 + mock_requirements.total_pull_requests_count = 5 + mock_metric.total_releases_count = 10 + mock_requirements.total_releases_count = 5 + + # Set up backward fields (lower is better) - all should meet requirements + mock_metric.last_commit_days = 1 + mock_requirements.last_commit_days = 5 + mock_metric.last_pull_request_days = 1 + mock_requirements.last_pull_request_days = 5 + mock_metric.last_release_days = 1 + mock_requirements.last_release_days = 5 + mock_metric.open_issues_count = 1 + mock_requirements.open_issues_count = 5 + mock_metric.owasp_page_last_update_days = 1 + mock_requirements.owasp_page_last_update_days = 5 + mock_metric.unanswered_issues_count = 1 + mock_requirements.unanswered_issues_count = 5 + mock_metric.unassigned_issues_count = 1 + mock_requirements.unassigned_issues_count = 5 + + # Set up project with compliant level + mock_metric.project.level = FLAGSHIP_LEVEL + mock_metric.project.project_level_official = FLAGSHIP_LEVEL + mock_metric.project.name = COMPLIANT_PROJECT_NAME + type(mock_metric.project).is_level_compliant = PropertyMock(return_value=True) + + mock_metric.is_funding_requirements_compliant = True + mock_metric.is_leader_requirements_compliant = True + + # Set penalty weight (should not be applied) + mock_requirements.compliance_penalty_weight = PENALTY_TWENTY_PERCENT + + self.mock_metrics.return_value.select_related.return_value = [mock_metric] + self.mock_requirements.return_value = [mock_requirements] + mock_requirements.level = FLAGSHIP_LEVEL + + # Execute command + with patch(STDOUT_PATCH, new=self.stdout): + call_command("owasp_update_project_health_scores") + + # Verify no penalty was applied for compliant project + # Expected: total possible 100 points without penalty + # (48 + 10 + 42 = 100 points total) + assert abs(mock_metric.score - 100.0) < FLOAT_PRECISION # Should be maximum score + + # Verify no penalty was logged + output = self.stdout.getvalue() + assert "compliance penalty" not in output + + def test_handle_zero_penalty_weight(self): + """Test score calculation with zero penalty weight.""" + mock_metric = MagicMock(spec=ProjectHealthMetrics) + mock_requirements = MagicMock(spec=ProjectHealthRequirements) + + # Set up basic scoring fields using explicit values (all meet requirements) + mock_metric.age_days = 5 + mock_requirements.age_days = 5 + mock_metric.contributors_count = 5 + mock_requirements.contributors_count = 5 + mock_metric.forks_count = 5 + mock_requirements.forks_count = 5 + mock_metric.is_funding_requirements_compliant = True + mock_requirements.is_funding_requirements_compliant = True + mock_metric.is_leader_requirements_compliant = True + mock_requirements.is_leader_requirements_compliant = True + mock_metric.open_pull_requests_count = 5 + mock_requirements.open_pull_requests_count = 5 + mock_metric.recent_releases_count = 5 + mock_requirements.recent_releases_count = 5 + mock_metric.stars_count = 5 + mock_requirements.stars_count = 5 + mock_metric.total_pull_requests_count = 5 + mock_requirements.total_pull_requests_count = 5 + mock_metric.total_releases_count = 5 + mock_requirements.total_releases_count = 5 + mock_metric.last_commit_days = 5 + mock_requirements.last_commit_days = 5 + mock_metric.last_pull_request_days = 5 + mock_requirements.last_pull_request_days = 5 + mock_metric.last_release_days = 5 + mock_requirements.last_release_days = 5 + mock_metric.open_issues_count = 5 + mock_requirements.open_issues_count = 5 + mock_metric.owasp_page_last_update_days = 5 + mock_requirements.owasp_page_last_update_days = 5 + mock_metric.unanswered_issues_count = 5 + mock_requirements.unanswered_issues_count = 5 + mock_metric.unassigned_issues_count = 5 + mock_requirements.unassigned_issues_count = 5 + + # Set up non-compliant project + mock_metric.project.level = LAB_LEVEL + mock_metric.project.project_level_official = FLAGSHIP_LEVEL + mock_metric.project.name = TEST_PROJECT_NAME + type(mock_metric.project).is_level_compliant = PropertyMock(return_value=False) + + mock_metric.is_funding_requirements_compliant = True + mock_metric.is_leader_requirements_compliant = True + + # Set zero penalty weight + mock_requirements.compliance_penalty_weight = PENALTY_ZERO_PERCENT + + self.mock_metrics.return_value.select_related.return_value = [mock_metric] + self.mock_requirements.return_value = [mock_requirements] + mock_requirements.level = LAB_LEVEL + + # Execute command + with patch(STDOUT_PATCH, new=self.stdout): + call_command("owasp_update_project_health_scores") + + # Score should be unchanged (no penalty applied) + # Expected: total possible 100 points without penalty + assert abs(mock_metric.score - 100.0) < FLOAT_PRECISION # Should be maximum score + + # Verify penalty was applied but with 0% (should be logged) + output = self.stdout.getvalue() + assert f"Applied {PENALTY_ZERO_PERCENT}% compliance penalty" in output + + def test_handle_maximum_penalty_weight(self): + """Test score calculation with maximum penalty weight (100%).""" + mock_metric = MagicMock(spec=ProjectHealthMetrics) + mock_requirements = MagicMock(spec=ProjectHealthRequirements) + + # Set up basic scoring fields using explicit values (all meet requirements) + mock_metric.age_days = 10 + mock_requirements.age_days = 5 + mock_metric.contributors_count = 10 + mock_requirements.contributors_count = 5 + mock_metric.forks_count = 10 + mock_requirements.forks_count = 5 + mock_metric.is_funding_requirements_compliant = True + mock_requirements.is_funding_requirements_compliant = True + mock_metric.is_leader_requirements_compliant = True + mock_requirements.is_leader_requirements_compliant = True + mock_metric.open_pull_requests_count = 10 + mock_requirements.open_pull_requests_count = 5 + mock_metric.recent_releases_count = 10 + mock_requirements.recent_releases_count = 5 + mock_metric.stars_count = 10 + mock_requirements.stars_count = 5 + mock_metric.total_pull_requests_count = 10 + mock_requirements.total_pull_requests_count = 5 + mock_metric.total_releases_count = 10 + mock_requirements.total_releases_count = 5 + mock_metric.last_commit_days = 1 + mock_requirements.last_commit_days = 5 + mock_metric.last_pull_request_days = 1 + mock_requirements.last_pull_request_days = 5 + mock_metric.last_release_days = 1 + mock_requirements.last_release_days = 5 + mock_metric.open_issues_count = 1 + mock_requirements.open_issues_count = 5 + mock_metric.owasp_page_last_update_days = 1 + mock_requirements.owasp_page_last_update_days = 5 + mock_metric.unanswered_issues_count = 1 + mock_requirements.unanswered_issues_count = 5 + mock_metric.unassigned_issues_count = 1 + mock_requirements.unassigned_issues_count = 5 + + # Set up non-compliant project + mock_metric.project.level = LAB_LEVEL + mock_metric.project.project_level_official = FLAGSHIP_LEVEL + mock_metric.project.name = TEST_PROJECT_NAME + type(mock_metric.project).is_level_compliant = PropertyMock(return_value=False) + + mock_metric.is_funding_requirements_compliant = True + mock_metric.is_leader_requirements_compliant = True + + # Set maximum penalty weight + mock_requirements.compliance_penalty_weight = PENALTY_HUNDRED_PERCENT + + self.mock_metrics.return_value.select_related.return_value = [mock_metric] + self.mock_requirements.return_value = [mock_requirements] + mock_requirements.level = LAB_LEVEL + + # Execute command + with patch(STDOUT_PATCH, new=self.stdout): + call_command("owasp_update_project_health_scores") + + # Score should be 0 (100% penalty) + assert ( + abs(mock_metric.score - PENALTY_ZERO_PERCENT) < FLOAT_PRECISION + ) # Use approximate comparison for float + + def test_handle_penalty_weight_clamping(self): + """Test that penalty weight is properly clamped to [0, 100] range.""" + mock_metric = MagicMock(spec=ProjectHealthMetrics) + mock_requirements = MagicMock(spec=ProjectHealthRequirements) + + # Set up basic scoring fields for partial score (some fields meet requirements) + mock_metric.age_days = 5 + mock_requirements.age_days = 10 # Does not meet requirement + mock_metric.contributors_count = 10 + mock_requirements.contributors_count = 5 # Meets requirement + mock_metric.forks_count = 5 + mock_requirements.forks_count = 10 # Does not meet requirement + mock_metric.is_funding_requirements_compliant = True + mock_requirements.is_funding_requirements_compliant = True # Meets requirement + mock_metric.is_leader_requirements_compliant = True + mock_requirements.is_leader_requirements_compliant = True # Meets requirement + mock_metric.open_pull_requests_count = 10 + mock_requirements.open_pull_requests_count = 5 # Meets requirement + mock_metric.recent_releases_count = 5 + mock_requirements.recent_releases_count = 10 # Does not meet requirement + mock_metric.stars_count = 10 + mock_requirements.stars_count = 5 # Meets requirement + mock_metric.total_pull_requests_count = 10 + mock_requirements.total_pull_requests_count = 5 # Meets requirement + mock_metric.total_releases_count = 5 + mock_requirements.total_releases_count = 10 # Does not meet requirement + mock_metric.last_commit_days = 10 + mock_requirements.last_commit_days = 5 # Does not meet requirement + mock_metric.last_pull_request_days = 3 + mock_requirements.last_pull_request_days = 5 # Meets requirement + mock_metric.last_release_days = 10 + mock_requirements.last_release_days = 5 # Does not meet requirement + mock_metric.open_issues_count = 3 + mock_requirements.open_issues_count = 5 # Meets requirement + mock_metric.owasp_page_last_update_days = 3 + mock_requirements.owasp_page_last_update_days = 5 # Meets requirement + mock_metric.unanswered_issues_count = 3 + mock_requirements.unanswered_issues_count = 5 # Meets requirement + mock_metric.unassigned_issues_count = 10 + mock_requirements.unassigned_issues_count = 5 # Does not meet requirement + + # Expected base score calculation: + # Total base score: 58.0 + base_score = 58.0 + + # Set up non-compliant project + mock_metric.project.level = LAB_LEVEL + mock_metric.project.project_level_official = FLAGSHIP_LEVEL + mock_metric.project.name = TEST_PROJECT_NAME + type(mock_metric.project).is_level_compliant = PropertyMock(return_value=False) + mock_metric.is_funding_requirements_compliant = True + mock_metric.is_leader_requirements_compliant = True + + # Test cases for penalty weight clamping + test_cases = [ + (-10.0, PENALTY_ZERO_PERCENT, base_score), # Negative should be clamped to 0 + (150.0, PENALTY_HUNDRED_PERCENT, 0.0), # Over 100 should be clamped to 100 + (50.0, 50.0, base_score * 0.5), # Valid value should remain unchanged + ] + + for input_penalty, expected_penalty, expected_score in test_cases: + mock_requirements.compliance_penalty_weight = input_penalty + + self.mock_metrics.return_value.select_related.return_value = [mock_metric] + self.mock_requirements.return_value = [mock_requirements] + mock_requirements.level = LAB_LEVEL + + # Reset stdout for each test + self.stdout = StringIO() + + # Execute command + with patch(STDOUT_PATCH, new=self.stdout): + call_command("owasp_update_project_health_scores") + + # Verify penalty was clamped correctly + output = self.stdout.getvalue() + assert f"Applied {expected_penalty}% compliance penalty" in output + + # Verify final score is correct + assert abs(mock_metric.score - expected_score) < FLOAT_PRECISION + + def test_handle_no_projects_to_update(self): + """Test command when no projects need score updates.""" + # Mock empty queryset (no projects with null scores) + self.mock_metrics.return_value.select_related.return_value = [] + self.mock_requirements.return_value = [] + + # Execute command + with patch(STDOUT_PATCH, new=self.stdout): + call_command("owasp_update_project_health_scores") + + # Verify no bulk save was called and success message shown + self.mock_bulk_save.assert_called_once_with([], fields=["score"]) assert "Updated project health scores successfully." in self.stdout.getvalue() - assert "Updating score for project: Test Project" in self.stdout.getvalue() diff --git a/backend/tests/apps/owasp/management/commands/project_level_compliance_integration_test.py b/backend/tests/apps/owasp/management/commands/project_level_compliance_integration_test.py new file mode 100644 index 0000000000..d56436bae9 --- /dev/null +++ b/backend/tests/apps/owasp/management/commands/project_level_compliance_integration_test.py @@ -0,0 +1,397 @@ +"""Integration tests for project level compliance functionality.""" + +from io import StringIO +from unittest.mock import MagicMock, patch + +import pytest +import requests +from django.core.management import call_command +from django.db.models.base import ModelState + +from apps.owasp.models.project import Project +from apps.owasp.models.project_health_metrics import ProjectHealthMetrics +from apps.owasp.models.project_health_requirements import ProjectHealthRequirements + +# Test constants +PROJECT_FILTER_PATCH = "apps.owasp.models.project.Project.objects.filter" +PROJECT_BULK_SAVE_PATCH = "apps.owasp.models.project.Project.bulk_save" +METRICS_BULK_SAVE_PATCH = "apps.owasp.models.project_health_metrics.ProjectHealthMetrics.bulk_save" +METRICS_FILTER_PATCH = ( + "apps.owasp.models.project_health_metrics.ProjectHealthMetrics.objects.filter" +) +REQUIREMENTS_ALL_PATCH = ( + "apps.owasp.models.project_health_requirements.ProjectHealthRequirements.objects.all" +) +STDOUT_PATCH = "sys.stdout" + + +class TestProjectLevelComplianceIntegration: + """Integration tests for the complete project level compliance workflow.""" + + @pytest.fixture(autouse=True) + def _setup(self): + """Set up test environment.""" + self.stdout = StringIO() + + def create_mock_project(self, name, local_level, official_level=None): + """Create a mock project with specified levels.""" + project = MagicMock(spec=Project) + project._state = ModelState() + project.name = name + project.level = local_level + project.project_level_official = official_level or local_level + + # Set default values for health metrics fields + for field in [ + "contributors_count", + "created_at", + "forks_count", + "is_funding_requirements_compliant", + "is_leader_requirements_compliant", + "pushed_at", + "released_at", + "open_issues_count", + "open_pull_requests_count", + "owasp_page_last_updated_at", + "pull_request_last_created_at", + "recent_releases_count", + "stars_count", + "issues_count", + "pull_requests_count", + "releases_count", + "unanswered_issues_count", + "unassigned_issues_count", + ]: + setattr(project, field, 5) + + return project + + def create_mock_metric(self, project): + """Create a mock health metric for a project.""" + metric = MagicMock(spec=ProjectHealthMetrics) + metric.project = project + + # Set default values for scoring fields + for field in [ + "age_days", + "contributors_count", + "forks_count", + "last_release_days", + "last_commit_days", + "open_issues_count", + "open_pull_requests_count", + "owasp_page_last_update_days", + "last_pull_request_days", + "recent_releases_count", + "stars_count", + "total_pull_requests_count", + "total_releases_count", + "unanswered_issues_count", + "unassigned_issues_count", + ]: + setattr(metric, field, 5) + + metric.is_funding_requirements_compliant = True + metric.is_leader_requirements_compliant = True + + return metric + + def create_mock_requirements(self, level, penalty_weight=10.0): + """Create mock health requirements.""" + requirements = MagicMock(spec=ProjectHealthRequirements) + requirements.level = level + requirements.compliance_penalty_weight = penalty_weight + + # Set default requirement values + for field in [ + "age_days", + "contributors_count", + "forks_count", + "last_release_days", + "last_commit_days", + "open_issues_count", + "open_pull_requests_count", + "owasp_page_last_update_days", + "last_pull_request_days", + "recent_releases_count", + "stars_count", + "total_pull_requests_count", + "total_releases_count", + "unanswered_issues_count", + "unassigned_issues_count", + ]: + setattr(requirements, field, 5) + + return requirements + + @patch("requests.get") + def test_complete_compliance_workflow_with_penalties(self, mock_get): + """Test the complete workflow: fetch levels -> update projects -> calculate scores.""" + # Step 1: Mock API response with official levels + mock_response = MagicMock() + mock_response.json.return_value = [ + {"name": "OWASP ZAP", "level": "flagship"}, + {"name": "OWASP WebGoat", "level": "production"}, + {"name": "OWASP Top 10", "level": "flagship"}, + ] + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + # Step 2: Create mock projects with different compliance statuses + compliant_project = self.create_mock_project( + "OWASP ZAP", "flagship", "lab" + ) # Will be updated to flagship + non_compliant_project = self.create_mock_project( + "OWASP WebGoat", "lab", "other" + ) # Will be updated to production + missing_project = self.create_mock_project( + "OWASP Missing", "lab", "lab" + ) # Not in official data + + projects = [compliant_project, non_compliant_project, missing_project] + + # Step 3: Create corresponding health metrics + compliant_metric = self.create_mock_metric(compliant_project) + non_compliant_metric = self.create_mock_metric(non_compliant_project) + missing_metric = self.create_mock_metric(missing_project) + + metrics = [compliant_metric, non_compliant_metric, missing_metric] + + # Step 4: Create mock requirements with penalty weights + flagship_requirements = self.create_mock_requirements("flagship", 15.0) + lab_requirements = self.create_mock_requirements("lab", 20.0) + production_requirements = self.create_mock_requirements("production", 25.0) + + requirements = [flagship_requirements, lab_requirements, production_requirements] + + # Step 5: Execute health metrics command (includes official level fetching) + with ( + patch(PROJECT_FILTER_PATCH) as mock_projects, + patch(PROJECT_BULK_SAVE_PATCH), + patch(METRICS_BULK_SAVE_PATCH), + patch(STDOUT_PATCH, new=self.stdout), + ): + mock_projects.return_value = projects + + call_command("owasp_update_project_health_metrics") + + # Verify official levels were updated + assert compliant_project.project_level_official == "flagship" + assert non_compliant_project.project_level_official == "production" + assert ( + missing_project.project_level_official == "lab" + ) # Unchanged (not in official data) + + output = self.stdout.getvalue() + assert "Successfully fetched 3 official project levels" in output + assert "Updated official levels for 2 projects" in output + + # Step 6: Set up compliance status based on updated official levels + compliant_project.is_level_compliant = True # flagship == flagship + non_compliant_project.is_level_compliant = False # lab != production + missing_project.is_level_compliant = True # lab == lab + + # Step 7: Execute health scores command + self.stdout = StringIO() # Reset stdout + + with ( + patch(METRICS_FILTER_PATCH) as mock_metrics_filter, + patch(REQUIREMENTS_ALL_PATCH) as mock_requirements, + patch(METRICS_BULK_SAVE_PATCH), + patch(STDOUT_PATCH, new=self.stdout), + ): + mock_metrics_filter.return_value.select_related.return_value = metrics + mock_requirements.return_value = requirements + + call_command("owasp_update_project_health_scores") + + # Verify scores were calculated correctly + # Base score for all projects: 90.0 (all fields meet requirements) + + # Verify scores were calculated and penalties applied appropriately + # Compliant project: should have higher score (no penalty) + assert compliant_metric.score >= 90.0 + + # Non-compliant project: should have lower score due to penalty + assert non_compliant_metric.score < compliant_metric.score + + # Missing project: should be compliant (no penalty) + assert missing_metric.score >= 90.0 + + output = self.stdout.getvalue() + assert "compliance penalty to OWASP WebGoat" in output + assert "penalty:" in output + assert "final score:" in output + assert "[Local: lab, Official: production]" in output + + @patch("requests.get") + def test_compliance_detection_with_various_level_mappings(self, mock_get): + """Test compliance detection with different level formats from API.""" + # Mock API response with various level formats + mock_response = MagicMock() + mock_response.json.return_value = [ + {"name": "Project A", "level": "2"}, # Numeric -> incubator + {"name": "Project B", "level": "3.5"}, # Decimal -> production + {"name": "Project C", "level": "flagship"}, # String -> flagship + {"name": "Project D", "level": "unknown"}, # Unknown -> other + ] + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + # Create projects with different local levels + project_a = self.create_mock_project("Project A", "lab", "other") + project_b = self.create_mock_project("Project B", "lab", "other") + project_c = self.create_mock_project("Project C", "production", "other") + project_d = self.create_mock_project("Project D", "flagship", "other") + + projects = [project_a, project_b, project_c, project_d] + + with ( + patch(PROJECT_FILTER_PATCH) as mock_projects, + patch(PROJECT_BULK_SAVE_PATCH), + patch(METRICS_BULK_SAVE_PATCH), + patch(STDOUT_PATCH, new=self.stdout), + ): + mock_projects.return_value = projects + + call_command("owasp_update_project_health_metrics") + + # Verify level mappings + assert project_a.project_level_official == "incubator" # 2 -> incubator + assert project_b.project_level_official == "production" # 3.5 -> production + assert project_c.project_level_official == "flagship" # flagship -> flagship + assert project_d.project_level_official == "other" # unknown -> other + + # Verify compliance status + project_a.is_level_compliant = False # lab != incubator + project_b.is_level_compliant = False # lab != production + project_c.is_level_compliant = False # production != flagship + project_d.is_level_compliant = False # flagship != other + + @patch("requests.get") + def test_api_failure_handling(self, mock_get): + """Test handling of API failures during official level fetching.""" + # Mock API failure + mock_get.side_effect = requests.exceptions.RequestException("Network error") + + project = self.create_mock_project("Test Project", "lab", "lab") + + with ( + patch(PROJECT_FILTER_PATCH) as mock_projects, + patch(METRICS_BULK_SAVE_PATCH), + patch(STDOUT_PATCH, new=self.stdout), + ): + mock_projects.return_value = [project] + + call_command("owasp_update_project_health_metrics") + + # Verify graceful handling of API failure + output = self.stdout.getvalue() + assert "Failed to fetch official project levels, continuing without updates" in output + assert "Evaluating metrics for project: Test Project" in output + + # Project level should remain unchanged + assert project.project_level_official == "lab" + + def test_skip_official_levels_flag(self): + """Test that --skip-official-levels flag works correctly.""" + project = self.create_mock_project("Test Project", "lab", "flagship") + + with ( + patch(PROJECT_FILTER_PATCH) as mock_projects, + patch(METRICS_BULK_SAVE_PATCH), + patch(STDOUT_PATCH, new=self.stdout), + ): + mock_projects.return_value = [project] + + call_command("owasp_update_project_health_metrics", "--skip-official-levels") + + # Verify official levels fetching was skipped + output = self.stdout.getvalue() + assert "Fetching official project levels" not in output + assert "Evaluating metrics for project: Test Project" in output + + # Project level should remain unchanged + assert project.project_level_official == "flagship" + + def test_logging_and_detection_accuracy(self): + """Test that level mismatches are properly detected and logged.""" + # Create projects with various compliance scenarios + scenarios = [ + ("Compliant Flagship", "flagship", "flagship", True), + ("Non-compliant Lab", "lab", "flagship", False), + ("Non-compliant Production", "production", "incubator", False), + ("Compliant Other", "other", "other", True), + ] + + projects = [] + metrics = [] + + for name, local_level, official_level, expected_compliance in scenarios: + project = self.create_mock_project(name, local_level, official_level) + project.is_level_compliant = expected_compliance + metric = self.create_mock_metric(project) + + projects.append(project) + metrics.append(metric) + + # Create requirements for each level + requirements = [ + self.create_mock_requirements("flagship", 10.0), + self.create_mock_requirements("lab", 15.0), + self.create_mock_requirements("production", 20.0), + self.create_mock_requirements("incubator", 25.0), + self.create_mock_requirements("other", 5.0), + ] + + with ( + patch(METRICS_FILTER_PATCH) as mock_metrics_filter, + patch(REQUIREMENTS_ALL_PATCH) as mock_requirements, + patch(METRICS_BULK_SAVE_PATCH), + patch(STDOUT_PATCH, new=self.stdout), + ): + mock_metrics_filter.return_value.select_related.return_value = metrics + mock_requirements.return_value = requirements + + call_command("owasp_update_project_health_scores") + + output = self.stdout.getvalue() + + # Verify compliant projects don't have penalties logged + assert "compliance penalty to Compliant Flagship" not in output + assert "compliance penalty to Compliant Other" not in output + + # Verify non-compliant projects have penalties logged with correct levels + assert "Applied 15.0% compliance penalty to Non-compliant Lab" in output + assert "[Local: lab, Official: flagship]" in output + assert "Applied 20.0% compliance penalty to Non-compliant Production" in output + assert "[Local: production, Official: incubator]" in output + + def test_edge_cases_and_data_validation(self): + """Test edge cases in data validation and processing.""" + # Test with projects that have edge case data + edge_case_project = self.create_mock_project("Edge Case", "lab", "flagship") + edge_case_metric = self.create_mock_metric(edge_case_project) + edge_case_project.is_level_compliant = False + + # Test with extreme penalty weight + extreme_requirements = self.create_mock_requirements( + "lab", 999.0 + ) # Should be clamped to 100 + + with ( + patch(METRICS_FILTER_PATCH) as mock_metrics_filter, + patch(REQUIREMENTS_ALL_PATCH) as mock_requirements, + patch(METRICS_BULK_SAVE_PATCH), + patch(STDOUT_PATCH, new=self.stdout), + ): + mock_metrics_filter.return_value.select_related.return_value = [edge_case_metric] + mock_requirements.return_value = [extreme_requirements] + + call_command("owasp_update_project_health_scores") + + # Verify penalty was clamped to 100% and score is 0 + assert abs(edge_case_metric.score - 0.0) < 0.01 # Use approximate comparison for float + + output = self.stdout.getvalue() + assert "Applied 100.0% compliance penalty" in output diff --git a/backend/tests/apps/owasp/models/project_test.py b/backend/tests/apps/owasp/models/project_test.py index 3179866da8..31805085c9 100644 --- a/backend/tests/apps/owasp/models/project_test.py +++ b/backend/tests/apps/owasp/models/project_test.py @@ -131,3 +131,24 @@ def test_from_github(self): assert project.level == ProjectLevel.LAB assert project.type == ProjectType.TOOL assert project.updated_at == owasp_repository.updated_at + + @pytest.mark.parametrize( + ("local_level", "official_level", "expected_result"), + [ + (ProjectLevel.LAB, ProjectLevel.LAB, True), + (ProjectLevel.FLAGSHIP, ProjectLevel.FLAGSHIP, True), + (ProjectLevel.LAB, ProjectLevel.FLAGSHIP, False), + (ProjectLevel.FLAGSHIP, ProjectLevel.LAB, False), + (ProjectLevel.INCUBATOR, ProjectLevel.PRODUCTION, False), + (ProjectLevel.OTHER, ProjectLevel.OTHER, True), + ], + ) + def test_is_level_compliant(self, local_level, official_level, expected_result): + """Test project level compliance detection.""" + project = Project(level=local_level, project_level_official=official_level) + assert project.is_level_compliant == expected_result + + def test_is_level_compliant_default_values(self): + """Test project level compliance with default values.""" + project = Project() # Both default to ProjectLevel.OTHER + assert project.is_level_compliant is True