Source code for clusx.evaluation

"""
Evaluation module for clustering quality assessment.

This module provides tools for evaluating the quality and characteristics of clusters
generated by Bayesian nonparametric clustering algorithms. It implements established
metrics for cluster validation in the context of text data clustering, with a focus
on power-law analysis and similarity-based metrics.

Key components:

- :class:`ClusterEvaluator`: Main class for evaluating clustering results
- :class:`NumpyEncoder`: Custom JSON encoder for handling NumPy data types
- :func:`save_evaluation_report`: Function to save evaluation results to JSON

The evaluation process assesses:

1. Cluster cohesion and separation (silhouette score)
2. Intra-cluster vs. inter-cluster similarity
3. Power-law characteristics of cluster size distributions
4. Potential outliers in the clustering results
5. Cluster size distribution

This module is typically used after running clustering with the Dirichlet Process
and Pitman-Yor Process models to compare their performance and understand the
statistical properties of the generated clusters.
"""

from __future__ import annotations

import json
import os
from typing import TYPE_CHECKING

import numpy as np
from sklearn.metrics import silhouette_score  # type: ignore
from sklearn.metrics.pairwise import cosine_similarity  # type: ignore
from sklearn.neighbors import NearestNeighbors  # type: ignore

from clusx.errors import EvaluationError
from clusx.logging import get_logger

if TYPE_CHECKING:
    from typing import Any, Union

    import numpy  # pylint: disable=reimported

logger = get_logger(__name__)


[docs] class NumpyEncoder(json.JSONEncoder): """ Custom JSON encoder that handles NumPy data types. This encoder converts NumPy types to their Python equivalents for proper JSON serialization. It's used when saving evaluation reports to ensure all NumPy values are properly converted to standard Python types. Conversions: - :class:`numpy.ndarray` → :class:`list` - :class:`numpy.single` → :class:`float` - :class:`numpy.double` → :class:`float` - :class:`numpy.intc` → :class:`int` - :class:`numpy.int_` → :class:`int` - :class:`numpy.bool_` → :class:`bool` - Other NumPy types → Python equivalents via the `item()` method when available """
[docs] def default(self, o): """Convert NumPy types to their Python equivalents for JSON serialization.""" # Dictionary mapping types to conversion functions converters = { np.ndarray: lambda x: x.tolist(), np.single: float, np.double: float, np.intc: int, np.int_: int, np.bool_: bool, bool: bool, } # Try direct type conversions first for type_class, converter in converters.items(): if isinstance(o, type_class): # type: ignore return converter(o) # Try the item() method as fallback for other NumPy types try: if hasattr(o, "item"): return o.item() return o except (AttributeError, ValueError, TypeError): return super().default(o)
[docs] class ClusterEvaluator: """ Evaluates the quality and characteristics of text clusters using metrics. This class provides methods to assess clustering results through various metrics: - Silhouette Score: Measures how similar an object is to its own cluster compared to other clusters - Similarity Metrics: Evaluates intra-cluster vs inter-cluster similarity - Power-law Analysis: Determines if cluster sizes follow a power-law distribution - Outlier Detection: Identifies potential outliers in the clustering results - Cluster Size Distribution: Calculates the distribution of cluster sizes Used for post-processing analysis of Bayesian nonparametric clustering results. Note: Parameters like alpha and sigma in clustering algorithms significantly impact the resulting cluster distributions. """ def __init__( self, texts: list[str], embeddings: numpy.ndarray, # TODO: replace with the uniform type cluster_assignments: list[int], model_name: str, alpha: float, sigma: float, kappa: float, random_state: Union[int, None] = None, ): """ Initialize the cluster evaluator. Args: texts: List of text strings that were clustered embeddings: Numpy array of embeddings for each text cluster_assignments: List of cluster IDs for each text model_name: Name of the clustering model (e.g., "DP", "PYP") alpha: Concentration parameter sigma: Discount parameter for Pitman-Yor Process kappa: Kappa parameter for likelihood model random_state: Random seed for reproducible evaluation (default: None) """ self.texts = texts self.embeddings = embeddings self.cluster_assignments = cluster_assignments self.model_name = model_name self.alpha = alpha self.sigma = sigma self.kappa = kappa self.random_state = random_state self.unique_clusters = sorted(set(cluster_assignments)) # Set random state for reproducibility if provided if random_state is not None: np.random.seed(random_state) # Validate inputs if len(texts) != len(embeddings) or len(texts) != len(cluster_assignments): raise EvaluationError( "Length mismatch: texts, embeddings, and cluster_assignments " f"must have the same length, got {len(texts)}, {len(embeddings)}, " f"and {len(cluster_assignments)} respectively", ) logger.info( "Initialized cluster evaluator for %s with %d texts and %d clusters", model_name, len(texts), len(self.unique_clusters), )
[docs] def calculate_silhouette_score(self) -> float: """ Calculate the silhouette score for the clustering data. This method calculates the silhouette score only for valid clusters (those with ≥2 samples). Invalid clusters are excluded from the calculation. Cosine distance is used because the data is represented by text embeddings. The silhouette score measures how similar an object is to its own cluster compared to other clusters. The score ranges from -1 to 1, where: - A high value (close to 1) indicates the object is well-matched to its cluster - A value near 0 indicates the object is on or very close to the decision boundary - A negative value indicates the object might be assigned to the wrong cluster This method handles edge cases: - Returns 0.0 if there are fewer than 2 valid clusters - An error occurs during calculation Returns: float: Silhouette score as a float between -1 and 1, or 0.0 if calculation is not possible """ # Count samples per cluster cluster_counts: dict[int, int] = {} for cluster_id in self.cluster_assignments: cluster_counts[cluster_id] = cluster_counts.get(cluster_id, 0) + 1 # Identify valid clusters (those with at least 2 samples) valid_clusters = {c for c, count in cluster_counts.items() if count >= 2} # We need at least 2 valid clusters for silhouette score if len(valid_clusters) < 2: logger.warning( "Cannot calculate silhouette score: fewer than 2 valid clusters found" ) return 0.0 # Filter embeddings and assignments to include only those in valid clusters valid_indices = [ i for i, c in enumerate(self.cluster_assignments) if c in valid_clusters ] valid_embeddings = self.embeddings[valid_indices] valid_assignments = [self.cluster_assignments[i] for i in valid_indices] try: score = silhouette_score( valid_embeddings, valid_assignments, metric="cosine" ) logger.info( "Silhouette score for %s: %.4f (using %d/%d samples in %d/%d clusters)", self.model_name, score, len(valid_indices), len(self.cluster_assignments), len(valid_clusters), len(self.unique_clusters), ) return float(score) except Exception as err: # pylint: disable=broad-except logger.error("Error calculating silhouette score: %s", err) return 0.0
[docs] def calculate_similarity_metrics( self, ) -> dict[str, Union[float, numpy.floating, dict[str, int]]]: """Calculate cluster-aware similarity metrics. This method computes three key metrics using cosine similarity: - Intra-cluster similarity: Average similarity between texts in the same cluster (higher values indicate more cohesive clusters) - Inter-cluster similarity: Average similarity between texts in different clusters (lower values indicate better separation between clusters) - Silhouette-like score: Difference between intra-cluster and inter-cluster similarity (similar to silhouette score but calculated differently) The method handles edge cases: - Only considers clusters with ≥2 members for intra-similarity - Uses matrix operations for O(n) complexity - Handles edge cases with proper numerical stability Returns: dict[str, Union[float, numpy.floating]]: Dictionary with the following keys: - ``intra_cluster_similarity``: Average similarity within clusters - ``inter_cluster_similarity``: Average similarity between clusters - ``silhouette_like_score``: Difference between intra and inter similarity - ``valid_cluster_ratio``: Fraction of valid clusters - ``analyzed_pairs``: Number of analyzed intra and inter cluster pairs (``intra``: intra-cluster pairs, ``inter``: inter-cluster pairs) """ default_results = { "intra_cluster_similarity": 0.0, "inter_cluster_similarity": 0.0, "silhouette_like_score": 0.0, "valid_cluster_ratio": 0.0, "analyzed_pairs": { "intra": 0, "inter": 0, }, } # Get valid clusters with ≥2 members valid_clusters = { cid: [i for i, c in enumerate(self.cluster_assignments) if c == cid] for cid in self.unique_clusters if self.cluster_assignments.count(cid) >= 2 } # No valid clusters case if not valid_clusters: logger.warning("No valid clusters found for similarity metrics") return default_results try: # Flatten indices properly for inter-similarity calculation all_indices = [] for cluster_indices in valid_clusters.values(): all_indices.extend(cluster_indices) all_indices = np.array(all_indices) # Calculate intra-cluster similarities intra_sims = [] # TODO: On a file of 170000 lines at this point we die. for cluster_indices in valid_clusters.values(): cluster_embeddings = self.embeddings[cluster_indices] sim_matrix = cosine_similarity(cluster_embeddings) np.fill_diagonal(sim_matrix, np.nan) intra_sims.append(sim_matrix[~np.isnan(sim_matrix)]) # Flatten intra similarities intra_sims = np.concatenate(intra_sims) if intra_sims else np.array([]) # Calculate inter-cluster similarities inter_sims = [] cluster_list = list(valid_clusters.values()) for i, cluster_i in enumerate(cluster_list): for _, cluster_j in enumerate(cluster_list[i + 1 :], i + 1): embeds_i = self.embeddings[cluster_i] embeds_j = self.embeddings[cluster_j] sims = cosine_similarity(embeds_i, embeds_j) inter_sims.append(sims.flatten()) inter_sims = np.concatenate(inter_sims) if inter_sims else np.array([]) # Calculate metrics valid_cluster_count = len(valid_clusters) total_clusters = len(self.unique_clusters) intra_cluster_similarity = ( np.nanmean(intra_sims) if intra_sims.size else 0.0 ) inter_cluster_similarity = ( np.nanmean(inter_sims) if inter_sims.size else 0.0 ) silhouette_like = intra_cluster_similarity - inter_cluster_similarity return { "intra_cluster_similarity": float(intra_cluster_similarity), "inter_cluster_similarity": float(inter_cluster_similarity), "silhouette_like_score": float(silhouette_like), "valid_cluster_ratio": valid_cluster_count / total_clusters, "analyzed_pairs": { "intra": len(intra_sims), "inter": len(inter_sims), }, } except Exception as err: # pylint: disable=broad-except raise EvaluationError( f"Error calculating similarity metrics: {err}" ) from err
[docs] def detect_powerlaw_distribution(self) -> dict[str, Any]: """ Detect if the cluster size distribution follows a power-law. This method analyzes the distribution of cluster sizes to determine if it follows a power-law distribution, which is common in many natural language datasets and indicates scale-free properties. The analysis includes: 1. Collecting the size of each cluster 2. Validating if there are enough clusters (at least 5) for meaningful analysis 3. Fitting a power-law distribution using the powerlaw package 4. Comparing the power-law fit to an exponential distribution The method handles edge cases: - Returns null values if there are fewer than 5 clusters - Handles errors in the powerlaw fitting process - Validates the fitted parameters to avoid NaN values Returns: dict[str, Any]: A dictionary with power-law parameters: - ``alpha``: Power-law exponent (higher values indicate steeper distribution) - ``xmin``: Minimum value for which power-law holds - ``is_powerlaw``: Boolean indicating if distribution follows power-law - ``sigma_error``: Standard error of the alpha estimate - ``p_value``: P-value from comparison with exponential distribution """ # noqa: E501 default_powerlaw_results = { "alpha": None, "xmin": None, "is_powerlaw": False, "sigma_error": None, "p_value": None, } try: import powerlaw # type: ignore # 1. Get cluster sizes cluster_sizes = [] for cluster_id in self.unique_clusters: size = self.cluster_assignments.count(cluster_id) cluster_sizes.append(size) # 2. Check if there are enough clusters and unique sizes for the analysis. unique_sizes = set(cluster_sizes) if len(cluster_sizes) < 5: logger.warning("Not enough clusters to detect power-law distribution") return default_powerlaw_results if len(unique_sizes) < 2: logger.warning( "Not enough unique cluster sizes to detect power-law distribution" ) # noqa: E501 return default_powerlaw_results # 3. Fit power-law distribution fit = powerlaw.Fit(cluster_sizes, discrete=True, verbose=False) alpha = fit.alpha xmin = fit.xmin sigma = fit.sigma if hasattr(fit, "sigma") else 0.0 # Check for NaN values if alpha is None or np.isnan(alpha) or xmin is None or np.isnan(xmin): logger.warning("Power-law fit returned NaN values") return default_powerlaw_results # Test if distribution follows power-law # Compare to exponential distribution try: ratio, p_value = fit.distribution_compare( "power_law", "exponential", normalized_ratio=True ) # Positive ratio means power_law is better is_powerlaw = ratio > 0 and p_value < 0.1 except Exception as err: # pylint: disable=broad-except logger.error("Error comparing distributions: %s", err) ratio, p_value = None, None is_powerlaw = False return { "alpha": float(alpha), "xmin": float(xmin), "is_powerlaw": is_powerlaw, "sigma_error": ( float(sigma) if sigma is not None and not np.isnan(sigma) else None ), # noqa: E501 "p_value": ( float(p_value) if p_value is not None and not np.isnan(p_value) else None ), # noqa: E501 } except Exception as err: # pylint: disable=broad-except logger.error("Error detecting power-law distribution: %s", err) return default_powerlaw_results
[docs] def find_outliers(self, n_neighbors: int = 5) -> dict[str, float]: """ Find potential outliers in each cluster using nearest neighbors. Args: n_neighbors: Number of neighbors to consider (default: 5) Returns: dict[str, float]: Dictionary with outlier metrics """ try: # Skip if we have too few samples if len(self.embeddings) < n_neighbors + 1: logger.warning("Not enough samples to detect outliers") return {} # Fit nearest neighbors nn = NearestNeighbors(n_neighbors=n_neighbors) nn.fit(self.embeddings) # Get distances to nearest neighbors distances, _ = nn.kneighbors(self.embeddings) # Calculate outlier score as mean distance to neighbors outlier_scores = distances.mean(axis=1) # Create dictionary of outlier scores result = {} for i, score in enumerate(outlier_scores): result[str(i)] = float(score) return result except Exception as err: # pylint: disable=broad-except logger.error("Error detecting outliers: %s", err) return {}
[docs] def calculate_cluster_size_distribution(self) -> dict[str, int]: """ Calculate the distribution of cluster sizes across all clusters. This method counts the number of texts assigned to each cluster and returns a mapping of cluster IDs to their respective sizes. The distribution is useful for: - Analyzing the balance of cluster assignments - Identifying dominant vs. minor clusters - Providing input for power-law distribution analysis - Visualizing the cluster size distribution The cluster IDs are converted to strings in the returned dictionary to ensure compatibility with JSON serialization. Returns: dict[str, int]: Dictionary mapping cluster IDs (as strings) to their sizes, where size represents the number of texts in each cluster """ cluster_sizes = {} for cluster_id in self.unique_clusters: cluster_sizes[str(cluster_id)] = self.cluster_assignments.count(cluster_id) return cluster_sizes
[docs] def generate_report(self) -> "dict[str, Any]": """ Generate a comprehensive evaluation report. Returns: dict[str, Any]: Dictionary containing all evaluation metrics and metadata """ # Calculate all metrics silhouette = self.calculate_silhouette_score() similarity_metrics = self.calculate_similarity_metrics() powerlaw_metrics = self.detect_powerlaw_distribution() outliers = self.find_outliers() cluster_sizes = self.calculate_cluster_size_distribution() # Compile the report report = { "model_name": self.model_name, "parameters": { "alpha": self.alpha, "sigma": self.sigma, "kappa": self.kappa, "random_state": self.random_state, }, "cluster_stats": { "num_clusters": len(self.unique_clusters), "num_texts": len(self.texts), "cluster_sizes": cluster_sizes, }, "metrics": { "silhouette_score": silhouette, "similarity": similarity_metrics, "powerlaw": powerlaw_metrics, "outliers": outliers, }, } return report
def _sanitize_for_json(obj): """Convert NumPy types to Python types for JSON serialization.""" if isinstance(obj, dict): return {k: _sanitize_for_json(v) for k, v in obj.items()} if isinstance(obj, list): return [_sanitize_for_json(item) for item in obj] if isinstance(obj, (np.integer, np.floating, np.bool_)): # type: ignore return obj.item() if isinstance(obj, np.ndarray): return obj.tolist() return obj def _debug_json_error(report: dict[str, Any]) -> None: """Debug JSON serialization errors by identifying problematic values.""" for model_name, model_report in report.items(): try: json.dumps(model_report, cls=NumpyEncoder) except TypeError: logger.error("Problem in model report: %s", model_name) for key, value in model_report.items(): try: json.dumps({key: value}, cls=NumpyEncoder) except TypeError: logger.error( "Problem with key: %s, value type: %s", key, str(type(value)), ) def _create_simplified_report(report: dict[str, Any]) -> dict[str, Any]: """Create a simplified version of the report with only basic metrics.""" simplified_report = {} for model_name, model_report in report.items(): simplified_report[model_name] = { "basic_metrics": model_report.get("basic_metrics", {}), "silhouette_score": model_report.get("silhouette_score", 0.0), } return simplified_report
[docs] def save_evaluation_report( report: dict[str, Any], output_dir: str, filename: str = "evaluation_report.json", ) -> str: """ Save the evaluation report to a JSON file. This function serializes the evaluation report to a JSON file, handling NumPy data types through the NumpyEncoder. The report contains comprehensive metrics about the clustering quality, including silhouette scores, similarity metrics, power-law analysis, and outlier detection. If serialization issues occur, the function attempts to save a simplified version of the report with only basic metrics. Args: report: Dictionary containing the evaluation report for different clustering models output_dir: Directory to save the report filename: Name of the output file (default: "evaluation_report.json") Returns: str: Path to the saved report file Raises: TypeError: If JSON serialization fails even after simplification attempts """ output_path = os.path.join(output_dir, filename) try: # Sanitize the report sanitized_report = _sanitize_for_json(report) with open(output_path, "w", encoding="utf-8") as f: json.dump(sanitized_report, f, indent=2, cls=NumpyEncoder) logger.info("Evaluation report saved to %s", output_path) return output_path except TypeError as err: # If we still have serialization issues, log detailed information logger.error("JSON serialization error: %s", err) # Debug the error _debug_json_error(report) # Save a simplified version simplified_report = _create_simplified_report(report) with open(output_path, "w", encoding="utf-8") as f: json.dump(simplified_report, f, indent=2) logger.info( "Saved simplified report to %s due to serialization issues", output_path ) return output_path