Skip to content

Writer - API Reference

warprec.data.writer.base_writer.WriterFactory

Factory class for creating Writer instances based on configuration.

Attributes:

Name Type Description
config TrainConfiguration | EvalConfiguration | EstimateConfiguration

The configuration of the experiment.

Source code in warprec/data/writer/base_writer.py
class WriterFactory:  # pylint: disable=C0415, R0903
    """Factory class for creating Writer instances based on configuration.

    Attributes:
        config (TrainConfiguration | EvalConfiguration | EstimateConfiguration): The configuration of the experiment.
    """

    config: TrainConfiguration | EvalConfiguration | EstimateConfiguration = None

    @classmethod
    def get_writer(
        cls, config: TrainConfiguration | EvalConfiguration | EstimateConfiguration
    ) -> Writer:
        """Factory method to get the appropriate Writer instance based on the configuration.

        Args:
            config (TrainConfiguration | EvalConfiguration | EstimateConfiguration): The configuration
                of the experiment.

        Returns:
            Writer: An instance of a class that extends the Writer abstract class.

        Raises:
            ValueError: If the writing method specified in the configuration is unknown.
        """
        writer_type = config.writer.writing_method

        # Create the appropriate Writer instance based on the writing method
        match writer_type:
            case WritingMethods.LOCAL:
                from warprec.data.writer import LocalWriter

                dataset_name = config.writer.dataset_name
                local_path = config.writer.local_experiment_path

                return LocalWriter(
                    dataset_name=dataset_name,
                    local_path=local_path,
                )
            case WritingMethods.AZURE_BLOB:
                from warprec.data.writer import AzureBlobWriter

                storage_account_name = config.general.azure.storage_account_name
                container_name = config.general.azure.container_name
                dataset_name = config.writer.dataset_name
                blob_experiment_container = (
                    config.writer.azure_blob_experiment_container
                )

                return AzureBlobWriter(
                    storage_account_name=storage_account_name,
                    container_name=container_name,
                    dataset_name=dataset_name,
                    blob_experiment_container=blob_experiment_container,
                )

        raise ValueError(f"Unknown writer type: {writer_type}")

get_writer(config) classmethod

Factory method to get the appropriate Writer instance based on the configuration.

Parameters:

Name Type Description Default
config TrainConfiguration | EvalConfiguration | EstimateConfiguration

The configuration of the experiment.

required

Returns:

Name Type Description
Writer Writer

An instance of a class that extends the Writer abstract class.

Raises:

Type Description
ValueError

If the writing method specified in the configuration is unknown.

Source code in warprec/data/writer/base_writer.py
@classmethod
def get_writer(
    cls, config: TrainConfiguration | EvalConfiguration | EstimateConfiguration
) -> Writer:
    """Factory method to get the appropriate Writer instance based on the configuration.

    Args:
        config (TrainConfiguration | EvalConfiguration | EstimateConfiguration): The configuration
            of the experiment.

    Returns:
        Writer: An instance of a class that extends the Writer abstract class.

    Raises:
        ValueError: If the writing method specified in the configuration is unknown.
    """
    writer_type = config.writer.writing_method

    # Create the appropriate Writer instance based on the writing method
    match writer_type:
        case WritingMethods.LOCAL:
            from warprec.data.writer import LocalWriter

            dataset_name = config.writer.dataset_name
            local_path = config.writer.local_experiment_path

            return LocalWriter(
                dataset_name=dataset_name,
                local_path=local_path,
            )
        case WritingMethods.AZURE_BLOB:
            from warprec.data.writer import AzureBlobWriter

            storage_account_name = config.general.azure.storage_account_name
            container_name = config.general.azure.container_name
            dataset_name = config.writer.dataset_name
            blob_experiment_container = (
                config.writer.azure_blob_experiment_container
            )

            return AzureBlobWriter(
                storage_account_name=storage_account_name,
                container_name=container_name,
                dataset_name=dataset_name,
                blob_experiment_container=blob_experiment_container,
            )

    raise ValueError(f"Unknown writer type: {writer_type}")

warprec.data.writer.local_writer.LocalWriter

Bases: Writer

LocalWriter saves the experiment results to the local filesystem.

Parameters:

Name Type Description Default
dataset_name str

The name of the dataset.

required
local_path str

The root path for saving experiments.

required
Source code in warprec/data/writer/local_writer.py
class LocalWriter(Writer):
    """LocalWriter saves the experiment results to the local filesystem.

    Args:
        dataset_name (str): The name of the dataset.
        local_path (str): The root path for saving experiments.
    """

    def __init__(self, dataset_name: str, local_path: str):
        super().__init__()

        # Setup
        self.experiment_path = Path(self._path_join(local_path, dataset_name))
        self.experiment_evaluation_path = Path(
            self._path_join(self.experiment_path, "evaluation")
        )
        self.experiment_recommendation_path = Path(
            self._path_join(self.experiment_path, "recs")
        )
        self.experiment_serialized_models_path = Path(
            self._path_join(self.experiment_path, "serialized")
        )
        self.experiment_params_path = Path(
            self._path_join(self.experiment_path, "params")
        )
        self.experiment_split_path = Path(
            self._path_join(self.experiment_path, "split")
        )

        self.setup_experiment()

    def _path_join(self, *args) -> str:
        """Joins path components for the local filesystem."""
        return str(Path(args[0]).joinpath(*args[1:]))

    def setup_experiment(self):
        """Creates all necessary local directories for the experiment."""
        logger.msg("Setting up experiment local folder.")
        for path in [
            self.experiment_path,
            self.experiment_evaluation_path,
            self.experiment_recommendation_path,
            self.experiment_serialized_models_path,
            self.experiment_params_path,
            self.experiment_split_path,
        ]:
            path.mkdir(parents=True, exist_ok=True)
        logger.msg("Experiment folder created successfully.")

    def write_recs(
        self,
        model: Recommender,
        dataset: Dataset,
        k: int,
        sep: str = "\t",
        ext: str = ".tsv",
        header: bool = True,
        user_label: str = "user_id",
        item_label: str = "item_id",
        rating_label: str = "rating",
    ):
        """Writes recommendations to a local file in a streaming fashion."""
        path = self._path_join(
            self.experiment_recommendation_path,
            f"{model.name}_{self._timestamp}{ext}",
        )

        try:
            with open(path, "w", newline="", encoding="utf-8") as f:
                writer = csv.writer(f, delimiter=sep)
                if header:
                    writer.writerow([user_label, item_label, rating_label])

                # The generator now yields entire batches of rows
                batch_generator = self._generate_recommendation_batches(
                    model, dataset, k
                )

                # Iterate over batches and write them to the file
                for batch in batch_generator:
                    writer.writerows(batch)
            logger.msg(f"Recommendations successfully written to {path}")
        except (OSError, csv.Error) as e:
            logger.negative(f"Error writing recommendations to {path}: {e}")

    def _write_text(self, path: str, content: str) -> None:
        """Writes text content to a local file."""
        Path(path).write_text(content, encoding="utf-8")

    def _read_text(self, path: str) -> Optional[str]:
        """Reads text content from a local file if it exists."""
        p = Path(path)
        if p.exists() and p.stat().st_size > 0:
            return p.read_text(encoding="utf-8")
        return None

    def _write_bytes(self, path: str, content: bytes) -> None:
        """Writes binary content to a local file."""
        Path(path).write_bytes(content)

setup_experiment()

Creates all necessary local directories for the experiment.

Source code in warprec/data/writer/local_writer.py
def setup_experiment(self):
    """Creates all necessary local directories for the experiment."""
    logger.msg("Setting up experiment local folder.")
    for path in [
        self.experiment_path,
        self.experiment_evaluation_path,
        self.experiment_recommendation_path,
        self.experiment_serialized_models_path,
        self.experiment_params_path,
        self.experiment_split_path,
    ]:
        path.mkdir(parents=True, exist_ok=True)
    logger.msg("Experiment folder created successfully.")

write_recs(model, dataset, k, sep='\t', ext='.tsv', header=True, user_label='user_id', item_label='item_id', rating_label='rating')

Writes recommendations to a local file in a streaming fashion.

Source code in warprec/data/writer/local_writer.py
def write_recs(
    self,
    model: Recommender,
    dataset: Dataset,
    k: int,
    sep: str = "\t",
    ext: str = ".tsv",
    header: bool = True,
    user_label: str = "user_id",
    item_label: str = "item_id",
    rating_label: str = "rating",
):
    """Writes recommendations to a local file in a streaming fashion."""
    path = self._path_join(
        self.experiment_recommendation_path,
        f"{model.name}_{self._timestamp}{ext}",
    )

    try:
        with open(path, "w", newline="", encoding="utf-8") as f:
            writer = csv.writer(f, delimiter=sep)
            if header:
                writer.writerow([user_label, item_label, rating_label])

            # The generator now yields entire batches of rows
            batch_generator = self._generate_recommendation_batches(
                model, dataset, k
            )

            # Iterate over batches and write them to the file
            for batch in batch_generator:
                writer.writerows(batch)
        logger.msg(f"Recommendations successfully written to {path}")
    except (OSError, csv.Error) as e:
        logger.negative(f"Error writing recommendations to {path}: {e}")

warprec.data.writer.azureblob_writer.AzureBlobWriter

Bases: Writer

AzureBlobWriter saves the experiment results to Azure Blob Storage.

Parameters:

Name Type Description Default
storage_account_name str

The storage account name of the Azure Blob Storage.

required
container_name str

The name of the blob container where results will be stored.

required
dataset_name str

The name of the dataset.

required
blob_experiment_container str

The root container for experiments. Defaults to "experiments".

'experiments'
Source code in warprec/data/writer/azureblob_writer.py
class AzureBlobWriter(Writer):
    """AzureBlobWriter saves the experiment results to Azure Blob Storage.

    Args:
        storage_account_name (str): The storage account name of the Azure Blob Storage.
        container_name (str): The name of the blob container where results will be stored.
        dataset_name (str): The name of the dataset.
        blob_experiment_container (str): The root container for experiments. Defaults to "experiments".
    """

    def __init__(
        self,
        storage_account_name: str,
        container_name: str,
        dataset_name: str,
        blob_experiment_container: str = "experiments",
    ):
        super().__init__()

        credential = DefaultAzureCredential()
        account_url = f"https://{storage_account_name}.blob.core.windows.net"
        self.blob_service_client = BlobServiceClient(
            account_url=account_url, credential=credential
        )
        self.container_name = container_name
        self.container_client = self.blob_service_client.get_container_client(
            self.container_name
        )

        self.experiment_path = posixpath.join(blob_experiment_container, dataset_name)
        self.experiment_evaluation_path = posixpath.join(
            self.experiment_path, "evaluation"
        )
        self.experiment_recommendation_path = posixpath.join(
            self.experiment_path, "recs"
        )
        self.experiment_serialized_models_path = posixpath.join(
            self.experiment_path, "serialized"
        )
        self.experiment_params_path = posixpath.join(self.experiment_path, "params")
        self.experiment_split_path = posixpath.join(self.experiment_path, "split")

        self.setup_experiment()

    def _path_join(self, *args) -> str:
        """Joins path components for a blob URI."""
        return posixpath.join(*args)

    def setup_experiment(self):
        """Ensures the Azure Blob container exists."""
        logger.msg(
            f"Setting up experiment on Azure Blob container '{self.container_name}'."
        )
        try:
            self.container_client.create_container()
            logger.msg(f"Container '{self.container_name}' created.")
        except ResourceExistsError:
            logger.msg(f"Container '{self.container_name}' already exists.")
        logger.msg("Experiment setup on Azure completed successfully.")

    def write_recs(
        self,
        model: Recommender,
        dataset: Dataset,
        k: int,
        sep: str = "\t",
        ext: str = ".tsv",
        header: bool = True,
        user_label: str = "user_id",
        item_label: str = "item_id",
        rating_label: str = "rating",
    ) -> None:
        """Uploads recommendations to Azure Blob Storage in a streaming fashion."""
        path = self._path_join(
            self.experiment_recommendation_path,
            f"{model.name}_{self._timestamp}{ext}",
        )
        blob_client = self.container_client.get_blob_client(path)

        # Get the generator that yields batches of recommendation rows
        batch_generator = self._generate_recommendation_batches(model, dataset, k)

        def csv_batch_generator() -> Generator[bytes, None, None]:
            """A generator that yields CSV data in batches as UTF-8 encoded bytes."""
            string_io = StringIO()
            writer = csv.writer(string_io, delimiter=sep)

            if header:
                writer.writerow([user_label, item_label, rating_label])
                yield string_io.getvalue().encode("utf-8")
                string_io.seek(0)
                string_io.truncate(0)

            # Iterate over batches of rows and write them
            for batch in batch_generator:
                writer.writerows(batch)
                yield string_io.getvalue().encode("utf-8")

                # Reset the StringIO for the next batch
                string_io.seek(0)
                string_io.truncate(0)

            string_io.close()

        try:
            # upload_blob can take a generator for memory-efficient streaming
            blob_client.upload_blob(csv_batch_generator(), overwrite=True)
            logger.msg(f"Recommendations successfully written to blob: {path}")
        except AzureError as e:
            logger.negative(f"Error writing recommendations to blob {path}: {e}")

    def _write_text(self, path: str, content: str) -> None:
        """Uploads text content to a blob."""
        self.container_client.get_blob_client(path).upload_blob(
            content, overwrite=True, encoding="utf-8"
        )

    def _read_text(self, path: str) -> Optional[str]:
        """Downloads text content from a blob if it exists."""
        try:
            blob_client = self.container_client.get_blob_client(path)
            return blob_client.download_blob(encoding="utf-8").readall()
        except ResourceNotFoundError:
            return None

    def _write_bytes(self, path: str, content: bytes) -> None:
        """Uploads binary content to a blob."""
        self.container_client.get_blob_client(path).upload_blob(content, overwrite=True)

setup_experiment()

Ensures the Azure Blob container exists.

Source code in warprec/data/writer/azureblob_writer.py
def setup_experiment(self):
    """Ensures the Azure Blob container exists."""
    logger.msg(
        f"Setting up experiment on Azure Blob container '{self.container_name}'."
    )
    try:
        self.container_client.create_container()
        logger.msg(f"Container '{self.container_name}' created.")
    except ResourceExistsError:
        logger.msg(f"Container '{self.container_name}' already exists.")
    logger.msg("Experiment setup on Azure completed successfully.")

write_recs(model, dataset, k, sep='\t', ext='.tsv', header=True, user_label='user_id', item_label='item_id', rating_label='rating')

Uploads recommendations to Azure Blob Storage in a streaming fashion.

Source code in warprec/data/writer/azureblob_writer.py
def write_recs(
    self,
    model: Recommender,
    dataset: Dataset,
    k: int,
    sep: str = "\t",
    ext: str = ".tsv",
    header: bool = True,
    user_label: str = "user_id",
    item_label: str = "item_id",
    rating_label: str = "rating",
) -> None:
    """Uploads recommendations to Azure Blob Storage in a streaming fashion."""
    path = self._path_join(
        self.experiment_recommendation_path,
        f"{model.name}_{self._timestamp}{ext}",
    )
    blob_client = self.container_client.get_blob_client(path)

    # Get the generator that yields batches of recommendation rows
    batch_generator = self._generate_recommendation_batches(model, dataset, k)

    def csv_batch_generator() -> Generator[bytes, None, None]:
        """A generator that yields CSV data in batches as UTF-8 encoded bytes."""
        string_io = StringIO()
        writer = csv.writer(string_io, delimiter=sep)

        if header:
            writer.writerow([user_label, item_label, rating_label])
            yield string_io.getvalue().encode("utf-8")
            string_io.seek(0)
            string_io.truncate(0)

        # Iterate over batches of rows and write them
        for batch in batch_generator:
            writer.writerows(batch)
            yield string_io.getvalue().encode("utf-8")

            # Reset the StringIO for the next batch
            string_io.seek(0)
            string_io.truncate(0)

        string_io.close()

    try:
        # upload_blob can take a generator for memory-efficient streaming
        blob_client.upload_blob(csv_batch_generator(), overwrite=True)
        logger.msg(f"Recommendations successfully written to blob: {path}")
    except AzureError as e:
        logger.negative(f"Error writing recommendations to blob {path}: {e}")