Skip to content

Reader - API Reference

warprec.data.reader.base_reader.ReaderFactory

Factory class for creating Reader instances based on configuration.

Source code in warprec/data/reader/base_reader.py
class ReaderFactory:  # pylint: disable=C0415, R0903
    """Factory class for creating Reader instances based on configuration."""

    @classmethod
    def get_reader(cls, config: WarpRecConfiguration) -> Reader:
        """Factory method to get the appropriate Reader instance based on the configuration.

        Args:
            config (WarpRecConfiguration): Configuration file.

        Returns:
            Reader: An instance of a class that extends the Reader abstract class.

        Raises:
            ValueError: If the reading method specified in the configuration is unknown.
        """
        reader_type = config.reader.reading_method
        backend = config.general.backend

        # Create the appropriate Reader instance based on the reading method
        match reader_type:
            case ReadingMethods.LOCAL:
                from warprec.data.reader import LocalReader

                return LocalReader(backend=backend)
            case ReadingMethods.AZURE_BLOB:
                from warprec.data.reader import AzureBlobReader

                storage_account_name = config.general.azure.storage_account_name
                container_name = config.general.azure.container_name

                return AzureBlobReader(
                    storage_account_name=storage_account_name,
                    container_name=container_name,
                    backend=backend,
                )

        raise ValueError(f"Unknown reader type: {reader_type}")

get_reader(config) classmethod

Factory method to get the appropriate Reader instance based on the configuration.

Parameters:

Name Type Description Default
config WarpRecConfiguration

Configuration file.

required

Returns:

Name Type Description
Reader Reader

An instance of a class that extends the Reader abstract class.

Raises:

Type Description
ValueError

If the reading method specified in the configuration is unknown.

Source code in warprec/data/reader/base_reader.py
@classmethod
def get_reader(cls, config: WarpRecConfiguration) -> Reader:
    """Factory method to get the appropriate Reader instance based on the configuration.

    Args:
        config (WarpRecConfiguration): Configuration file.

    Returns:
        Reader: An instance of a class that extends the Reader abstract class.

    Raises:
        ValueError: If the reading method specified in the configuration is unknown.
    """
    reader_type = config.reader.reading_method
    backend = config.general.backend

    # Create the appropriate Reader instance based on the reading method
    match reader_type:
        case ReadingMethods.LOCAL:
            from warprec.data.reader import LocalReader

            return LocalReader(backend=backend)
        case ReadingMethods.AZURE_BLOB:
            from warprec.data.reader import AzureBlobReader

            storage_account_name = config.general.azure.storage_account_name
            container_name = config.general.azure.container_name

            return AzureBlobReader(
                storage_account_name=storage_account_name,
                container_name=container_name,
                backend=backend,
            )

    raise ValueError(f"Unknown reader type: {reader_type}")

warprec.data.reader.local_reader.LocalReader

Bases: Reader

This class extends Reader and handles data reading from a local machine.

Source code in warprec/data/reader/local_reader.py
class LocalReader(Reader):
    """This class extends Reader and handles data reading from a local machine."""

    def read_tabular(
        self,
        local_path: str,
        column_names: Optional[List[str]] = None,
        dtypes: Optional[Dict[str, str]] = None,
        sep: str = "\t",
        header: bool = True,
        *args: Any,
        **kwargs: Any,
    ) -> DataFrame[Any]:
        """Reads tabular data (e.g., CSV, TSV) from a local file.

        The file content is read into memory and then processed robustly by the
        parent's stream processor.

        Args:
            local_path (str): The local file path to the tabular data.
            column_names (Optional[List[str]]): A list of expected column names.
            dtypes (Optional[Dict[str, str]]): A dict of data types corresponding to `column_names`.
            sep (str): The delimiter character used in the file. Defaults to tab `\t`.
            header (bool): A boolean indicating if the file has a header row. Defaults to `True`.
            *args (Any): The additional arguments.
            **kwargs (Any): The additional keyword arguments.

        Returns:
            DataFrame[Any]: A DataFrame containing the tabular data. Returns an empty DataFrame
                if the blob is not found.
        """
        path = Path(local_path)
        if not path.exists():
            return nw.from_native(pd.DataFrame())

        return self._process_tabular_data(
            source=path,
            sep=sep,
            header=header,
            desired_cols=column_names,
            desired_dtypes=dtypes,
        )

    def read_parquet(
        self,
        local_path: str,
        column_names: Optional[List[str]] = None,
        *args: Any,
        **kwargs: Any,
    ) -> DataFrame[Any]:
        """Reads data from a local parquet file.

        Args:
            local_path (str): The local file path to the parquet data.
            column_names (Optional[List[str]]): A list of specific columns to read.
            *args (Any): The additional arguments.
            **kwargs (Any): The additional keyword arguments.

        Returns:
            DataFrame[Any]: A Narwhals DataFrame containing the data.
        """
        path = Path(local_path)
        if not path.exists():
            # Return empty compatible with the backend
            return nw.from_native(pd.DataFrame())
        return self._process_parquet_data(source=path, desired_cols=column_names)

    def read_tabular_split(
        self,
        local_path: str,
        column_names: Optional[List[str]],
        dtypes: Optional[Dict[str, str]],
        sep: str = "\t",
        ext: str = ".tsv",
        header: bool = True,
        *args: Any,
        **kwargs: Any,
    ) -> Tuple[
        DataFrame[Any],
        Optional[List[Tuple[DataFrame[Any], DataFrame[Any]]] | DataFrame[Any]],
        DataFrame[Any],
    ]:
        return super()._process_tabular_split(
            base_location=local_path,
            column_names=column_names,
            dtypes=dtypes,
            sep=sep,
            ext=ext,
            header=header,
            is_remote=False,  # Specify local path handling
        )

    def read_parquet_split(
        self,
        local_path: str,
        column_names: Optional[List[str]] = None,
        ext: str = ".parquet",
        *args: Any,
        **kwargs: Any,
    ) -> Tuple[
        DataFrame[Any],
        Optional[List[Tuple[DataFrame[Any], DataFrame[Any]]] | DataFrame[Any]],
        DataFrame[Any],
    ]:
        return super()._process_parquet_split(
            base_location=local_path,
            column_names=column_names,
            ext=ext,
            is_remote=False,  # Specify local path handling
        )

    def read_json(self, *args, **kwargs):
        """This method will read the json data from the source."""
        raise NotImplementedError

    def read_json_split(self, *args, **kwargs):
        """This method will read the json split data from the source."""
        raise NotImplementedError

    def load_model_state(self, local_path: str) -> dict:
        """Loads a model state from a given path.

        Args:
            local_path (str): The path to the model state file.

        Returns:
            dict: The deserialized information of the model (e.g., weights, hyperparameters)
                loaded using `joblib`.

        Raises:
            FileNotFoundError: If the model state was not found in the provided path.
        """
        path = Path(local_path)
        if path.exists():
            return joblib.load(path)
        raise FileNotFoundError(f"Model state not found in {path}")

load_model_state(local_path)

Loads a model state from a given path.

Parameters:

Name Type Description Default
local_path str

The path to the model state file.

required

Returns:

Name Type Description
dict dict

The deserialized information of the model (e.g., weights, hyperparameters) loaded using joblib.

Raises:

Type Description
FileNotFoundError

If the model state was not found in the provided path.

Source code in warprec/data/reader/local_reader.py
def load_model_state(self, local_path: str) -> dict:
    """Loads a model state from a given path.

    Args:
        local_path (str): The path to the model state file.

    Returns:
        dict: The deserialized information of the model (e.g., weights, hyperparameters)
            loaded using `joblib`.

    Raises:
        FileNotFoundError: If the model state was not found in the provided path.
    """
    path = Path(local_path)
    if path.exists():
        return joblib.load(path)
    raise FileNotFoundError(f"Model state not found in {path}")

read_json(*args, **kwargs)

This method will read the json data from the source.

Source code in warprec/data/reader/local_reader.py
def read_json(self, *args, **kwargs):
    """This method will read the json data from the source."""
    raise NotImplementedError

read_json_split(*args, **kwargs)

This method will read the json split data from the source.

Source code in warprec/data/reader/local_reader.py
def read_json_split(self, *args, **kwargs):
    """This method will read the json split data from the source."""
    raise NotImplementedError

read_parquet(local_path, column_names=None, *args, **kwargs)

Reads data from a local parquet file.

Parameters:

Name Type Description Default
local_path str

The local file path to the parquet data.

required
column_names Optional[List[str]]

A list of specific columns to read.

None
*args Any

The additional arguments.

()
**kwargs Any

The additional keyword arguments.

{}

Returns:

Type Description
DataFrame[Any]

DataFrame[Any]: A Narwhals DataFrame containing the data.

Source code in warprec/data/reader/local_reader.py
def read_parquet(
    self,
    local_path: str,
    column_names: Optional[List[str]] = None,
    *args: Any,
    **kwargs: Any,
) -> DataFrame[Any]:
    """Reads data from a local parquet file.

    Args:
        local_path (str): The local file path to the parquet data.
        column_names (Optional[List[str]]): A list of specific columns to read.
        *args (Any): The additional arguments.
        **kwargs (Any): The additional keyword arguments.

    Returns:
        DataFrame[Any]: A Narwhals DataFrame containing the data.
    """
    path = Path(local_path)
    if not path.exists():
        # Return empty compatible with the backend
        return nw.from_native(pd.DataFrame())
    return self._process_parquet_data(source=path, desired_cols=column_names)

read_tabular(local_path, column_names=None, dtypes=None, sep='\t', header=True, *args, **kwargs)

Reads tabular data (e.g., CSV, TSV) from a local file.

The file content is read into memory and then processed robustly by the parent's stream processor.

Parameters:

Name Type Description Default
local_path str

The local file path to the tabular data.

required
column_names Optional[List[str]]

A list of expected column names.

None
dtypes Optional[Dict[str, str]]

A dict of data types corresponding to column_names.

None
sep str

The delimiter character used in the file. Defaults to tab .

'\t'
header bool

A boolean indicating if the file has a header row. Defaults to True.

True
*args Any

The additional arguments.

()
**kwargs Any

The additional keyword arguments.

{}

Returns:

Type Description
DataFrame[Any]

DataFrame[Any]: A DataFrame containing the tabular data. Returns an empty DataFrame if the blob is not found.

Source code in warprec/data/reader/local_reader.py
def read_tabular(
    self,
    local_path: str,
    column_names: Optional[List[str]] = None,
    dtypes: Optional[Dict[str, str]] = None,
    sep: str = "\t",
    header: bool = True,
    *args: Any,
    **kwargs: Any,
) -> DataFrame[Any]:
    """Reads tabular data (e.g., CSV, TSV) from a local file.

    The file content is read into memory and then processed robustly by the
    parent's stream processor.

    Args:
        local_path (str): The local file path to the tabular data.
        column_names (Optional[List[str]]): A list of expected column names.
        dtypes (Optional[Dict[str, str]]): A dict of data types corresponding to `column_names`.
        sep (str): The delimiter character used in the file. Defaults to tab `\t`.
        header (bool): A boolean indicating if the file has a header row. Defaults to `True`.
        *args (Any): The additional arguments.
        **kwargs (Any): The additional keyword arguments.

    Returns:
        DataFrame[Any]: A DataFrame containing the tabular data. Returns an empty DataFrame
            if the blob is not found.
    """
    path = Path(local_path)
    if not path.exists():
        return nw.from_native(pd.DataFrame())

    return self._process_tabular_data(
        source=path,
        sep=sep,
        header=header,
        desired_cols=column_names,
        desired_dtypes=dtypes,
    )

warprec.data.reader.azureblob_reader.AzureBlobReader

Bases: Reader

This class extends Reader and handles data reading from an Azure Blob Storage container.

It uses DefaultAzureCredential to authenticate, which relies on environment variables or other standard Azure identity sources.

Parameters:

Name Type Description Default
storage_account_name str

The name of the Azure Storage Account.

required
container_name str

The name of the container where data is stored.

required
backend str

The backend to use for reading data.

'polars'
Source code in warprec/data/reader/azureblob_reader.py
class AzureBlobReader(Reader):
    """This class extends Reader and handles data reading from an Azure Blob Storage container.

    It uses DefaultAzureCredential to authenticate, which relies on
    environment variables or other standard Azure identity sources.

    Args:
        storage_account_name (str): The name of the Azure Storage Account.
        container_name (str): The name of the container where data is stored.
        backend (str): The backend to use for reading data.
    """

    def __init__(
        self,
        storage_account_name: str,
        container_name: str,
        backend: str = "polars",
    ) -> None:
        super().__init__(backend=backend)

        # Retrieve Azure credentials from the environment
        credential = DefaultAzureCredential()

        # Create the BlobService client
        account_url = f"https://{storage_account_name}.blob.core.windows.net"
        self.blob_service_client = BlobServiceClient(
            account_url=account_url, credential=credential
        )

        # Retrieve the container client
        self.container_name = container_name
        self.container_client = self.blob_service_client.get_container_client(
            self.container_name
        )
        logger.msg(
            f"AzureBlobReader initialized for container '{self.container_name}'."
        )

    def _download_blob_content(
        self, blob_name: str, as_bytes: bool = False
    ) -> str | bytes | None:
        """Helper to download a blob's content.

        Args:
            blob_name (str): The full path/name of the blob within the container.
            as_bytes (bool): If True, returns content as raw bytes; otherwise,
                decodes to UTF-8 string.

        Returns:
            str | bytes | None: The blob content as a string or bytes,
                or None if the resource is not found.
        """
        try:
            blob_client = self.container_client.get_blob_client(blob_name)
            downloader = blob_client.download_blob()
            if as_bytes:
                return downloader.readall()
            return downloader.readall().decode("utf-8")
        except ResourceNotFoundError:
            logger.attention(
                f"Blob '{blob_name}' not found in container '{self.container_name}'."
            )
            return None

    def read_tabular(
        self,
        blob_name: str,
        column_names: Optional[List[str]],
        dtypes: Optional[Dict[str, str]],
        sep: str = "\t",
        header: bool = True,
        *args: Any,
        **kwargs: Any,
    ) -> DataFrame:
        """Reads tabular data from a blob by feeding it to the parent stream processor.

        Downloads the blob content as a string and uses the inherited `_process_tabular_stream`
        for robust tabular parsing.

        Args:
            blob_name (str): The path/name of the blob containing the tabular data.
            column_names (Optional[List[str]]): A list of expected column names.
            dtypes (Optional[Dict[str, str]]): A dict of data types corresponding to `column_names`.
            sep (str): The delimiter character used in the file. Defaults to tab `\t`.
            header (bool): A boolean indicating if the file has a header row. Defaults to `True`.
            *args (Any): The additional arguments.
            **kwargs (Any): The additional keyword arguments.

        Returns:
            DataFrame: A Pandas DataFrame containing the tabular data. Returns an empty DataFrame
                if the blob is not found.
        """
        as_bytes = self.backend == "polars"

        content = self._download_blob_content(blob_name, as_bytes=as_bytes)

        if content is None:
            return nw.from_native(pd.DataFrame())

        stream: Union[BytesIO, StringIO]
        if isinstance(content, bytes):
            stream = BytesIO(content)
        else:
            stream = StringIO(content)

        return self._process_tabular_data(
            source=stream,
            sep=sep,
            header=header,
            desired_cols=column_names,
            desired_dtypes=dtypes,
        )

    def read_parquet(
        self,
        blob_name: str,
        column_names: Optional[List[str]] = None,
        *args: Any,
        **kwargs: Any,
    ) -> DataFrame[Any]:
        """Reads parquet data from a blob.

        Downloads the blob content as bytes and uses the inherited `_process_parquet_data`
        for parsing.

        Args:
            blob_name (str): The path/name of the blob containing the parquet data.
            column_names (Optional[List[str]]): A list of specific columns to read.
            *args (Any): The additional arguments.
            **kwargs (Any): The additional keyword arguments.

        Returns:
            DataFrame[Any]: A Narwhals DataFrame containing the data.
        """
        content = self._download_blob_content(blob_name, as_bytes=True)

        if content is None:
            if self.backend == "polars":
                import polars as pl

                return nw.from_native(pl.DataFrame())
            return nw.from_native(pd.DataFrame())

        stream = BytesIO(content)  # type: ignore[arg-type]

        return self._process_parquet_data(source=stream, desired_cols=column_names)

    def read_tabular_split(
        self,
        blob_prefix: str,
        column_names: Optional[List[str]],
        dtypes: Optional[Dict[str, str]],
        sep: str = "\t",
        ext: str = ".tsv",
        header: bool = True,
        *args: Any,
        **kwargs: Any,
    ) -> Tuple[
        DataFrame, Optional[List[Tuple[DataFrame, DataFrame]] | DataFrame], DataFrame
    ]:
        return super()._process_tabular_split(
            base_location=blob_prefix,
            column_names=column_names,
            dtypes=dtypes,
            sep=sep,
            ext=ext,
            header=header,
            is_remote=True,  # Specify remote path handling
        )

    def read_parquet_split(
        self,
        blob_prefix: str,
        column_names: Optional[List[str]] = None,
        ext: str = ".parquet",
        *args: Any,
        **kwargs: Any,
    ) -> Tuple[
        DataFrame[Any],
        Optional[List[Tuple[DataFrame[Any], DataFrame[Any]]] | DataFrame[Any]],
        DataFrame[Any],
    ]:
        return super()._process_parquet_split(
            base_location=blob_prefix,
            column_names=column_names,
            ext=ext,
            is_remote=True,  # Specify remote path handling
        )

    def read_json(self, *args, **kwargs):
        """This method will read the json data from the source."""
        raise NotImplementedError

    def read_json_split(self, *args, **kwargs):
        """This method will read the json split data from the source."""
        raise NotImplementedError

    def load_model_state(self, blob_name: str) -> dict:
        """This method will load a model state from a source.

        Downloads the blob content as bytes and uses `joblib.load` to deserialize the model state.

        Args:
            blob_name (str): The path/name of the blob containing the serialized model state (e.g., a `.joblib` file).

        Returns:
            dict: A dictionary representing the loaded model state (e.g., weights, hyperparameters).

        Raises:
            FileNotFoundError: If the model state blob is not found.
        """
        logger.msg(f"Loading model state from blob: {blob_name}")
        blob_content_bytes = self._download_blob_content(blob_name, as_bytes=True)

        if blob_content_bytes is None:
            raise FileNotFoundError(
                f"Model state blob '{blob_name}' not found in container '{self.container_name}'."
            )

        buffer = BytesIO(blob_content_bytes)  # type: ignore[arg-type]
        model_state = joblib.load(buffer)
        logger.msg("Model state loaded successfully.")
        return model_state

load_model_state(blob_name)

This method will load a model state from a source.

Downloads the blob content as bytes and uses joblib.load to deserialize the model state.

Parameters:

Name Type Description Default
blob_name str

The path/name of the blob containing the serialized model state (e.g., a .joblib file).

required

Returns:

Name Type Description
dict dict

A dictionary representing the loaded model state (e.g., weights, hyperparameters).

Raises:

Type Description
FileNotFoundError

If the model state blob is not found.

Source code in warprec/data/reader/azureblob_reader.py
def load_model_state(self, blob_name: str) -> dict:
    """This method will load a model state from a source.

    Downloads the blob content as bytes and uses `joblib.load` to deserialize the model state.

    Args:
        blob_name (str): The path/name of the blob containing the serialized model state (e.g., a `.joblib` file).

    Returns:
        dict: A dictionary representing the loaded model state (e.g., weights, hyperparameters).

    Raises:
        FileNotFoundError: If the model state blob is not found.
    """
    logger.msg(f"Loading model state from blob: {blob_name}")
    blob_content_bytes = self._download_blob_content(blob_name, as_bytes=True)

    if blob_content_bytes is None:
        raise FileNotFoundError(
            f"Model state blob '{blob_name}' not found in container '{self.container_name}'."
        )

    buffer = BytesIO(blob_content_bytes)  # type: ignore[arg-type]
    model_state = joblib.load(buffer)
    logger.msg("Model state loaded successfully.")
    return model_state

read_json(*args, **kwargs)

This method will read the json data from the source.

Source code in warprec/data/reader/azureblob_reader.py
def read_json(self, *args, **kwargs):
    """This method will read the json data from the source."""
    raise NotImplementedError

read_json_split(*args, **kwargs)

This method will read the json split data from the source.

Source code in warprec/data/reader/azureblob_reader.py
def read_json_split(self, *args, **kwargs):
    """This method will read the json split data from the source."""
    raise NotImplementedError

read_parquet(blob_name, column_names=None, *args, **kwargs)

Reads parquet data from a blob.

Downloads the blob content as bytes and uses the inherited _process_parquet_data for parsing.

Parameters:

Name Type Description Default
blob_name str

The path/name of the blob containing the parquet data.

required
column_names Optional[List[str]]

A list of specific columns to read.

None
*args Any

The additional arguments.

()
**kwargs Any

The additional keyword arguments.

{}

Returns:

Type Description
DataFrame[Any]

DataFrame[Any]: A Narwhals DataFrame containing the data.

Source code in warprec/data/reader/azureblob_reader.py
def read_parquet(
    self,
    blob_name: str,
    column_names: Optional[List[str]] = None,
    *args: Any,
    **kwargs: Any,
) -> DataFrame[Any]:
    """Reads parquet data from a blob.

    Downloads the blob content as bytes and uses the inherited `_process_parquet_data`
    for parsing.

    Args:
        blob_name (str): The path/name of the blob containing the parquet data.
        column_names (Optional[List[str]]): A list of specific columns to read.
        *args (Any): The additional arguments.
        **kwargs (Any): The additional keyword arguments.

    Returns:
        DataFrame[Any]: A Narwhals DataFrame containing the data.
    """
    content = self._download_blob_content(blob_name, as_bytes=True)

    if content is None:
        if self.backend == "polars":
            import polars as pl

            return nw.from_native(pl.DataFrame())
        return nw.from_native(pd.DataFrame())

    stream = BytesIO(content)  # type: ignore[arg-type]

    return self._process_parquet_data(source=stream, desired_cols=column_names)

read_tabular(blob_name, column_names, dtypes, sep='\t', header=True, *args, **kwargs)

Reads tabular data from a blob by feeding it to the parent stream processor.

Downloads the blob content as a string and uses the inherited _process_tabular_stream for robust tabular parsing.

Parameters:

Name Type Description Default
blob_name str

The path/name of the blob containing the tabular data.

required
column_names Optional[List[str]]

A list of expected column names.

required
dtypes Optional[Dict[str, str]]

A dict of data types corresponding to column_names.

required
sep str

The delimiter character used in the file. Defaults to tab .

'\t'
header bool

A boolean indicating if the file has a header row. Defaults to True.

True
*args Any

The additional arguments.

()
**kwargs Any

The additional keyword arguments.

{}

Returns:

Name Type Description
DataFrame DataFrame

A Pandas DataFrame containing the tabular data. Returns an empty DataFrame if the blob is not found.

Source code in warprec/data/reader/azureblob_reader.py
def read_tabular(
    self,
    blob_name: str,
    column_names: Optional[List[str]],
    dtypes: Optional[Dict[str, str]],
    sep: str = "\t",
    header: bool = True,
    *args: Any,
    **kwargs: Any,
) -> DataFrame:
    """Reads tabular data from a blob by feeding it to the parent stream processor.

    Downloads the blob content as a string and uses the inherited `_process_tabular_stream`
    for robust tabular parsing.

    Args:
        blob_name (str): The path/name of the blob containing the tabular data.
        column_names (Optional[List[str]]): A list of expected column names.
        dtypes (Optional[Dict[str, str]]): A dict of data types corresponding to `column_names`.
        sep (str): The delimiter character used in the file. Defaults to tab `\t`.
        header (bool): A boolean indicating if the file has a header row. Defaults to `True`.
        *args (Any): The additional arguments.
        **kwargs (Any): The additional keyword arguments.

    Returns:
        DataFrame: A Pandas DataFrame containing the tabular data. Returns an empty DataFrame
            if the blob is not found.
    """
    as_bytes = self.backend == "polars"

    content = self._download_blob_content(blob_name, as_bytes=as_bytes)

    if content is None:
        return nw.from_native(pd.DataFrame())

    stream: Union[BytesIO, StringIO]
    if isinstance(content, bytes):
        stream = BytesIO(content)
    else:
        stream = StringIO(content)

    return self._process_tabular_data(
        source=stream,
        sep=sep,
        header=header,
        desired_cols=column_names,
        desired_dtypes=dtypes,
    )