Skip to content

Model Interfaces - API Reference

warprec.recommenders.base_recommender.Recommender

Bases: Module, ABC

Abstract class that defines the basic functionalities of a recommendation model.

Parameters:

Name Type Description Default
params dict

The dictionary with the model params.

required
info dict

The dictionary containing dataset information.

required
*args Any

Argument for PyTorch nn.Module.

()
seed int

The seed to use for reproducibility.

42
**kwargs Any

Keyword argument for PyTorch nn.Module.

{}

Attributes:

Name Type Description
DATALOADER_TYPE Optional[DataLoaderType]

The type of dataloader used by this model. This value will be used to pre-compute the required data structure before starting the training process.

Raises:

Type Description
ValueError

If the info dictionary does not contain the number of items and users of the dataset.

Source code in warprec/recommenders/base_recommender.py
class Recommender(nn.Module, ABC):
    """Abstract class that defines the basic functionalities of a recommendation model.

    Args:
        params (dict): The dictionary with the model params.
        info (dict): The dictionary containing dataset information.
        *args (Any): Argument for PyTorch nn.Module.
        seed (int): The seed to use for reproducibility.
        **kwargs (Any): Keyword argument for PyTorch nn.Module.

    Attributes:
        DATALOADER_TYPE (Optional[DataLoaderType]): The type of dataloader used
            by this model. This value will be used to pre-compute the required
            data structure before starting the training process.

    Raises:
        ValueError: If the info dictionary does not contain the number of items
            and users of the dataset.
    """

    DATALOADER_TYPE: Optional[DataLoaderType] = None

    def __init__(
        self,
        params: dict,
        info: dict,
        *args: Any,
        seed: int = 42,
        **kwargs: Any,
    ):
        super().__init__()
        self.init_params(params)
        self.set_seed(seed)
        self.info = info

        # Initialize the dataset dimensions
        self.n_users = info.get("n_users")
        self.n_items = info.get("n_items")
        if not self.n_users or not self.n_items:
            raise ValueError(
                f"Incorrect initialization: 'n_users' ({self.n_users}) e 'n_items' ({self.n_items}) "
                "must be present in the 'info' dictionary."
            )

    @no_type_check
    @abstractmethod
    def predict(
        self,
        *args: Any,
        item_indices: Optional[Tensor] = None,
        **kwargs: Any,
    ) -> Tensor:
        """This method will produce the final predictions in the form of
        a dense Tensor.

        Args:
            *args (Any): List of arguments.
            item_indices (Optional[Tensor]): The batch of item indices. If None,
                full prediction will be produced.
            **kwargs (Any): The dictionary of keyword arguments.

        Returns:
            Tensor: The score matrix {user x item}.
        """

    def init_params(self, params: dict):
        """This method sets up the model with the correct parameters.

        Args:
            params (dict): The dictionary with the model params.
        """
        for ann, _ in self.__class__.__annotations__.items():
            if ann in params:
                setattr(self, ann, params[ann])

    def get_params(self) -> dict:
        """Get the model parameters as a dictionary.

        Returns:
            dict: The dictionary containing the model parameters.
        """
        params = {}
        for ann, _ in self.__class__.__annotations__.items():
            params[ann] = getattr(self, ann)
        return params

    def set_seed(self, seed: int):
        """Set random seed for reproducibility.

        Args:
            seed (int): The seed value to be used.
        """
        self.seed = seed
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed(seed)
            torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

    def get_state(self) -> Dict[str, Any]:
        """Returns the enriched state_dict of the WarpRec model.

        The returned dictionary contains all the information required to
        fully restore the model state, including additional metadata
        beyond the standard PyTorch state_dict.

        Returns:
            Dict[str, Any]: An enriched dictionary representing the model state.
        """
        state = {
            "name": self.name,
            "params": self.get_params(),
            "info": self.info,
            "state_dict": self.state_dict(),
        }
        return state

    @classmethod
    def estimate_space(
        cls,
        params: dict,
        info: dict,
        interactions: Optional[Interactions] = None,
        **kwargs: Any,
    ) -> Dict[str, Any]:
        """Estimate the train memory footprint of the model in MB."""
        raise NotImplementedError(
            f"Model '{cls.__name__}' does not implement estimate_space()."
        )

    @staticmethod
    def _require_interactions_for_estimate(
        interactions: Optional[Interactions], model_name: str
    ) -> Interactions:
        if interactions is None:
            raise ValueError(f"{model_name} requires interactions to estimate space.")
        return interactions

    @staticmethod
    def _bytes_to_mb(value: float) -> float:
        return float(value) / 1024**2

    @staticmethod
    def _dense_size_mb(shape: tuple, dtype: Any) -> float:
        return Recommender._bytes_to_mb(np.prod(shape) * np.dtype(dtype).itemsize)

    @staticmethod
    def _csr_size_mb(matrix: Any) -> float:
        return Recommender._bytes_to_mb(
            matrix.data.nbytes + matrix.indices.nbytes + matrix.indptr.nbytes
        )

    @staticmethod
    def _sparse_size_mb(matrix: Any) -> float:
        if matrix is None:
            return 0.0

        total_bytes = matrix.data.nbytes if hasattr(matrix, "data") else 0
        total_bytes += matrix.indices.nbytes if hasattr(matrix, "indices") else 0
        total_bytes += matrix.indptr.nbytes if hasattr(matrix, "indptr") else 0
        total_bytes += matrix.row.nbytes if hasattr(matrix, "row") else 0
        total_bytes += matrix.col.nbytes if hasattr(matrix, "col") else 0
        return Recommender._bytes_to_mb(total_bytes)

    @staticmethod
    def _compressed_sparse_size_mb(
        nnz: int,
        ptr_len: int,
        data_dtype: Any,
        index_dtype: Any = np.int32,
    ) -> float:
        total_bytes = (
            nnz * np.dtype(data_dtype).itemsize
            + nnz * np.dtype(index_dtype).itemsize
            + ptr_len * np.dtype(index_dtype).itemsize
        )
        return Recommender._bytes_to_mb(total_bytes)

    @staticmethod
    def _estimated_sparse_square_size_mb(
        source_nnz: int,
        side_len: int,
        data_dtype: Any,
        overlap: float = 0.9,
        index_dtype: Any = np.int32,
    ) -> float:
        estimated_nnz = max(
            side_len,
            int(np.ceil(source_nnz * overlap)),
        )
        estimated_nnz = min(side_len * side_len, estimated_nnz)
        return Recommender._compressed_sparse_size_mb(
            nnz=estimated_nnz,
            ptr_len=side_len + 1,
            data_dtype=data_dtype,
            index_dtype=index_dtype,
        )

    @staticmethod
    def _coo_size_mb(nnz: int, data_dtype: Any, index_dtype: Any = np.int32) -> float:
        total_bytes = (
            nnz * np.dtype(data_dtype).itemsize
            + 2 * nnz * np.dtype(index_dtype).itemsize
        )
        return Recommender._bytes_to_mb(total_bytes)

    @staticmethod
    def _peak_size_mb(*values: float) -> float:
        return float(max(values, default=0.0))

    @classmethod
    def from_checkpoint(
        cls, checkpoint: Any, strict: bool = True, **kwargs: Any
    ) -> "Recommender":
        """Load a WarpRec checkpoint model state with custom parameters.

        Args:
            checkpoint (Any): The checkpoint containing the model state and
                other parameter required for initialization.
            strict (bool): Wether or not to load the model using strict mode.
            **kwargs (Any): The additional keyword arguments.

        Returns:
            Recommender: The Recommender model instance.

        Raises:
            ValueError: When trying to load a model checkpoint of a different model.
        """
        if checkpoint["name"] != cls.__name__:
            raise ValueError(
                f"Warning: Loading a {checkpoint['name']} checkpoint into {cls.__name__} class."
            )

        # Common initialization params + additional parameters
        init_args = {
            "params": checkpoint["params"],
            "info": checkpoint["info"],
            **kwargs,
        }

        # Initialize the model and return the instance
        model = cls(**init_args)
        model.load_state_dict(checkpoint["state_dict"], strict=strict)
        return model

    def _apply_topk_filtering(self, sim_matrix: Tensor, k: int) -> Tensor:
        """Keep only top-k similarities per item.

        Args:
            sim_matrix (Tensor): The similarity tensor to filter.
            k (int): The top k values to filter.

        Returns:
            Tensor: The filtered similarity tensor.
        """
        # Safety check for k size
        k = min(k, sim_matrix.size(1) - 1)

        # Get top-k values and indices
        values, indices = torch.topk(sim_matrix, k=k, dim=1)

        # Create sparse similarity matrix with top-k values
        return torch.zeros_like(sim_matrix).scatter_(1, indices, values)

    @classmethod
    def get_name_from_params(cls, params: dict) -> str:
        """Generates a deterministic coolname based on a dictionary of parameters."""
        # Add model name to params to ensure different models with same params get different names
        params_with_model = {"model": cls.__name__, **params}

        # Create a reproducible json dump of model parameters
        param_str = json.dumps(params_with_model, sort_keys=True, default=str)

        # Use the hash of the model hyperparameter as seed for the name generation
        hash_hex = hashlib.md5(
            param_str.encode("utf-8"),
            usedforsecurity=False,
        ).hexdigest()
        seed_int = int(hash_hex, 16)

        # Coolname uses the random seed
        current_state = random.getstate()
        fun_extension = "DefaultCoolName"  # Default name in case of failure

        # After the name generation the random seed
        # will be reverted to the experiment seed
        try:
            random.seed(seed_int)
            words = coolname.generate(3)
            fun_extension = "".join(word.capitalize() for word in words)
        finally:
            random.setstate(current_state)

        return f"{cls.__name__}_{fun_extension}"

    @property
    def name(self):
        """The name of the model."""
        return self.__class__.__name__

    @property
    def name_param(self):
        """The name of the model with a deterministic coolname extension.

        The name is generated based on the hash of the model's parameters,
        ensuring that the same parameters always yield the same name.
        """
        return self.get_name_from_params(self.get_params())

    @property
    def device(self) -> torch.device:
        """Get the device where the model is located.

        Returns:
            torch.device: The device of the model.
        """
        # Search through parameters
        try:
            return next(self.parameters()).device
        except StopIteration:
            pass

        # If no parameter found, search through buffers
        try:
            return next(self.buffers()).device
        except StopIteration:
            pass

        # Fallback: Device will be cpu
        return torch.device("cpu")

device property

Get the device where the model is located.

Returns:

Type Description
device

torch.device: The device of the model.

name property

The name of the model.

name_param property

The name of the model with a deterministic coolname extension.

The name is generated based on the hash of the model's parameters, ensuring that the same parameters always yield the same name.

estimate_space(params, info, interactions=None, **kwargs) classmethod

Estimate the train memory footprint of the model in MB.

Source code in warprec/recommenders/base_recommender.py
@classmethod
def estimate_space(
    cls,
    params: dict,
    info: dict,
    interactions: Optional[Interactions] = None,
    **kwargs: Any,
) -> Dict[str, Any]:
    """Estimate the train memory footprint of the model in MB."""
    raise NotImplementedError(
        f"Model '{cls.__name__}' does not implement estimate_space()."
    )

from_checkpoint(checkpoint, strict=True, **kwargs) classmethod

Load a WarpRec checkpoint model state with custom parameters.

Parameters:

Name Type Description Default
checkpoint Any

The checkpoint containing the model state and other parameter required for initialization.

required
strict bool

Wether or not to load the model using strict mode.

True
**kwargs Any

The additional keyword arguments.

{}

Returns:

Name Type Description
Recommender Recommender

The Recommender model instance.

Raises:

Type Description
ValueError

When trying to load a model checkpoint of a different model.

Source code in warprec/recommenders/base_recommender.py
@classmethod
def from_checkpoint(
    cls, checkpoint: Any, strict: bool = True, **kwargs: Any
) -> "Recommender":
    """Load a WarpRec checkpoint model state with custom parameters.

    Args:
        checkpoint (Any): The checkpoint containing the model state and
            other parameter required for initialization.
        strict (bool): Wether or not to load the model using strict mode.
        **kwargs (Any): The additional keyword arguments.

    Returns:
        Recommender: The Recommender model instance.

    Raises:
        ValueError: When trying to load a model checkpoint of a different model.
    """
    if checkpoint["name"] != cls.__name__:
        raise ValueError(
            f"Warning: Loading a {checkpoint['name']} checkpoint into {cls.__name__} class."
        )

    # Common initialization params + additional parameters
    init_args = {
        "params": checkpoint["params"],
        "info": checkpoint["info"],
        **kwargs,
    }

    # Initialize the model and return the instance
    model = cls(**init_args)
    model.load_state_dict(checkpoint["state_dict"], strict=strict)
    return model

get_name_from_params(params) classmethod

Generates a deterministic coolname based on a dictionary of parameters.

Source code in warprec/recommenders/base_recommender.py
@classmethod
def get_name_from_params(cls, params: dict) -> str:
    """Generates a deterministic coolname based on a dictionary of parameters."""
    # Add model name to params to ensure different models with same params get different names
    params_with_model = {"model": cls.__name__, **params}

    # Create a reproducible json dump of model parameters
    param_str = json.dumps(params_with_model, sort_keys=True, default=str)

    # Use the hash of the model hyperparameter as seed for the name generation
    hash_hex = hashlib.md5(
        param_str.encode("utf-8"),
        usedforsecurity=False,
    ).hexdigest()
    seed_int = int(hash_hex, 16)

    # Coolname uses the random seed
    current_state = random.getstate()
    fun_extension = "DefaultCoolName"  # Default name in case of failure

    # After the name generation the random seed
    # will be reverted to the experiment seed
    try:
        random.seed(seed_int)
        words = coolname.generate(3)
        fun_extension = "".join(word.capitalize() for word in words)
    finally:
        random.setstate(current_state)

    return f"{cls.__name__}_{fun_extension}"

get_params()

Get the model parameters as a dictionary.

Returns:

Name Type Description
dict dict

The dictionary containing the model parameters.

Source code in warprec/recommenders/base_recommender.py
def get_params(self) -> dict:
    """Get the model parameters as a dictionary.

    Returns:
        dict: The dictionary containing the model parameters.
    """
    params = {}
    for ann, _ in self.__class__.__annotations__.items():
        params[ann] = getattr(self, ann)
    return params

get_state()

Returns the enriched state_dict of the WarpRec model.

The returned dictionary contains all the information required to fully restore the model state, including additional metadata beyond the standard PyTorch state_dict.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: An enriched dictionary representing the model state.

Source code in warprec/recommenders/base_recommender.py
def get_state(self) -> Dict[str, Any]:
    """Returns the enriched state_dict of the WarpRec model.

    The returned dictionary contains all the information required to
    fully restore the model state, including additional metadata
    beyond the standard PyTorch state_dict.

    Returns:
        Dict[str, Any]: An enriched dictionary representing the model state.
    """
    state = {
        "name": self.name,
        "params": self.get_params(),
        "info": self.info,
        "state_dict": self.state_dict(),
    }
    return state

init_params(params)

This method sets up the model with the correct parameters.

Parameters:

Name Type Description Default
params dict

The dictionary with the model params.

required
Source code in warprec/recommenders/base_recommender.py
def init_params(self, params: dict):
    """This method sets up the model with the correct parameters.

    Args:
        params (dict): The dictionary with the model params.
    """
    for ann, _ in self.__class__.__annotations__.items():
        if ann in params:
            setattr(self, ann, params[ann])

predict(*args, item_indices=None, **kwargs) abstractmethod

This method will produce the final predictions in the form of a dense Tensor.

Parameters:

Name Type Description Default
*args Any

List of arguments.

()
item_indices Optional[Tensor]

The batch of item indices. If None, full prediction will be produced.

None
**kwargs Any

The dictionary of keyword arguments.

{}

Returns:

Name Type Description
Tensor Tensor

The score matrix {user x item}.

Source code in warprec/recommenders/base_recommender.py
@no_type_check
@abstractmethod
def predict(
    self,
    *args: Any,
    item_indices: Optional[Tensor] = None,
    **kwargs: Any,
) -> Tensor:
    """This method will produce the final predictions in the form of
    a dense Tensor.

    Args:
        *args (Any): List of arguments.
        item_indices (Optional[Tensor]): The batch of item indices. If None,
            full prediction will be produced.
        **kwargs (Any): The dictionary of keyword arguments.

    Returns:
        Tensor: The score matrix {user x item}.
    """

set_seed(seed)

Set random seed for reproducibility.

Parameters:

Name Type Description Default
seed int

The seed value to be used.

required
Source code in warprec/recommenders/base_recommender.py
def set_seed(self, seed: int):
    """Set random seed for reproducibility.

    Args:
        seed (int): The seed value to be used.
    """
    self.seed = seed
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

warprec.recommenders.base_recommender.IterativeRecommender

Bases: Recommender, LightningModule

Interface for recommendation model that use an iterative approach to be trained.

Parameters:

Name Type Description Default
params dict

The dictionary with the model params.

required
info dict

The dictionary containing dataset information.

required
*args Any

Argument for PyTorch LightningModule.

()
seed int

The seed to use for reproducibility.

42
**kwargs Any

Keyword argument for PyTorch LightningModule.

{}

Attributes:

Name Type Description
epochs int

The number of epochs used to train the model.

learning_rate float

The learning rate using during optimization.

Source code in warprec/recommenders/base_recommender.py
class IterativeRecommender(Recommender, L.LightningModule):
    """Interface for recommendation model that use
    an iterative approach to be trained.

    Args:
        params (dict): The dictionary with the model params.
        info (dict): The dictionary containing dataset information.
        *args (Any): Argument for PyTorch LightningModule.
        seed (int): The seed to use for reproducibility.
        **kwargs (Any): Keyword argument for PyTorch LightningModule.

    Attributes:
        epochs (int): The number of epochs used to
            train the model.
        learning_rate (float): The learning rate using
            during optimization.
    """

    epochs: int
    learning_rate: float

    def __init__(
        self, params: dict, info: dict, *args: Any, seed: int = 42, **kwargs: Any
    ):
        super().__init__(params, info, *args, seed=seed, **kwargs)
        self.save_hyperparameters(params)

        # Optimization parameters
        self._optimizer_name: str = "Adam"
        self._optimizer_kwargs: Dict[str, Any] = {}
        self._lr_scheduler_name: Optional[str] = None
        self._lr_scheduler_kwargs: Dict[str, Any] = {}

    def set_optimization_parameters(
        self,
        optimizer_config: Optional[OptimizerConfig] = None,
        lr_scheduler_config: Optional[LRSchedulerConfig] = None,
    ):
        # Set the optimizer values
        if optimizer_config:
            self._optimizer_name = optimizer_config.name
            self._optimizer_kwargs = optimizer_config.params

        # Set the scheduler values
        if lr_scheduler_config:
            self._lr_scheduler_name = lr_scheduler_config.name
            self._lr_scheduler_kwargs = lr_scheduler_config.params

    def configure_optimizers(self):
        """Standard Lightning method to define optimizers.

        This method separates parameters into two groups:
        1. Decay Group:
           - Dense layers weights (Linear, Conv).
           - Structural embeddings (e.g., Positional Embeddings).
        2. No-Decay Group:
           - Sparse Entity Embeddings (User/Item) -> Handled manually by EmbLoss.
           - Biases -> Standard DL practice (no decay).
           - LayerNorm weights -> Standard Transformer practice (no decay).
        """
        # Identify parameters that belong to nn.Embedding modules
        embedding_param_ids = set()
        for module in self.modules():
            if isinstance(module, nn.Embedding):
                for param in module.parameters():
                    embedding_param_ids.add(id(param))

        # Separate parameters into groups
        decay_params = []
        no_decay_params = []

        for name, param in self.named_parameters():
            if not param.requires_grad:
                continue

            # We disable optimizer weight decay for:
            # A. Biases (standard practice)
            # B. LayerNorm parameters (standard Transformer practice)
            # C. Sparse Embeddings (User/Item), because we use EmbLoss for them.
            #    EXCEPTION: Positional Embeddings should have weight decay applied.

            is_bias = "bias" in name
            is_layernorm = "layernorm" in name or "norm" in name
            is_embedding = id(param) in embedding_param_ids
            is_positional = "position" in name  # Heuristic to catch position_embedding

            if is_bias or is_layernorm or (is_embedding and not is_positional):
                no_decay_params.append(param)
            else:
                # Linear weights, Conv weights, and Positional Embeddings go here
                decay_params.append(param)

        # Finalize the Optimizer with correct groups
        decay = getattr(self, "weight_decay", 0.0)

        optimizer_grouped_parameters = [
            {
                "params": decay_params,
                "weight_decay": decay,
            },
            {
                "params": no_decay_params,
                "weight_decay": 0.0,
            },
        ]

        # Create the optimizer instance
        optimizer = optimizer_registry.get(
            name=self._optimizer_name,
            params=optimizer_grouped_parameters,
            lr=self.learning_rate,
            **self._optimizer_kwargs,
        )

        # No learning rate scheduler provided -> Return only the optimizer
        if self._lr_scheduler_name is None:
            return optimizer

        # Create the instance of the learning rate scheduler
        scheduler = lr_scheduler_registry.get(
            name=self._lr_scheduler_name,
            optimizer=optimizer,
            **self._lr_scheduler_kwargs,
        )
        lr_scheduler_config = {
            "scheduler": scheduler,
            "interval": "epoch",
            "frequency": 1,
        }
        return {"optimizer": optimizer, "lr_scheduler": lr_scheduler_config}

    def _init_weights(self, module: nn.Module):
        """A comprehensive default weight initialization method.
        This method is called recursively by `self.apply(self._init_weights)`
        and handles the most common layer types found in recommendation models.

        It can be overridden by subclasses for model-specific initialization.

        The default strategies are:
        - Xavier Normal for Linear, Embedding, and Convolutional layers.
        - Xavier Uniform for Recurrent layers (GRU, LSTM).
        - Identity-like initialization for LayerNorm.
        - Zeros for all biases.

        Args:
            module (nn.Module): The module to initialize.
        """
        # Layers with standard weight matrices
        if isinstance(module, (nn.Linear, nn.Conv1d, nn.Conv2d)):
            xavier_normal_(module.weight.data)
            if hasattr(module, "bias") and module.bias is not None:
                constant_(module.bias.data, 0)

        # Embedding Layer
        elif isinstance(module, nn.Embedding):
            xavier_normal_(module.weight.data)

        # Recurrent Layers
        elif isinstance(module, (nn.GRU, nn.LSTM, nn.RNN)):
            for name, param in module.named_parameters():
                if "weight_ih" in name or "weight_hh" in name:
                    xavier_uniform_(param.data)
                elif "bias" in name:
                    constant_(param.data, 0)

        # Normalization Layers
        elif isinstance(module, nn.LayerNorm):
            constant_(module.bias.data, 0)
            constant_(module.weight.data, 1.0)

    @abstractmethod
    def forward(self, *args: Any, **kwargs: Any):
        """This method process a forward step of the model.

        All recommendation models that implement a neural network or any
        kind of backpropagation must implement this method.

        Args:
            *args (Any): List of arguments.
            **kwargs (Any): The dictionary of keyword arguments.
        """

    @abstractmethod
    def get_dataloader(
        self, interactions: Interactions, sessions: Sessions, **kwargs: Any
    ) -> DataLoader:
        """Returns a PyTorch DataLoader for the given interactions.

        The DataLoader should provide batches suitable for the model's training.

        Args:
            interactions (Interactions): The interaction of users with items.
            sessions (Sessions): The sessions of the users.
            **kwargs (Any): Additional keyword arguments.

        Returns:
            DataLoader: The dataloader that will be used by the model during train.
        """

    @abstractmethod
    def training_step(self, batch: Any, batch_idx: int) -> Tensor:
        """Performs a single training step for a given batch.

        This is a standard method defined by PyTorch Lightning.

        Args:
            batch (Any): A single batch of data from the DataLoader.
            batch_idx (int): The current current batch index.

        Returns:
            Tensor: The computed loss for the batch.
        """

    def validation_step(self, batch: Any, batch_idx: int) -> Any:
        """PyTorch Lightning needs this method to be implemented
        to correctly perform the on_validation_epoch_end callback.

        Args:
            batch (Any): A single batch of data from the DataLoader.
            batch_idx (int): The current current batch index.

        Returns:
            Any: The batch of data from the DataLoader.
        """
        return batch

    def on_save_checkpoint(self, checkpoint: Dict[str, Any]):
        """PyTorch Lightning hook used during checkpoint saving.

        Args:
            checkpoint (Dict[str, Any]): The dictionary containing the
                checkpoint information.
        """
        checkpoint["name"] = self.name
        checkpoint["params"] = self.get_params()
        checkpoint["info"] = self.info

configure_optimizers()

Standard Lightning method to define optimizers.

This method separates parameters into two groups: 1. Decay Group: - Dense layers weights (Linear, Conv). - Structural embeddings (e.g., Positional Embeddings). 2. No-Decay Group: - Sparse Entity Embeddings (User/Item) -> Handled manually by EmbLoss. - Biases -> Standard DL practice (no decay). - LayerNorm weights -> Standard Transformer practice (no decay).

Source code in warprec/recommenders/base_recommender.py
def configure_optimizers(self):
    """Standard Lightning method to define optimizers.

    This method separates parameters into two groups:
    1. Decay Group:
       - Dense layers weights (Linear, Conv).
       - Structural embeddings (e.g., Positional Embeddings).
    2. No-Decay Group:
       - Sparse Entity Embeddings (User/Item) -> Handled manually by EmbLoss.
       - Biases -> Standard DL practice (no decay).
       - LayerNorm weights -> Standard Transformer practice (no decay).
    """
    # Identify parameters that belong to nn.Embedding modules
    embedding_param_ids = set()
    for module in self.modules():
        if isinstance(module, nn.Embedding):
            for param in module.parameters():
                embedding_param_ids.add(id(param))

    # Separate parameters into groups
    decay_params = []
    no_decay_params = []

    for name, param in self.named_parameters():
        if not param.requires_grad:
            continue

        # We disable optimizer weight decay for:
        # A. Biases (standard practice)
        # B. LayerNorm parameters (standard Transformer practice)
        # C. Sparse Embeddings (User/Item), because we use EmbLoss for them.
        #    EXCEPTION: Positional Embeddings should have weight decay applied.

        is_bias = "bias" in name
        is_layernorm = "layernorm" in name or "norm" in name
        is_embedding = id(param) in embedding_param_ids
        is_positional = "position" in name  # Heuristic to catch position_embedding

        if is_bias or is_layernorm or (is_embedding and not is_positional):
            no_decay_params.append(param)
        else:
            # Linear weights, Conv weights, and Positional Embeddings go here
            decay_params.append(param)

    # Finalize the Optimizer with correct groups
    decay = getattr(self, "weight_decay", 0.0)

    optimizer_grouped_parameters = [
        {
            "params": decay_params,
            "weight_decay": decay,
        },
        {
            "params": no_decay_params,
            "weight_decay": 0.0,
        },
    ]

    # Create the optimizer instance
    optimizer = optimizer_registry.get(
        name=self._optimizer_name,
        params=optimizer_grouped_parameters,
        lr=self.learning_rate,
        **self._optimizer_kwargs,
    )

    # No learning rate scheduler provided -> Return only the optimizer
    if self._lr_scheduler_name is None:
        return optimizer

    # Create the instance of the learning rate scheduler
    scheduler = lr_scheduler_registry.get(
        name=self._lr_scheduler_name,
        optimizer=optimizer,
        **self._lr_scheduler_kwargs,
    )
    lr_scheduler_config = {
        "scheduler": scheduler,
        "interval": "epoch",
        "frequency": 1,
    }
    return {"optimizer": optimizer, "lr_scheduler": lr_scheduler_config}

forward(*args, **kwargs) abstractmethod

This method process a forward step of the model.

All recommendation models that implement a neural network or any kind of backpropagation must implement this method.

Parameters:

Name Type Description Default
*args Any

List of arguments.

()
**kwargs Any

The dictionary of keyword arguments.

{}
Source code in warprec/recommenders/base_recommender.py
@abstractmethod
def forward(self, *args: Any, **kwargs: Any):
    """This method process a forward step of the model.

    All recommendation models that implement a neural network or any
    kind of backpropagation must implement this method.

    Args:
        *args (Any): List of arguments.
        **kwargs (Any): The dictionary of keyword arguments.
    """

get_dataloader(interactions, sessions, **kwargs) abstractmethod

Returns a PyTorch DataLoader for the given interactions.

The DataLoader should provide batches suitable for the model's training.

Parameters:

Name Type Description Default
interactions Interactions

The interaction of users with items.

required
sessions Sessions

The sessions of the users.

required
**kwargs Any

Additional keyword arguments.

{}

Returns:

Name Type Description
DataLoader DataLoader

The dataloader that will be used by the model during train.

Source code in warprec/recommenders/base_recommender.py
@abstractmethod
def get_dataloader(
    self, interactions: Interactions, sessions: Sessions, **kwargs: Any
) -> DataLoader:
    """Returns a PyTorch DataLoader for the given interactions.

    The DataLoader should provide batches suitable for the model's training.

    Args:
        interactions (Interactions): The interaction of users with items.
        sessions (Sessions): The sessions of the users.
        **kwargs (Any): Additional keyword arguments.

    Returns:
        DataLoader: The dataloader that will be used by the model during train.
    """

on_save_checkpoint(checkpoint)

PyTorch Lightning hook used during checkpoint saving.

Parameters:

Name Type Description Default
checkpoint Dict[str, Any]

The dictionary containing the checkpoint information.

required
Source code in warprec/recommenders/base_recommender.py
def on_save_checkpoint(self, checkpoint: Dict[str, Any]):
    """PyTorch Lightning hook used during checkpoint saving.

    Args:
        checkpoint (Dict[str, Any]): The dictionary containing the
            checkpoint information.
    """
    checkpoint["name"] = self.name
    checkpoint["params"] = self.get_params()
    checkpoint["info"] = self.info

training_step(batch, batch_idx) abstractmethod

Performs a single training step for a given batch.

This is a standard method defined by PyTorch Lightning.

Parameters:

Name Type Description Default
batch Any

A single batch of data from the DataLoader.

required
batch_idx int

The current current batch index.

required

Returns:

Name Type Description
Tensor Tensor

The computed loss for the batch.

Source code in warprec/recommenders/base_recommender.py
@abstractmethod
def training_step(self, batch: Any, batch_idx: int) -> Tensor:
    """Performs a single training step for a given batch.

    This is a standard method defined by PyTorch Lightning.

    Args:
        batch (Any): A single batch of data from the DataLoader.
        batch_idx (int): The current current batch index.

    Returns:
        Tensor: The computed loss for the batch.
    """

validation_step(batch, batch_idx)

PyTorch Lightning needs this method to be implemented to correctly perform the on_validation_epoch_end callback.

Parameters:

Name Type Description Default
batch Any

A single batch of data from the DataLoader.

required
batch_idx int

The current current batch index.

required

Returns:

Name Type Description
Any Any

The batch of data from the DataLoader.

Source code in warprec/recommenders/base_recommender.py
def validation_step(self, batch: Any, batch_idx: int) -> Any:
    """PyTorch Lightning needs this method to be implemented
    to correctly perform the on_validation_epoch_end callback.

    Args:
        batch (Any): A single batch of data from the DataLoader.
        batch_idx (int): The current current batch index.

    Returns:
        Any: The batch of data from the DataLoader.
    """
    return batch

warprec.recommenders.base_recommender.ContextRecommenderUtils

Bases: Module, ABC

Common definition for context-aware recommenders.

This Mixin handles
  1. Initialization of context dimensions.
  2. Creation of standard Biases (Global, User, Item, Context).
  3. Creation of Context Embeddings (to avoid boilerplate loops in models).
  4. Helper methods for Linear computation and Regularization.

Parameters:

Name Type Description Default
params dict

Model parameters.

required
info dict

The dictionary containing dataset information.

required
*args Any

Variable length argument list.

()
interactions Optional[Interactions]

The training interactions.

None
**kwargs Any

Arbitrary keyword arguments.

{}

Attributes:

Name Type Description
n_users int

Number of users.

n_items int

Number of items.

embedding_size int

The size of the latent vectors.

batch_size int

The batch size used for training.

neg_samples int

Number of negative samples for training.

merged_feature_embedding Optional[Embedding]

Single feature embedding.

merged_feature_bias Optional[Embedding]

Single feature bias.

feature_offsets Optional[Tensor]

Offset buffer to index the single embedding.

merged_context_embedding Optional[Embedding]

Single context embedding.

merged_context_bias Optional[Embedding]

Single context bias.

context_offsets Optional[Tensor]

Offset buffer to index the single context.

Source code in warprec/recommenders/base_recommender.py
class ContextRecommenderUtils(nn.Module, ABC):
    """Common definition for context-aware recommenders.

    This Mixin handles:
        1. Initialization of context dimensions.
        2. Creation of standard Biases (Global, User, Item, Context).
        3. Creation of Context Embeddings (to avoid boilerplate loops in models).
        4. Helper methods for Linear computation and Regularization.

    Args:
        params (dict): Model parameters.
        info (dict): The dictionary containing dataset information.
        *args (Any): Variable length argument list.
        interactions (Optional[Interactions]): The training interactions.
        **kwargs (Any): Arbitrary keyword arguments.

    Attributes:
        n_users (int): Number of users.
        n_items (int): Number of items.
        embedding_size (int): The size of the latent vectors.
        batch_size (int): The batch size used for training.
        neg_samples (int): Number of negative samples for training.
        merged_feature_embedding (Optional[nn.Embedding]): Single feature embedding.
        merged_feature_bias (Optional[nn.Embedding]): Single feature bias.
        feature_offsets (Optional[Tensor]): Offset buffer to index the single embedding.
        merged_context_embedding (Optional[nn.Embedding]): Single context embedding.
        merged_context_bias (Optional[nn.Embedding]): Single context bias.
        context_offsets (Optional[Tensor]): Offset buffer to index the single context.
    """

    # Type hints used in general mixin implementations
    n_users: int
    n_items: int
    embedding_size: int
    batch_size: int
    neg_samples: int

    # Explicit Type Hinting for Dynamic Attributes to fix Linting errors
    merged_feature_embedding: Optional[nn.Embedding]
    merged_feature_bias: Optional[nn.Embedding]
    feature_offsets: Optional[Tensor]

    merged_context_embedding: Optional[nn.Embedding]
    merged_context_bias: Optional[nn.Embedding]
    context_offsets: Optional[Tensor]

    def __init__(
        self,
        params: dict,
        info: dict,
        *args: Any,
        interactions: Optional[Interactions] = None,
        **kwargs: Any,
    ):
        # Feature info extraction
        self.feature_dims: dict = info.get("feature_dims", {})
        self.feature_labels = list(self.feature_dims.keys())

        # Context info extraction
        self.context_dims: dict = info.get("context_dims", {})
        self.context_labels = list(self.context_dims.keys())

        # Call super init to populate n_users, n_items, embedding_size
        super().__init__(params, info, *args, **kwargs)  # type: ignore[call-arg]

        # Define Embeddings (Latent Factors)
        self.user_embedding = nn.Embedding(self.n_users, self.embedding_size)
        self.item_embedding = nn.Embedding(
            self.n_items + 1, self.embedding_size, padding_idx=self.n_items
        )

        # Context Embeddings
        if self.context_dims:
            ctx_dims_list = [self.context_dims[name] for name in self.context_labels]
            self.total_ctx_dim = sum(ctx_dims_list)

            # Offsets: [0, dim_0, dim_0+dim_1, ...]
            ctx_offsets = torch.tensor([0] + ctx_dims_list[:-1]).cumsum(0)
            self.register_buffer("context_offsets", ctx_offsets)

            self.merged_context_embedding = nn.Embedding(
                self.total_ctx_dim, self.embedding_size
            )
            self.merged_context_bias = nn.Embedding(self.total_ctx_dim, 1)
        else:
            self.register_buffer("context_offsets", None)
            self.merged_context_embedding = None
            self.merged_context_bias = None

        # Feature Embeddings
        if self.feature_dims:
            feat_dims_list = [self.feature_dims[name] for name in self.feature_labels]
            self.total_feat_dim = sum(feat_dims_list)

            feat_offsets = torch.tensor([0] + feat_dims_list[:-1]).cumsum(0)
            self.register_buffer("feature_offsets", feat_offsets)

            self.merged_feature_embedding = nn.Embedding(
                self.total_feat_dim, self.embedding_size
            )
            self.merged_feature_bias = nn.Embedding(self.total_feat_dim, 1)
        else:
            self.register_buffer("feature_offsets", None)
            self.merged_feature_embedding = None
            self.merged_feature_bias = None

        # Define Biases (Standard Linear Infrastructure)
        self.global_bias = nn.Parameter(torch.zeros(1))
        self.user_bias = nn.Embedding(self.n_users, 1)
        self.item_bias = nn.Embedding(self.n_items + 1, 1, padding_idx=self.n_items)

        # Fixed feature lookup Tensor
        if interactions is not None:
            item_features = interactions.get_side_tensor()
            self.register_buffer("item_features", item_features)
        else:
            self.register_buffer(
                "item_features", torch.zeros(self.n_items + 1, dtype=torch.long)
            )

    def get_dataloader(
        self,
        interactions: Interactions,
        sessions: Sessions,
        **kwargs: Any,
    ) -> DataLoader:
        """Common dataloader retrieval used by contextual models.

        Args:
            interactions (Interactions): The interaction of users with items.
            sessions (Sessions): The sessions of the users.
            **kwargs (Any): Additional keyword arguments.

        Returns:
            DataLoader: The appropriate dataloader for the training.
        """
        return interactions.get_pointwise_dataloader(
            neg_samples=self.neg_samples,
            include_side_info=bool(self.feature_dims),
            include_context=bool(self.context_dims),
            batch_size=self.batch_size,
            **kwargs,
        )

    def compute_first_order(
        self,
        user: Tensor,
        item: Tensor,
        features: Optional[Tensor],
        contexts: Optional[Tensor],
    ) -> Tensor:
        """Computes the First-Order Linear part.

        Formula: global_bias + user_bias + item_bias + sum(feature_biases) + sum(context_biases)

        Args:
            user (Tensor): User indices.
            item (Tensor): Item indices.
            features (Optional[Tensor]): Feature indices [batch_size, n_features].
            contexts (Optional[Tensor]): Context indices [batch_size, n_contexts].

        Returns:
            Tensor: The linear score [batch_size].
        """
        linear_part = (
            self.global_bias
            + self.user_bias(user).squeeze(-1)
            + self.item_bias(item).squeeze(-1)
        )

        # Add feature biases
        if features is not None and self.merged_feature_bias is not None:
            global_indices = features + self.feature_offsets
            feat_bias = self.merged_feature_bias(global_indices).sum(dim=1).squeeze(-1)
            linear_part += feat_bias

        # Add context biases
        if contexts is not None and self.merged_context_bias is not None:
            global_indices = contexts + self.context_offsets
            ctx_bias = self.merged_context_bias(global_indices).sum(dim=1).squeeze(-1)
            linear_part += ctx_bias

        return linear_part

    def get_reg_params(
        self,
        user: Tensor,
        item: Tensor,
        features: Optional[Tensor],
        contexts: Optional[Tensor],
    ) -> List[Tensor]:
        """Helper to extract ALL embeddings and biases for regularization.

        Args:
            user (Tensor): User indices.
            item (Tensor): Item indices.
            features (Optional[Tensor]): Feature indices.
            contexts (Optional[Tensor]): Context indices.

        Returns:
            List[Tensor]: List of embeddings and biases to be passed to the Reg Loss.
        """
        reg_params = [
            self.user_embedding(user),
            self.item_embedding(item),
            self.user_bias(user),
            self.item_bias(item),
        ]

        if features is not None and self.merged_feature_embedding is not None:
            global_indices = features + self.feature_offsets
            reg_params.append(self.merged_feature_embedding(global_indices))
            reg_params.append(self.merged_feature_bias(global_indices))

        if contexts is not None and self.merged_context_embedding is not None:
            global_indices = contexts + self.context_offsets
            reg_params.append(self.merged_context_embedding(global_indices))
            reg_params.append(self.merged_context_bias(global_indices))

        return reg_params

    def _get_feature_embeddings(self, target_items: Tensor) -> Tensor:
        """Helper to retrieve feature embeddings for a set of items."""
        if not self.feature_dims or self.item_features is None:
            return None

        # Indices Lookup
        flat_items = target_items.view(-1).cpu()
        raw_indices = self.item_features[flat_items].to(target_items.device)  # type: ignore[index]

        # Apply Offsets
        global_indices = raw_indices + self.feature_offsets

        # Single Lookup
        embeddings = self.merged_feature_embedding(global_indices)

        # Reshape to match input
        target_shape = target_items.shape
        return embeddings.view(
            *target_shape, len(self.feature_labels), self.embedding_size
        )

    def _get_context_embeddings(self, contexts: Tensor) -> Optional[Tensor]:
        """Retrieves context embeddings as a single Tensor."""
        if not self.context_dims or self.merged_context_embedding is None:
            return None

        global_indices = contexts + self.context_offsets
        return self.merged_context_embedding(global_indices)

    def _get_feature_bias(self, target_items: Tensor) -> Tensor:
        """Helper to retrieve the sum of feature biases for a set of items."""
        if not self.feature_dims or self.item_features is None:
            return torch.zeros(target_items.shape, device=target_items.device)

        flat_items = target_items.view(-1).cpu()
        raw_indices = self.item_features[flat_items].to(target_items.device)  # type: ignore[index]

        global_indices = raw_indices + self.feature_offsets

        # Lookup & Sum
        biases = self.merged_feature_bias(global_indices).sum(dim=1).squeeze(-1)

        return biases.view(target_items.shape)

compute_first_order(user, item, features, contexts)

Computes the First-Order Linear part.

Formula: global_bias + user_bias + item_bias + sum(feature_biases) + sum(context_biases)

Parameters:

Name Type Description Default
user Tensor

User indices.

required
item Tensor

Item indices.

required
features Optional[Tensor]

Feature indices [batch_size, n_features].

required
contexts Optional[Tensor]

Context indices [batch_size, n_contexts].

required

Returns:

Name Type Description
Tensor Tensor

The linear score [batch_size].

Source code in warprec/recommenders/base_recommender.py
def compute_first_order(
    self,
    user: Tensor,
    item: Tensor,
    features: Optional[Tensor],
    contexts: Optional[Tensor],
) -> Tensor:
    """Computes the First-Order Linear part.

    Formula: global_bias + user_bias + item_bias + sum(feature_biases) + sum(context_biases)

    Args:
        user (Tensor): User indices.
        item (Tensor): Item indices.
        features (Optional[Tensor]): Feature indices [batch_size, n_features].
        contexts (Optional[Tensor]): Context indices [batch_size, n_contexts].

    Returns:
        Tensor: The linear score [batch_size].
    """
    linear_part = (
        self.global_bias
        + self.user_bias(user).squeeze(-1)
        + self.item_bias(item).squeeze(-1)
    )

    # Add feature biases
    if features is not None and self.merged_feature_bias is not None:
        global_indices = features + self.feature_offsets
        feat_bias = self.merged_feature_bias(global_indices).sum(dim=1).squeeze(-1)
        linear_part += feat_bias

    # Add context biases
    if contexts is not None and self.merged_context_bias is not None:
        global_indices = contexts + self.context_offsets
        ctx_bias = self.merged_context_bias(global_indices).sum(dim=1).squeeze(-1)
        linear_part += ctx_bias

    return linear_part

get_dataloader(interactions, sessions, **kwargs)

Common dataloader retrieval used by contextual models.

Parameters:

Name Type Description Default
interactions Interactions

The interaction of users with items.

required
sessions Sessions

The sessions of the users.

required
**kwargs Any

Additional keyword arguments.

{}

Returns:

Name Type Description
DataLoader DataLoader

The appropriate dataloader for the training.

Source code in warprec/recommenders/base_recommender.py
def get_dataloader(
    self,
    interactions: Interactions,
    sessions: Sessions,
    **kwargs: Any,
) -> DataLoader:
    """Common dataloader retrieval used by contextual models.

    Args:
        interactions (Interactions): The interaction of users with items.
        sessions (Sessions): The sessions of the users.
        **kwargs (Any): Additional keyword arguments.

    Returns:
        DataLoader: The appropriate dataloader for the training.
    """
    return interactions.get_pointwise_dataloader(
        neg_samples=self.neg_samples,
        include_side_info=bool(self.feature_dims),
        include_context=bool(self.context_dims),
        batch_size=self.batch_size,
        **kwargs,
    )

get_reg_params(user, item, features, contexts)

Helper to extract ALL embeddings and biases for regularization.

Parameters:

Name Type Description Default
user Tensor

User indices.

required
item Tensor

Item indices.

required
features Optional[Tensor]

Feature indices.

required
contexts Optional[Tensor]

Context indices.

required

Returns:

Type Description
List[Tensor]

List[Tensor]: List of embeddings and biases to be passed to the Reg Loss.

Source code in warprec/recommenders/base_recommender.py
def get_reg_params(
    self,
    user: Tensor,
    item: Tensor,
    features: Optional[Tensor],
    contexts: Optional[Tensor],
) -> List[Tensor]:
    """Helper to extract ALL embeddings and biases for regularization.

    Args:
        user (Tensor): User indices.
        item (Tensor): Item indices.
        features (Optional[Tensor]): Feature indices.
        contexts (Optional[Tensor]): Context indices.

    Returns:
        List[Tensor]: List of embeddings and biases to be passed to the Reg Loss.
    """
    reg_params = [
        self.user_embedding(user),
        self.item_embedding(item),
        self.user_bias(user),
        self.item_bias(item),
    ]

    if features is not None and self.merged_feature_embedding is not None:
        global_indices = features + self.feature_offsets
        reg_params.append(self.merged_feature_embedding(global_indices))
        reg_params.append(self.merged_feature_bias(global_indices))

    if contexts is not None and self.merged_context_embedding is not None:
        global_indices = contexts + self.context_offsets
        reg_params.append(self.merged_context_embedding(global_indices))
        reg_params.append(self.merged_context_bias(global_indices))

    return reg_params

warprec.recommenders.base_recommender.SequentialRecommenderUtils

Bases: ABC

Common definition for sequential recommenders.

Collection of common method used by all sequential recommenders.

Attributes:

Name Type Description
max_seq_len int

This value will be used to truncate user sequences. More recent transaction will have priority over older ones in case a sequence needs to be truncated. If a sequence is smaller than the max_seq_len, it will be padded.

Source code in warprec/recommenders/base_recommender.py
class SequentialRecommenderUtils(ABC):
    """Common definition for sequential recommenders.

    Collection of common method used by all sequential recommenders.

    Attributes:
        max_seq_len (int): This value will be used to truncate user sequences.
            More recent transaction will have priority over older ones in case
            a sequence needs to be truncated. If a sequence is smaller than the
            max_seq_len, it will be padded.
    """

    max_seq_len: int = 0

    def _gather_indexes(self, output: Tensor, gather_index: Tensor) -> Tensor:
        """Gathers the output from specific indexes for each batch.

        Args:
            output (Tensor): The tensor to gather the indices from.
            gather_index (Tensor): The indices to gather.

        Returns:
            Tensor: The gathered values flattened.
        """
        gather_index = gather_index.view(-1, 1, 1).expand(-1, 1, output.shape[-1])
        output_flatten = output.gather(dim=1, index=gather_index)
        return output_flatten.squeeze(1)

    def _generate_square_subsequent_mask(self, seq_len: int) -> Tensor:
        """Generate a square mask for the sequence.

        Args:
            seq_len (int): Length of the sequence.

        Returns:
            Tensor: A square mask of shape [seq_len, seq_len] with True for positions
                    that should not be attended to.
        """
        mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1)
        return mask.bool()

warprec.recommenders.base_recommender.ItemSimRecommender

Bases: Recommender

ItemSimilarity common interface.

Parameters:

Name Type Description Default
params dict

The dictionary with the model params.

required
info dict

The dictionary containing dataset information.

required
interactions Interactions

The training interactions.

required
*args Any

Argument for PyTorch nn.Module.

()
seed int

The seed to use for reproducibility.

42
**kwargs Any

Keyword argument for PyTorch nn.Module.

{}

Raises:

Type Description
ValueError

If the items value was not passed through the info dict.

Source code in warprec/recommenders/base_recommender.py
class ItemSimRecommender(Recommender):
    """ItemSimilarity common interface.

    Args:
        params (dict): The dictionary with the model params.
        info (dict): The dictionary containing dataset information.
        interactions (Interactions): The training interactions.
        *args (Any): Argument for PyTorch nn.Module.
        seed (int): The seed to use for reproducibility.
        **kwargs (Any): Keyword argument for PyTorch nn.Module.

    Raises:
        ValueError: If the items value was not passed through the info dict.
    """

    def __init__(
        self,
        params: dict,
        info: dict,
        interactions: Interactions,
        *args: Any,
        seed: int = 42,
        **kwargs: Any,
    ):
        super().__init__(params, info, seed=seed, *args, **kwargs)
        self.n_items = info.get("n_items", None)
        if not self.n_items:
            raise ValueError(
                "Items value must be provided to correctly initialize the model."
            )
        self.train_matrix = interactions.get_sparse()
        self.item_similarity = np.zeros(self.n_items)

    def predict(
        self,
        user_indices: Tensor,
        *args: Any,
        item_indices: Optional[Tensor] = None,
        **kwargs: Any,
    ) -> Tensor:
        """Prediction in the form of X@B where B is a {item x item} similarity matrix.

        Args:
            user_indices (Tensor): The batch of user indices.
            *args (Any): List of arguments.
            item_indices (Optional[Tensor]): The batch of item indices. If None,
                full prediction will be produced.
            **kwargs (Any): The dictionary of keyword arguments.

        Returns:
            Tensor: The score matrix {user x item}.
        """
        # Compute predictions and convert to Tensor
        predictions = self.train_matrix[user_indices.tolist(), :] @ self.item_similarity
        predictions = torch.from_numpy(predictions)

        # Return full or sampled predictions
        if item_indices is None:
            # Case 'full': prediction on all items
            return predictions  # [batch_size, n_items]

        # Case 'sampled': prediction on a sampled set of items
        return predictions.gather(
            1,
            item_indices.to(predictions.device).clamp(
                max=self.n_items - 1
            ),  # [batch_size, pad_seq]
        )

predict(user_indices, *args, item_indices=None, **kwargs)

Prediction in the form of X@B where B is a {item x item} similarity matrix.

Parameters:

Name Type Description Default
user_indices Tensor

The batch of user indices.

required
*args Any

List of arguments.

()
item_indices Optional[Tensor]

The batch of item indices. If None, full prediction will be produced.

None
**kwargs Any

The dictionary of keyword arguments.

{}

Returns:

Name Type Description
Tensor Tensor

The score matrix {user x item}.

Source code in warprec/recommenders/base_recommender.py
def predict(
    self,
    user_indices: Tensor,
    *args: Any,
    item_indices: Optional[Tensor] = None,
    **kwargs: Any,
) -> Tensor:
    """Prediction in the form of X@B where B is a {item x item} similarity matrix.

    Args:
        user_indices (Tensor): The batch of user indices.
        *args (Any): List of arguments.
        item_indices (Optional[Tensor]): The batch of item indices. If None,
            full prediction will be produced.
        **kwargs (Any): The dictionary of keyword arguments.

    Returns:
        Tensor: The score matrix {user x item}.
    """
    # Compute predictions and convert to Tensor
    predictions = self.train_matrix[user_indices.tolist(), :] @ self.item_similarity
    predictions = torch.from_numpy(predictions)

    # Return full or sampled predictions
    if item_indices is None:
        # Case 'full': prediction on all items
        return predictions  # [batch_size, n_items]

    # Case 'sampled': prediction on a sampled set of items
    return predictions.gather(
        1,
        item_indices.to(predictions.device).clamp(
            max=self.n_items - 1
        ),  # [batch_size, pad_seq]
    )

warprec.recommenders.collaborative_filtering_recommender.graph_based.graph_utils.GraphRecommenderUtils

Bases: Module

Common definition for graph recommenders.

Collection of common method used by all graph recommenders.

Source code in warprec/recommenders/collaborative_filtering_recommender/graph_based/graph_utils.py
class GraphRecommenderUtils(nn.Module):
    """Common definition for graph recommenders.

    Collection of common method used by all graph recommenders.
    """

    # Cache storage
    _cached_user_emb: Optional[Tensor]
    _cached_item_emb: Optional[Tensor]

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Initialize local cache
        self._cached_user_emb = None
        self._cached_item_emb = None

    def train(self, mode=True):
        """Override train mode to empty the cache when switching to training."""
        super().train(mode)

        if mode:
            # We are in training mode, embeddings will change. Empty the cache
            self._cached_user_emb = None
            self._cached_item_emb = None

    def eval(self):
        """Override eval mode to empty the cache when switching to evaluation."""
        super().eval()

        # We are in evaluation mode, embeddings will be cached. Empty the cache
        self._cached_user_emb = None
        self._cached_item_emb = None

    def propagate_embeddings(self) -> Tuple[Tensor, Tensor]:
        """Retrieve the propagate user and item embeddings.

        Subsequent calls will return the cached values, speeding up the
        evaluation process.

        Returns:
            Tuple[Tensor, Tensor]: (User Embeddings, Item Embeddings)
        """
        # Safety check
        if self.training:
            return self.forward()[:2]

        # Check if values are cached
        if self._cached_user_emb is None or self._cached_item_emb is None:
            with torch.no_grad():
                # Unpack the forward
                ret = self.forward()
                self._cached_user_emb = ret[0]
                self._cached_item_emb = ret[1]

        return self._cached_user_emb, self._cached_item_emb

    def get_adj_mat(
        self,
        interaction_matrix: coo_matrix,
        n_users: int,
        n_items: int,
        normalize: bool = False,
    ) -> SparseTensor:
        """Get the normalized interaction matrix of users and items.

        Args:
            interaction_matrix (coo_matrix): The full interaction matrix in coo format.
            n_users (int): The number of users.
            n_items (int): The number of items.
            normalize (bool): Wether or not to normalize the sparse adjacency matrix.

        Returns:
            SparseTensor: The sparse adjacency matrix.
        """
        # Extract user and items nodes
        user_nodes = interaction_matrix.row
        item_nodes = interaction_matrix.col + n_users

        # Unify arcs in both directions
        row = np.concatenate([user_nodes, item_nodes])
        col = np.concatenate([item_nodes, user_nodes])

        # Create the edge tensor
        edge_index_np = np.vstack([row, col])  # Efficient solution

        # Creating a tensor directly from a numpy array instead of lists
        edge_index = torch.tensor(edge_index_np, dtype=torch.int64)

        # Create the SparseTensor using the edge indexes.
        adj = SparseTensor(
            row=edge_index[0],
            col=edge_index[1],
            sparse_sizes=(n_users + n_items, n_users + n_items),
        )

        # Normalize the SparseTensor if requested
        if normalize:
            adj = self._symmetric_normalization(adj)

        return adj

    def _symmetric_normalization(self, adj: SparseTensor) -> SparseTensor:
        """Applies symmetric normalization: D^-0.5 * A * D^-0.5."""
        # Calculate degree (sum of rows)
        deg = adj.sum(dim=1)
        deg_inv_sqrt = deg.pow(-0.5)
        deg_inv_sqrt.masked_fill_(deg_inv_sqrt == float("inf"), 0.0)

        # Apply normalization efficiently on the values
        row, col, _ = adj.coo()
        norm_vals = deg_inv_sqrt[row] * deg_inv_sqrt[col]

        return adj.set_value(norm_vals, layout="coo")

    def get_ego_embeddings(
        self, user_embedding: nn.Embedding, item_embedding: nn.Embedding
    ) -> Tensor:
        """Get the initial embedding of users and items and combine to an embedding matrix.

        Args:
            user_embedding (nn.Embedding): The user embeddings.
            item_embedding (nn.Embedding): The item embeddings.

        Returns:
            Tensor: Combined user and item embeddings.
        """
        user_embeddings = user_embedding.weight
        item_embeddings = item_embedding.weight
        ego_embeddings = torch.cat([user_embeddings, item_embeddings], dim=0)
        return ego_embeddings

eval()

Override eval mode to empty the cache when switching to evaluation.

Source code in warprec/recommenders/collaborative_filtering_recommender/graph_based/graph_utils.py
def eval(self):
    """Override eval mode to empty the cache when switching to evaluation."""
    super().eval()

    # We are in evaluation mode, embeddings will be cached. Empty the cache
    self._cached_user_emb = None
    self._cached_item_emb = None

get_adj_mat(interaction_matrix, n_users, n_items, normalize=False)

Get the normalized interaction matrix of users and items.

Parameters:

Name Type Description Default
interaction_matrix coo_matrix

The full interaction matrix in coo format.

required
n_users int

The number of users.

required
n_items int

The number of items.

required
normalize bool

Wether or not to normalize the sparse adjacency matrix.

False

Returns:

Name Type Description
SparseTensor SparseTensor

The sparse adjacency matrix.

Source code in warprec/recommenders/collaborative_filtering_recommender/graph_based/graph_utils.py
def get_adj_mat(
    self,
    interaction_matrix: coo_matrix,
    n_users: int,
    n_items: int,
    normalize: bool = False,
) -> SparseTensor:
    """Get the normalized interaction matrix of users and items.

    Args:
        interaction_matrix (coo_matrix): The full interaction matrix in coo format.
        n_users (int): The number of users.
        n_items (int): The number of items.
        normalize (bool): Wether or not to normalize the sparse adjacency matrix.

    Returns:
        SparseTensor: The sparse adjacency matrix.
    """
    # Extract user and items nodes
    user_nodes = interaction_matrix.row
    item_nodes = interaction_matrix.col + n_users

    # Unify arcs in both directions
    row = np.concatenate([user_nodes, item_nodes])
    col = np.concatenate([item_nodes, user_nodes])

    # Create the edge tensor
    edge_index_np = np.vstack([row, col])  # Efficient solution

    # Creating a tensor directly from a numpy array instead of lists
    edge_index = torch.tensor(edge_index_np, dtype=torch.int64)

    # Create the SparseTensor using the edge indexes.
    adj = SparseTensor(
        row=edge_index[0],
        col=edge_index[1],
        sparse_sizes=(n_users + n_items, n_users + n_items),
    )

    # Normalize the SparseTensor if requested
    if normalize:
        adj = self._symmetric_normalization(adj)

    return adj

get_ego_embeddings(user_embedding, item_embedding)

Get the initial embedding of users and items and combine to an embedding matrix.

Parameters:

Name Type Description Default
user_embedding Embedding

The user embeddings.

required
item_embedding Embedding

The item embeddings.

required

Returns:

Name Type Description
Tensor Tensor

Combined user and item embeddings.

Source code in warprec/recommenders/collaborative_filtering_recommender/graph_based/graph_utils.py
def get_ego_embeddings(
    self, user_embedding: nn.Embedding, item_embedding: nn.Embedding
) -> Tensor:
    """Get the initial embedding of users and items and combine to an embedding matrix.

    Args:
        user_embedding (nn.Embedding): The user embeddings.
        item_embedding (nn.Embedding): The item embeddings.

    Returns:
        Tensor: Combined user and item embeddings.
    """
    user_embeddings = user_embedding.weight
    item_embeddings = item_embedding.weight
    ego_embeddings = torch.cat([user_embeddings, item_embeddings], dim=0)
    return ego_embeddings

propagate_embeddings()

Retrieve the propagate user and item embeddings.

Subsequent calls will return the cached values, speeding up the evaluation process.

Returns:

Type Description
Tuple[Tensor, Tensor]

Tuple[Tensor, Tensor]: (User Embeddings, Item Embeddings)

Source code in warprec/recommenders/collaborative_filtering_recommender/graph_based/graph_utils.py
def propagate_embeddings(self) -> Tuple[Tensor, Tensor]:
    """Retrieve the propagate user and item embeddings.

    Subsequent calls will return the cached values, speeding up the
    evaluation process.

    Returns:
        Tuple[Tensor, Tensor]: (User Embeddings, Item Embeddings)
    """
    # Safety check
    if self.training:
        return self.forward()[:2]

    # Check if values are cached
    if self._cached_user_emb is None or self._cached_item_emb is None:
        with torch.no_grad():
            # Unpack the forward
            ret = self.forward()
            self._cached_user_emb = ret[0]
            self._cached_item_emb = ret[1]

    return self._cached_user_emb, self._cached_item_emb

train(mode=True)

Override train mode to empty the cache when switching to training.

Source code in warprec/recommenders/collaborative_filtering_recommender/graph_based/graph_utils.py
def train(self, mode=True):
    """Override train mode to empty the cache when switching to training."""
    super().train(mode)

    if mode:
        # We are in training mode, embeddings will change. Empty the cache
        self._cached_user_emb = None
        self._cached_item_emb = None