|
| 1 | +from collections import defaultdict |
| 2 | +from typing import TYPE_CHECKING, Dict, List |
| 3 | + |
| 4 | +import click |
| 5 | + |
| 6 | +from cycode.cli.consts import LICENSE_COMPLIANCE_POLICY_ID, PACKAGE_VULNERABILITY_POLICY_ID |
| 7 | +from cycode.cli.models import Detection |
| 8 | +from cycode.cli.printers.tables.table import Table |
| 9 | +from cycode.cli.printers.tables.table_models import ColumnInfoBuilder, ColumnWidths |
| 10 | +from cycode.cli.printers.tables.table_printer_base import TablePrinterBase |
| 11 | +from cycode.cli.utils.string_utils import shortcut_dependency_paths |
| 12 | + |
| 13 | +if TYPE_CHECKING: |
| 14 | + from cycode.cli.models import LocalScanResult |
| 15 | + |
| 16 | + |
| 17 | +column_builder = ColumnInfoBuilder() |
| 18 | + |
| 19 | +# Building must have strict order. Represents the order of the columns in the table (from left to right) |
| 20 | +SEVERITY_COLUMN = column_builder.build(name='Severity') |
| 21 | +REPOSITORY_COLUMN = column_builder.build(name='Repository') |
| 22 | + |
| 23 | +FILE_PATH_COLUMN = column_builder.build(name='File Path') |
| 24 | +ECOSYSTEM_COLUMN = column_builder.build(name='Ecosystem') |
| 25 | +DEPENDENCY_NAME_COLUMN = column_builder.build(name='Dependency Name') |
| 26 | +DIRECT_DEPENDENCY_COLUMN = column_builder.build(name='Direct Dependency') |
| 27 | +DEVELOPMENT_DEPENDENCY_COLUMN = column_builder.build(name='Development Dependency') |
| 28 | +DEPENDENCY_PATHS_COLUMN = column_builder.build(name='Dependency Paths') |
| 29 | + |
| 30 | +CVE_COLUMNS = column_builder.build(name='CVE') |
| 31 | +UPGRADE_COLUMN = column_builder.build(name='Upgrade') |
| 32 | +LICENSE_COLUMN = column_builder.build(name='License') |
| 33 | + |
| 34 | +COLUMN_WIDTHS_CONFIG: ColumnWidths = { |
| 35 | + REPOSITORY_COLUMN: 2, |
| 36 | + FILE_PATH_COLUMN: 3, |
| 37 | + CVE_COLUMNS: 5, |
| 38 | + UPGRADE_COLUMN: 3, |
| 39 | + LICENSE_COLUMN: 2, |
| 40 | +} |
| 41 | + |
| 42 | + |
| 43 | +class ScaTablePrinter(TablePrinterBase): |
| 44 | + def _print_results(self, local_scan_results: List['LocalScanResult']) -> None: |
| 45 | + detections_per_detection_type_id = self._extract_detections_per_policy_id(local_scan_results) |
| 46 | + for policy_id, detections in detections_per_detection_type_id.items(): |
| 47 | + table = self._get_table(policy_id) |
| 48 | + table.set_cols_width(COLUMN_WIDTHS_CONFIG) |
| 49 | + |
| 50 | + for detection in detections: |
| 51 | + self._enrich_table_with_values(table, detection) |
| 52 | + |
| 53 | + self._print_summary_issues(len(detections), self._get_title(policy_id)) |
| 54 | + click.echo(table.get_table().draw()) |
| 55 | + |
| 56 | + self._print_report_urls(local_scan_results) |
| 57 | + |
| 58 | + @staticmethod |
| 59 | + def _get_title(policy_id: str) -> str: |
| 60 | + if policy_id == PACKAGE_VULNERABILITY_POLICY_ID: |
| 61 | + return 'Dependencies Vulnerabilities' |
| 62 | + if policy_id == LICENSE_COMPLIANCE_POLICY_ID: |
| 63 | + return 'License Compliance' |
| 64 | + |
| 65 | + return 'Unknown' |
| 66 | + |
| 67 | + def _get_table(self, policy_id: str) -> Table: |
| 68 | + table = Table() |
| 69 | + |
| 70 | + if policy_id == PACKAGE_VULNERABILITY_POLICY_ID: |
| 71 | + table.add(SEVERITY_COLUMN) |
| 72 | + table.add(CVE_COLUMNS) |
| 73 | + table.add(UPGRADE_COLUMN) |
| 74 | + elif policy_id == LICENSE_COMPLIANCE_POLICY_ID: |
| 75 | + table.add(LICENSE_COLUMN) |
| 76 | + |
| 77 | + if self._is_git_repository(): |
| 78 | + table.add(REPOSITORY_COLUMN) |
| 79 | + |
| 80 | + table.add(FILE_PATH_COLUMN) |
| 81 | + table.add(ECOSYSTEM_COLUMN) |
| 82 | + table.add(DEPENDENCY_NAME_COLUMN) |
| 83 | + table.add(DIRECT_DEPENDENCY_COLUMN) |
| 84 | + table.add(DEVELOPMENT_DEPENDENCY_COLUMN) |
| 85 | + table.add(DEPENDENCY_PATHS_COLUMN) |
| 86 | + |
| 87 | + return table |
| 88 | + |
| 89 | + @staticmethod |
| 90 | + def _enrich_table_with_values(table: Table, detection: Detection) -> None: |
| 91 | + detection_details = detection.detection_details |
| 92 | + |
| 93 | + table.set(REPOSITORY_COLUMN, detection_details.get('repository_name')) |
| 94 | + |
| 95 | + table.set(FILE_PATH_COLUMN, detection_details.get('file_name')) |
| 96 | + table.set(ECOSYSTEM_COLUMN, detection_details.get('ecosystem')) |
| 97 | + table.set(DEPENDENCY_NAME_COLUMN, detection_details.get('package_name')) |
| 98 | + table.set(DIRECT_DEPENDENCY_COLUMN, detection_details.get('is_direct_dependency_str')) |
| 99 | + table.set(DEVELOPMENT_DEPENDENCY_COLUMN, detection_details.get('is_dev_dependency_str')) |
| 100 | + |
| 101 | + dependency_paths = 'N/A' |
| 102 | + dependency_paths_raw = detection_details.get('dependency_paths') |
| 103 | + if dependency_paths_raw: |
| 104 | + dependency_paths = shortcut_dependency_paths(dependency_paths_raw) |
| 105 | + table.set(DEPENDENCY_PATHS_COLUMN, dependency_paths) |
| 106 | + |
| 107 | + upgrade = '' |
| 108 | + alert = detection_details.get('alert') |
| 109 | + if alert and alert.get('first_patched_version'): |
| 110 | + upgrade = f'{alert.get("vulnerable_requirements")} -> {alert.get("first_patched_version")}' |
| 111 | + table.set(UPGRADE_COLUMN, upgrade) |
| 112 | + |
| 113 | + table.set(SEVERITY_COLUMN, detection_details.get('advisory_severity')) |
| 114 | + table.set(CVE_COLUMNS, detection_details.get('vulnerability_id')) |
| 115 | + table.set(LICENSE_COLUMN, detection_details.get('license')) |
| 116 | + |
| 117 | + @staticmethod |
| 118 | + def _print_report_urls(local_scan_results: List['LocalScanResult']) -> None: |
| 119 | + click.echo('Report URLs:') |
| 120 | + for local_scan_result in local_scan_results: |
| 121 | + report_url = local_scan_result.report_url if local_scan_result.report_url else 'N/A' |
| 122 | + click.echo(f'- {report_url}') |
| 123 | + |
| 124 | + @staticmethod |
| 125 | + def _print_summary_issues(detections_count: int, title: str) -> None: |
| 126 | + click.echo(f'⛔ Found {detections_count} issues of type: {click.style(title, bold=True)}') |
| 127 | + |
| 128 | + @staticmethod |
| 129 | + def _extract_detections_per_policy_id( |
| 130 | + local_scan_results: List['LocalScanResult'], |
| 131 | + ) -> Dict[str, List[Detection]]: |
| 132 | + detections_to_policy_id = defaultdict(list) |
| 133 | + |
| 134 | + for local_scan_result in local_scan_results: |
| 135 | + for document_detection in local_scan_result.document_detections: |
| 136 | + for detection in document_detection.detections: |
| 137 | + detections_to_policy_id[detection.detection_type_id].append(detection) |
| 138 | + |
| 139 | + # sort dict by keys (policy id) to make persist output order |
| 140 | + return dict(sorted(detections_to_policy_id.items(), reverse=True)) |
0 commit comments