Skip to content

Multiobjective Metrics - API Reference

Auto-generated documentation for multiobjective metric classes.

warprec.evaluation.metrics.multiobjective.euclideandistance.EucDistance

Bases: TopKMetric

Computes the Euclidean Distance between the model's performance and an Utopia Point (Ideal Point).

Usually, a lower Euclidean Distance indicates a better performance as it is closer to the ideal objectives.

Attributes:

Name Type Description
utopia_tensor Tensor

Tensor of utopia points of sub metrics.

Parameters:

Name Type Description Default
k int

The number of top recommendations to consider (cutoff).

required
num_users int

Number of users in the training set.

required
num_items int

Number of items in the training set.

required
metric_names List[str]

List of sub metrics to compute.

required
utopia_points List[float]

List of utopia points of the sub metrics.

required
*args Any

Additional arguments to pass to the parent class.

()
dist_sync_on_step bool

Torchmetrics parameter.

False
**kwargs Any

Additional keyword arguments to pass to the parent class.

{}

Raises:

Type Description
ValueError

If the provided lists have different lengths.

Source code in warprec/evaluation/metrics/multiobjective/euclideandistance.py
@metric_registry.register("EucDistance")
class EucDistance(TopKMetric):
    """Computes the Euclidean Distance between the model's performance and an Utopia Point (Ideal Point).

    Usually, a lower Euclidean Distance indicates a better performance as it is closer
    to the ideal objectives.

    Attributes:
        utopia_tensor (Tensor): Tensor of utopia points of sub metrics.

    Args:
        k (int): The number of top recommendations to consider (cutoff).
        num_users (int): Number of users in the training set.
        num_items (int): Number of items in the training set.
        metric_names (List[str]): List of sub metrics to compute.
        utopia_points (List[float]): List of utopia points of the sub metrics.
        *args (Any): Additional arguments to pass to the parent class.
        dist_sync_on_step (bool): Torchmetrics parameter.
        **kwargs (Any): Additional keyword arguments to pass to the parent class.

    Raises:
        ValueError: If the provided lists have different lengths.
    """

    utopia_tensor: Tensor

    def __init__(
        self,
        k: int,
        num_users: int,
        num_items: int,
        metric_names: List[str],
        utopia_points: List[float],
        *args: Any,
        dist_sync_on_step: bool = False,
        **kwargs: Any,
    ):
        super().__init__(k, dist_sync_on_step)
        self.num_users = num_users

        if not (len(metric_names) == len(utopia_points)):
            raise ValueError(
                "The lists (names, utopia points) must have the same length."
            )

        # Register utopia points as a buffer to ensure they are moved to the correct device (CPU/GPU) automatically
        self.register_buffer(
            "utopia_tensor", torch.tensor(utopia_points, dtype=torch.float32)
        )

        # Dynamically load the component metrics from the registry
        self.sub_metrics: List[BaseMetric] = []
        required_blocks = set()

        for m_name in metric_names:
            m_inst = metric_registry.get(
                m_name,
                k=k,
                num_users=num_users,
                num_items=num_items,
                dist_sync_on_step=dist_sync_on_step,
                **kwargs,
            )
            self.sub_metrics.append(m_inst)
            # Collect and union all required data components from sub-metrics
            required_blocks |= m_inst._REQUIRED_COMPONENTS

        self._REQUIRED_COMPONENTS = required_blocks

    def update(self, preds: Tensor, user_indices: Tensor, **kwargs: Any):
        """Updates the metric state with the new batch of predictions."""
        for metric in self.sub_metrics:
            metric.update(preds, user_indices, **kwargs)

    def compute(self):
        """Computes the Euclidean distance for each user towards the Utopia Point."""
        sum_squared_diff = torch.zeros(self.num_users, device=self.utopia_tensor.device)

        for i, metric in enumerate(self.sub_metrics):
            # Compute the sub-metric score
            res = metric.compute()

            # Extract the specific score tensor for the current metric
            score = res.get(
                metric.name,
                torch.zeros(self.num_users, device=self.utopia_tensor.device),
            )

            utopia = self.utopia_tensor[i]

            # Compute squared difference
            # NOTE: Optimization direction is technically handled by the Utopia Point value itself
            # (e.g., Utopia is 1.0 for Precision, 0.0 for Error), but we keep the logic consistent.
            diff = score - utopia
            sum_squared_diff += torch.pow(diff, 2)

        # Final Euclidean distance: sqrt(sum(diff^2))
        euc_dist = torch.sqrt(sum_squared_diff)

        return {self.name: euc_dist}

    @property
    def name(self) -> str:
        """Generates a dynamic name for the metric identifying its components,
        utopia points, and directions."""
        m_str = ", ".join([m.name for m in self.sub_metrics])
        u_str = ", ".join(map(str, self.utopia_tensor.tolist()))
        return f"EucDistance[{m_str}][{u_str}]"

name property

Generates a dynamic name for the metric identifying its components, utopia points, and directions.

compute()

Computes the Euclidean distance for each user towards the Utopia Point.

Source code in warprec/evaluation/metrics/multiobjective/euclideandistance.py
def compute(self):
    """Computes the Euclidean distance for each user towards the Utopia Point."""
    sum_squared_diff = torch.zeros(self.num_users, device=self.utopia_tensor.device)

    for i, metric in enumerate(self.sub_metrics):
        # Compute the sub-metric score
        res = metric.compute()

        # Extract the specific score tensor for the current metric
        score = res.get(
            metric.name,
            torch.zeros(self.num_users, device=self.utopia_tensor.device),
        )

        utopia = self.utopia_tensor[i]

        # Compute squared difference
        # NOTE: Optimization direction is technically handled by the Utopia Point value itself
        # (e.g., Utopia is 1.0 for Precision, 0.0 for Error), but we keep the logic consistent.
        diff = score - utopia
        sum_squared_diff += torch.pow(diff, 2)

    # Final Euclidean distance: sqrt(sum(diff^2))
    euc_dist = torch.sqrt(sum_squared_diff)

    return {self.name: euc_dist}

update(preds, user_indices, **kwargs)

Updates the metric state with the new batch of predictions.

Source code in warprec/evaluation/metrics/multiobjective/euclideandistance.py
def update(self, preds: Tensor, user_indices: Tensor, **kwargs: Any):
    """Updates the metric state with the new batch of predictions."""
    for metric in self.sub_metrics:
        metric.update(preds, user_indices, **kwargs)

warprec.evaluation.metrics.multiobjective.hypervolume.Hypervolume

Bases: TopKMetric

Computes the multi-objective Hypervolume (HV) metric based on a list of individual metrics.

The Hypervolume measures the volume of the objective space dominated by the model's performance relative to a reference (nadir) point. In this implementation, it is calculated as the product of the distances between the scores and the nadir points.

Attributes:

Name Type Description
nadir_tensor Tensor

Tensor of nadir points of sub metrics.

Parameters:

Name Type Description Default
k int

The number of top recommendations to consider (cutoff).

required
num_users int

Number of users in the training set.

required
num_items int

Number of items in the training set.

required
metric_names List[str]

List of sub metrics to compute.

required
nadir_points List[float]

List of nadir points of the sub metrics.

required
higher_is_better List[bool]

List of booleans that defines wether a sub metric wants to be maximized.

required
*args Any

Additional arguments to pass to the parent class.

()
dist_sync_on_step bool

Torchmetrics parameter.

False
**kwargs Any

Additional keyword arguments to pass to the parent class.

{}

Raises:

Type Description
ValueError

If the provided lists have different lengths.

Source code in warprec/evaluation/metrics/multiobjective/hypervolume.py
@metric_registry.register("Hypervolume")
class Hypervolume(TopKMetric):
    """Computes the multi-objective Hypervolume (HV) metric based on a list of individual metrics.

    The Hypervolume measures the volume of the objective space dominated by the model's
    performance relative to a reference (nadir) point. In this implementation, it is
    calculated as the product of the distances between the scores and the nadir points.

    Attributes:
        nadir_tensor (Tensor): Tensor of nadir points of sub metrics.

    Args:
        k (int): The number of top recommendations to consider (cutoff).
        num_users (int): Number of users in the training set.
        num_items (int): Number of items in the training set.
        metric_names (List[str]): List of sub metrics to compute.
        nadir_points (List[float]): List of nadir points of the sub metrics.
        higher_is_better (List[bool]): List of booleans that defines wether
            a sub metric wants to be maximized.
        *args (Any): Additional arguments to pass to the parent class.
        dist_sync_on_step (bool): Torchmetrics parameter.
        **kwargs (Any): Additional keyword arguments to pass to the parent class.

    Raises:
        ValueError: If the provided lists have different lengths.
    """

    nadir_tensor: Tensor

    def __init__(
        self,
        k: int,
        num_users: int,
        num_items: int,
        metric_names: List[str],
        nadir_points: List[float],
        higher_is_better: List[bool],
        *args: Any,
        dist_sync_on_step: bool = False,
        **kwargs: Any,
    ):
        super().__init__(k, dist_sync_on_step)
        self.num_users = num_users
        self.higher_better = higher_is_better

        # Ensure all input lists have the same length to avoid indexing errors
        if not (len(metric_names) == len(nadir_points) == len(higher_is_better)):
            raise ValueError(
                "The lists (names, nadir points, directions) must have the same length."
            )

        # Register nadir points as a buffer to ensure they are moved to the correct device (CPU/GPU) automatically
        self.register_buffer(
            "nadir_tensor", torch.tensor(nadir_points, dtype=torch.float32)
        )

        # Dynamically load the component metrics from the registry
        self.sub_metrics: List[BaseMetric] = []
        required_blocks = set()

        for m_name in metric_names:
            m_inst = metric_registry.get(
                m_name,
                k=k,
                num_users=num_users,
                num_items=num_items,
                dist_sync_on_step=dist_sync_on_step,
                **kwargs,
            )
            self.sub_metrics.append(m_inst)
            # Collect and union all required data components from sub-metrics
            required_blocks |= m_inst._REQUIRED_COMPONENTS

        self._REQUIRED_COMPONENTS = required_blocks

    def update(self, preds: Tensor, user_indices: Tensor, **kwargs: Any):
        """Updates the metric state with the new batch of predictions."""
        for metric in self.sub_metrics:
            metric.update(preds, user_indices, **kwargs)

    def compute(self):
        """Computes the Hypervolume for each user.

        HV is calculated as the product of positive distances from the nadir point
        across all defined objectives.
        """
        hv_per_user = None

        for i, metric in enumerate(self.sub_metrics):
            # Compute the sub-metric score
            res = metric.compute()

            # Extract the specific score tensor for the current metric
            score = res.get(
                metric.name,
                torch.zeros(self.num_users, device=self.nadir_tensor.device),
            )
            nadir = self.nadir_tensor[i]

            # Calculate the distance from the nadir based on the optimization direction
            if self.higher_better[i]:
                # Maximization: distance = score - nadir (we want to be above the nadir)
                dist = score - nadir
            else:
                # Minimization: distance = nadir - score (we want to be below the nadir)
                dist = nadir - score

            # Clamp distance to 0: if the score is worse than the nadir, the volume contribution is zero
            dist = torch.clamp(dist, min=0.0)

            # Cumulative product: HV is the volume of the hyper-rectangle
            if hv_per_user is None:
                hv_per_user = dist
            else:
                hv_per_user = hv_per_user * dist

        return {self.name: hv_per_user}

    @property
    def name(self) -> str:
        """Generates a dynamic name for the metric identifying its components,
        nadir points, and directions.
        """
        m_str = ", ".join([m.name for m in self.sub_metrics])
        n_str = ", ".join(map(str, self.nadir_tensor.tolist()))
        h_str = ", ".join([str(h).lower() for h in self.higher_better])
        return f"Hypervolume[{m_str}][{n_str}][{h_str}]"

name property

Generates a dynamic name for the metric identifying its components, nadir points, and directions.

compute()

Computes the Hypervolume for each user.

HV is calculated as the product of positive distances from the nadir point across all defined objectives.

Source code in warprec/evaluation/metrics/multiobjective/hypervolume.py
def compute(self):
    """Computes the Hypervolume for each user.

    HV is calculated as the product of positive distances from the nadir point
    across all defined objectives.
    """
    hv_per_user = None

    for i, metric in enumerate(self.sub_metrics):
        # Compute the sub-metric score
        res = metric.compute()

        # Extract the specific score tensor for the current metric
        score = res.get(
            metric.name,
            torch.zeros(self.num_users, device=self.nadir_tensor.device),
        )
        nadir = self.nadir_tensor[i]

        # Calculate the distance from the nadir based on the optimization direction
        if self.higher_better[i]:
            # Maximization: distance = score - nadir (we want to be above the nadir)
            dist = score - nadir
        else:
            # Minimization: distance = nadir - score (we want to be below the nadir)
            dist = nadir - score

        # Clamp distance to 0: if the score is worse than the nadir, the volume contribution is zero
        dist = torch.clamp(dist, min=0.0)

        # Cumulative product: HV is the volume of the hyper-rectangle
        if hv_per_user is None:
            hv_per_user = dist
        else:
            hv_per_user = hv_per_user * dist

    return {self.name: hv_per_user}

update(preds, user_indices, **kwargs)

Updates the metric state with the new batch of predictions.

Source code in warprec/evaluation/metrics/multiobjective/hypervolume.py
def update(self, preds: Tensor, user_indices: Tensor, **kwargs: Any):
    """Updates the metric state with the new batch of predictions."""
    for metric in self.sub_metrics:
        metric.update(preds, user_indices, **kwargs)