Skip to content

Build design matrix

Module to regroup the steps to build the design matrix for DESeq2.

build_design_matrix

BuildDesignMatrix

Bases: AggMergeDesignColumnsBuildContrast, AggMergeDesignLevels, LocGetLocalFactors, LocSetLocalDesign, LocOderDesignComputeLogMean

Mixin class to implement the computation of the design matrix.

Methods:

Name Description
build_design_matrix

The method to build the design matrix, that must be used in the main pipeline.

check_design_matrix

The method to check the design matrix, that must be used in the main pipeline while we are testing.

Source code in fedpydeseq2/core/deseq2_core/build_design_matrix/build_design_matrix.py
class BuildDesignMatrix(
    AggMergeDesignColumnsBuildContrast,
    AggMergeDesignLevels,
    LocGetLocalFactors,
    LocSetLocalDesign,
    LocOderDesignComputeLogMean,
):
    """Mixin class to implement the computation of the design matrix.

    Methods
    -------
    build_design_matrix
        The method to build the design matrix, that must be used in the main
        pipeline.

    check_design_matrix
        The method to check the design matrix, that must be used in the main
        pipeline while we are testing.
    """

    def build_design_matrix(
        self, train_data_nodes, aggregation_node, local_states, round_idx, clean_models
    ):
        """Build the design matrix.

        Parameters
        ----------
        train_data_nodes: list
            List of TrainDataNode.

        aggregation_node: AggregationNode
            The aggregation node.

        local_states: dict
            Local states. Required to propagate intermediate results.

        round_idx:
            The current round

        clean_models: bool
            Whether to clean the models after the computation.

        Returns
        -------
        local_states: dict
            Local states. Required to propagate intermediate results.

        shared_states: dict
            Shared states containing the necessary local information to start
            the next step of the pipeline, which is computing the size factors.
            They contain a "log_means" key and a "n_samples" key.

        round_idx: int
            The updated round

        """
        # ---- For each design factor, get the list of each center's levels ---- #
        if len(local_states) == 0:
            # In that case, there is no reference dds, and this is the first step of
            # The pipeline
            input_local_states = None
        else:
            # In this case, there was already a step before, and we need to propagate
            # the local states
            input_local_states = local_states

        local_states, shared_states, round_idx = local_step(
            local_method=self.get_local_factors,
            train_data_nodes=train_data_nodes,
            output_local_states=local_states,
            round_idx=round_idx,
            input_local_states=input_local_states,
            input_shared_state=None,
            aggregation_id=aggregation_node.organization_id,
            description="Computing local design factor levels",
            clean_models=clean_models,
        )

        # ---- For each design factor, merge the list of unique levels ---- #

        design_levels_aggregated_state, round_idx = aggregation_step(
            aggregation_method=self.merge_design_levels,
            train_data_nodes=train_data_nodes,
            aggregation_node=aggregation_node,
            input_shared_states=shared_states,
            round_idx=round_idx,
            description="Merging design levels",
            clean_models=clean_models,
        )

        # ---- Initialize design matrices in each center ---- #

        local_states, shared_states, round_idx = local_step(
            local_method=self.set_local_design,
            train_data_nodes=train_data_nodes,
            output_local_states=local_states,
            round_idx=round_idx,
            input_local_states=local_states,
            input_shared_state=design_levels_aggregated_state,
            aggregation_id=aggregation_node.organization_id,
            description="Setting local design matrices",
            clean_models=clean_models,
        )

        # ---- Merge design columns ---- #

        design_columns_aggregated_state, round_idx = aggregation_step(
            aggregation_method=self.merge_design_columns_and_build_contrast,
            train_data_nodes=train_data_nodes,
            aggregation_node=aggregation_node,
            input_shared_states=shared_states,
            round_idx=round_idx,
            description="Merge local design matrix columns",
            clean_models=clean_models,
        )

        local_states, shared_states, round_idx = local_step(
            local_method=self.order_design_cols_compute_local_log_mean,
            train_data_nodes=train_data_nodes,
            output_local_states=local_states,
            input_local_states=local_states,
            round_idx=round_idx,
            input_shared_state=design_columns_aggregated_state,
            aggregation_id=aggregation_node.organization_id,
            description="Computing local log means",
            clean_models=clean_models,
        )

        return local_states, shared_states, round_idx

build_design_matrix(train_data_nodes, aggregation_node, local_states, round_idx, clean_models)

Build the design matrix.

Parameters:

Name Type Description Default
train_data_nodes

List of TrainDataNode.

required
aggregation_node

The aggregation node.

required
local_states

Local states. Required to propagate intermediate results.

required
round_idx

The current round

required
clean_models

Whether to clean the models after the computation.

required

Returns:

Name Type Description
local_states dict

Local states. Required to propagate intermediate results.

shared_states dict

Shared states containing the necessary local information to start the next step of the pipeline, which is computing the size factors. They contain a "log_means" key and a "n_samples" key.

round_idx int

The updated round

Source code in fedpydeseq2/core/deseq2_core/build_design_matrix/build_design_matrix.py
def build_design_matrix(
    self, train_data_nodes, aggregation_node, local_states, round_idx, clean_models
):
    """Build the design matrix.

    Parameters
    ----------
    train_data_nodes: list
        List of TrainDataNode.

    aggregation_node: AggregationNode
        The aggregation node.

    local_states: dict
        Local states. Required to propagate intermediate results.

    round_idx:
        The current round

    clean_models: bool
        Whether to clean the models after the computation.

    Returns
    -------
    local_states: dict
        Local states. Required to propagate intermediate results.

    shared_states: dict
        Shared states containing the necessary local information to start
        the next step of the pipeline, which is computing the size factors.
        They contain a "log_means" key and a "n_samples" key.

    round_idx: int
        The updated round

    """
    # ---- For each design factor, get the list of each center's levels ---- #
    if len(local_states) == 0:
        # In that case, there is no reference dds, and this is the first step of
        # The pipeline
        input_local_states = None
    else:
        # In this case, there was already a step before, and we need to propagate
        # the local states
        input_local_states = local_states

    local_states, shared_states, round_idx = local_step(
        local_method=self.get_local_factors,
        train_data_nodes=train_data_nodes,
        output_local_states=local_states,
        round_idx=round_idx,
        input_local_states=input_local_states,
        input_shared_state=None,
        aggregation_id=aggregation_node.organization_id,
        description="Computing local design factor levels",
        clean_models=clean_models,
    )

    # ---- For each design factor, merge the list of unique levels ---- #

    design_levels_aggregated_state, round_idx = aggregation_step(
        aggregation_method=self.merge_design_levels,
        train_data_nodes=train_data_nodes,
        aggregation_node=aggregation_node,
        input_shared_states=shared_states,
        round_idx=round_idx,
        description="Merging design levels",
        clean_models=clean_models,
    )

    # ---- Initialize design matrices in each center ---- #

    local_states, shared_states, round_idx = local_step(
        local_method=self.set_local_design,
        train_data_nodes=train_data_nodes,
        output_local_states=local_states,
        round_idx=round_idx,
        input_local_states=local_states,
        input_shared_state=design_levels_aggregated_state,
        aggregation_id=aggregation_node.organization_id,
        description="Setting local design matrices",
        clean_models=clean_models,
    )

    # ---- Merge design columns ---- #

    design_columns_aggregated_state, round_idx = aggregation_step(
        aggregation_method=self.merge_design_columns_and_build_contrast,
        train_data_nodes=train_data_nodes,
        aggregation_node=aggregation_node,
        input_shared_states=shared_states,
        round_idx=round_idx,
        description="Merge local design matrix columns",
        clean_models=clean_models,
    )

    local_states, shared_states, round_idx = local_step(
        local_method=self.order_design_cols_compute_local_log_mean,
        train_data_nodes=train_data_nodes,
        output_local_states=local_states,
        input_local_states=local_states,
        round_idx=round_idx,
        input_shared_state=design_columns_aggregated_state,
        aggregation_id=aggregation_node.organization_id,
        description="Computing local log means",
        clean_models=clean_models,
    )

    return local_states, shared_states, round_idx

substeps

Module containing the substeps for the computation of design matrices.

This module contains all these substeps as mixin classes.

AggMergeDesignColumnsBuildContrast

Mixin to merge the columns of the design matrices and build contrast.

Source code in fedpydeseq2/core/deseq2_core/build_design_matrix/substeps.py
class AggMergeDesignColumnsBuildContrast:
    """Mixin to merge the columns of the design matrices and build contrast."""

    design_factors: list[str]
    continuous_factors: list[str] | None
    contrast: list[str] | None

    @remote
    @log_remote
    def merge_design_columns_and_build_contrast(self, shared_states):
        """Merge the columns of the design matrices and build constrasts.

        Parameters
        ----------
        shared_states : list
            List of results (dictionaries of design columns) from training nodes.

        Returns
        -------
        dict
            Shared state containing:
            - merged_columns: the names of the columns that the local design matrices
              should have.
            - contrast: the contrast (in a list of strings form) to be used for the
              DESeq2 model.
        """
        merged_columns = pd.Index([])

        for state in shared_states:
            merged_columns = merged_columns.union(state["design_columns"])

        # We now also have everything to compute the contrasts
        contrast = build_contrast(
            self.design_factors,
            merged_columns,
            self.continuous_factors,
            self.contrast,
        )

        return {"merged_columns": merged_columns, "contrast": contrast}

merge_design_columns_and_build_contrast(shared_states)

Merge the columns of the design matrices and build constrasts.

Parameters:

Name Type Description Default
shared_states list

List of results (dictionaries of design columns) from training nodes.

required

Returns:

Type Description
dict

Shared state containing: - merged_columns: the names of the columns that the local design matrices should have. - contrast: the contrast (in a list of strings form) to be used for the DESeq2 model.

Source code in fedpydeseq2/core/deseq2_core/build_design_matrix/substeps.py
@remote
@log_remote
def merge_design_columns_and_build_contrast(self, shared_states):
    """Merge the columns of the design matrices and build constrasts.

    Parameters
    ----------
    shared_states : list
        List of results (dictionaries of design columns) from training nodes.

    Returns
    -------
    dict
        Shared state containing:
        - merged_columns: the names of the columns that the local design matrices
          should have.
        - contrast: the contrast (in a list of strings form) to be used for the
          DESeq2 model.
    """
    merged_columns = pd.Index([])

    for state in shared_states:
        merged_columns = merged_columns.union(state["design_columns"])

    # We now also have everything to compute the contrasts
    contrast = build_contrast(
        self.design_factors,
        merged_columns,
        self.continuous_factors,
        self.contrast,
    )

    return {"merged_columns": merged_columns, "contrast": contrast}

AggMergeDesignLevels

Mixin to merge the levels of the design factors.

Source code in fedpydeseq2/core/deseq2_core/build_design_matrix/substeps.py
class AggMergeDesignLevels:
    """Mixin to merge the levels of the design factors."""

    categorical_factors: list[str]

    @remote
    @log_remote
    def merge_design_levels(self, shared_states):
        """Merge the levels of the design factors.

        Parameters
        ----------
        shared_states : list
            List of results (dictionaries of local_levels) from training nodes.

        Returns
        -------
        dict
            Dictionary of unique levels for each factor.
        """
        # merge levels
        merged_levels = {factor: set() for factor in self.categorical_factors}
        for factor in self.categorical_factors:
            for state in shared_states:
                merged_levels[factor] = set(state["local_levels"][factor]).union(
                    merged_levels[factor]
                )

        return {
            "merged_levels": {
                factor: np.array(list(levels))
                for factor, levels in merged_levels.items()
            }
        }

merge_design_levels(shared_states)

Merge the levels of the design factors.

Parameters:

Name Type Description Default
shared_states list

List of results (dictionaries of local_levels) from training nodes.

required

Returns:

Type Description
dict

Dictionary of unique levels for each factor.

Source code in fedpydeseq2/core/deseq2_core/build_design_matrix/substeps.py
@remote
@log_remote
def merge_design_levels(self, shared_states):
    """Merge the levels of the design factors.

    Parameters
    ----------
    shared_states : list
        List of results (dictionaries of local_levels) from training nodes.

    Returns
    -------
    dict
        Dictionary of unique levels for each factor.
    """
    # merge levels
    merged_levels = {factor: set() for factor in self.categorical_factors}
    for factor in self.categorical_factors:
        for state in shared_states:
            merged_levels[factor] = set(state["local_levels"][factor]).union(
                merged_levels[factor]
            )

    return {
        "merged_levels": {
            factor: np.array(list(levels))
            for factor, levels in merged_levels.items()
        }
    }

LocGetLocalFactors

Mixin to get the list of unique levels for each categorical design factor.

Source code in fedpydeseq2/core/deseq2_core/build_design_matrix/substeps.py
class LocGetLocalFactors:
    """Mixin to get the list of unique levels for each categorical design factor."""

    categorical_factors: list[str]

    @remote_data
    @log_remote_data
    @reconstruct_adatas
    def get_local_factors(
        self, data_from_opener, shared_state=None
    ):  # pylint: disable=unused-argument
        """Get the list of unique levels for each categorical design factor.

        Parameters
        ----------
        data_from_opener : ad.AnnData
            AnnData returned by the opener. Copied in local anndata objects.

        shared_state : None, optional
            Not used.

        Returns
        -------
        dict
            A dictionary of unique local levels for each factor.
        """
        self.local_adata = data_from_opener.copy()
        return {
            "local_levels": {
                factor: self.local_adata.obs[factor].unique()
                for factor in self.categorical_factors
            }
        }

get_local_factors(data_from_opener, shared_state=None)

Get the list of unique levels for each categorical design factor.

Parameters:

Name Type Description Default
data_from_opener AnnData

AnnData returned by the opener. Copied in local anndata objects.

required
shared_state None

Not used.

None

Returns:

Type Description
dict

A dictionary of unique local levels for each factor.

Source code in fedpydeseq2/core/deseq2_core/build_design_matrix/substeps.py
@remote_data
@log_remote_data
@reconstruct_adatas
def get_local_factors(
    self, data_from_opener, shared_state=None
):  # pylint: disable=unused-argument
    """Get the list of unique levels for each categorical design factor.

    Parameters
    ----------
    data_from_opener : ad.AnnData
        AnnData returned by the opener. Copied in local anndata objects.

    shared_state : None, optional
        Not used.

    Returns
    -------
    dict
        A dictionary of unique local levels for each factor.
    """
    self.local_adata = data_from_opener.copy()
    return {
        "local_levels": {
            factor: self.local_adata.obs[factor].unique()
            for factor in self.categorical_factors
        }
    }

LocOderDesignComputeLogMean

Mixin to order design cols and compute the local log mean.

Attributes:

Name Type Description
local_adata AnnData

The local AnnData.

Methods:

Name Description
order_design_cols_compute_local_log_mean

Order design columns and compute the local log mean.

Source code in fedpydeseq2/core/deseq2_core/build_design_matrix/substeps.py
class LocOderDesignComputeLogMean:
    """Mixin to order design cols and compute the local log mean.

    Attributes
    ----------
    local_adata : ad.AnnData
        The local AnnData.

    Methods
    -------
    order_design_cols_compute_local_log_mean
        Order design columns and compute the local log mean.

    """

    local_adata: ad.AnnData

    @remote_data
    @log_remote_data
    @reconstruct_adatas
    def order_design_cols_compute_local_log_mean(
        self, data_from_opener, shared_state=None
    ):
        """Order design columns and compute the local log mean.

        This function also sets the contrast in the local AnnData,
        and saves the number of parameters in the uns field.


        Parameters
        ----------
        data_from_opener : ad.AnnData
            AnnData returned by the opener. Not used.

        shared_state : dict
            Shared state with:
            - "merged_columns" a set containing the names of columns that the design
                matrix should have.
            - "contrast" the contrast to be used for the DESeq2 model.

        Returns
        -------
        dict
            Local mean of logs and number of samples.
        """
        #### ----Step 1: Order design columns---- ####

        self.local_adata.uns["contrast"] = shared_state["contrast"]

        for col in shared_state["merged_columns"]:
            if col not in self.local_adata.obsm["design_matrix"].columns:
                self.local_adata.obsm["design_matrix"][col] = 0

        # Reorder columns for consistency
        self.local_adata.obsm["design_matrix"] = self.local_adata.obsm["design_matrix"][
            shared_state["merged_columns"]
        ]

        # Save the number of params in an uns field for easy access
        self.local_adata.uns["n_params"] = self.local_adata.obsm["design_matrix"].shape[
            1
        ]

        #### ----Step 2: Compute local log mean---- ####

        with np.errstate(divide="ignore"):  # ignore division by zero warnings
            return {
                "log_mean": np.log(data_from_opener.X).mean(axis=0),
                "n_samples": data_from_opener.n_obs,
            }

order_design_cols_compute_local_log_mean(data_from_opener, shared_state=None)

Order design columns and compute the local log mean.

This function also sets the contrast in the local AnnData, and saves the number of parameters in the uns field.

Parameters:

Name Type Description Default
data_from_opener AnnData

AnnData returned by the opener. Not used.

required
shared_state dict

Shared state with: - "merged_columns" a set containing the names of columns that the design matrix should have. - "contrast" the contrast to be used for the DESeq2 model.

None

Returns:

Type Description
dict

Local mean of logs and number of samples.

Source code in fedpydeseq2/core/deseq2_core/build_design_matrix/substeps.py
@remote_data
@log_remote_data
@reconstruct_adatas
def order_design_cols_compute_local_log_mean(
    self, data_from_opener, shared_state=None
):
    """Order design columns and compute the local log mean.

    This function also sets the contrast in the local AnnData,
    and saves the number of parameters in the uns field.


    Parameters
    ----------
    data_from_opener : ad.AnnData
        AnnData returned by the opener. Not used.

    shared_state : dict
        Shared state with:
        - "merged_columns" a set containing the names of columns that the design
            matrix should have.
        - "contrast" the contrast to be used for the DESeq2 model.

    Returns
    -------
    dict
        Local mean of logs and number of samples.
    """
    #### ----Step 1: Order design columns---- ####

    self.local_adata.uns["contrast"] = shared_state["contrast"]

    for col in shared_state["merged_columns"]:
        if col not in self.local_adata.obsm["design_matrix"].columns:
            self.local_adata.obsm["design_matrix"][col] = 0

    # Reorder columns for consistency
    self.local_adata.obsm["design_matrix"] = self.local_adata.obsm["design_matrix"][
        shared_state["merged_columns"]
    ]

    # Save the number of params in an uns field for easy access
    self.local_adata.uns["n_params"] = self.local_adata.obsm["design_matrix"].shape[
        1
    ]

    #### ----Step 2: Compute local log mean---- ####

    with np.errstate(divide="ignore"):  # ignore division by zero warnings
        return {
            "log_mean": np.log(data_from_opener.X).mean(axis=0),
            "n_samples": data_from_opener.n_obs,
        }

LocSetLocalDesign

Mixin to set the design matrices in centers.

Source code in fedpydeseq2/core/deseq2_core/build_design_matrix/substeps.py
class LocSetLocalDesign:
    """Mixin to set the design matrices in centers."""

    local_adata: ad.AnnData
    design_factors: list[str]
    continuous_factors: list[str] | None
    ref_levels: dict[str, str] | None

    @remote_data
    @log_remote_data
    @reconstruct_adatas
    def set_local_design(
        self,
        data_from_opener,
        shared_state,
    ):
        # pylint: disable=unused-argument
        """
        Set the design matrices in centers.

        Returns their columns in order to harmonize them.

        Parameters
        ----------
        data_from_opener : ad.AnnData
            AnnData returned by the opener. Not used.

        shared_state : dict
            Shared state with a "design_columns" key containing a dictionary with, for
            each design factor, the names of its unique levels.

        Returns
        -------
        dict
            Local design columns.
        """
        self.local_adata.obsm["design_matrix"] = build_design_matrix(
            metadata=self.local_adata.obs,
            design_factors=self.design_factors,
            continuous_factors=self.continuous_factors,
            levels=shared_state["merged_levels"],
            ref_levels=self.ref_levels,
        )
        return {"design_columns": self.local_adata.obsm["design_matrix"].columns}

set_local_design(data_from_opener, shared_state)

Set the design matrices in centers.

Returns their columns in order to harmonize them.

Parameters:

Name Type Description Default
data_from_opener AnnData

AnnData returned by the opener. Not used.

required
shared_state dict

Shared state with a "design_columns" key containing a dictionary with, for each design factor, the names of its unique levels.

required

Returns:

Type Description
dict

Local design columns.

Source code in fedpydeseq2/core/deseq2_core/build_design_matrix/substeps.py
@remote_data
@log_remote_data
@reconstruct_adatas
def set_local_design(
    self,
    data_from_opener,
    shared_state,
):
    # pylint: disable=unused-argument
    """
    Set the design matrices in centers.

    Returns their columns in order to harmonize them.

    Parameters
    ----------
    data_from_opener : ad.AnnData
        AnnData returned by the opener. Not used.

    shared_state : dict
        Shared state with a "design_columns" key containing a dictionary with, for
        each design factor, the names of its unique levels.

    Returns
    -------
    dict
        Local design columns.
    """
    self.local_adata.obsm["design_matrix"] = build_design_matrix(
        metadata=self.local_adata.obs,
        design_factors=self.design_factors,
        continuous_factors=self.continuous_factors,
        levels=shared_state["merged_levels"],
        ref_levels=self.ref_levels,
    )
    return {"design_columns": self.local_adata.obsm["design_matrix"].columns}