diff --git a/src/analyzers/base_analyzer.py b/src/analyzers/base_analyzer.py index 3873a0d..4a1d449 100644 --- a/src/analyzers/base_analyzer.py +++ b/src/analyzers/base_analyzer.py @@ -1,8 +1,9 @@ from abc import ABC, abstractmethod -from typing import Optional +from typing import Optional, Dict, Any from core.models.analysis_result import AnalysisResult from core.analysis_context import AnalysisContext from core.exceptions import AnalyzerError +from core.metrics import get_metric_metadata import os @@ -90,7 +91,6 @@ def _get_project_folders(self, target_path: str = None) -> list: for item in os.listdir(target_path): item_path = os.path.join(target_path, item) if os.path.isdir(item_path): - # Check if folder contains Python files has_python = any( f.endswith('.py') for f in os.listdir(item_path) @@ -99,25 +99,29 @@ def _get_project_folders(self, target_path: str = None) -> list: if has_python: folders.append(item) - return folders or ['.'] # Current directory if no folders found + return folders or ['.'] @property @abstractmethod def analyzer_id(self) -> str: - """ - Unique identifier for this analyzer. - - Returns: - Analyzer identifier string - """ pass + def _create_result( + self, + score: float, + message_count: Dict[str, Any], + module_count: int, + details: Optional[Dict[str, Any]] = None + ) -> AnalysisResult: + return AnalysisResult( + analyzer_id=self.analyzer_id, + score=score, + message_count=message_count, + module_count=module_count, + metric_metadata=get_metric_metadata(self.analyzer_id), + details=details + ) + @abstractmethod def analyze(self) -> AnalysisResult: - """ - Run the analysis. - - Returns: - AnalysisResult with metrics and messages - """ pass \ No newline at end of file diff --git a/src/analyzers/factory.py b/src/analyzers/factory.py index f8cea24..54a05fa 100644 --- a/src/analyzers/factory.py +++ b/src/analyzers/factory.py @@ -19,6 +19,7 @@ def _get_analyzers(cls) -> Dict[str, Type[BaseAnalyzer]]: from analyzers.radon_mi_analyzer import RadonMIAnalyzer from analyzers.pipeline_analyzer import PipelineAnalyzer from analyzers.fpc_analyzer import FPCAnalyzer + from analyzers.pfp_analyzer import PFPAnalyzer cls._analyzers = { "pylint": PyLintAnalyzer, @@ -26,6 +27,7 @@ def _get_analyzers(cls) -> Dict[str, Type[BaseAnalyzer]]: "radon_mi": RadonMIAnalyzer, "pipeline": PipelineAnalyzer, "fpc": FPCAnalyzer, + "pfp":PFPAnalyzer } return cls._analyzers diff --git a/src/analyzers/fpc_analyzer.py b/src/analyzers/fpc_analyzer.py index 3b94f05..0da20c9 100644 --- a/src/analyzers/fpc_analyzer.py +++ b/src/analyzers/fpc_analyzer.py @@ -9,13 +9,10 @@ class FPCAnalyzer(BaseAnalyzer): - """Analyzes functional pipeline cohesion of ML code.""" def __init__(self, session_id: str, local_path: str, context=None): - """Initialize FPCAnalyzer.""" super().__init__(session_id, local_path, context) - # Load pipeline stages configuration pipeline_stages_json_path = os.path.join( os.path.dirname(os.path.dirname(__file__)), 'config', @@ -30,7 +27,6 @@ def __init__(self, session_id: str, local_path: str, context=None): f"Pipeline stages config not found at {pipeline_stages_json_path}" ) - # Build stage to phase mapping from config self.stage_to_phase = {} for phase, stages in self.config.get('phases', {}).items(): for stage in stages: @@ -38,21 +34,40 @@ def __init__(self, session_id: str, local_path: str, context=None): @property def analyzer_id(self) -> str: - return "FPC" + return "fpc" def analyze(self) -> AnalysisResult: - """Run FPC analysis on all Python files.""" + """ + Run FPC analysis on ML pipeline files. + + If pipeline metadata is available, analyzes only files detected as part + of the ML pipeline. Otherwise falls back to analyzing all Python files. + """ results = { 'files': {}, 'summary': { 'total_files': 0, 'high_cohesion': 0, 'medium_cohesion': 0, - 'low_cohesion': 0 + 'low_cohesion': 0, + 'ml_files_only': False, + 'by_pattern': { + 'functions_only': 0, + 'classes_only': 0, + 'mixed': 0 + } } } - python_files = self.context.get_all_python_files() + ml_files = self.context.get_all_ml_files() + + if ml_files: + python_files = ml_files + results['summary']['ml_files_only'] = True + else: + python_files = self.context.get_all_python_files() + results['summary']['ml_files_only'] = False + results['summary']['total_files'] = len(python_files) for py_file in python_files: @@ -63,10 +78,8 @@ def analyze(self) -> AnalysisResult: file_result = self._analyze_file(tree, py_file) results['files'][py_file] = file_result - # Cache for other metrics self.context.set_file_metric(py_file, 'fpc', file_result) - # Update summary cohesion_level = file_result['cohesion_level'] if cohesion_level == 'high': results['summary']['high_cohesion'] += 1 @@ -74,18 +87,22 @@ def analyze(self) -> AnalysisResult: results['summary']['medium_cohesion'] += 1 else: results['summary']['low_cohesion'] += 1 + + # Count pattern type + pattern = file_result.get('pattern', 'functions_only') + if pattern in results['summary']['by_pattern']: + results['summary']['by_pattern'][pattern] += 1 - # Calculate score + # Calculate weighted score based on pattern quality if results['summary']['total_files'] > 0: - score = (results['summary']['high_cohesion'] / - results['summary']['total_files']) * 10 + weighted_score = self._calculate_weighted_score(results) + score = weighted_score else: score = 0 messages = self._generate_messages(results) - return AnalysisResult( - analyzer_id=self.analyzer_id, + return self._create_result( score=round(score, 2), message_count={'messages': messages}, module_count=results['summary']['total_files'], @@ -93,15 +110,27 @@ def analyze(self) -> AnalysisResult: ) def _analyze_file(self, tree: ast.Module, file_path: str) -> Dict: - """Analyze a single file for FPC.""" + """ + Analyze a single file for FPC. + + If pipeline metadata is available, uses pre-detected stages. + Otherwise, performs stage detection. + """ + pattern = self._classify_file_pattern(tree) + functions = self._extract_functions(tree) + file_stages_from_pipeline = self._get_file_stages_from_pipeline(file_path) + function_stages = {} for func_name, func_node in functions.items(): - stages = self._detect_stages(func_node, file_path) + if file_stages_from_pipeline: + stages = file_stages_from_pipeline + else: + stages = self._detect_stages(func_node, file_path) + function_stages[func_name] = stages - # Count unique stages and phases all_stages = set() for stages in function_stages.values(): all_stages.update(stages) @@ -113,18 +142,30 @@ def _analyze_file(self, tree: ast.Module, file_path: str) -> Dict: cohesion_level = self._determine_cohesion_level(unique_stages, unique_phases) - return { + result = { + 'pattern': pattern, 'unique_stages': unique_stages, 'unique_phases': unique_phases, 'stages_detected': list(all_stages), 'phases_detected': list(all_phases), 'cohesion_level': cohesion_level, - 'function_stages': function_stages + 'function_stages': function_stages, + 'source': 'pipeline_metadata' if file_stages_from_pipeline else 'heuristic' } + + if pattern == 'mixed': + num_classes = sum(1 for name in function_stages.keys() if '.' in name) + num_functions = len(function_stages) - num_classes + result['pattern_info'] = { + 'classes': num_classes // 2 if num_classes > 0 else 0, # Approximate class count + 'functions': num_functions, + 'recommendation': 'Consider splitting into separate modules for better maintainability' + } + + return result def _extract_functions(self, tree: ast.Module) -> Dict[str, ast.FunctionDef]: """Extract all functions and methods from AST.""" - functions = {} class FunctionVisitor(ast.NodeVisitor): def __init__(self): @@ -149,6 +190,60 @@ def visit_FunctionDef(self, node): visitor.visit(tree) return visitor.functions + def _classify_file_pattern(self, tree: ast.Module) -> str: + """ + Classify Python file structure pattern. + + Args: + tree: AST of the file + + Returns: + 'functions_only': Only top-level functions (functional style) + 'classes_only': Only classes with methods (OOP style) + 'mixed': Mix of classes and top-level functions (monolithic) + """ + has_classes = False + has_top_level_functions = False + + for node in ast.iter_child_nodes(tree): + if isinstance(node, ast.ClassDef): + has_classes = True + elif isinstance(node, ast.FunctionDef): + has_top_level_functions = True + + if has_classes and not has_top_level_functions: + return 'classes_only' + elif has_top_level_functions and not has_classes: + return 'functions_only' + elif has_classes and has_top_level_functions: + return 'mixed' + else: + return 'functions_only' # Default + + def _get_file_stages_from_pipeline(self, file_path: str) -> Set[str]: + """ + Get stages for a file from pipeline metadata. + + Args: + file_path: Relative path to the file + + Returns: + Set of stage names detected by PipelineAnalyzer, or empty set + """ + pipeline_metadata = self.context.get_pipeline_metadata() + if not pipeline_metadata: + return set() + + detected_stages = pipeline_metadata.get("detected_stages", {}) + file_stages = set() + + for stage_name, file_list in detected_stages.items(): + for file_info in file_list: + if file_info["file"] == file_path: + file_stages.add(stage_name) + + return file_stages + def _detect_stages(self, func_node: ast.FunctionDef, file_path: str) -> Set[str]: """Detect ML pipeline stages in a function.""" detected_stages = set() @@ -177,7 +272,6 @@ def _detect_stages(self, func_node: ast.FunctionDef, file_path: str) -> Set[str] if stage_name in detected_stages: continue - # Check imports for node in ast.walk(func_node): if isinstance(node, ast.Import): for alias in node.names: @@ -208,14 +302,75 @@ def _determine_cohesion_level(self, unique_stages: int, unique_phases: int) -> s else: return 'low' + def _calculate_weighted_score(self, results: Dict) -> float: + """ + Calculate weighted score based on cohesion and code patterns. + + Pattern weights: + - classes_only: 1.0 (best practice - OOP design) + - functions_only: 0.9 (acceptable - functional style) + - mixed: 0.7 (anti-pattern - penalized) + + Cohesion scores: + - high: 10 points + - medium: 6 points + - low: 3 points + """ + pattern_weights = { + 'classes_only': 1.0, + 'functions_only': 0.9, + 'mixed': 0.7 + } + + cohesion_scores = { + 'high': 10, + 'medium': 6, + 'low': 3 + } + + total_weighted_score = 0 + total_weight = 0 + + for file_data in results['files'].values(): + pattern = file_data.get('pattern', 'functions_only') + cohesion = file_data.get('cohesion_level', 'high') + + base_score = cohesion_scores.get(cohesion, 10) + weight = pattern_weights.get(pattern, 1.0) + + total_weighted_score += base_score * weight + total_weight += 10 * weight # Max possible per file + + if total_weight > 0: + return round((total_weighted_score / total_weight) * 10, 2) + return 0 + def _generate_messages(self, results: Dict) -> List[str]: """Generate human-readable messages.""" messages = [] summary = results['summary'] - messages.append( - f"Analyzed {summary['total_files']} files for functional pipeline cohesion" - ) + if summary.get('ml_files_only', False): + messages.append( + f"✓ Analyzed {summary['total_files']} ML pipeline files (using pipeline detection)" + ) + else: + messages.append( + f"Analyzed {summary['total_files']} Python files (no pipeline metadata available)" + ) + + by_pattern = summary.get('by_pattern', {}) + if any(by_pattern.values()): + pattern_info = [] + if by_pattern.get('functions_only', 0) > 0: + pattern_info.append(f"{by_pattern['functions_only']} functional") + if by_pattern.get('classes_only', 0) > 0: + pattern_info.append(f"{by_pattern['classes_only']} OOP") + if by_pattern.get('mixed', 0) > 0: + pattern_info.append(f"{by_pattern['mixed']} mixed") + + if pattern_info: + messages.append(f"Pattern distribution: {', '.join(pattern_info)}") if summary['high_cohesion'] > 0: messages.append( @@ -239,6 +394,31 @@ def _generate_messages(self, results: Dict) -> List[str]: if low_cohesion_files: messages.append("Files needing refactoring:") for fp in low_cohesion_files[:5]: - messages.append(f" - {fp}") + file_data = results['files'][fp] + pattern = file_data.get('pattern', 'unknown') + stages = ', '.join(file_data.get('stages_detected', [])) + messages.append(f" - {fp} ({pattern} pattern, stages: {stages})") + + # Anti-pattern detection: mixed files + mixed_files = [ + fp for fp, data in results['files'].items() + if data.get('pattern') == 'mixed' + ] + if mixed_files: + messages.append(f"⚠ Anti-pattern detected: {len(mixed_files)} file(s) use mixed pattern") + messages.append(" Recommendation: Separate classes and functions into distinct modules") + for fp in mixed_files[:3]: + file_data = results['files'][fp] + if 'pattern_info' in file_data: + info = file_data['pattern_info'] + messages.append(f" - {fp}: {info.get('classes', 0)} classes + {info.get('functions', 0)} functions") + + # Best practices recognition + oop_files = [ + fp for fp, data in results['files'].items() + if data.get('pattern') == 'classes_only' and data.get('cohesion_level') == 'high' + ] + if oop_files: + messages.append(f"✓ {len(oop_files)} file(s) follow OOP best practices with high cohesion") return messages diff --git a/src/analyzers/pfp_analyzer.py b/src/analyzers/pfp_analyzer.py new file mode 100644 index 0000000..6c9f5ef --- /dev/null +++ b/src/analyzers/pfp_analyzer.py @@ -0,0 +1,224 @@ +import os +from typing import Dict, List, Set, Any +from collections import defaultdict + +from core.models.analysis_result import AnalysisResult +from analyzers.base_analyzer import BaseAnalyzer +from core.exceptions import AnalyzerError + +class PFPAnalyzer(BaseAnalyzer): + """ + Analyzes Package Functional Purity (PFP). + This metric measures how focused a package is on a specific ML pipeline function. + It depends on the results of the FPCAnalyzer. + """ + + + ETAPAS_MAX = 5 + + @property + def analyzer_id(self) -> str: + return "PFP" + + def analyze(self) -> AnalysisResult: + """ + Runs the PFP analysis for all packages in the project. + + It relies on the FPC analysis results being available in the context. + """ + + if not self._is_fpc_data_available(): + raise AnalyzerError( + "PFP analysis requires FPC analysis to be run first. " + "Please include 'fpc' in the list of analyzers." + ) + + packages = self._discover_packages() + package_results = {} + total_pfp_score = 0 + + if not packages: + return AnalysisResult( + analyzer_id=self.analyzer_id, + score=10.0, + message_count={}, + module_count=0, + details={"message": "No Python packages found to analyze."} + ) + + for pkg_path, modules in packages.items(): + + package_results[pkg_path] = self._analyze_package(modules) + total_pfp_score += package_results[pkg_path]['pfp_score'] + + + average_pfp = total_pfp_score / len(packages) + final_score = round(average_pfp * 10, 2) + + feedback = self._generate_feedback(package_results) + + return AnalysisResult( + analyzer_id=self.analyzer_id, + score=final_score, + message_count=self._generate_summary(package_results), + module_count=len(self.context.get_all_python_files()), + details={ + "packages": package_results, + + } + # details={ + # "packages": package_results, + # "feedback": feedback, + # "summary": { + # "total_packages": len(packages), + # "average_pfp": round(average_pfp, 4), + # "packages_needing_attention": len([p for p in package_results.values() if p['pfp_score'] < 0.6]) + # } + # } + ) + + def _analyze_package(self, modules: List[str]) -> Dict[str, Any]: + """Calculates PFP for a single package.""" + n_total = len(modules) + n_ml = 0 + all_stages: Set[str] = set() + + + for module_path in modules: + fpc_result = self.context.get_file_metric(module_path, 'fpc') + + + + + if fpc_result and fpc_result.get('stages_detected'): + + n_ml += 1 + all_stages.update(fpc_result['stages_detected']) + + + n_etapas = len(all_stages) + cf = 1.0 + if self.ETAPAS_MAX > 1 and n_etapas > 1: + cf = 1 - ((n_etapas - 1) / (self.ETAPAS_MAX - 1)) + + + pfp_score = 0 + if n_total > 0: + pfp_score = (n_ml / n_total) * cf + + return { + "total_modules": n_total, + "ml_modules": n_ml, + "unique_stages_found": n_etapas, + "concentration_factor": round(cf, 4), + "stage_types": sorted(list(all_stages)), + "pfp_score": round(pfp_score, 4), + "purity_level": self._get_purity_level(pfp_score) + } + + def _discover_packages(self) -> Dict[str, List[str]]: + """ + Identifies packages and their contained modules. + A package is a directory containing Python files. + """ + packages: Dict[str, List[str]] = defaultdict(list) + python_files = self.context.get_all_python_files() + + for file_path in python_files: + if file_path.endswith('__init__.py'): + continue + + package_path = os.path.dirname(file_path) or '.' + packages[package_path].append(file_path) + + return dict(packages) + + def _is_fpc_data_available(self) -> bool: + """Checks if any file has FPC data in the context.""" + for py_file in self.context.get_all_python_files(): + if self.context.has_file_metric(py_file, 'fpc'): + return True + return False + + def _get_purity_level(self, score: float) -> str: + """Determines a qualitative purity level from a PFP score.""" + if score > 0.8: + return "High" + if score >= 0.6: + return "Moderate" + if score >= 0.4: + return "Low" + return "Very Low" + + def _generate_summary(self, results: Dict) -> Dict: + summary = {"High": 0, "Moderate": 0, "Low": 0, "Very Low": 0} + for data in results.values(): + level = data['purity_level'] + summary[level] += 1 + return summary + + def _generate_feedback(self, results: Dict) -> List[Dict[str, Any]]: + feedback = [] + + for pkg_path, data in results.items(): + pfp_score = data['pfp_score'] + purity_level = data['purity_level'] + + if pfp_score < 0.6: + issue = { + "package": pkg_path, + "purity_level": purity_level, + "pfp_score": pfp_score, + "total_modules": data['total_modules'], + "ml_modules": data['ml_modules'], + "stages_detected": data['stage_types'], + "recommendations": self._generate_recommendations(data) + } + feedback.append(issue) + + return sorted(feedback, key=lambda x: x['pfp_score']) + + def _generate_recommendations(self, package_data: Dict) -> List[str]: + recommendations = [] + + pfp_score = package_data['pfp_score'] + n_ml = package_data['ml_modules'] + n_total = package_data['total_modules'] + n_stages = package_data['unique_stages_found'] + + if n_ml == 0: + recommendations.append("This package contains no ML-related modules. Consider moving it or documenting its purpose.") + elif n_ml < n_total * 0.5: + recommendations.append(f"Only {n_ml}/{n_total} modules are ML-related. Consider separating non-ML code into another package.") + + if n_stages > 2: + recommendations.append(f"Package spans {n_stages} different pipeline stages. Consider splitting into more focused packages.") + + if n_stages > 1: + stages_list = ", ".join(package_data['stage_types']) + recommendations.append(f"Mixed stages: {stages_list}. Separate by single responsibility.") + + if pfp_score < 0.4: + improvement_needed = ((0.6 - pfp_score) / 0.6 * 100) + recommendations.append( + f" HIGH PRIORITY: PFP score of {pfp_score:.2f} indicates poor cohesion. " + f"IMPACT: Requires {improvement_needed:.0f}% improvement to reach acceptable levels. " + f"ACTION: Immediate refactoring required - start by separating stages into dedicated packages." + ) + elif pfp_score < 0.6: + improvement_needed = ((0.6 - pfp_score) / 0.6 * 100) + recommendations.append( + f" MEDIUM PRIORITY: PFP score of {pfp_score:.2f} is below recommended threshold (0.6). " + f"IMPACT: {improvement_needed:.0f}% improvement needed for good cohesion. " + f"ACTION: Review module distribution and consider consolidating ML logic or removing non-ML modules." + ) + + if n_stages > 0 and n_ml > 0: + avg_modules_per_stage = n_ml / n_stages + if avg_modules_per_stage < 2 and n_stages > 1: + recommendations.append( + f" INSIGHT: Average of {avg_modules_per_stage:.1f} ML module(s) per stage suggests thin distribution. " + f"SUGGESTION: Either combine related stages or ensure each stage has sufficient implementation depth." + ) + + return recommendations \ No newline at end of file diff --git a/src/analyzers/pipeline_analyzer.py b/src/analyzers/pipeline_analyzer.py index 656e638..926e43d 100644 --- a/src/analyzers/pipeline_analyzer.py +++ b/src/analyzers/pipeline_analyzer.py @@ -5,24 +5,14 @@ from collections import defaultdict from core.models.analysis_result import AnalysisResult -from core.models.pipeline_overrides import PipelineOverrides # ✅ CORRECCIÓN #5 +from core.models.pipeline_overrides import PipelineOverrides from analyzers.base_analyzer import BaseAnalyzer class PipelineAnalyzer(BaseAnalyzer): - """Analyzes code structure to detect ML pipeline stages.""" def __init__(self, session_id: str, local_path: str, config_path: Optional[str] = None): - """ - Initialize PipelineAnalyzer. - - Args: - session_id: Unique session identifier - local_path: Path to extracted code - config_path: Optional path to pipeline_stages.json - """ super().__init__(session_id, local_path) - self._analyzer_id = "Pipeline Structure" # Required stages for a valid pipeline self.required_stages = { @@ -49,8 +39,7 @@ def __init__(self, session_id: str, local_path: str, config_path: Optional[str] @property def analyzer_id(self) -> str: - """Unique identifier for this analyzer.""" - return self._analyzer_id + return "pipeline_detection" def apply_overrides( self, @@ -192,8 +181,7 @@ def analyze(self) -> AnalysisResult: all_files = self._get_python_files() - return AnalysisResult( - analyzer_id=self.analyzer_id, + return self._create_result( score=10.0 if is_pipeline else 0.0, message_count={}, module_count=len(all_files), diff --git a/src/analyzers/pylint_analyzer.py b/src/analyzers/pylint_analyzer.py index e482780..c3036cc 100644 --- a/src/analyzers/pylint_analyzer.py +++ b/src/analyzers/pylint_analyzer.py @@ -11,22 +11,20 @@ logger = logging.getLogger(__name__) + class PyLintAnalyzer(BaseAnalyzer): - """PyLint code quality analyzer.""" @property def analyzer_id(self) -> str: - return "PyLint" + return "pylint_score" def analyze(self, code_path: str = None) -> AnalysisResult: - """Analyze code using PyLint.""" target_path = code_path or self.local_path try: json_output = self._run_pylint_analysis(target_path) - return AnalysisResult( - analyzer_id=self.analyzer_id, + return self._create_result( score=json_output["statistics"]["score"], message_count=json_output["statistics"]["messageTypeCount"], module_count=json_output["statistics"]["modulesLinted"], @@ -54,7 +52,6 @@ def _run_pylint_analysis(self, target_path: str) -> Dict[str, Any]: raise with self._change_to_project_dir(): - # After changing to project dir, use current directory folders = self._get_project_folders('.') for folder in folders: @@ -81,18 +78,14 @@ def _run_pylint_analysis(self, target_path: str) -> Dict[str, Any]: try: parsed = json.loads(result.stdout) - # With json2 format, pylint returns a dict with statistics - # No conversion needed if isinstance(parsed, dict) and "statistics" in parsed: logger.info(f"PyLint analysis completed: Score={parsed['statistics']['score']:.2f}, " f"Modules={parsed['statistics']['modulesLinted']}, " f"Messages={len(parsed.get('messages', []))}") return parsed - # Fallback: if still getting list format (shouldn't happen with json2) elif isinstance(parsed, list): logger.warning("PyLint returned list format, expected json2 dict format") - # Count message types message_counts = {"convention": 0, "refactor": 0, "warning": 0, "error": 0, "fatal": 0, "info": 0} modules = set() @@ -102,7 +95,6 @@ def _run_pylint_analysis(self, target_path: str) -> Dict[str, Any]: if "module" in msg: modules.add(msg["module"]) - # Calculate score penalties = ( message_counts["fatal"] * 10 + message_counts["error"] * 10 + @@ -112,7 +104,6 @@ def _run_pylint_analysis(self, target_path: str) -> Dict[str, Any]: ) score = max(0.0, 10.0 - penalties / max(len(modules), 1)) - # Create expected format return { "messages": parsed, "statistics": { @@ -143,13 +134,11 @@ def _run_pylint_analysis(self, target_path: str) -> Dict[str, Any]: logger.error(f"ERROR in folder {folder}: {e}", exc_info=True) continue # Try next folder - # If no valid output found raise AnalyzerError("No valid PyLint output generated") def _run_pylint_report(self, target_path: str) -> bytes: """Execute pylint for detailed report.""" with self._change_to_project_dir(): - # After changing to project dir, use current directory folders = self._get_project_folders('.') for folder in folders: diff --git a/src/analyzers/radon_cc_analyzer.py b/src/analyzers/radon_cc_analyzer.py index 5ee2f4a..a870596 100644 --- a/src/analyzers/radon_cc_analyzer.py +++ b/src/analyzers/radon_cc_analyzer.py @@ -6,24 +6,22 @@ from core.models.analysis_result import AnalysisResult from core.exceptions import AnalyzerError + class RadonCCAnalyzer(BaseAnalyzer): - """Radon Cyclomatic Complexity analyzer.""" @property def analyzer_id(self) -> str: - return "Radon - Complexity" + return "radon_cc" def analyze(self, code_path: str = None) -> AnalysisResult: - """Analyze cyclomatic complexity using Radon.""" target_path = code_path or self.local_path try: complexity_score, block_count = self._run_complexity_analysis(target_path) - return AnalysisResult( - analyzer_id=self.analyzer_id, + return self._create_result( score=complexity_score, - message_count={}, # Radon CC doesn't provide message counts + message_count={}, module_count=block_count, details={"complexity_method": "cyclomatic"} ) @@ -42,7 +40,6 @@ def generate_report(self, code_path: str = None) -> bytes: def _run_complexity_analysis(self, target_path: str) -> tuple: """Execute radon cc analysis.""" with self._change_to_project_dir(): - # After changing to project dir, use current directory folders = self._get_project_folders('.') for folder in folders: @@ -56,20 +53,16 @@ def _run_complexity_analysis(self, target_path: str) -> tuple: check=True ) - # Parse output: last two lines contain blocks and complexity lines = result.stdout.strip().split('\n') if len(lines) >= 2: blocks_line = lines[-2] complexity_line = lines[-1] - # Extract block count blocks = int(blocks_line.split()[0]) - # Extract complexity score complexity_str = complexity_line.split('(')[-1].rstrip(')') complexity = float(complexity_str) - # Calculate score (inverse relationship with complexity) score = round(10 / pow(complexity, 0.3), 2) return score, blocks @@ -82,7 +75,6 @@ def _run_complexity_analysis(self, target_path: str) -> tuple: def _run_complexity_report(self, target_path: str) -> bytes: """Generate detailed complexity report.""" with self._change_to_project_dir(): - # After changing to project dir, use current directory folders = self._get_project_folders('.') for folder in folders: diff --git a/src/analyzers/radon_mi_analyzer.py b/src/analyzers/radon_mi_analyzer.py index be7493d..8df741c 100644 --- a/src/analyzers/radon_mi_analyzer.py +++ b/src/analyzers/radon_mi_analyzer.py @@ -7,22 +7,20 @@ from core.models.analysis_result import AnalysisResult from core.exceptions import AnalyzerError + class RadonMIAnalyzer(BaseAnalyzer): - """Radon Maintainability Index analyzer.""" @property def analyzer_id(self) -> str: - return "Radon - Maintainability" + return "radon_mi" def analyze(self, code_path: str = None) -> AnalysisResult: - """Analyze maintainability using Radon.""" target_path = code_path or self.local_path try: mi_data = self._run_maintainability_analysis(target_path) - return AnalysisResult( - analyzer_id=self.analyzer_id, + return self._create_result( score=mi_data["average_score"], message_count=mi_data["rank_counts"], module_count=mi_data["module_count"], @@ -43,7 +41,6 @@ def generate_report(self, code_path: str = None) -> bytes: def _run_maintainability_analysis(self, target_path: str) -> Dict[str, Any]: """Execute radon mi analysis.""" with self._change_to_project_dir(): - # After changing to project dir, use current directory folders = self._get_project_folders('.') for folder in folders: @@ -68,7 +65,6 @@ def _run_maintainability_analysis(self, target_path: str) -> Dict[str, Any]: def _run_maintainability_report(self, target_path: str) -> bytes: """Generate detailed maintainability report.""" with self._change_to_project_dir(): - # After changing to project dir, use current directory folders = self._get_project_folders('.') for folder in folders: @@ -107,11 +103,9 @@ def _process_mi_data(self, json_data: Dict[str, Any]) -> Dict[str, Any]: mi_score = float(module_data['mi']) rank = module_data.get('rank', 'C') - # Update counters total_score += mi_score / 10.0 # Normalize to 0-10 scale valid_modules += 1 - # Count ranks if rank == 'A': rank_counts["Very High"] += 1 elif rank == 'B': @@ -119,7 +113,6 @@ def _process_mi_data(self, json_data: Dict[str, Any]) -> Dict[str, Any]: elif rank == 'C': rank_counts["Extremely low"] += 1 - # Store module details module_details[module_path] = { "mi_score": mi_score, "rank": rank diff --git a/src/api/middleware.py b/src/api/middleware.py index c25b1e7..1d9b950 100644 --- a/src/api/middleware.py +++ b/src/api/middleware.py @@ -5,7 +5,6 @@ import config.settings as config def setup_middleware(app: Flask) -> Flask: - """Configure middleware for the Flask app.""" CORS(app, supports_credentials=True, origins=['*']) app.config["CORS_HEADERS"] = ["Content-Type", "X-Requested-With", "X-CSRFToken"] @@ -21,12 +20,4 @@ def log_response(response): logging.info(f"Response: {response.status_code} - {duration:.3f}s") return response - @app.before_request - def validate_request(): - if request.method == 'POST' and request.path.startswith(f'{config.settings.API_PREFIX}'): - has_content = request.data or request.files or request.form - if not has_content and request.path != f'{config.settings.API_PREFIX}/analyzers': - from api.serializers import ResponseSerializer - return ResponseSerializer.error("Request body is required", 400) - return app diff --git a/src/api/routes.py b/src/api/routes.py index 662c5b6..4a32f6d 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -11,11 +11,13 @@ from analyzers.factory import AnalyzerFactory from core.tree_generator import TreeGenerator from core.models.pipeline_overrides import AnalysisRequest -from utils.validation import validate_analysis_request +from core.models.analysis_result import AnalysisResult +from core.metrics import get_metric_metadata +from utils.validation import validate_analysis_request, validate_zip_file +from core.analysis_context import AnalysisContext def create_routes(app: Flask) -> Flask: - """Create and configure API routes.""" @app.route(f'{config.settings.API_PREFIX}/upload-zip', methods=['POST']) @cross_origin() @@ -26,23 +28,10 @@ def upload_zip(): Returns session_id, tree_structure, and auto_detected_pipeline. """ try: - # Validate file upload - if 'file' not in request.files: - return ResponseSerializer.error("No file provided", 400) + app_zip, error = validate_zip_file(request) + if error: + return ResponseSerializer.error(error, 400) - file = request.files['file'] - if file.filename == '': - return ResponseSerializer.error("Empty filename", 400) - - if not file.filename.endswith('.zip'): - return ResponseSerializer.error("File must be a ZIP archive", 400) - - # Read ZIP content - app_zip = file.read() - if not app_zip: - return ResponseSerializer.error("Empty ZIP file", 400) - - # Create session session = SessionManager( app_zip=app_zip, base_path=config.settings.SESSION_BASE_PATH @@ -50,17 +39,14 @@ def upload_zip(): session.ensure_setup() - # Generate file tree tree_generator = TreeGenerator(session.local_path) tree_structure = tree_generator.generate() - # Auto-detect pipeline pipeline_analyzer = AnalyzerFactory.create_analyzer( "pipeline", session.session_id, session.local_path ) pipeline_result = pipeline_analyzer.analyze() - # Save session metadata session.save_session( tree_structure=tree_structure, auto_detected_pipeline=pipeline_result.details, @@ -86,7 +72,7 @@ def analyze_session(session_id: str): Expects JSON body: { - "analyzers": ["pylint", "radon_cc", "pipeline"], + "analyzers": ["pylint", "radon_cc", "pipeline",], "pipeline_overrides": { "file_stages": {"path/to/file.py": ["data_collection"]}, "excluded_files": ["tests/", "docs/"] @@ -94,11 +80,9 @@ def analyze_session(session_id: str): } """ try: - # Validate session exists if not SessionStorage.exists(session_id, config.settings.SESSION_BASE_PATH): return ResponseSerializer.error("Session not found or expired", 404) - # Parse and validate request data = request.get_json() if not data: return ResponseSerializer.error("Request body required", 400) @@ -108,21 +92,28 @@ def analyze_session(session_id: str): return ResponseSerializer.error(error_msg, 400) analysis_request = AnalysisRequest.from_dict(data) - # Load existing session + session = SessionManager.load_session( session_id, base_path=config.settings.SESSION_BASE_PATH ) + metadata = session.get_metadata() + pipeline_metadata = metadata.get("auto_detected_pipeline") + + shared_context = AnalysisContext( + session_id, + session.local_path, + pipeline_metadata=pipeline_metadata + ) + results = {} - # Handle pipeline analyzer with overrides if "pipeline" in analysis_request.analyzers: pipeline_analyzer = AnalyzerFactory.create_analyzer( - "pipeline", session_id, session.local_path + "pipeline", session_id, session.local_path, shared_context ) - # Apply overrides if provided if analysis_request.pipeline_overrides: metadata = session.get_metadata() auto_detected = metadata["auto_detected_pipeline"] @@ -132,53 +123,47 @@ def analyze_session(session_id: str): overrides=analysis_request.pipeline_overrides ) - results["pipeline"] = { - "score": 10.0 if modified["is_valid_pipeline"] else 0.0, - "message_count": {}, - "module_count": modified.get("files_analyzed", 0), - "details": modified - } + results["pipeline"] = AnalysisResult( + analyzer_id="pipeline_detection", + score=10.0 if modified["is_valid_pipeline"] else 0.0, + message_count={}, + module_count=modified.get("files_analyzed", 0), + metric_metadata=get_metric_metadata("pipeline_detection"), + details=modified + ) else: - # No overrides, use fresh analysis - result = pipeline_analyzer.analyze() - results["pipeline"] = { - "score": result.score, - "message_count": result.message_count, - "module_count": result.module_count, - "details": result.details - } + results["pipeline"] = pipeline_analyzer.analyze() - # Execute other analyzers for analyzer_type in analysis_request.analyzers: if analyzer_type == "pipeline": - continue # Already processed + continue analyzer = AnalyzerFactory.create_analyzer( - analyzer_type, session_id, session.local_path + analyzer_type, session_id, session.local_path, shared_context ) result = analyzer.analyze() - results[analyzer_type] = { - "score": result.score, - "message_count": result.message_count, - "module_count": result.module_count - } + results[analyzer_type] = result - # Save results to session - session.save_analysis_results(results) + serialized_results = { + key: value.to_dict() if isinstance(value, AnalysisResult) else value + for key, value in results.items() + } + + session.save_analysis_results(serialized_results) return ResponseSerializer.success({ "session_id": session_id, "timestamp": datetime.utcnow().isoformat() + "Z", - "results": results + "results": serialized_results }) except SessionError as e: return ResponseSerializer.error(f"Session error: {str(e)}", 400) except Exception as e: + print(e) return ResponseSerializer.error(f"Analysis failed: {str(e)}", 500) - # Legacy endpoint (deprecated) @app.route(f'{config.settings.API_PREFIX}/rate_app', methods=['POST']) @cross_origin() def rate_app(): @@ -193,20 +178,10 @@ def rate_app(): ) try: - # Validate file upload - if 'file' not in request.files: - return ResponseSerializer.error("No file provided", 400) - - file = request.files['file'] - if not file or file.filename == '': - return ResponseSerializer.error("No file selected", 400) - - # Read and validate ZIP - app_zip = file.read() - if not app_zip: - return ResponseSerializer.error("Empty file", 400) + app_zip, error = validate_zip_file(request) + if error: + return ResponseSerializer.error(error, 400) - # Create session with legacy analyzers session = SessionManager( app_zip=app_zip, analyzer_types=['pylint', 'radon_cc', 'radon_mi'], @@ -215,10 +190,8 @@ def rate_app(): session.ensure_setup() - # Run analysis results = session.run_analysis() - # Format legacy response legacy_response = { "session_id": session.session_id, "results": { diff --git a/src/api/serializers.py b/src/api/serializers.py index 084d74d..cceb4b3 100644 --- a/src/api/serializers.py +++ b/src/api/serializers.py @@ -1,22 +1,22 @@ from flask import jsonify from typing import Any, Dict +from core.models.analysis_result import AnalysisResult + class ResponseSerializer: - """Handles API response serialization.""" @staticmethod def success(data: Any, status_code: int = 200): - """Create success response.""" + serialized_data = ResponseSerializer._serialize_data(data) response = jsonify({ "success": True, - "data": data + "data": serialized_data }) response.status_code = status_code return response @staticmethod def error(message: str, status_code: int = 400, details: Dict = None): - """Create error response.""" error_data = {"message": message} if details: error_data["details"] = details @@ -27,3 +27,15 @@ def error(message: str, status_code: int = 400, details: Dict = None): }) response.status_code = status_code return response + + @staticmethod + def _serialize_data(data: Any) -> Any: + if isinstance(data, AnalysisResult): + return data.to_dict() + elif isinstance(data, dict): + return {key: ResponseSerializer._serialize_data(value) + for key, value in data.items()} + elif isinstance(data, (list, tuple)): + return [ResponseSerializer._serialize_data(item) for item in data] + else: + return data diff --git a/src/config/pipeline_stages.json b/src/config/pipeline_stages.json index 51c105c..2a8cab0 100644 --- a/src/config/pipeline_stages.json +++ b/src/config/pipeline_stages.json @@ -1,8 +1,7 @@ { "phases": { - "data_engineering": ["data_collection", "data_cleaning"], + "data_engineering": ["data_collection", "data_cleaning", "feature_engineering"], "model_development": [ - "feature_engineering", "model_training", "model_evaluation" ] @@ -10,75 +9,29 @@ "stages": { "data_collection": { "filename_patterns": [ - "collect", - "scrape", - "loader", - "fetch", - "collection", - "ingest", - "extract", - "download", - "import", - "acquisition" + "collect", "scrape", "loader", "fetch", "ingest", "extract", "download" ], "keywords": [ + "pd.read_csv", + "pd.read_excel", + "pd.read_json", + "pd.read_sql", + "pd.read_parquet", "load_dataset", - "load_data", - "get_data", + "load_iris", + "fetch_data", - "download_data", - "scrape", - "crawl", - "api.get", "requests.get", - "requests.post", "BeautifulSoup", - "selenium", - "webdriver", - "read_sql", - "read_database", - "execute(", - "query(", - "connect(", - "cursor", - "MongoClient", - "find(", - "aggregate(", - "S3", - "boto3", - "download_file", - "get_object", - "read_parquet", - "from_records" + "boto3.client" ], "imports": [ - "requests", - "beautifulsoup4", - "bs4", - "selenium", - "scrapy", - "sqlalchemy", - "pymongo", - "psycopg2", - "mysql.connector", - "sqlite3", - "boto3", - "s3fs", - "pyarrow", - "fastparquet", - "sklearn.datasets", - "kaggle" + "requests", "selenium", "scrapy", "boto3", "sklearn.datasets" ] }, "data_cleaning": { "filename_patterns": [ - "clean", - "prep", - "preprocess", - "cleaning", - "sanitize", - "wash", - "process" + "clean", "prep", "preprocess", "sanitize", "join" ], "keywords": [ "dropna", @@ -87,34 +40,14 @@ "isna", "notnull", "drop_duplicates", - "duplicated", - "handle_missing", "impute", - "clean_data", - "clean_text", - "clean_string", - "remove_outliers", - "detect_outliers", - "replace_null", - "replace_nan", - "fill_missing", - "interpolate", - "forward_fill", - "backward_fill", - "standardize_", - "normalize_", - "missing_values", - "handle_nulls", - "strip(", - "lower(", - "upper(", - "remove_whitespace", - "remove_duplicates", - "validate_data" + ".map(", + ".join(", + ".merge(" ], "imports": [] }, - "feature_engineering": { + "feature_engineering": { "filename_patterns": [ "feature", "transform", @@ -125,48 +58,25 @@ "selector" ], "keywords": [ + "fit_transform", + ".transform(", + ".drop(", + "pd.concat", + "get_dummies", + "feature_selection", + "create_features", + "extract_features", + "generate_features", "OneHotEncoder", "LabelEncoder", - "OrdinalEncoder", - "TargetEncoder", - "BinaryEncoder", "StandardScaler", "MinMaxScaler", "RobustScaler", - "MaxAbsScaler", - "Normalizer", - "QuantileTransformer", - "PowerTransformer", - "PolynomialFeatures", - "SplineTransformer", "PCA", - "TruncatedSVD", - "FactorAnalysis", "SelectKBest", - "SelectPercentile", - "RFE", - "RFECV", "VarianceThreshold", - "feature_selection", - "create_features", - "extract_features", - "generate_features", - "engineer_features", - "build_features", - "fit_transform", - "transform(", - "get_dummies", - "factorize", - "cut(", - "qcut(", "TfidfVectorizer", - "CountVectorizer", - "HashingVectorizer", - "Word2Vec", - "text_to_sequence", - "tokenize", - "lemmatize", - "stemming" + "CountVectorizer" ], "imports": [ "sklearn.preprocessing", @@ -174,10 +84,7 @@ "sklearn.decomposition", "sklearn.feature_selection", "category_encoders", - "feature_engine", - "gensim", - "nltk", - "spacy" + "feature_engine" ] }, "model_training": { @@ -188,155 +95,61 @@ "training", "trainer", "learn", - "build" + "build", + "optimizer" ], "keywords": [ "model.fit", - "fit(", - "train_model", - "build_model", - "create_model", + "model.train", "train_test_split", "cross_val_score", - "cross_validate", - "KFold", - "StratifiedKFold", "GridSearchCV", "RandomizedSearchCV", - "BayesSearchCV", - "HalvingGridSearchCV", "RandomForestClassifier", "RandomForestRegressor", "XGBClassifier", - "XGBRegressor", "LGBMClassifier", - "LGBMRegressor", - "CatBoostClassifier", - "CatBoostRegressor", - "GradientBoostingClassifier", - "GradientBoostingRegressor", - "AdaBoostClassifier", "LinearRegression", "LogisticRegression", - "Ridge", - "Lasso", - "ElasticNet", - "SGDClassifier", - "SGDRegressor", "SVC", "SVR", "KNeighborsClassifier", - "KNeighborsRegressor", "DecisionTreeClassifier", - "DecisionTreeRegressor", - "MLPClassifier", - "MLPRegressor", - "GaussianNB", - "MultinomialNB", "Sequential", - "Dense(", - "Conv2D", - "LSTM", - "GRU", - "Transformer", - "compile(", "model.compile", - "fit_generator", - "train_on_batch", - "AutoML", - "Pipeline(" + "AdamsOptimizer" ], "imports": [ "sklearn.ensemble", "sklearn.linear_model", "sklearn.svm", - "sklearn.neighbors", - "sklearn.tree", - "sklearn.naive_bayes", - "sklearn.neural_network", - "sklearn.pipeline", "sklearn.model_selection", "tensorflow", - "tensorflow.keras", "keras", "torch", - "torch.nn", "xgboost", "lightgbm", "catboost", - "h2o", - "autokeras", - "auto-sklearn", - "pycaret" + "ultralytics" ] }, "model_evaluation": { "filename_patterns": [ - "eval", - "test", - "assess", - "metric", - "score", - "validation", - "validate", - "measure" + "eval", "test", "assess", "metric", "score", "validate" ], "keywords": [ "accuracy_score", "classification_report", "confusion_matrix", - "precision_score", - "recall_score", - "f1_score", "roc_auc_score", - "roc_curve", - "auc", - "precision_recall_curve", - "average_precision_score", - "log_loss", - "hinge_loss", - "matthews_corrcoef", - "cohen_kappa_score", "mean_squared_error", - "mean_absolute_error", - "mean_absolute_percentage_error", - "root_mean_squared_error", "r2_score", - "explained_variance_score", - "median_absolute_error", - "max_error", - "evaluate(", "model.evaluate", "model.score", - "predict(", - "predict_proba", - "cross_val_score", - "cross_validate", - "learning_curve", - "validation_curve", - "plot_confusion_matrix", - "plot_roc_curve", - "plot_precision_recall_curve", - "ConfusionMatrixDisplay", - "RocCurveDisplay", - "PrecisionRecallDisplay", - "silhouette_score", - "davies_bouldin_score", - "calinski_harabasz_score", - "adjusted_rand_score", - "normalized_mutual_info_score", - "v_measure_score", - "homogeneity_score", - "completeness_score" + "model.predict" ], "imports": [ - "sklearn.metrics", - "sklearn.model_selection", - "tensorflow.keras.metrics", - "torchmetrics", - "mlflow", - "wandb", - "tensorboard" + "sklearn.metrics", "mlflow" ] } } diff --git a/src/config/settings.py b/src/config/settings.py index df7aca0..b44727f 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -4,6 +4,7 @@ class Settings: """Application configuration settings.""" + # Server settings HOST: str = os.getenv('HOST', '0.0.0.0') PORT: int = int(os.getenv('PORT', '5060')) diff --git a/src/core/analysis_context.py b/src/core/analysis_context.py index ecb9378..5bb89b1 100644 --- a/src/core/analysis_context.py +++ b/src/core/analysis_context.py @@ -25,18 +25,20 @@ class AnalysisContext: This prevents redundant parsing and computation across analyzers. """ - def __init__(self, session_id: str, local_path: str): + def __init__(self, session_id: str, local_path: str, pipeline_metadata: Optional[Dict[str, Any]] = None): """ Initialize AnalysisContext. Args: session_id: Unique session identifier local_path: Path to extracted code + pipeline_metadata: Optional pipeline detection results from PipelineAnalyzer """ self.session_id = session_id self.local_path = local_path self._file_cache: Dict[str, FileAnalysisCache] = {} self._global_metrics: Dict[str, Any] = {} + self._pipeline_metadata: Optional[Dict[str, Any]] = pipeline_metadata def get_file_ast(self, file_path: str) -> Optional[ast.Module]: """ @@ -184,4 +186,67 @@ def get_all_python_files(self) -> List[str]: def clear_cache(self) -> None: """Clear all cached data.""" self._file_cache.clear() - self._global_metrics.clear() \ No newline at end of file + self._global_metrics.clear() + + def get_pipeline_metadata(self) -> Optional[Dict[str, Any]]: + """ + Get pipeline detection metadata. + + Returns: + Pipeline metadata from PipelineAnalyzer or None if not available + """ + return self._pipeline_metadata + + def get_ml_files_by_stage(self, stage: Optional[str] = None) -> Dict[str, List[str]]: + """ + Get ML-related files grouped by pipeline stage. + + Args: + stage: Optional stage name to filter by. If None, returns all stages. + + Returns: + Dictionary mapping stage names to list of file paths. + If stage is specified, returns dict with single key. + Returns empty dict if no pipeline metadata is available. + """ + if not self._pipeline_metadata: + return {} + + detected_stages = self._pipeline_metadata.get("detected_stages", {}) + + if stage: + if stage in detected_stages: + files = [ + file_info["file"] + for file_info in detected_stages[stage] + ] + return {stage: files} + return {} + + result = {} + for stage_name, file_list in detected_stages.items(): + result[stage_name] = [ + file_info["file"] + for file_info in file_list + ] + return result + + def get_all_ml_files(self) -> List[str]: + """ + Get all files detected as part of the ML pipeline. + + Returns: + List of file paths that are part of the ML pipeline. + Returns empty list if no pipeline metadata is available. + """ + if not self._pipeline_metadata: + return [] + + ml_files = set() + detected_stages = self._pipeline_metadata.get("detected_stages", {}) + + for file_list in detected_stages.values(): + for file_info in file_list: + ml_files.add(file_info["file"]) + + return list(ml_files) diff --git a/src/core/metrics/README.md b/src/core/metrics/README.md new file mode 100644 index 0000000..eb46c47 --- /dev/null +++ b/src/core/metrics/README.md @@ -0,0 +1,77 @@ +# Metric Metadata System + +Sistema modular para documentación de métricas de análisis. + +## Estructura + +``` +src/core/metrics/ +├── __init__.py # Exports +├── metadata.py # MetricMetadata dataclass +└── registry.py # METRICS_REGISTRY +``` + +## Uso en Analizadores + +```python +from core.metrics import get_metric_metadata +from core.models.analysis_result import AnalysisResult + +def analyze(self): + return AnalysisResult( + analyzer_id="My Analyzer", + score=7.5, + message_count={}, + module_count=10, + metric_metadata=get_metric_metadata("metric_id"), + details={} + ) +``` + +## Agregar Nueva Métrica + +Editar `registry.py`: + +```python +METRICS_REGISTRY = { + "my_metric": MetricMetadata( + metric_id="my_metric", + name="My Metric", + description="What it measures", + formula="x + y", + ideal_range={"min": 0, "max": 10}, + interpretation={"low": "Bad", "high": "Good"}, + references=["https://..."], + category="quality", + unit="score" + ), +} +``` + +## Respuesta API + +```json +{ + "success": true, + "data": { + "score": 7.5, + "module_count": 10, + "documentation": { + "name": "Cyclomatic Complexity", + "description": "...", + "formula": "CC = E - N + 2P", + "ideal_range": {"min": 1, "max": 10}, + "interpretation": {...}, + "references": [...] + } + } +} +``` + +## Métricas Disponibles + +- `radon_cc` - Cyclomatic Complexity +- `radon_mi` - Maintainability Index +- `pylint_score` - Code Quality Score +- `fpc` - Functional Pipeline Cohesion +- `pipeline_detection` - ML Pipeline Detection diff --git a/src/core/metrics/__init__.py b/src/core/metrics/__init__.py new file mode 100644 index 0000000..4d7806b --- /dev/null +++ b/src/core/metrics/__init__.py @@ -0,0 +1,4 @@ +from .metadata import MetricMetadata +from .registry import METRICS_REGISTRY, get_metric_metadata + +__all__ = ['MetricMetadata', 'METRICS_REGISTRY', 'get_metric_metadata'] diff --git a/src/core/metrics/metadata.py b/src/core/metrics/metadata.py new file mode 100644 index 0000000..e688203 --- /dev/null +++ b/src/core/metrics/metadata.py @@ -0,0 +1,28 @@ +from dataclasses import dataclass, field +from typing import Dict, List, Any, Optional + + +@dataclass(frozen=True) +class MetricMetadata: + metric_id: str + name: str + description: str + formula: Optional[str] = None + ideal_range: Dict[str, Any] = field(default_factory=dict) + interpretation: Dict[str, str] = field(default_factory=dict) + references: List[str] = field(default_factory=list) + category: str = "general" + unit: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "metric_id": self.metric_id, + "name": self.name, + "description": self.description, + "formula": self.formula, + "ideal_range": self.ideal_range, + "interpretation": self.interpretation, + "references": self.references, + "category": self.category, + "unit": self.unit + } diff --git a/src/core/metrics/registry.py b/src/core/metrics/registry.py new file mode 100644 index 0000000..c9184ee --- /dev/null +++ b/src/core/metrics/registry.py @@ -0,0 +1,130 @@ +from typing import Dict +from .metadata import MetricMetadata + + +METRICS_REGISTRY: Dict[str, MetricMetadata] = { + "radon_cc": MetricMetadata( + metric_id="radon_cc", + name="Cyclomatic Complexity", + description=( + "Measures the cyclomatic complexity of code, representing the number of " + "independent paths through the code. Higher complexity indicates more " + "difficult code to test and maintain." + ), + formula="CC = E - N + 2P (where E=edges, N=nodes, P=connected components)", + ideal_range={"min": 1, "max": 10, "optimal": "1-5", "acceptable": "6-10", "warning": ">10"}, + interpretation={ + "1-5": "Simple code, easy to understand and maintain", + "6-10": "Moderate complexity, acceptable but monitor", + "11-20": "High complexity, consider refactoring", + ">20": "Very high complexity, refactoring recommended" + }, + references=[ + "https://radon.readthedocs.io/en/latest/intro.html", + "https://en.wikipedia.org/wiki/Cyclomatic_complexity" + ], + category="complexity", + unit="score" + ), + + "radon_mi": MetricMetadata( + metric_id="radon_mi", + name="Maintainability Index", + description=( + "Composite metric calculating maintainability based on Halstead Volume, " + "Cyclomatic Complexity, and Lines of Code. Higher values indicate better maintainability." + ), + formula=( + "MI = 171 - 5.2 * ln(Halstead Volume) - 0.23 * (Cyclomatic Complexity) - " + "16.2 * ln(Lines of Code)" + ), + ideal_range={"min": 0, "max": 100, "optimal": ">20", "warning": "<10"}, + interpretation={ + "A (20-100)": "High maintainability - easy to maintain", + "B (10-19)": "Medium maintainability - acceptable but can improve", + "C (0-9)": "Low maintainability - refactoring strongly recommended" + }, + references=[ + "https://radon.readthedocs.io/en/latest/intro.html", + "https://www.verifysoft.com/en_maintainability.html" + ], + category="maintainability", + unit="index" + ), + + "pylint_score": MetricMetadata( + metric_id="pylint_score", + name="PyLint Code Quality Score", + description=( + "Overall code quality score based on static analysis. Evaluates code against " + "PEP 8 style guide, detects errors, enforces coding standards, and finds code smells." + ), + formula=( + "Score = 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)" + ), + ideal_range={"min": -float('inf'), "max": 10.0, "optimal": ">8.0", "acceptable": "7.0-8.0", "warning": "<7.0"}, + interpretation={ + "9.0-10.0": "Excellent - very few issues detected", + "8.0-8.9": "Good - minor improvements possible", + "7.0-7.9": "Acceptable - consider addressing warnings", + "5.0-6.9": "Needs improvement - multiple issues found", + "<5.0": "Poor - significant refactoring needed" + }, + references=[ + "https://pylint.pycqa.org/en/latest/", + "https://peps.python.org/pep-0008/" + ], + category="quality", + unit="score" + ), + + "fpc": MetricMetadata( + metric_id="fpc", + name="Functional Pipeline Cohesion", + description=( + "Measures cohesion of ML pipeline code by analyzing how well functions and classes " + "are organized around specific ML pipeline stages. Higher cohesion indicates better " + "organized and more maintainable ML code." + ), + formula=( + "FPC = (Number of cohesive modules / Total modules) * 10. " + "A module is cohesive when its functions/methods belong to the same pipeline stage or phase." + ), + ideal_range={"min": 0, "max": 10, "optimal": ">7.0", "acceptable": "5.0-7.0", "warning": "<5.0"}, + interpretation={ + "high (7.0-10.0)": "Well-organized ML pipeline code with clear separation of concerns", + "medium (4.0-6.9)": "Code organization is acceptable but could benefit from better structure", + "low (0-3.9)": "Poorly organized code, consider restructuring around ML pipeline stages" + }, + references=["https://github.com/MLS-Toobox/mls_code_generator"], + category="cohesion", + unit="score" + ), + + "pipeline_detection": MetricMetadata( + metric_id="pipeline_detection", + name="ML Pipeline Detection", + description=( + "Detects and maps ML pipeline stages in the codebase. Identifies which files and " + "functions belong to different stages and provides insights into pipeline structure." + ), + formula=None, + ideal_range={}, + interpretation={ + "comprehensive": "All major ML pipeline stages detected", + "partial": "Some pipeline stages detected, others may be missing", + "minimal": "Few or no ML pipeline patterns detected" + }, + references=["https://github.com/MLS-Toobox/mls_code_generator"], + category="detection", + unit="detection" + ), +} + + +def get_metric_metadata(metric_id: str) -> MetricMetadata: + if metric_id not in METRICS_REGISTRY: + raise KeyError( + f"Metric '{metric_id}' not found. Available: {list(METRICS_REGISTRY.keys())}" + ) + return METRICS_REGISTRY[metric_id] diff --git a/src/core/models/analysis_result.py b/src/core/models/analysis_result.py index fb9b179..95fbd33 100644 --- a/src/core/models/analysis_result.py +++ b/src/core/models/analysis_result.py @@ -1,20 +1,48 @@ from dataclasses import dataclass -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, TYPE_CHECKING from datetime import datetime +if TYPE_CHECKING: + from core.metrics import MetricMetadata + + @dataclass(frozen=True) class AnalysisResult: - """Immutable result from code analysis.""" analyzer_id: str score: float message_count: Dict[str, Any] module_count: int + metric_metadata: 'MetricMetadata' details: Optional[Dict[str, Any]] = None timestamp: datetime = None def __post_init__(self): if self.timestamp is None: object.__setattr__(self, 'timestamp', datetime.now()) + + def to_dict(self) -> Dict[str, Any]: + return { + "analyzer_id": self.analyzer_id, + "score": self.score, + "message_count": self._serialize_value(self.message_count), + "module_count": self.module_count, + "details": self._serialize_value(self.details), + "timestamp": self.timestamp.isoformat() if self.timestamp else None, + "documentation": self.metric_metadata.to_dict() + } + + def _serialize_value(self, value: Any) -> Any: + """Convert non-JSON-serializable types to serializable ones.""" + if isinstance(value, set): + return list(value) + elif isinstance(value, dict): + return {k: self._serialize_value(v) for k, v in value.items()} + elif isinstance(value, (list, tuple)): + return [self._serialize_value(item) for item in value] + elif isinstance(value, datetime): + return value.isoformat() + else: + return value @dataclass(frozen=True) class SessionResult: diff --git a/src/core/tree_generator.py b/src/core/tree_generator.py index 9aa2d27..ce3907f 100644 --- a/src/core/tree_generator.py +++ b/src/core/tree_generator.py @@ -6,7 +6,6 @@ class TreeGenerator: """Generates hierarchical file tree structure.""" - # Patterns to exclude from tree EXCLUDED_PATTERNS = { '__pycache__', '.git', '.pyc', '.pyo', '.pyd', 'node_modules', 'venv', 'env', '.vscode', '.idea', @@ -53,18 +52,15 @@ def _build_tree(self, path: str, relative_path: str) -> Dict: if os.path.isfile(path): node["size"] = os.path.getsize(path) - # Validate Python syntax if path.endswith('.py'): node["valid_syntax"] = self._is_valid_python(path) else: - # Directory node children = [] try: entries = sorted(os.listdir(path)) for entry in entries: - # Skip excluded patterns if self._should_exclude(entry): continue @@ -92,11 +88,9 @@ def _should_exclude(self, name: str) -> bool: Returns: True if should be excluded """ - # Check exact match if name in self.EXCLUDED_PATTERNS: return True - # Check patterns for pattern in self.EXCLUDED_PATTERNS: if pattern in name: return True diff --git a/src/server.py b/src/server.py index 8335342..e78d29d 100644 --- a/src/server.py +++ b/src/server.py @@ -16,10 +16,8 @@ def create_app() -> Flask: """Create and configure Flask application.""" app = Flask(__name__) - # Setup middleware app = setup_middleware(app) - # Setup routes app = create_routes(app) start_scheduler( @@ -41,11 +39,12 @@ def create_app() -> Flask: } ) - if settings.DEBUG: + if True: app.run( host=settings.HOST, port=settings.PORT, - debug=True + debug=True, + use_reloader=True ) else: serve(app, host=settings.HOST, port=settings.PORT) \ No newline at end of file diff --git a/src/session/cleanup_scheduler.py b/src/session/cleanup_scheduler.py index c17ddd7..d5acea0 100644 --- a/src/session/cleanup_scheduler.py +++ b/src/session/cleanup_scheduler.py @@ -61,7 +61,6 @@ def _run(self) -> None: except Exception as e: logger.error(f"Cleanup failed: {str(e)}", exc_info=True) - # Sleep in small intervals to allow quick shutdown sleep_time = self.interval_minutes * 60 elapsed = 0 while elapsed < sleep_time and self.running: @@ -82,11 +81,9 @@ def _cleanup_expired_sessions(self) -> None: if not os.path.isdir(session_dir): continue - # Check if session is expired metadata = SessionStorage.load_metadata(session_id, self.base_path) if metadata is None: - # Session expired or invalid, remove it try: shutil.rmtree(session_dir) removed_count += 1 @@ -114,7 +111,6 @@ def run_once(self) -> None: self._cleanup_expired_sessions() -# Global scheduler instance _scheduler: Optional[CleanupScheduler] = None diff --git a/src/session/cleanup_service.py b/src/session/cleanup_service.py index 03c1386..6abedfb 100644 --- a/src/session/cleanup_service.py +++ b/src/session/cleanup_service.py @@ -36,12 +36,10 @@ def cleanup_old_sessions(): if os.path.isdir(item) and self._looks_like_session_id(item): item_path = os.path.join('.', item) - # Check if session is old if (current_time - os.path.getctime(item_path) > settings.MAX_SESSION_LIFETIME): self.cleanup_session(item_path) - # Schedule next cleanup timer = threading.Timer(settings.CLEANUP_INTERVAL, cleanup_old_sessions) timer.daemon = True timer.start() diff --git a/src/session/file_handler.py b/src/session/file_handler.py index 89a24ee..324d56d 100644 --- a/src/session/file_handler.py +++ b/src/session/file_handler.py @@ -18,23 +18,18 @@ def __init__(self, base_path: str): def create_session_workspace(self, session_id: str, app_zip: bytes) -> str: """Create workspace and extract ZIP file.""" - # Use base_path to create session directory workspace_path = os.path.join(self.base_path, session_id) workspace_path = os.path.abspath(workspace_path) try: - # Create session directory os.makedirs(workspace_path, exist_ok=True) - # Write ZIP file zip_path = os.path.join(workspace_path, 'temp.zip') with open(zip_path, 'wb') as f: f.write(app_zip) - # Extract ZIP shutil.unpack_archive(zip_path, workspace_path) - # Remove ZIP file os.remove(zip_path) return workspace_path diff --git a/src/session/session_manager.py b/src/session/session_manager.py index 2079d5f..9a30872 100644 --- a/src/session/session_manager.py +++ b/src/session/session_manager.py @@ -50,7 +50,6 @@ def __init__( self.file_handler = FileHandler(self.base_path) - # Initialize shared analysis context self.analysis_context: Optional[AnalysisContext] = None def _load_metadata(self) -> None: @@ -114,14 +113,12 @@ def run_analysis(self) -> Dict[str, AnalysisResult]: if not self.local_path: self._setup_session() - # Initialize shared context once self.analysis_context = AnalysisContext(self.session_id, self.local_path) results = {} for analyzer_type in self.analyzer_types: try: - # Inject shared context into each analyzer analyzer = AnalyzerFactory.create_analyzer( analyzer_type, self.session_id, @@ -164,7 +161,6 @@ def save_session( ttl_minutes=ttl_minutes ) - # Update internal metadata cache self.metadata = SessionStorage.load_metadata(self.session_id, self.base_path) def save_analysis_results(self, results: Dict) -> None: @@ -180,7 +176,6 @@ def save_analysis_results(self, results: Dict) -> None: results ) - # Update internal cache self.metadata = SessionStorage.load_metadata(self.session_id, self.base_path) def get_metadata(self) -> Dict[str, Any]: @@ -208,6 +203,5 @@ def cleanup(self) -> None: SessionStorage.delete_metadata(self.session_id, self.base_path) - # Clear context cache if self.analysis_context: self.analysis_context.clear_cache() \ No newline at end of file diff --git a/src/utils/validation.py b/src/utils/validation.py index 3fa7e61..f83a167 100644 --- a/src/utils/validation.py +++ b/src/utils/validation.py @@ -11,7 +11,6 @@ def validate_analysis_request(data: Dict) -> Tuple[bool, str]: Returns: Tuple of (is_valid, error_message) """ - # Check required fields if 'analyzers' not in data: return False, "Missing required field: analyzers" @@ -21,13 +20,11 @@ def validate_analysis_request(data: Dict) -> Tuple[bool, str]: if len(data['analyzers']) == 0: return False, "At least one analyzer must be specified" - # Validate analyzer types - valid_analyzers = {'pylint', 'radon_cc', 'radon_mi', 'pipeline', 'fpc'} + valid_analyzers = {'pylint', 'radon_cc', 'radon_mi', 'pipeline', 'fpc', 'pfp'} for analyzer in data['analyzers']: if analyzer not in valid_analyzers: return False, f"Invalid analyzer type: {analyzer}. Valid types: {valid_analyzers}" - # Validate overrides if present if 'pipeline_overrides' in data: is_valid, error = validate_pipeline_overrides(data['pipeline_overrides']) if not is_valid: @@ -49,14 +46,12 @@ def validate_pipeline_overrides(overrides: Dict) -> Tuple[bool, str]: if not isinstance(overrides, dict): return False, "pipeline_overrides must be an object" - # Validate file_stages if present if 'file_stages' in overrides: file_stages = overrides['file_stages'] if not isinstance(file_stages, dict): return False, "file_stages must be an object" - # Valid stage names (includes optional stages) valid_stages = { "data_collection", "data_cleaning", # Optional @@ -73,7 +68,6 @@ def validate_pipeline_overrides(overrides: Dict) -> Tuple[bool, str]: if stage not in valid_stages: return False, f"Invalid stage '{stage}'. Valid stages: {valid_stages}" - # Validate excluded_files if present if 'excluded_files' in overrides: excluded = overrides['excluded_files'] @@ -84,4 +78,27 @@ def validate_pipeline_overrides(overrides: Dict) -> Tuple[bool, str]: if not isinstance(item, str): return False, "excluded_files items must be strings" - return True, "" \ No newline at end of file + return True, "" + +def validate_zip_file(request): + """ + Validate ZIP file in request. + + Returns: + tuple: (zip_content, error_message) - error_message is None on success + """ + if 'file' not in request.files: + return None, "No file provided" + + file = request.files['file'] + if not file or file.filename == '': + return None, "No file selected" + + if not file.filename.endswith('.zip'): + return None, "File must be a ZIP archive" + + app_zip = file.read() + if not app_zip: + return None, "Empty ZIP file" + + return app_zip, None \ No newline at end of file