Skip to content

Fairness Metrics - API Reference

Auto-generated documentation for fairness metric classes.

warprec.evaluation.metrics.fairness.biasdisparitybd.BiasDisparityBD

Bases: TopKMetric

Bias Disparity (BD) metric.

This metric measures the relative disparity in bias between the distribution of recommended items and the distribution of items in the training set, aggregated over user and item clusters. It is computed as the relative difference between BiasDisparityBR (bias in recommendations) and BiasDisparityBS (bias in the training set):

Parameters:

Name Type Description Default
k int

Cutoff for top-k recommendations (used by BiasDisparityBR).

required
num_items int

Number of items in the training set.

required
user_cluster Tensor

Lookup tensor of user clusters.

required
item_cluster Tensor

Lookup tensor of item clusters.

required
dist_sync_on_step bool

Whether to synchronize metric state across distributed processes.

False
**kwargs Any

Additional keyword arguments.

{}
Source code in warprec/evaluation/metrics/fairness/biasdisparitybd.py
@metric_registry.register("BiasDisparityBD")
class BiasDisparityBD(TopKMetric):
    """Bias Disparity (BD) metric.

    This metric measures the relative disparity in bias between the distribution of
    recommended items and the distribution of items in the training set,
    aggregated over user and item clusters.
    It is computed as the relative difference between BiasDisparityBR (bias in recommendations)
    and BiasDisparityBS (bias in the training set):

    Args:
        k (int): Cutoff for top-k recommendations (used by BiasDisparityBR).
        num_items (int): Number of items in the training set.
        user_cluster (Tensor): Lookup tensor of user clusters.
        item_cluster (Tensor): Lookup tensor of item clusters.
        dist_sync_on_step (bool): Whether to synchronize metric state across distributed processes.
        **kwargs (Any): Additional keyword arguments.
    """

    def __init__(
        self,
        k: int,
        num_items: int,
        user_cluster: Tensor,
        item_cluster: Tensor,
        dist_sync_on_step: bool = False,
        **kwargs: Any,
    ):
        super().__init__(k=k, dist_sync_on_step=dist_sync_on_step)

        # Instantiate sub-metrics
        self.bs_metric = metric_registry.get(
            "BiasDisparityBS",
            num_items=num_items,
            user_cluster=user_cluster,
            item_cluster=item_cluster,
            dist_sync_on_step=dist_sync_on_step,
            **kwargs,
        )
        self.br_metric = metric_registry.get(
            "BiasDisparityBR",
            k=k,
            num_items=num_items,
            user_cluster=user_cluster,
            item_cluster=item_cluster,
            dist_sync_on_step=dist_sync_on_step,
            **kwargs,
        )

        # Union of required components
        self._REQUIRED_COMPONENTS = (
            self.bs_metric._REQUIRED_COMPONENTS | self.br_metric._REQUIRED_COMPONENTS
        )

        # Metadata for formatting
        self.n_user_effective_clusters = self.bs_metric.n_user_effective_clusters
        self.n_item_effective_clusters = self.bs_metric.n_item_effective_clusters

    def update(self, preds: Tensor, **kwargs: Any):
        # Update sub-metrics inner states
        self.bs_metric.update(preds, **kwargs)
        self.br_metric.update(preds, **kwargs)

    def compute(self):
        # Compute BS Tensor
        bs_safe_total = self.bs_metric.total_sum.unsqueeze(1).clamp(min=1.0)
        bs_tensor = (
            self.bs_metric.category_sum / bs_safe_total
        ) / self.bs_metric.PC.unsqueeze(0)

        # Compute BR Tensor
        br_safe_total = self.br_metric.total_sum.unsqueeze(1).clamp(min=1.0)
        br_tensor = (
            self.br_metric.category_sum / br_safe_total
        ) / self.br_metric.PC.unsqueeze(0)

        # Compute BD handling division by zero
        bd_tensor = ((br_tensor - bs_tensor) / bs_tensor).nan_to_num(0.0)

        results = {}

        # Format output per cluster combination
        for uc in range(self.n_user_effective_clusters):
            for ic in range(self.n_item_effective_clusters):
                key = f"{self.name}_UC{uc + 1}_IC{ic + 1}"
                results[key] = bd_tensor[uc + 1, ic + 1].item()

        return results

warprec.evaluation.metrics.fairness.biasdisparitybr.BiasDisparityBR

Bases: TopKMetric

The BiasDisparityBR@K (Bias Disparity - Bias Recommendations) metric.

This metric computes the disparity between the distribution of recommended items and the global item distribution per user cluster, averaged over users in the cluster.

Attributes:

Name Type Description
user_clusters Tensor

Tensor mapping each user to a user cluster.

item_clusters Tensor

Tensor mapping each item to an item cluster.

PC Tensor

Global distribution of items across item clusters.

category_sum Tensor

Accumulator tensor of shape counting recommended items per user-item cluster pair.

total_sum Tensor

Accumulator tensor counting total recommendations per user cluster.

Parameters:

Name Type Description Default
k int

The cutoff.

required
num_items int

Number of items in the training set.

required
user_cluster Tensor

Lookup tensor of user clusters.

required
item_cluster Tensor

Lookup tensor of item clusters.

required
dist_sync_on_step bool

Whether to synchronize metric state across distributed processes.

False
**kwargs Any

Additional keyword arguments.

{}
Source code in warprec/evaluation/metrics/fairness/biasdisparitybr.py
@metric_registry.register("BiasDisparityBR")
class BiasDisparityBR(TopKMetric):
    """The BiasDisparityBR@K (Bias Disparity - Bias Recommendations) metric.

    This metric computes the disparity between the distribution of recommended items and the global
    item distribution per user cluster, averaged over users in the cluster.

    Attributes:
        user_clusters (Tensor): Tensor mapping each user to a user cluster.
        item_clusters (Tensor): Tensor mapping each item to an item cluster.
        PC (Tensor): Global distribution of items across item clusters.
        category_sum (Tensor): Accumulator tensor of shape counting recommended items per user-item cluster pair.
        total_sum (Tensor): Accumulator tensor counting total recommendations per user cluster.

    Args:
        k (int): The cutoff.
        num_items (int): Number of items in the training set.
        user_cluster (Tensor): Lookup tensor of user clusters.
        item_cluster (Tensor): Lookup tensor of item clusters.
        dist_sync_on_step (bool): Whether to synchronize metric state across distributed processes.
        **kwargs (Any): Additional keyword arguments.
    """

    _REQUIRED_COMPONENTS: Set[MetricBlock] = {MetricBlock.TOP_K_INDICES}

    user_clusters: Tensor
    item_clusters: Tensor
    PC: Tensor
    category_sum: Tensor
    total_sum: Tensor

    def __init__(
        self,
        k: int,
        num_items: int,
        user_cluster: Tensor,
        item_cluster: Tensor,
        dist_sync_on_step: bool = False,
        **kwargs: Any,
    ):
        super().__init__(k=k, dist_sync_on_step=dist_sync_on_step)

        # Register static buffers
        self.register_buffer("user_clusters", user_cluster)
        self.register_buffer("item_clusters", item_cluster)

        self.n_user_effective_clusters = int(user_cluster.max().item())
        self.n_user_clusters = self.n_user_effective_clusters + 1

        self.n_item_effective_clusters = int(item_cluster.max().item())
        self.n_item_clusters = self.n_item_effective_clusters + 1

        # Global distribution of items (P_global)
        pc = torch.bincount(item_cluster, minlength=self.n_item_clusters).float()
        pc = pc / float(num_items)
        self.register_buffer("PC", pc)

        # Accumulators
        self.add_state(
            "category_sum",
            default=torch.zeros(self.n_user_clusters, self.n_item_clusters),
            dist_reduce_fx="sum",
        )
        self.add_state(
            "total_sum", default=torch.zeros(self.n_user_clusters), dist_reduce_fx="sum"
        )

    def update(self, preds: Tensor, user_indices: Tensor, **kwargs: Any):
        top_k_indices = kwargs.get(f"top_{self.k}_indices")

        # Remap top_k_indices to global
        item_indices = kwargs.get("item_indices")
        top_k_indices = self.remap_indices(top_k_indices, item_indices)

        # Get User Clusters (expanded to match top-k shape)
        # user_indices: [Batch] -> [Batch, 1] -> [Batch, K] -> [Batch * K]
        batch_user_clusters = self.user_clusters[user_indices]
        flat_user_clusters = (
            batch_user_clusters.unsqueeze(1).expand(-1, self.k).reshape(-1)
        )

        # Get Item Clusters
        flat_item_clusters = self.item_clusters[top_k_indices.reshape(-1)]

        # Vectorized Counting using bincount on flattened 2D coordinates
        # Index = user_cluster * n_item_cols + item_cluster
        combined_indices = (
            flat_user_clusters * self.n_item_clusters + flat_item_clusters
        )

        counts = torch.bincount(
            combined_indices, minlength=self.n_user_clusters * self.n_item_clusters
        ).float()

        # Reshape back to matrix form
        counts_matrix = counts.reshape(self.n_user_clusters, self.n_item_clusters)

        # Update states
        self.category_sum += counts_matrix
        self.total_sum += counts_matrix.sum(dim=1)

    def compute(self):
        # P_rec(u, c) / P_rec(u) / P_global(c)
        safe_total = self.total_sum.unsqueeze(1).clamp(min=1.0)

        bias_rec = (self.category_sum / safe_total) / self.PC.unsqueeze(0)

        results = {}
        for uc in range(self.n_user_effective_clusters):
            for ic in range(self.n_item_effective_clusters):
                # +1 because cluster 0 is usually padding/unknown
                key = f"{self.name}_UC{uc + 1}_IC{ic + 1}"
                results[key] = bias_rec[uc + 1, ic + 1].item()
        return results

warprec.evaluation.metrics.fairness.biasdisparitybs.BiasDisparityBS

Bases: BaseMetric

BiasDisparityBS measures the disparity in recommendation bias across user and item clusters.

This metric quantifies how the distribution of recommended items deviates from the global item distribution within each user cluster. It helps to identify whether certain user groups are disproportionately exposed to specific item categories compared to the overall item popularity.

Attributes:

Name Type Description
user_clusters Tensor

Tensor mapping each user to a user cluster.

item_clusters Tensor

Tensor mapping each item to an item cluster.

PC Tensor

Global distribution of items across item clusters.

category_sum Tensor

Accumulated counts of positive interactions per user-item cluster pair.

total_sum Tensor

Accumulated counts of positive interactions per user cluster.

Parameters:

Name Type Description Default
num_items int

Number of items in the training set.

required
user_cluster Tensor

Lookup tensor of user clusters.

required
item_cluster Tensor

Lookup tensor of item clusters.

required
dist_sync_on_step bool

Whether to synchronize metric state across distributed processes.

False
**kwargs Any

Additional keyword arguments.

{}
Source code in warprec/evaluation/metrics/fairness/biasdisparitybs.py
@metric_registry.register("BiasDisparityBS")
class BiasDisparityBS(BaseMetric):
    """BiasDisparityBS measures the disparity in recommendation bias across user and item clusters.

    This metric quantifies how the distribution of recommended items deviates from the global item
    distribution within each user cluster. It helps to identify whether certain user groups are
    disproportionately exposed to specific item categories compared to the overall item popularity.

    Attributes:
        user_clusters (Tensor): Tensor mapping each user to a user cluster.
        item_clusters (Tensor): Tensor mapping each item to an item cluster.
        PC (Tensor): Global distribution of items across item clusters.
        category_sum (Tensor): Accumulated counts of positive interactions per user-item cluster pair.
        total_sum (Tensor): Accumulated counts of positive interactions per user cluster.

    Args:
        num_items (int): Number of items in the training set.
        user_cluster (Tensor): Lookup tensor of user clusters.
        item_cluster (Tensor): Lookup tensor of item clusters.
        dist_sync_on_step (bool): Whether to synchronize metric state across distributed processes.
        **kwargs (Any): Additional keyword arguments.
    """

    user_clusters: Tensor
    item_clusters: Tensor
    PC: Tensor
    category_sum: Tensor
    total_sum: Tensor

    def __init__(
        self,
        num_items: int,
        user_cluster: Tensor,
        item_cluster: Tensor,
        dist_sync_on_step: bool = False,
        **kwargs: Any,
    ):
        super().__init__(dist_sync_on_step=dist_sync_on_step)

        # Register static buffers
        self.register_buffer("user_clusters", user_cluster)
        self.register_buffer("item_clusters", item_cluster)

        self.n_user_effective_clusters = int(user_cluster.max().item())
        self.n_user_clusters = self.n_user_effective_clusters + 1

        self.n_item_effective_clusters = int(item_cluster.max().item())
        self.n_item_clusters = self.n_item_effective_clusters + 1

        # Global distribution of items (P_global)
        pc = torch.bincount(item_cluster, minlength=self.n_item_clusters).float()
        pc = pc / float(num_items)
        self.register_buffer("PC", pc)

        # Accumulators
        self.add_state(
            "category_sum",
            default=torch.zeros(self.n_user_clusters, self.n_item_clusters),
            dist_reduce_fx="sum",
        )
        self.add_state(
            "total_sum", default=torch.zeros(self.n_user_clusters), dist_reduce_fx="sum"
        )

    def update(self, preds: Tensor, user_indices: Tensor, **kwargs: Any):
        # Retrieve Ground Truth (Binary or Raw)
        target = kwargs.get("ground")

        # Find positive interactions in the batch
        user_idx_local, item_idx_local = target.nonzero(as_tuple=True)

        if user_idx_local.numel() == 0:
            return

        # Map User Indices: Batch -> Global
        user_idx_global = user_indices[user_idx_local]

        # Map Item Indices: Local -> Global
        item_indices = kwargs.get("item_indices")
        if item_indices is not None:
            item_idx_global = item_indices[user_idx_local, item_idx_local]
        else:
            item_idx_global = item_idx_local

        # Get Clusters
        u_clusters = self.user_clusters[user_idx_global]
        i_clusters = self.item_clusters[item_idx_global]

        # Accumulate
        # We use index_put_ with accumulate=True for efficient scatter add
        self.category_sum.index_put_(
            (u_clusters, i_clusters),
            torch.ones_like(u_clusters, dtype=torch.float),
            accumulate=True,
        )
        self.total_sum.index_put_(
            (u_clusters,),
            torch.ones_like(u_clusters, dtype=torch.float),
            accumulate=True,
        )

    def compute(self):
        # P_train(u, c) / P_global(c)
        # Avoid division by zero for total_sum
        safe_total = self.total_sum.unsqueeze(1).clamp(min=1.0)

        bias_src = (self.category_sum / safe_total) / self.PC.unsqueeze(0)

        results = {}
        for uc in range(self.n_user_effective_clusters):
            for ic in range(self.n_item_effective_clusters):
                # +1 because cluster 0 is usually padding/unknown
                key = f"{self.name}_UC{uc + 1}_IC{ic + 1}"
                results[key] = bias_src[uc + 1, ic + 1].item()
        return results

warprec.evaluation.metrics.fairness.itemmadranking.ItemMADRanking

Bases: TopKMetric

Item MAD Ranking (ItemMADRanking) metric.

This metric measures the disparity in item exposure across different item clusters in the top-k recommendations, by computing the Mean Absolute Deviation (MAD) of the average discounted relevance scores per cluster. The goal is to evaluate whether some item clusters receive consistently higher or lower exposure than others.

Attributes:

Name Type Description
num_items int

Number of items in the training set.

item_clusters Tensor

Tensor mapping each item to an item cluster.

item_counts Tensor

Tensor of counts of item recommended.

item_gains Tensor

Tensor of gains of item recommended.

Parameters:

Name Type Description Default
k int

Cutoff for top-k recommendations.

required
num_items int

Number of items in the training set.

required
item_cluster Tensor

Lookup tensor of item clusters.

required
dist_sync_on_step bool

Whether to synchronize metric state across distributed processes.

False
**kwargs Any

Additional keyword arguments.

{}
Source code in warprec/evaluation/metrics/fairness/itemmadranking.py
@metric_registry.register("ItemMADRanking")
class ItemMADRanking(TopKMetric):
    """Item MAD Ranking (ItemMADRanking) metric.

    This metric measures the disparity in item exposure across different item clusters
    in the top-k recommendations, by computing the Mean Absolute Deviation (MAD) of the average
    discounted relevance scores per cluster. The goal is to evaluate whether some item clusters
    receive consistently higher or lower exposure than others.

    Attributes:
        num_items (int): Number of items in the training set.
        item_clusters (Tensor): Tensor mapping each item to an item cluster.
        item_counts (Tensor): Tensor of counts of item recommended.
        item_gains (Tensor): Tensor of gains of item recommended.

    Args:
        k (int): Cutoff for top-k recommendations.
        num_items (int): Number of items in the training set.
        item_cluster (Tensor): Lookup tensor of item clusters.
        dist_sync_on_step (bool): Whether to synchronize metric state across distributed processes.
        **kwargs (Any): Additional keyword arguments.
    """

    _REQUIRED_COMPONENTS: Set[MetricBlock] = {
        MetricBlock.DISCOUNTED_RELEVANCE,
        MetricBlock.TOP_K_DISCOUNTED_RELEVANCE,
        MetricBlock.TOP_K_INDICES,
    }

    num_items: int
    item_clusters: Tensor
    item_counts: Tensor
    item_gains: Tensor

    def __init__(
        self,
        k: int,
        num_items: int,
        item_cluster: Tensor,
        dist_sync_on_step: bool = False,
        **kwargs: Any,
    ):
        super().__init__(k=k, dist_sync_on_step=dist_sync_on_step)
        self.num_items = num_items

        # Register item clusters as buffer
        self.register_buffer("item_clusters", item_cluster)
        self.n_item_clusters = int(item_cluster.max().item()) + 1

        # Initialize accumulators
        self.add_state("item_counts", torch.zeros(num_items), dist_reduce_fx="sum")
        self.add_state("item_gains", torch.zeros(num_items), dist_reduce_fx="sum")

    def update(self, preds: Tensor, **kwargs: Any):
        top_k_gains = kwargs.get(f"top_{self.k}_discounted_relevance")
        top_k_indices = kwargs.get(f"top_{self.k}_indices")

        # Remap top_k_indices to global
        item_indices = kwargs.get("item_indices")
        top_k_indices = self.remap_indices(top_k_indices, item_indices)

        # Flatten for accumulation
        flat_indices = top_k_indices.flatten()
        flat_gains = top_k_gains.flatten().float()

        # Accumulate counts (1 for every appearance in top-k)
        self.item_counts.index_add_(
            0, flat_indices, torch.ones_like(flat_indices, dtype=torch.float)
        )

        # Accumulate gains (discounted relevance from GT)
        self.item_gains.index_add_(0, flat_indices, flat_gains)

    def compute(self):
        # Compute average gain per item (only for recommended items)
        recommended_mask = self.item_counts > 0

        if not recommended_mask.any():
            return {self.name: torch.tensor(0.0)}

        item_avg_gains = torch.zeros_like(self.item_gains)
        item_avg_gains[recommended_mask] = (
            self.item_gains[recommended_mask] / self.item_counts[recommended_mask]
        )

        # Aggregate per cluster
        # Get clusters for recommended items
        rec_indices = torch.where(recommended_mask)[0]
        rec_clusters = self.item_clusters[rec_indices]
        rec_gains = item_avg_gains[rec_indices]

        # Sum gains and counts per cluster
        cluster_sum_gains = torch.zeros(self.n_item_clusters, device=self.device)
        cluster_counts = torch.zeros(self.n_item_clusters, device=self.device)

        cluster_sum_gains.index_add_(0, rec_clusters, rec_gains)
        cluster_counts.index_add_(0, rec_clusters, torch.ones_like(rec_gains))

        # Compute Mean per cluster
        valid_clusters = cluster_counts > 0

        if not valid_clusters.any():
            return {self.name: torch.tensor(0.0)}

        cluster_means = (
            cluster_sum_gains[valid_clusters] / cluster_counts[valid_clusters]
        )

        # Compute MAD (Pairwise differences)
        if cluster_means.numel() < 2:
            mad = torch.tensor(0.0, device=self.device)
        else:
            # Vectorized pairwise absolute difference
            diffs = (cluster_means.unsqueeze(0) - cluster_means.unsqueeze(1)).abs()
            # Sum upper triangle
            pairwise_sum = diffs.triu(diagonal=1).sum()
            num_pairs = cluster_means.numel() * (cluster_means.numel() - 1) / 2
            mad = pairwise_sum / num_pairs

        return {self.name: mad.item()}

warprec.evaluation.metrics.fairness.itemmadrating.ItemMADRating

Bases: TopKMetric

Item MAD Rating (ItemMADRating) metric.

This metric measures the disparity in the average rating received by items across different item clusters, considering only the items that were recommended and were relevant to the user. It computes the Mean Absolute Deviation (MAD) of the average rating per item cluster. The goal is to evaluate whether some item clusters receive consistently higher or lower average ratings when they are successfully recommended (i.e., recommended to a relevant user).

Attributes:

Name Type Description
num_items int

Number of items in the training set.

item_clusters Tensor

Tensor mapping each item to an item cluster.

item_counts Tensor

Tensor of counts of item recommended and relevant.

item_gains Tensor

Tensor of summed ratings/relevance for item recommended and relevant.

Parameters:

Name Type Description Default
k int

Cutoff for top-k recommendations.

required
num_items int

Number of items in the training set.

required
item_cluster Tensor

Lookup tensor of item clusters.

required
dist_sync_on_step bool

Whether to synchronize metric state across distributed processes.

False
**kwargs Any

Additional keyword arguments.

{}
Source code in warprec/evaluation/metrics/fairness/itemmadrating.py
@metric_registry.register("ItemMADRating")
class ItemMADRating(TopKMetric):
    """Item MAD Rating (ItemMADRating) metric.

    This metric measures the disparity in the average rating received by items
    across different item clusters, considering only the items that were recommended
    and were relevant to the user. It computes the Mean Absolute Deviation (MAD)
    of the average rating per item cluster. The goal is to evaluate whether some item
    clusters receive consistently higher or lower average ratings when they are
    successfully recommended (i.e., recommended to a relevant user).

    Attributes:
        num_items (int): Number of items in the training set.
        item_clusters (Tensor): Tensor mapping each item to an item cluster.
        item_counts (Tensor): Tensor of counts of item recommended and relevant.
        item_gains (Tensor): Tensor of summed ratings/relevance for item recommended and relevant.

    Args:
        k (int): Cutoff for top-k recommendations.
        num_items (int): Number of items in the training set.
        item_cluster (Tensor): Lookup tensor of item clusters.
        dist_sync_on_step (bool): Whether to synchronize metric state across distributed processes.
        **kwargs (Any): Additional keyword arguments.
    """

    _REQUIRED_COMPONENTS: Set[MetricBlock] = {
        MetricBlock.BINARY_RELEVANCE,
        MetricBlock.TOP_K_BINARY_RELEVANCE,
        MetricBlock.TOP_K_INDICES,
        MetricBlock.TOP_K_VALUES,
    }

    num_items: int
    item_clusters: Tensor
    item_counts: Tensor
    item_gains: Tensor

    def __init__(
        self,
        k: int,
        num_items: int,
        item_cluster: Tensor,
        dist_sync_on_step: bool = False,
        **kwargs: Any,
    ):
        super().__init__(k=k, dist_sync_on_step=dist_sync_on_step)
        self.num_items = num_items

        self.register_buffer("item_clusters", item_cluster)
        self.n_item_clusters = int(item_cluster.max().item()) + 1

        # Initialize accumulators
        self.add_state("item_counts", torch.zeros(num_items), dist_reduce_fx="sum")
        self.add_state("item_gains", torch.zeros(num_items), dist_reduce_fx="sum")

    def update(self, preds: Tensor, **kwargs: Any):
        top_k_values = kwargs.get(f"top_{self.k}_values")
        top_k_indices = kwargs.get(f"top_{self.k}_indices")
        top_k_rel = kwargs.get(f"top_{self.k}_binary_relevance")

        # Create relevance mask (only consider relevant items)
        rel_mask = top_k_rel.bool()

        # Remap top_k_indices to global
        item_indices = kwargs.get("item_indices")
        top_k_indices = self.remap_indices(top_k_indices, item_indices)

        # Filter only Relevant items (True Positives)
        # We select elements from global_indices and top_k_values where relevance_mask is True
        relevant_indices = torch.masked_select(top_k_indices, rel_mask)
        relevant_scores = torch.masked_select(top_k_values, rel_mask)

        if relevant_indices.numel() > 0:
            # Accumulate counts (1 for every relevant appearance)
            self.item_counts.index_add_(
                0, relevant_indices, torch.ones_like(relevant_scores)
            )

            # Accumulate gains (predicted scores of relevant items)
            self.item_gains.index_add_(0, relevant_indices, relevant_scores)

    def compute(self):
        # Compute average rating per item (only for recommended and relevant items)
        recommended_mask = self.item_counts > 0

        if not recommended_mask.any():
            return {self.name: torch.tensor(0.0)}

        item_avg_ratings = torch.zeros_like(self.item_gains)
        item_avg_ratings[recommended_mask] = (
            self.item_gains[recommended_mask] / self.item_counts[recommended_mask]
        )

        # Aggregate per cluster
        rec_indices = torch.where(recommended_mask)[0]
        rec_clusters = self.item_clusters[rec_indices]
        rec_ratings = item_avg_ratings[rec_indices]

        cluster_sum_ratings = torch.zeros(self.n_item_clusters, device=self.device)
        cluster_counts = torch.zeros(self.n_item_clusters, device=self.device)

        cluster_sum_ratings.index_add_(0, rec_clusters, rec_ratings)
        cluster_counts.index_add_(0, rec_clusters, torch.ones_like(rec_ratings))

        # Compute Mean per cluster
        valid_clusters = cluster_counts > 0

        if not valid_clusters.any():
            return {self.name: torch.tensor(0.0)}

        cluster_means = (
            cluster_sum_ratings[valid_clusters] / cluster_counts[valid_clusters]
        )

        # Compute MAD
        if cluster_means.numel() < 2:
            mad = torch.tensor(0.0, device=self.device)
        else:
            diffs = (cluster_means.unsqueeze(0) - cluster_means.unsqueeze(1)).abs()
            pairwise_sum = diffs.triu(diagonal=1).sum()
            num_pairs = cluster_means.numel() * (cluster_means.numel() - 1) / 2
            mad = pairwise_sum / num_pairs

        return {self.name: mad.item()}

warprec.evaluation.metrics.fairness.reo.REO

Bases: TopKMetric

Ranking-based Equal Opportunity (REO) metric.

This metric evaluates the fairness of a recommender system by comparing the proportion of recommended items from different item clusters (or groups) among the relevant items in the ground truth. It calculates the standard deviation of these proportions divided by their mean, providing a measure of how equally the system recommends relevant items across different groups.

Attributes:

Name Type Description
item_clusters Tensor

A tensor mapping item index to its cluster ID.

cluster_recommendations Tensor

Accumulator for the total count of relevant recommended items per cluster.

cluster_total_items Tensor

Accumulator for the total count of relevant items per cluster in the ground truth.

n_effective_clusters int

The total number of unique item clusters.

n_item_clusters int

The total number of unique item clusters, including fallback cluster.

Parameters:

Name Type Description Default
k int

Cutoff for top-k recommendations.

required
*args Any

The argument list.

()
item_cluster Tensor

Lookup tensor of item clusters.

None
dist_sync_on_step bool

Whether to synchronize metric state across distributed processes.

False
**kwargs Any

Additional keyword arguments.

{}
Source code in warprec/evaluation/metrics/fairness/reo.py
@metric_registry.register("REO")
class REO(TopKMetric):
    """Ranking-based Equal Opportunity (REO) metric.

    This metric evaluates the fairness of a recommender system by comparing the
    proportion of recommended items from different item clusters (or groups)
    among the relevant items in the ground truth. It calculates the standard
    deviation of these proportions divided by their mean, providing a measure
    of how equally the system recommends relevant items across different groups.

    Attributes:
        item_clusters (Tensor): A tensor mapping item index to its cluster ID.
        cluster_recommendations (Tensor): Accumulator for the total count of relevant recommended items per cluster.
        cluster_total_items (Tensor): Accumulator for the total count of relevant items per cluster in the ground truth.
        n_effective_clusters (int): The total number of unique item clusters.
        n_item_clusters (int): The total number of unique item clusters, including fallback cluster.

    Args:
        k (int): Cutoff for top-k recommendations.
        *args (Any): The argument list.
        item_cluster (Tensor): Lookup tensor of item clusters.
        dist_sync_on_step (bool): Whether to synchronize metric state across distributed processes.
        **kwargs (Any): Additional keyword arguments.
    """

    _REQUIRED_COMPONENTS: Set[MetricBlock] = {
        MetricBlock.BINARY_RELEVANCE,
        MetricBlock.TOP_K_INDICES,
        MetricBlock.TOP_K_BINARY_RELEVANCE,
    }

    item_clusters: Tensor
    cluster_recommendations: Tensor
    cluster_total_items: Tensor
    n_effective_clusters: int
    n_item_clusters: int

    def __init__(
        self,
        k: int,
        *args: Any,
        item_cluster: Tensor = None,
        dist_sync_on_step: bool = False,
        **kwargs: Any,
    ):
        super().__init__(k, dist_sync_on_step)
        self.register_buffer("item_clusters", item_cluster)
        self.n_effective_clusters = int(item_cluster.max().item())
        self.n_item_clusters = (
            self.n_effective_clusters + 1
        )  # Take into account the zero cluster

        # Per-cluster accumulators
        self.add_state(
            "cluster_recommendations",
            torch.zeros(self.n_item_clusters),
            dist_reduce_fx="sum",
        )
        self.add_state(
            "cluster_total_items",
            torch.zeros(self.n_item_clusters),
            dist_reduce_fx="sum",
        )

    def update(self, preds: Tensor, **kwargs: Any):
        target = kwargs.get("binary_relevance")
        top_k_indices = kwargs.get(f"top_{self.k}_indices")
        top_k_rel = kwargs.get(f"top_{self.k}_binary_relevance")
        item_indices = kwargs.get("item_indices")

        # Remap top_k_indices to global
        item_indices = kwargs.get("item_indices")
        top_k_indices = self.remap_indices(top_k_indices, item_indices)

        # Identify Global Indices for Recommendations
        if item_indices is not None:
            rows, cols = target.nonzero(as_tuple=True)
            positive_indices_global = item_indices[rows, cols]
        else:
            _, positive_indices_global = target.nonzero(as_tuple=True)

        # Identify Relevant Recommended Items
        rel_mask = top_k_rel > 0
        relevant_rec_indices_global = top_k_indices[rel_mask]

        # Map to Clusters
        rec_clusters = self.item_clusters[relevant_rec_indices_global]
        gt_clusters = self.item_clusters[positive_indices_global]

        # Accumulate Counts
        batch_rec_counts = torch.bincount(
            rec_clusters, minlength=self.n_item_clusters
        ).float()

        batch_total_counts = torch.bincount(
            gt_clusters, minlength=self.n_item_clusters
        ).float()

        self.cluster_recommendations += batch_rec_counts
        self.cluster_total_items += batch_total_counts

    def compute(self):
        # Mask for clusters that exist in the ground truth
        # We assume cluster 0 is padding/unknown and usually ignore it if it has no items
        valid_mask = self.cluster_total_items > 0

        if not valid_mask.any():
            return {self.name: torch.tensor(0.0).item()}

        # Calculate probabilities for ALL clusters (keep 0 for invalid ones to maintain index alignment)
        # Avoid division by zero
        safe_denom = self.cluster_total_items.clone()
        safe_denom[~valid_mask] = 1.0

        probs = self.cluster_recommendations / safe_denom
        probs[~valid_mask] = 0.0  # Ensure invalid clusters are 0

        # Calculate global stats based ONLY on valid clusters
        valid_probs = probs[valid_mask]

        if valid_probs.numel() <= 1:
            std_prob = torch.tensor(0.0)
            mean_prob = torch.tensor(1.0)  # Avoid div/0
        else:
            std_prob = torch.std(valid_probs, unbiased=False)
            mean_prob = torch.mean(valid_probs)

        results = {}

        # Populate per-cluster probability
        for ic in range(1, self.n_effective_clusters + 1):
            key = f"{self.name}_IC{ic}"
            if valid_mask[ic]:
                results[key] = probs[ic].item()
            else:
                results[key] = float("nan")

        # Aggregate Score
        if mean_prob == 0:
            results[self.name] = 0.0
        else:
            results[self.name] = (std_prob / mean_prob).item()

        return results

warprec.evaluation.metrics.fairness.rsp.RSP

Bases: TopKMetric

Ranking-based Statistical Parity (RSP) metric.

This metric evaluates the fairness of a recommender system by comparing the proportion of recommended items from different item clusters (or groups) out of the pool of items not seen during training. It calculates the standard deviation of these proportions divided by their mean, providing a measure of how equally the system recommends items across different groups, regardless of relevance in the test set.

Attributes:

Name Type Description
item_clusters Tensor

A tensor mapping item index to its cluster ID.

cluster_recommendations Tensor

Accumulator for the total count of recommended items per cluster in the top-k.

denominator_counts Tensor

Pre-calculated total count of items per cluster not in the training set across all users.

n_effective_clusters int

The total number of unique item clusters.

n_item_clusters int

The total number of unique item clusters, including fallback cluster.

user_interactions Tensor

Accumulator for counting how many times each user has been evaluated.

Parameters:

Name Type Description Default
k int

Cutoff for top-k recommendations.

required
num_users int

Number of users in the training set.

required
item_interactions Tensor

Tensor containing counts of item interactions in the training set.

required
item_cluster Tensor

Lookup tensor of item clusters.

None
dist_sync_on_step bool

Whether to synchronize metric state across distributed processes.

False
**kwargs Any

Additional keyword arguments.

{}
Source code in warprec/evaluation/metrics/fairness/rsp.py
@metric_registry.register("RSP")
class RSP(TopKMetric):
    """Ranking-based Statistical Parity (RSP) metric.

    This metric evaluates the fairness of a recommender system by comparing the
    proportion of recommended items from different item clusters (or groups)
    out of the pool of items not seen during training. It calculates the standard
    deviation of these proportions divided by their mean, providing a measure
    of how equally the system recommends items across different groups, regardless
    of relevance in the test set.

    Attributes:
        item_clusters (Tensor): A tensor mapping item index to its cluster ID.
        cluster_recommendations (Tensor): Accumulator for the total count of recommended items per cluster in the top-k.
        denominator_counts (Tensor): Pre-calculated total count of items per cluster not in the training set across all users.
        n_effective_clusters (int): The total number of unique item clusters.
        n_item_clusters (int): The total number of unique item clusters, including fallback cluster.
        user_interactions (Tensor): Accumulator for counting how many times each user has been evaluated.

    Args:
        k (int): Cutoff for top-k recommendations.
        num_users (int): Number of users in the training set.
        item_interactions (Tensor): Tensor containing counts of item interactions in the training set.
        item_cluster (Tensor): Lookup tensor of item clusters.
        dist_sync_on_step (bool): Whether to synchronize metric state across distributed processes.
        **kwargs (Any): Additional keyword arguments.
    """

    _REQUIRED_COMPONENTS: Set[MetricBlock] = {
        MetricBlock.TOP_K_INDICES,
        MetricBlock.VALID_USERS,
    }

    item_clusters: Tensor
    cluster_recommendations: Tensor
    denominator_counts: Tensor
    n_effective_clusters: int
    n_item_clusters: int
    user_interactions: Tensor

    def __init__(
        self,
        k: int,
        num_users: int,
        item_interactions: Tensor,
        item_cluster: Tensor = None,
        dist_sync_on_step: bool = False,
        **kwargs: Any,
    ):
        super().__init__(k, dist_sync_on_step)
        self.register_buffer("item_clusters", item_cluster)
        self.n_effective_clusters = int(item_cluster.max().item())
        self.n_item_clusters = self.n_effective_clusters + 1

        # Count cluster of items in the catalog
        self.register_buffer(
            "cluster_item_counts",
            torch.bincount(item_cluster, minlength=self.n_item_clusters).float(),
        )

        # Global count of items per cluster in the training set
        cluster_train_counts = torch.zeros(
            self.n_item_clusters, dtype=torch.float, device=item_cluster.device
        )
        cluster_train_counts.index_add_(0, item_cluster, item_interactions.float())
        self.register_buffer("cluster_train_interaction_counts", cluster_train_counts)

        # Accumulators
        self.add_state(
            "cluster_recommendations",
            torch.zeros(self.n_item_clusters, dtype=torch.float),
            dist_reduce_fx="sum",
        )
        self.add_state(
            "user_interactions",
            default=torch.zeros(num_users, dtype=torch.float),
            dist_reduce_fx="sum",
        )

    def update(self, preds: Tensor, user_indices: Tensor, **kwargs: Any):
        users = kwargs.get("valid_users")
        top_k_indices = kwargs.get(f"top_{self.k}_indices")

        # Remap top_k_indices to global
        item_indices = kwargs.get("item_indices")
        top_k_indices = self.remap_indices(top_k_indices, item_indices)

        # Accumulate cluster recommendations for numerator
        flat_indices = top_k_indices.flatten()
        rec_clusters = self.item_clusters[flat_indices]
        batch_rec_counts = torch.bincount(
            rec_clusters, minlength=self.n_item_clusters
        ).float()
        self.cluster_recommendations += batch_rec_counts

        # Accumulate user interactions for denominator
        self.user_interactions.index_add_(0, user_indices, users.float())

    def compute(self):
        # Compute total interactions across all users
        total_interactions = self.user_interactions.sum()

        if total_interactions == 0:
            return {self.name: 0.0}

        # Total potential items per cluster not in training set
        total_potential = total_interactions * self.cluster_item_counts

        # Estimate masked items per cluster
        num_total_users = self.user_interactions.size(0)
        scaling_factor = total_interactions / num_total_users
        estimated_masked_items = scaling_factor * self.cluster_train_interaction_counts

        # Final denominator counts
        denominator_counts = total_potential - estimated_masked_items

        # Safety clamp to avoid negative values
        denominator_counts = torch.clamp(denominator_counts, min=0.0)

        # Valid clusters for computation
        valid_mask = denominator_counts > 0

        if not valid_mask.any():
            return {self.name: 0.0}

        # Compute probabilities per cluster
        probs = torch.zeros_like(self.cluster_recommendations)
        probs[valid_mask] = (
            self.cluster_recommendations[valid_mask] / denominator_counts[valid_mask]
        )

        valid_probs = probs[valid_mask]

        if valid_probs.numel() <= 1:
            std_prob = 0.0
            mean_prob = 1.0
        else:
            std_prob = torch.std(valid_probs, unbiased=False).item()
            mean_prob = torch.mean(valid_probs).item()

        results = {}

        # Populate per-cluster probability
        for ic in range(1, self.n_effective_clusters + 1):
            key = f"{self.name}_IC{ic}"
            if valid_mask[ic]:
                results[key] = probs[ic].item()
            else:
                results[key] = float("nan")

        # Aggregate Score
        if mean_prob == 0:
            results[self.name] = 0.0
        else:
            results[self.name] = std_prob / mean_prob

        return results

warprec.evaluation.metrics.fairness.usermadranking.UserMADRanking

Bases: UserAverageTopKMetric

User MAD Ranking (UserMADRanking) metric.

This metric measures the disparity in user exposure across different user clusters in the top-k recommendations, by computing the Mean Absolute Deviation (MAD) of the average per-user nDCG scores per cluster. The MAD is computed as the mean of absolute differences between every pair of cluster-level averages.

Attributes:

Name Type Description
user_clusters Tensor

Tensor mapping each user to an user cluster.

Parameters:

Name Type Description Default
k int

Cutoff for top-k recommendations.

required
num_users int

Number of users in the training set.

required
user_cluster Tensor

Lookup tensor of user clusters.

required
dist_sync_on_step bool

Whether to synchronize metric state across distributed processes.

False
**kwargs Any

Additional keyword arguments.

{}
Source code in warprec/evaluation/metrics/fairness/usermadranking.py
@metric_registry.register("UserMADRanking")
class UserMADRanking(UserAverageTopKMetric):
    """User MAD Ranking (UserMADRanking) metric.

    This metric measures the disparity in user exposure across different user clusters
    in the top-k recommendations, by computing the Mean Absolute Deviation (MAD)
    of the average per-user nDCG scores per cluster. The MAD is computed as the mean
    of absolute differences between every pair of cluster-level averages.

    Attributes:
        user_clusters (Tensor): Tensor mapping each user to an user cluster.

    Args:
        k (int): Cutoff for top-k recommendations.
        num_users (int): Number of users in the training set.
        user_cluster (Tensor): Lookup tensor of user clusters.
        dist_sync_on_step (bool): Whether to synchronize metric state across distributed processes.
        **kwargs (Any): Additional keyword arguments.
    """

    _REQUIRED_COMPONENTS: Set[MetricBlock] = {
        MetricBlock.DISCOUNTED_RELEVANCE,
        MetricBlock.TOP_K_DISCOUNTED_RELEVANCE,
        MetricBlock.VALID_USERS,
    }

    user_clusters: Tensor

    def __init__(
        self,
        k: int,
        num_users: int,
        user_cluster: Tensor,
        dist_sync_on_step: bool = False,
        **kwargs: Any,
    ):
        super().__init__(k=k, num_users=num_users, dist_sync_on_step=dist_sync_on_step)

        self.register_buffer("user_clusters", user_cluster)
        self.n_user_clusters = int(user_cluster.max().item()) + 1

    def unpack_inputs(
        self, preds: Tensor, **kwargs: Any
    ) -> Tuple[Tensor, Tensor, Tensor]:
        target = kwargs.get("discounted_relevance", torch.zeros_like(preds))
        users = kwargs.get("valid_users", self.valid_users(target))
        top_k_rel = kwargs.get(
            f"top_{self.k}_discounted_relevance",
            self.top_k_relevance(preds, target, self.k),
        )
        return target, users, top_k_rel

    def compute_scores(
        self, preds: Tensor, target: Tensor, top_k_rel: Tensor, **kwargs: Any
    ) -> Tensor:
        # Gather relevance at top-k (DCG component)
        dcg_score = self.dcg(top_k_rel)

        # Compute ideal relevance (IDCG component)
        ideal_rel = torch.topk(target, self.k, dim=1, largest=True, sorted=True).values
        idcg_score = self.dcg(ideal_rel).clamp(min=1e-10)

        # nDCG per user
        return (dcg_score / idcg_score).nan_to_num(0)

    def compute(self):
        # Calculate average nDCG per user
        mask = self.user_interactions > 0

        if not mask.any():
            return {self.name: torch.tensor(0.0)}

        user_vals = torch.zeros_like(self.scores)
        user_vals[mask] = self.scores[mask] / self.user_interactions[mask]

        # Aggregate per cluster
        sum_cluster = torch.zeros(self.n_user_clusters, device=self.device)
        count_cluster = torch.zeros(
            self.n_user_clusters, dtype=torch.long, device=self.device
        )

        # Scatter add to sum values for each cluster
        sum_cluster.scatter_add_(0, self.user_clusters, user_vals)
        count_cluster.scatter_add_(0, self.user_clusters, mask.long())

        # Mean per cluster
        # Filter out clusters with no users
        valid_clusters = count_cluster > 0

        if not valid_clusters.any():
            return {self.name: torch.tensor(0.0)}

        mean_cluster = sum_cluster[valid_clusters] / count_cluster[valid_clusters]

        # Pairwise absolute differences (MAD)
        m = mean_cluster.numel()
        if m < 2:
            mad = torch.tensor(0.0, device=self.device)
        else:
            # Vectorized pairwise diffs
            # [m, 1] - [1, m] -> [m, m] matrix of differences
            diffs = (mean_cluster.unsqueeze(0) - mean_cluster.unsqueeze(1)).abs()

            # Sum of upper triangle (excluding diagonal)
            pairwise_sum = diffs.triu(diagonal=1).sum()

            # Number of pairs: m * (m - 1) / 2
            num_pairs = m * (m - 1) / 2
            mad = pairwise_sum / num_pairs

        return {self.name: mad.item()}

warprec.evaluation.metrics.fairness.usermadrating.UserMADRating

Bases: UserAverageTopKMetric

User MAD Rating (UserMADRating) metric.

This metric measures the disparity in the average rating/score received by users across different user clusters, considering the average rating of their top-k recommended items. It computes the Mean Absolute Deviation (MAD) of the average per-user average top-k rating scores per user cluster. The MAD is computed as the mean of absolute differences between every pair of cluster-level averages.

Attributes:

Name Type Description
user_clusters Tensor

Tensor mapping each user to an user cluster.

Parameters:

Name Type Description Default
k int

Cutoff for top-k recommendations.

required
num_users int

Number of users in the training set.

required
user_cluster Tensor

Lookup tensor of user clusters.

required
dist_sync_on_step bool

Whether to synchronize metric state across distributed processes.

False
**kwargs Any

Additional keyword arguments.

{}
Source code in warprec/evaluation/metrics/fairness/usermadrating.py
@metric_registry.register("UserMADRating")
class UserMADRating(UserAverageTopKMetric):
    """User MAD Rating (UserMADRating) metric.

    This metric measures the disparity in the average rating/score received by users
    across different user clusters, considering the average rating of their top-k
    recommended items. It computes the Mean Absolute Deviation (MAD) of the average
    per-user average top-k rating scores per user cluster. The MAD is computed as the mean
    of absolute differences between every pair of cluster-level averages.

    Attributes:
        user_clusters (Tensor): Tensor mapping each user to an user cluster.

    Args:
        k (int): Cutoff for top-k recommendations.
        num_users (int): Number of users in the training set.
        user_cluster (Tensor): Lookup tensor of user clusters.
        dist_sync_on_step (bool): Whether to synchronize metric state across distributed processes.
        **kwargs (Any): Additional keyword arguments.
    """

    _REQUIRED_COMPONENTS: Set[MetricBlock] = {
        MetricBlock.BINARY_RELEVANCE,
        MetricBlock.TOP_K_VALUES,
        MetricBlock.VALID_USERS,
    }

    user_clusters: Tensor

    def __init__(
        self,
        k: int,
        num_users: int,
        user_cluster: Tensor,
        dist_sync_on_step: bool = False,
        **kwargs: Any,
    ):
        super().__init__(k=k, num_users=num_users, dist_sync_on_step=dist_sync_on_step)

        self.register_buffer("user_clusters", user_cluster)
        self.n_user_clusters = int(user_cluster.max().item()) + 1

    def compute_scores(
        self, preds: Tensor, target: Tensor, top_k_rel: Tensor, **kwargs: Any
    ) -> Tensor:
        top_k_values = kwargs.get(f"top_{self.k}_values")

        # Average score of the top-k items for each user
        return top_k_values.mean(dim=1)

    def compute(self):
        # Calculate average rating per user
        mask = self.user_interactions > 0

        if not mask.any():
            return {self.name: torch.tensor(0.0)}

        user_vals = torch.zeros_like(self.scores)
        user_vals[mask] = self.scores[mask] / self.user_interactions[mask]

        # Aggregate per cluster
        sum_cluster = torch.zeros(self.n_user_clusters, device=self.device)
        count_cluster = torch.zeros(
            self.n_user_clusters, dtype=torch.long, device=self.device
        )

        sum_cluster.scatter_add_(0, self.user_clusters, user_vals)
        count_cluster.scatter_add_(0, self.user_clusters, mask.long())

        # Mean per cluster
        valid_clusters = count_cluster > 0

        if not valid_clusters.any():
            return {self.name: torch.tensor(0.0)}

        mean_cluster = sum_cluster[valid_clusters] / count_cluster[valid_clusters]

        # Pairwise absolute differences (MAD)
        m = mean_cluster.numel()
        if m < 2:
            mad = torch.tensor(0.0, device=self.device)
        else:
            diffs = (mean_cluster.unsqueeze(0) - mean_cluster.unsqueeze(1)).abs()
            pairwise_sum = diffs.triu(diagonal=1).sum()
            num_pairs = m * (m - 1) / 2
            mad = pairwise_sum / num_pairs

        return {self.name: mad.item()}