"""
Evaluation module for clustering quality assessment.
This module provides tools for evaluating the quality and characteristics of clusters
generated by Bayesian nonparametric clustering algorithms. It implements established
metrics for cluster validation in the context of text data clustering, with a focus
on power-law analysis and similarity-based metrics.
Key components:
- :class:`ClusterEvaluator`: Main class for evaluating clustering results
- :class:`NumpyEncoder`: Custom JSON encoder for handling NumPy data types
- :func:`save_evaluation_report`: Function to save evaluation results to JSON
The evaluation process assesses:
1. Cluster cohesion and separation (silhouette score)
2. Intra-cluster vs. inter-cluster similarity
3. Power-law characteristics of cluster size distributions
4. Potential outliers in the clustering results
5. Cluster size distribution
This module is typically used after running clustering with the Dirichlet Process
and Pitman-Yor Process models to compare their performance and understand the
statistical properties of the generated clusters.
"""
from __future__ import annotations
import json
import os
from typing import TYPE_CHECKING
import numpy as np
from sklearn.metrics import silhouette_score # type: ignore
from sklearn.metrics.pairwise import cosine_similarity # type: ignore
from sklearn.neighbors import NearestNeighbors # type: ignore
from clusx.errors import EvaluationError
from clusx.logging import get_logger
if TYPE_CHECKING:
from typing import Any, Union
import numpy # pylint: disable=reimported
logger = get_logger(__name__)
[docs]
class NumpyEncoder(json.JSONEncoder):
"""
Custom JSON encoder that handles NumPy data types.
This encoder converts NumPy types to their Python equivalents for proper JSON
serialization. It's used when saving evaluation reports to ensure all NumPy
values are properly converted to standard Python types.
Conversions:
- :class:`numpy.ndarray` → :class:`list`
- :class:`numpy.single` → :class:`float`
- :class:`numpy.double` → :class:`float`
- :class:`numpy.intc` → :class:`int`
- :class:`numpy.int_` → :class:`int`
- :class:`numpy.bool_` → :class:`bool`
- Other NumPy types → Python equivalents via the `item()` method when available
"""
[docs]
def default(self, o):
"""Convert NumPy types to their Python equivalents for JSON serialization."""
# Dictionary mapping types to conversion functions
converters = {
np.ndarray: lambda x: x.tolist(),
np.single: float,
np.double: float,
np.intc: int,
np.int_: int,
np.bool_: bool,
bool: bool,
}
# Try direct type conversions first
for type_class, converter in converters.items():
if isinstance(o, type_class): # type: ignore
return converter(o)
# Try the item() method as fallback for other NumPy types
try:
if hasattr(o, "item"):
return o.item()
return o
except (AttributeError, ValueError, TypeError):
return super().default(o)
[docs]
class ClusterEvaluator:
"""
Evaluates the quality and characteristics of text clusters using metrics.
This class provides methods to assess clustering results through various metrics:
- Silhouette Score: Measures how similar an object is to its own cluster
compared to other clusters
- Similarity Metrics: Evaluates intra-cluster vs inter-cluster similarity
- Power-law Analysis: Determines if cluster sizes follow a power-law distribution
- Outlier Detection: Identifies potential outliers in the clustering results
- Cluster Size Distribution: Calculates the distribution of cluster sizes
Used for post-processing analysis of Bayesian nonparametric clustering results.
Note:
Parameters like alpha and sigma in clustering algorithms significantly impact
the resulting cluster distributions.
"""
def __init__(
self,
texts: list[str],
embeddings: numpy.ndarray, # TODO: replace with the uniform type
cluster_assignments: list[int],
model_name: str,
alpha: float,
sigma: float,
kappa: float,
random_state: Union[int, None] = None,
):
"""
Initialize the cluster evaluator.
Args:
texts: List of text strings that were clustered
embeddings: Numpy array of embeddings for each text
cluster_assignments: List of cluster IDs for each text
model_name: Name of the clustering model (e.g., "DP", "PYP")
alpha: Concentration parameter
sigma: Discount parameter for Pitman-Yor Process
kappa: Kappa parameter for likelihood model
random_state: Random seed for reproducible evaluation (default: None)
"""
self.texts = texts
self.embeddings = embeddings
self.cluster_assignments = cluster_assignments
self.model_name = model_name
self.alpha = alpha
self.sigma = sigma
self.kappa = kappa
self.random_state = random_state
self.unique_clusters = sorted(set(cluster_assignments))
# Set random state for reproducibility if provided
if random_state is not None:
np.random.seed(random_state)
# Validate inputs
if len(texts) != len(embeddings) or len(texts) != len(cluster_assignments):
raise EvaluationError(
"Length mismatch: texts, embeddings, and cluster_assignments "
f"must have the same length, got {len(texts)}, {len(embeddings)}, "
f"and {len(cluster_assignments)} respectively",
)
logger.info(
"Initialized cluster evaluator for %s with %d texts and %d clusters",
model_name,
len(texts),
len(self.unique_clusters),
)
[docs]
def calculate_silhouette_score(self) -> float:
"""
Calculate the silhouette score for the clustering data.
This method calculates the silhouette score only for valid clusters
(those with ≥2 samples). Invalid clusters are excluded from the calculation.
Cosine distance is used because the data is represented by text embeddings.
The silhouette score measures how similar an object is to its own cluster
compared to other clusters. The score ranges from -1 to 1, where:
- A high value (close to 1) indicates the object is well-matched to its cluster
- A value near 0 indicates the object is on or very close to the decision
boundary
- A negative value indicates the object might be assigned to the wrong cluster
This method handles edge cases:
- Returns 0.0 if there are fewer than 2 valid clusters
- An error occurs during calculation
Returns:
float: Silhouette score as a float between -1 and 1, or 0.0 if calculation
is not possible
"""
# Count samples per cluster
cluster_counts: dict[int, int] = {}
for cluster_id in self.cluster_assignments:
cluster_counts[cluster_id] = cluster_counts.get(cluster_id, 0) + 1
# Identify valid clusters (those with at least 2 samples)
valid_clusters = {c for c, count in cluster_counts.items() if count >= 2}
# We need at least 2 valid clusters for silhouette score
if len(valid_clusters) < 2:
logger.warning(
"Cannot calculate silhouette score: fewer than 2 valid clusters found"
)
return 0.0
# Filter embeddings and assignments to include only those in valid clusters
valid_indices = [
i for i, c in enumerate(self.cluster_assignments) if c in valid_clusters
]
valid_embeddings = self.embeddings[valid_indices]
valid_assignments = [self.cluster_assignments[i] for i in valid_indices]
try:
score = silhouette_score(
valid_embeddings, valid_assignments, metric="cosine"
)
logger.info(
"Silhouette score for %s: %.4f (using %d/%d samples in %d/%d clusters)",
self.model_name,
score,
len(valid_indices),
len(self.cluster_assignments),
len(valid_clusters),
len(self.unique_clusters),
)
return float(score)
except Exception as err: # pylint: disable=broad-except
logger.error("Error calculating silhouette score: %s", err)
return 0.0
[docs]
def calculate_similarity_metrics(
self,
) -> dict[str, Union[float, numpy.floating, dict[str, int]]]:
"""Calculate cluster-aware similarity metrics.
This method computes three key metrics using cosine similarity:
- Intra-cluster similarity: Average similarity between texts in the same
cluster (higher values indicate more cohesive clusters)
- Inter-cluster similarity: Average similarity between texts in different
clusters (lower values indicate better separation between clusters)
- Silhouette-like score: Difference between intra-cluster and inter-cluster
similarity (similar to silhouette score but calculated differently)
The method handles edge cases:
- Only considers clusters with ≥2 members for intra-similarity
- Uses matrix operations for O(n) complexity
- Handles edge cases with proper numerical stability
Returns:
dict[str, Union[float, numpy.floating]]: Dictionary with the following keys:
- ``intra_cluster_similarity``: Average similarity within clusters
- ``inter_cluster_similarity``: Average similarity between clusters
- ``silhouette_like_score``: Difference between intra and inter similarity
- ``valid_cluster_ratio``: Fraction of valid clusters
- ``analyzed_pairs``: Number of analyzed intra and inter cluster pairs
(``intra``: intra-cluster pairs, ``inter``: inter-cluster pairs)
"""
default_results = {
"intra_cluster_similarity": 0.0,
"inter_cluster_similarity": 0.0,
"silhouette_like_score": 0.0,
"valid_cluster_ratio": 0.0,
"analyzed_pairs": {
"intra": 0,
"inter": 0,
},
}
# Get valid clusters with ≥2 members
valid_clusters = {
cid: [i for i, c in enumerate(self.cluster_assignments) if c == cid]
for cid in self.unique_clusters
if self.cluster_assignments.count(cid) >= 2
}
# No valid clusters case
if not valid_clusters:
logger.warning("No valid clusters found for similarity metrics")
return default_results
try:
# Flatten indices properly for inter-similarity calculation
all_indices = []
for cluster_indices in valid_clusters.values():
all_indices.extend(cluster_indices)
all_indices = np.array(all_indices)
# Calculate intra-cluster similarities
intra_sims = []
# TODO: On a file of 170000 lines at this point we die.
for cluster_indices in valid_clusters.values():
cluster_embeddings = self.embeddings[cluster_indices]
sim_matrix = cosine_similarity(cluster_embeddings)
np.fill_diagonal(sim_matrix, np.nan)
intra_sims.append(sim_matrix[~np.isnan(sim_matrix)])
# Flatten intra similarities
intra_sims = np.concatenate(intra_sims) if intra_sims else np.array([])
# Calculate inter-cluster similarities
inter_sims = []
cluster_list = list(valid_clusters.values())
for i, cluster_i in enumerate(cluster_list):
for _, cluster_j in enumerate(cluster_list[i + 1 :], i + 1):
embeds_i = self.embeddings[cluster_i]
embeds_j = self.embeddings[cluster_j]
sims = cosine_similarity(embeds_i, embeds_j)
inter_sims.append(sims.flatten())
inter_sims = np.concatenate(inter_sims) if inter_sims else np.array([])
# Calculate metrics
valid_cluster_count = len(valid_clusters)
total_clusters = len(self.unique_clusters)
intra_cluster_similarity = (
np.nanmean(intra_sims) if intra_sims.size else 0.0
)
inter_cluster_similarity = (
np.nanmean(inter_sims) if inter_sims.size else 0.0
)
silhouette_like = intra_cluster_similarity - inter_cluster_similarity
return {
"intra_cluster_similarity": float(intra_cluster_similarity),
"inter_cluster_similarity": float(inter_cluster_similarity),
"silhouette_like_score": float(silhouette_like),
"valid_cluster_ratio": valid_cluster_count / total_clusters,
"analyzed_pairs": {
"intra": len(intra_sims),
"inter": len(inter_sims),
},
}
except Exception as err: # pylint: disable=broad-except
raise EvaluationError(
f"Error calculating similarity metrics: {err}"
) from err
[docs]
def detect_powerlaw_distribution(self) -> dict[str, Any]:
"""
Detect if the cluster size distribution follows a power-law.
This method analyzes the distribution of cluster sizes to determine if it
follows a power-law distribution, which is common in many natural language
datasets and indicates scale-free properties. The analysis includes:
1. Collecting the size of each cluster
2. Validating if there are enough clusters (at least 5) for meaningful analysis
3. Fitting a power-law distribution using the powerlaw package
4. Comparing the power-law fit to an exponential distribution
The method handles edge cases:
- Returns null values if there are fewer than 5 clusters
- Handles errors in the powerlaw fitting process
- Validates the fitted parameters to avoid NaN values
Returns:
dict[str, Any]: A dictionary with power-law parameters:
- ``alpha``: Power-law exponent (higher values indicate steeper distribution)
- ``xmin``: Minimum value for which power-law holds
- ``is_powerlaw``: Boolean indicating if distribution follows power-law
- ``sigma_error``: Standard error of the alpha estimate
- ``p_value``: P-value from comparison with exponential distribution
""" # noqa: E501
default_powerlaw_results = {
"alpha": None,
"xmin": None,
"is_powerlaw": False,
"sigma_error": None,
"p_value": None,
}
try:
import powerlaw # type: ignore
# 1. Get cluster sizes
cluster_sizes = []
for cluster_id in self.unique_clusters:
size = self.cluster_assignments.count(cluster_id)
cluster_sizes.append(size)
# 2. Check if there are enough clusters and unique sizes for the analysis.
unique_sizes = set(cluster_sizes)
if len(cluster_sizes) < 5:
logger.warning("Not enough clusters to detect power-law distribution")
return default_powerlaw_results
if len(unique_sizes) < 2:
logger.warning(
"Not enough unique cluster sizes to detect power-law distribution"
) # noqa: E501
return default_powerlaw_results
# 3. Fit power-law distribution
fit = powerlaw.Fit(cluster_sizes, discrete=True, verbose=False)
alpha = fit.alpha
xmin = fit.xmin
sigma = fit.sigma if hasattr(fit, "sigma") else 0.0
# Check for NaN values
if alpha is None or np.isnan(alpha) or xmin is None or np.isnan(xmin):
logger.warning("Power-law fit returned NaN values")
return default_powerlaw_results
# Test if distribution follows power-law
# Compare to exponential distribution
try:
ratio, p_value = fit.distribution_compare(
"power_law", "exponential", normalized_ratio=True
)
# Positive ratio means power_law is better
is_powerlaw = ratio > 0 and p_value < 0.1
except Exception as err: # pylint: disable=broad-except
logger.error("Error comparing distributions: %s", err)
ratio, p_value = None, None
is_powerlaw = False
return {
"alpha": float(alpha),
"xmin": float(xmin),
"is_powerlaw": is_powerlaw,
"sigma_error": (
float(sigma) if sigma is not None and not np.isnan(sigma) else None
), # noqa: E501
"p_value": (
float(p_value)
if p_value is not None and not np.isnan(p_value)
else None
), # noqa: E501
}
except Exception as err: # pylint: disable=broad-except
logger.error("Error detecting power-law distribution: %s", err)
return default_powerlaw_results
[docs]
def find_outliers(self, n_neighbors: int = 5) -> dict[str, float]:
"""
Find potential outliers in each cluster using nearest neighbors.
Args:
n_neighbors: Number of neighbors to consider (default: 5)
Returns:
dict[str, float]: Dictionary with outlier metrics
"""
try:
# Skip if we have too few samples
if len(self.embeddings) < n_neighbors + 1:
logger.warning("Not enough samples to detect outliers")
return {}
# Fit nearest neighbors
nn = NearestNeighbors(n_neighbors=n_neighbors)
nn.fit(self.embeddings)
# Get distances to nearest neighbors
distances, _ = nn.kneighbors(self.embeddings)
# Calculate outlier score as mean distance to neighbors
outlier_scores = distances.mean(axis=1)
# Create dictionary of outlier scores
result = {}
for i, score in enumerate(outlier_scores):
result[str(i)] = float(score)
return result
except Exception as err: # pylint: disable=broad-except
logger.error("Error detecting outliers: %s", err)
return {}
[docs]
def calculate_cluster_size_distribution(self) -> dict[str, int]:
"""
Calculate the distribution of cluster sizes across all clusters.
This method counts the number of texts assigned to each cluster and returns
a mapping of cluster IDs to their respective sizes. The distribution is useful
for:
- Analyzing the balance of cluster assignments
- Identifying dominant vs. minor clusters
- Providing input for power-law distribution analysis
- Visualizing the cluster size distribution
The cluster IDs are converted to strings in the returned dictionary to ensure
compatibility with JSON serialization.
Returns:
dict[str, int]: Dictionary mapping cluster IDs (as strings) to their sizes,
where size represents the number of texts in each cluster
"""
cluster_sizes = {}
for cluster_id in self.unique_clusters:
cluster_sizes[str(cluster_id)] = self.cluster_assignments.count(cluster_id)
return cluster_sizes
[docs]
def generate_report(self) -> "dict[str, Any]":
"""
Generate a comprehensive evaluation report.
Returns:
dict[str, Any]: Dictionary containing all evaluation metrics and metadata
"""
# Calculate all metrics
silhouette = self.calculate_silhouette_score()
similarity_metrics = self.calculate_similarity_metrics()
powerlaw_metrics = self.detect_powerlaw_distribution()
outliers = self.find_outliers()
cluster_sizes = self.calculate_cluster_size_distribution()
# Compile the report
report = {
"model_name": self.model_name,
"parameters": {
"alpha": self.alpha,
"sigma": self.sigma,
"kappa": self.kappa,
"random_state": self.random_state,
},
"cluster_stats": {
"num_clusters": len(self.unique_clusters),
"num_texts": len(self.texts),
"cluster_sizes": cluster_sizes,
},
"metrics": {
"silhouette_score": silhouette,
"similarity": similarity_metrics,
"powerlaw": powerlaw_metrics,
"outliers": outliers,
},
}
return report
def _sanitize_for_json(obj):
"""Convert NumPy types to Python types for JSON serialization."""
if isinstance(obj, dict):
return {k: _sanitize_for_json(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_sanitize_for_json(item) for item in obj]
if isinstance(obj, (np.integer, np.floating, np.bool_)): # type: ignore
return obj.item()
if isinstance(obj, np.ndarray):
return obj.tolist()
return obj
def _debug_json_error(report: dict[str, Any]) -> None:
"""Debug JSON serialization errors by identifying problematic values."""
for model_name, model_report in report.items():
try:
json.dumps(model_report, cls=NumpyEncoder)
except TypeError:
logger.error("Problem in model report: %s", model_name)
for key, value in model_report.items():
try:
json.dumps({key: value}, cls=NumpyEncoder)
except TypeError:
logger.error(
"Problem with key: %s, value type: %s",
key,
str(type(value)),
)
def _create_simplified_report(report: dict[str, Any]) -> dict[str, Any]:
"""Create a simplified version of the report with only basic metrics."""
simplified_report = {}
for model_name, model_report in report.items():
simplified_report[model_name] = {
"basic_metrics": model_report.get("basic_metrics", {}),
"silhouette_score": model_report.get("silhouette_score", 0.0),
}
return simplified_report
[docs]
def save_evaluation_report(
report: dict[str, Any],
output_dir: str,
filename: str = "evaluation_report.json",
) -> str:
"""
Save the evaluation report to a JSON file.
This function serializes the evaluation report to a JSON file, handling NumPy
data types through the NumpyEncoder. The report contains comprehensive metrics
about the clustering quality, including silhouette scores, similarity metrics,
power-law analysis, and outlier detection.
If serialization issues occur, the function attempts to save a simplified version
of the report with only basic metrics.
Args:
report: Dictionary containing the evaluation report for different clustering
models
output_dir: Directory to save the report
filename: Name of the output file (default: "evaluation_report.json")
Returns:
str: Path to the saved report file
Raises:
TypeError: If JSON serialization fails even after simplification attempts
"""
output_path = os.path.join(output_dir, filename)
try:
# Sanitize the report
sanitized_report = _sanitize_for_json(report)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(sanitized_report, f, indent=2, cls=NumpyEncoder)
logger.info("Evaluation report saved to %s", output_path)
return output_path
except TypeError as err:
# If we still have serialization issues, log detailed information
logger.error("JSON serialization error: %s", err)
# Debug the error
_debug_json_error(report)
# Save a simplified version
simplified_report = _create_simplified_report(report)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(simplified_report, f, indent=2)
logger.info(
"Saved simplified report to %s due to serialization issues", output_path
)
return output_path