Skip to content

Conversation

javier223222
Copy link

@javier223222 javier223222 commented Oct 14, 2025

Implementación del Analizador PFP (Package Functional Purity)

Descripción

Este Pull Request implementa el analizador PFP (Package Functional Purity), una métrica que evalúa la pureza funcional y cohesión de paquetes en proyectos de Machine Learning. El analizador mide qué tan enfocado está un paquete en una función específica del pipeline de ML, proporcionando scores cuantitativos y clasificaciones cualitativas.

Problema o Contexto

Como parte de las funcionalidades a implementar planteadas que en este caso las metricas de cohesion,especificamente la metrica fpc especifica para clases/modulos y pfp para paquetes.Mi trabajo consintio en implementar la segunda. utilizando de base el codigo que mi compañero ya habia hecho para calcular el fpc ya que pfp necesita los resultados de esta para hacer los calculos pertinenetes.

Cambios Realizados

1. Nuevo Módulo: pfp_analyzer.py

Se creó el módulo src/analyzers/pfp_analyzer.py que implementa la clase PFPAnalyzer:

class PFPAnalyzer(BaseAnalyzer):
    """
    Analyzes Package Functional Purity (PFP).
    Formula: PFP = (n_ml / n_total) * CF
    """

Componentes principales:

  • Cálculo de ratio de módulos ML por paquete
  • Factor de concentración basado en etapas únicas detectadas
  • Clasificación cualitativa (High, Moderate, Low, Very Low)
  • Validación de dependencia con FPC analyzer

2. Integración en AnalyzerFactory

Actualización de src/analyzers/factory.py:

from analyzers.pfp_analyzer import PFPAnalyzer

cls._analyzers = {
    "pylint": PyLintAnalyzer,
    "radon_cc": RadonCCAnalyzer,
    "radon_mi": RadonMIAnalyzer,
    "pipeline": PipelineAnalyzer,
    "fpc": FPCAnalyzer,
    "pfp": PFPAnalyzer
}

3. Actualización de Validadores

Modificación en src/utils/validation.py:

valid_analyzers = {'pylint', 'radon_cc', 'radon_mi', 'pipeline', 'fpc', 'pfp'}

4. Implementación de AnalysisContext

Lo que hice fue crear una instancia del AnalisisContext al inicio de la peticion
Pasar esa misma instancia a cada uno de los analizadores que se creen.De esta forma todos compartiran la misma inofrmacion
Así, cuando FPCAnalyzer guarda sus resultados, lo hace en este contexto compartido, y cuando PFPAnalyzer los busque, los encontrará en el mismo lugar.

# src/core/analysis_context.py
from flask import Flask, request
from flask_cors import cross_origin
from datetime import datetime
import warnings

from session.session_manager import SessionManager
from session.session_storage import SessionStorage
from api.serializers import ResponseSerializer
from core.exceptions import SessionError
import config.settings as config
from analyzers.factory import AnalyzerFactory
from core.tree_generator import TreeGenerator
from core.models.pipeline_overrides import AnalysisRequest
from utils.validation import validate_analysis_request
from core.analysis_context import AnalysisContext # <--- 1. IMPORTAR AnalysisContext


def create_routes(app: Flask) -> Flask:
    """Create and configure API routes."""

    @app.route(f'{config.settings.API_PREFIX}/upload-zip', methods=['POST'])
    @cross_origin()
    def upload_zip():
        """
        Upload ZIP and get auto-detected pipeline.
        
        Returns session_id, tree_structure, and auto_detected_pipeline.
        """
        try:
            # Validate file upload
            if 'file' not in request.files:
                return ResponseSerializer.error("No file provided", 400)
            
            file = request.files['file']
            if file.filename == '':
                return ResponseSerializer.error("Empty filename", 400)
            
            if not file.filename.endswith('.zip'):
                return ResponseSerializer.error("File must be a ZIP archive", 400)
            
            # Read ZIP content
            app_zip = file.read()
            if not app_zip:
                return ResponseSerializer.error("Empty ZIP file", 400)
            
            # Create session
            session = SessionManager(
                app_zip=app_zip,
                base_path=config.settings.SESSION_BASE_PATH
            )
            
            session.ensure_setup()
            
            # Generate file tree
            tree_generator = TreeGenerator(session.local_path)
            tree_structure = tree_generator.generate()
            
            # Auto-detect pipeline
            pipeline_analyzer = AnalyzerFactory.create_analyzer(
                "pipeline", session.session_id, session.local_path
            )
            pipeline_result = pipeline_analyzer.analyze()
            
            # Save session metadata
            session.save_session(
                tree_structure=tree_structure,
                auto_detected_pipeline=pipeline_result.details,
                ttl_minutes=config.settings.SESSION_TTL_MINUTES
            )
            
            return ResponseSerializer.success({
                "session_id": session.session_id,
                "tree_structure": tree_structure,
                "auto_detected_pipeline": pipeline_result.details
            })
            
        except SessionError as e:
            return ResponseSerializer.error(f"Session error: {str(e)}", 400)
        except Exception as e:
            return ResponseSerializer.error(f"Upload failed: {str(e)}", 500)

    @app.route(f'{config.settings.API_PREFIX}/analyze/<session_id>', methods=['POST'])
    @cross_origin()
    def analyze_session(session_id: str):
        """
        Analyze code for existing session with optional overrides.
        """
        try:
            # Validate session exists
            if not SessionStorage.exists(session_id, config.settings.SESSION_BASE_PATH):
                return ResponseSerializer.error("Session not found or expired", 404)
            
            # Parse and validate request
            data = request.get_json()
            if not data:
                return ResponseSerializer.error("Request body required", 400)
            
            is_valid, error_msg = validate_analysis_request(data)
            if not is_valid:
                return ResponseSerializer.error(error_msg, 400)
            
            analysis_request = AnalysisRequest.from_dict(data)
            # Load existing session
            session = SessionManager.load_session(
                session_id, 
                base_path=config.settings.SESSION_BASE_PATH
            )
            
            results = {}
            
            # --- INICIO DE LA CORRECCIÓN ---

            # 2. Crear UN SOLO contexto compartido para esta petición de análisis
            shared_context = AnalysisContext(session.session_id, session.local_path)
            
            # Handle pipeline analyzer with overrides
            if "pipeline" in analysis_request.analyzers:
                pipeline_analyzer = AnalyzerFactory.create_analyzer(
                    "pipeline", session_id, session.local_path, context=shared_context
                )
                
                if analysis_request.pipeline_overrides:
                    metadata = session.get_metadata()
                    auto_detected = metadata["auto_detected_pipeline"]
                    
                    modified = pipeline_analyzer.apply_overrides(
                        auto_detected=auto_detected,
                        overrides=analysis_request.pipeline_overrides
                    )
                    
                    results["pipeline"] = {
                        "score": 10.0 if modified["is_valid_pipeline"] else 0.0,
                        "message_count": {},
                        "module_count": modified.get("files_analyzed", 0),
                        "details": modified
                    }
                else:
                    result = pipeline_analyzer.analyze()
                    results["pipeline"] = {
                        "score": result.score,
                        "message_count": result.message_count,
                        "module_count": result.module_count,
                        "details": result.details
                    }
            
            
            for analyzer_type in analysis_request.analyzers:
                if analyzer_type == "pipeline":
                    continue  
                analyzer = AnalyzerFactory.create_analyzer(
                    analyzer_type, session_id, session.local_path, context=shared_context
                )
                result = analyzer.analyze()
                
                results[analyzer.analyzer_id] = {
                    "score": result.score,
                    "message_count": result.message_count,
                    "module_count": result.module_count,
                    "details": result.details
                }
            
            
            
           
            session.save_analysis_results(results)
            
            return ResponseSerializer.success({
                "session_id": session_id,
                "timestamp": datetime.utcnow().isoformat() + "Z",
                "results": results
            })
            
        except SessionError as e:
            return ResponseSerializer.error(f"Session error: {str(e)}", 400)
        except Exception as e:
            if config.settings.DEBUG:
                app.logger.error("Analysis failed with exception:", exc_info=True)
            return ResponseSerializer.error(f"Analysis failed: {str(e)}", 500)

   
    @app.route(f'{config.settings.API_PREFIX}/rate_app', methods=['POST'])
    @cross_origin()
    def rate_app():
        """
        Legacy endpoint for backward compatibility.
        
        @deprecated Use /api/upload-zip + /api/analyze instead
        """
        warnings.warn(
            "rate_app endpoint is deprecated. Use /upload-zip + /analyze instead",
            DeprecationWarning
        )
        
        try:
            # Validate file upload
            if 'file' not in request.files:
                return ResponseSerializer.error("No file provided", 400)
            
            file = request.files['file']
            if not file or file.filename == '':
                return ResponseSerializer.error("No file selected", 400)
            
            # Read and validate ZIP
            app_zip = file.read()
            if not app_zip:
                return ResponseSerializer.error("Empty file", 400)
            
            # Create session with legacy analyzers
            session = SessionManager(
                app_zip=app_zip,
                analyzer_types=['pylint', 'radon_cc', 'radon_mi'],
                base_path=config.settings.SESSION_BASE_PATH
            )
            
            session.ensure_setup()
            
            # Run analysis
            results = session.run_analysis()
            
            # Format legacy response
            legacy_response = {
                "session_id": session.session_id,
                "results": {
                    analyzer_type: {
                        "score": result.score,
                        "message_count": result.message_count,
                        "module_count": result.module_count
                    }
                    for analyzer_type, result in results.items()
                }
            }
            
            return ResponseSerializer.success(legacy_response)
            
        except Exception as e:
            return ResponseSerializer.error(f"Analysis failed: {str(e)}", 500)

    return app

Uso en PFP Analyzer:

# El PFP utiliza el contexto para acceder a resultados de FPC
fpc_result = self.context.get_file_metric(module_path, 'fpc')
if fpc_result and fpc_result.get('stages_detected'):
    n_ml += 1
    all_stages.update(fpc_result['stages_detected'])

5. Actualización de pipeline_stages.json

La principal razon por la que decidi actualizar el json es que alguna palabras clave eran demasiadas genericas y coincidian con nombres de metodos comunes lo que confundia al analizador y daba resultados de baja cohesion en paquetes cuando en realidad deberian haber tenido alta cohesion.
Palabras como "execute(", "fit(", "transform(", "predict(" son nombres de métodos muy comunes. casi todas las clases heredan de Task y tienen un método execute(), lo que provocaba que casi cualquier archivo sea clasificado incorrectamente como data_collection.
Algunas palabras eran demasiadas cortas o comunes en programación general y no son exclusivas de una etapa de ML (ej. "score").

{
  "stages": {
    "data_collection": {
      "keywords": [
        "pd.read_csv",
        "pd.read_excel",
        "pd.read_json",
        "pd.read_sql",
        "pd.read_parquet",
        "load_dataset",
        "load_iris",
        "fetch_data",
        "requests.get",
        "BeautifulSoup",
        "boto3.client"
      ],
      "imports": ["requests", "selenium", "scrapy", "boto3", "sklearn.datasets"]
    },
    "feature_engineering": {
      "keywords": [
        "fit_transform",
        ".transform(",
        "OneHotEncoder",
        "LabelEncoder",
        "StandardScaler",
        "MinMaxScaler",
        "PCA",
        "SelectKBest",
        "TfidfVectorizer"
      ]
    },
    "model_training": {
      "keywords": [
        ".fit(",
        ".train(",
        "model.compile",
        "GridSearchCV",
        "cross_val_score",
        "train_test_split"
      ]
    }
  }
}

Justificación de los Cambios

A continuación se detallan los cambios realizados en los criterios de clasificación de módulos y la justificación detrás de cada uno. El objetivo principal es aumentar la precisión y reducir los falsos positivos.

1. data_collection

  • Eliminado: execute()

    Motivo: Esta era la causa principal de los errores. Casi todos los módulos Task tienen un método execute, lo que hacía que la herramienta clasificara incorrectamente módulos de limpieza, entrenamiento, etc., como si fueran de recolección de datos.

  • Añadido: pd.read_csv, pd.read_json, pd.read_excel

    Motivo: Son mucho más específicos y una señal inequívoca de que se están cargando datos en un DataFrame de Pandas, una de las operaciones más comunes en la recolección de datos.

2. feature_engineering

  • Eliminado: transform()

    Motivo: Similar a execute, es un nombre de método demasiado genérico y propenso a errores de clasificación.

  • Añadido: model.fit_transform()

    Motivo: Al hacerlo más específico (requiere que sea una llamada a un método de un objeto model), reducimos drásticamente los falsos positivos, asegurando que capturamos una transformación de datos en el contexto de un modelo o pipeline.

3. model_training

  • Eliminado: fit()

    Motivo: Mismo problema de generalidad que en los casos anteriores.

  • Añadido: model.fit()

    Motivo: De nuevo, al especificar que debe ser un método de un objeto model, aumentamos la precisión. Esto nos permite identificar de manera fiable el entrenamiento de un modelo.

4. model_evaluation

  • Eliminado: evaluate(), score(), predict()

    Motivo: Son demasiado genéricos. Por ejemplo, la clase Model puede tener un método score, pero también otros métodos específicos como scoreR2 que se usan en la evaluación. La simple presencia de score no garantiza que sea una evaluación.

  • Añadido: model.evaluate(), model.score(), model.predict()

    Motivo: Al exigir el prefijo model., nos aseguramos de que estamos analizando la acción de evaluar o usar un modelo entrenado, no cualquier variable o función que casualmente se llame "score" o "predict".

6. Algoritmo de Cálculo Implementado

def _analyze_package(self, modules: List[str]) -> Dict[str, Any]:
    n_total = len(modules)
    n_ml = 0
    all_stages = set()
    
    for module_path in modules:
        # Utiliza el contexto para acceder a resultados de FPC
        fpc_result = self.context.get_file_metric(module_path, 'fpc')
        if fpc_result and fpc_result.get('stages_detected'):
            n_ml += 1
            all_stages.update(fpc_result['stages_detected'])
    
    n_etapas = len(all_stages)
    cf = 1 - ((n_etapas - 1) / (ETAPAS_MAX - 1)) if ETAPAS_MAX > 1 else 1.0
    
    pfp_score = (n_ml / n_total) * cf if n_total > 0 else 0
    
    return {
        "total_modules": n_total,
        "ml_modules": n_ml,
        "unique_stages_found": n_etapas,
        "concentration_factor": round(cf, 4),
        "pfp_score": round(pfp_score, 4),
        "purity_level": self._get_purity_level(pfp_score)
    }

Evidencia

Estructura de Respuesta Esperada

{
  "analyzer_id": "PFP",
  "score": 7.85,
  "message_count": {
    "High": 2,
    "Moderate": 3,
    "Low": 1,
    "Very Low": 0
  },
  "module_count": 15,
  "details": {
    "packages": {
      "src/data": {
        "total_modules": 5,
        "ml_modules": 5,
        "unique_stages_found": 2,
        "concentration_factor": 0.75,
        "pfp_score": 0.75,
        "purity_level": "High"
      },
      "src/models": {
        "total_modules": 4,
        "ml_modules": 4,
        "unique_stages_found": 1,
        "concentration_factor": 1.0,
        "pfp_score": 1.0,
        "purity_level": "High"
      },
      "src/utils": {
        "total_modules": 6,
        "ml_modules": 3,
        "unique_stages_found": 4,
        "concentration_factor": 0.25,
        "pfp_score": 0.125,
        "purity_level": "Very Low"
      }
    }
  }
}

Interpretación de Resultados

  • total_modules: Cantidad total de módulos Python en el paquete
  • ml_modules: Módulos identificados con funcionalidad ML (tienen etapas detectadas por FPC)
  • unique_stages_found: Número de etapas únicas del pipeline ML presentes
  • concentration_factor: Factor de penalización por dispersión (0.0 - 1.0)
  • pfp_score: Score de pureza funcional (0.0 - 1.0)
  • purity_level: Clasificación cualitativa

Checklist

  • Código implementado y funcional
  • Pruebas locales ejecutadas exitosamente
  • Documentación técnica completa (docstrings)
  • Integración con sistema existente sin romper funcionalidades
  • Validación de dependencias implementada
  • Manejo de errores robusto
  • Adherencia a patrones de diseño del proyecto (BaseAnalyzer)

Notas Adicionales

Fundamentos Técnicos

Fórmula de Cálculo:

PFP = (n_ml / n_total) × CF

Donde:

  • n_ml: Número de módulos con funcionalidad ML
  • n_total: Total de módulos en el paquete
  • CF: Factor de concentración

Factor de Concentración:

CF = 1 - ((n_etapas - 1) / (ETAPAS_MAX - 1))

Donde:

  • n_etapas: Etapas únicas del pipeline detectadas
  • ETAPAS_MAX: Constante = 5 (etapas totales del pipeline ML)

Clasificación de Pureza

Rango de Score Clasificación Interpretación
0.80 - 1.00 High Alta cohesión funcional
0.60 - 0.79 Moderate Cohesión aceptable
0.40 - 0.59 Low Baja cohesión, revisar diseño
0.00 - 0.39 Very Low Múltiples responsabilidades

Decisiones de Diseño

  1. Dependencia de FPC: El analizador PFP requiere resultados previos del FPC analyzer para determinar qué módulos tienen funcionalidad ML. Esta validación se ejecuta en tiempo de análisis y genera error descriptivo si no se cumple.

  2. Constante ETAPAS_MAX = 5: Basada en las cinco etapas del pipeline ML definidas en pipeline_stages.json:

    • data_collection
    • data_cleaning
    • feature_engineering
    • model_training
    • model_evaluation
  3. Escala de Score: Se mantiene consistencia con otros analizadores multiplicando el score base (0-1) por 10 para obtener la escala final (0-10).

  4. Descubrimiento de Paquetes: Se identifica un paquete como cualquier directorio que contenga archivos Python. Los módulos se agrupan por su directorio padre.

@betooxx-dev
Copy link
Member

Problemas Identificados

Dependencia frágil

if not self._is_fpc_data_available():
    raise AnalyzerError("PFP requires FPC to run first")

Problema: Orden de ejecución hardcodeado. Si usuario llama ["pfp", "fpc"] funciona?

Solución propuesta:

# En AnalyzerFactory o SessionManager
ANALYZER_DEPENDENCIES = {
    "pfp": ["fpc"]  # PFP depende de FPC
}

def run_analysis(self):
    # Ordenar topológicamente antes de ejecutar
    sorted_analyzers = self._topological_sort(self.analyzer_types)

Violación DRY: Stages detectados dos veces

FPC ya parsea ASTs y detecta stages. PFP solo lee resultados cacheados, pero si FPC no corrió antes, PFP debería:

  • Auto-ejecutar FPC internamente

Esto donde se hace y cómo?

ToDos

1. Implementar gestión de dependencias

# core/analyzer_dependencies.py
class AnalyzerDependencyResolver:
    GRAPH = {
        "pfp": ["fpc"],
        "future_analyzer": ["pfp", "radon_cc"]
    }
    
    @classmethod
    def resolve_order(cls, requested: List[str]) -> List[str]:
        """Retorna orden topológico + dependencias faltantes."""
        # Implementar Kahn's algorithm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants