"""
Module implementing matrix routines and associated error checking routines
for dense breeding value matrices.
"""
__all__ = [
"DenseBreedingValueMatrix",
"check_is_DenseBreedingValueMatrix",
]
import copy
from numbers import Integral
from numbers import Real
from pathlib import Path
from typing import Optional
from typing import Sequence
from typing import Union
import numpy
from numpy.typing import ArrayLike
import h5py
import pandas
from pybrops.core.error.error_type_pandas import check_is_pandas_DataFrame
from pybrops.core.error.error_type_python import check_is_array_like
from pybrops.core.error.error_type_python import check_is_bool
from pybrops.core.error.error_type_python import check_is_str
from pybrops.core.error.error_type_python import check_is_str_or_Integral
from pybrops.core.error.error_type_python import check_is_str_or_Sequence
from pybrops.core.error.error_type_numpy import check_is_ndarray
from pybrops.core.error.error_value_h5py import check_h5py_File_has_group
from pybrops.core.error.error_value_h5py import check_h5py_File_is_readable
from pybrops.core.error.error_value_h5py import check_h5py_File_is_writable
from pybrops.core.error.error_value_numpy import check_ndarray_all_gteq
from pybrops.core.error.error_value_numpy import check_ndarray_axis_len
from pybrops.core.error.error_value_numpy import check_ndarray_ndim
from pybrops.core.error.error_io_python import check_file_exists
from pybrops.core.error.error_value_python import check_is_gteq
from pybrops.core.error.error_value_python import check_len
from pybrops.core.error.error_value_python import check_str_value
from pybrops.core.mat.DenseTaxaTraitMatrix import DenseTaxaTraitMatrix
from pybrops.core.util.h5py import h5py_File_read_ndarray
from pybrops.core.util.h5py import h5py_File_read_ndarray_utf8
from pybrops.core.util.h5py import h5py_File_write_dict
from pybrops.popgen.bvmat.BreedingValueMatrix import BreedingValueMatrix
[docs]
class DenseBreedingValueMatrix(
DenseTaxaTraitMatrix,
BreedingValueMatrix,
):
"""
The DenseBreedingValueMatrix class uses a dense matrix to represent a
Multivariate Breeding Value.
Notes
-----
All elements within a BreedingValueMatrix are mean-centered and scaled to
unit variance for each trait.
.. math::
BV = \\frac{X - \\mu}{\\sigma}
Where:
- :math:`BV` is the breeding value.
- :math:`X` is the phenotype value.
- :math:`\\mu` is the mean (location) for :math:`X`.
- :math:`\\sigma` is the standard deviation (scale) for :math:`X`.
Phenotype values can be reconstituted using:
.. math::
X = \\sigma BV + \\mu
"""
########################## Special Object Methods ##########################
def __init__(
self,
mat: numpy.ndarray,
location: Union[numpy.ndarray,Real] = 0.0,
scale: Union[numpy.ndarray,Real] = 1.0,
taxa: Optional[numpy.ndarray] = None,
taxa_grp: Optional[numpy.ndarray] = None,
trait: Optional[numpy.ndarray] = None,
**kwargs: dict
) -> None:
"""
BreedingValueMatrix constructor
Parameters
----------
mat : numpy.ndarray
An array of breeding values of shape ``(n,t)``.
It is the responsibility of the user to ensure that the means and
standard deviations of this array along the ``taxa`` axis are 0 and
1, respectively, if the breeding values are with respect to the
individuals in the breeding value matrix.
location : numpy.ndarray, Real
A ``numpy.ndarray`` of shape ``(t,)`` containing breeding value
locations. If given a ``Real``, create a ``numpy.ndarray`` of shape
``(t,)`` filled with the provided value.
scale : numpy.ndarray, Real
A ``numpy.ndarray`` of shape ``(t,)`` containing breeding value
scales. If given a ``Real``, create a ``numpy.ndarray`` of shape
``(t,)`` filled with the provided value.
taxa : numpy.ndarray, None
A ``numpy.ndarray`` of shape ``(n,)`` containing taxa names.
If ``None``, do not store any taxa name information.
taxa_grp : numpy.ndarray, None
A ``numpy.ndarray`` of shape ``(n,)`` containing taxa groupings.
If ``None``, do not store any taxa group information.
trait : numpy.ndarray, None
A ``numpy.ndarray`` of shape ``(t,)`` containing trait names.
If ``None``, do not store any trait name information.
kwargs : dict
Used for cooperative inheritance. Dictionary passing unused
arguments to the parent class constructor.
"""
super(DenseBreedingValueMatrix, self).__init__(
mat = mat,
taxa = taxa,
taxa_grp = taxa_grp,
trait = trait,
**kwargs
)
# set location and scale parameters
self.location = location
self.scale = scale
#################### Matrix copying ####################
def __copy__(
self
) -> 'DenseBreedingValueMatrix':
"""
Make a shallow copy of the the matrix.
Returns
-------
out : DenseBreedingValueMatrix
A copy of the matrix.
"""
# create new object
out = self.__class__(
mat = copy.copy(self.mat),
location = copy.copy(self.location),
scale = copy.copy(self.scale),
taxa = copy.copy(self.taxa),
taxa_grp = copy.copy(self.taxa_grp),
trait = copy.copy(self.trait)
)
# copy taxa metadata
out.taxa_grp_name = copy.copy(self.taxa_grp_name)
out.taxa_grp_stix = copy.copy(self.taxa_grp_stix)
out.taxa_grp_spix = copy.copy(self.taxa_grp_spix)
out.taxa_grp_len = copy.copy(self.taxa_grp_len)
return out
def __deepcopy__(
self,
memo: dict
) -> 'DenseBreedingValueMatrix':
"""
Make a deep copy of the matrix.
Parameters
----------
memo : dict
A dictionary of objects already copied during the current copying
pass.
Returns
-------
out : DenseBreedingValueMatrix
A deep copy of the matrix.
"""
# create new object
out = self.__class__(
mat = copy.deepcopy(self.mat, memo),
location = copy.deepcopy(self.location),
scale = copy.deepcopy(self.scale),
taxa = copy.deepcopy(self.taxa, memo),
taxa_grp = copy.deepcopy(self.taxa_grp, memo),
trait = copy.deepcopy(self.trait, memo)
)
# copy taxa metadata
out.taxa_grp_name = copy.deepcopy(self.taxa_grp_name, memo)
out.taxa_grp_stix = copy.deepcopy(self.taxa_grp_stix, memo)
out.taxa_grp_spix = copy.deepcopy(self.taxa_grp_spix, memo)
out.taxa_grp_len = copy.deepcopy(self.taxa_grp_len, memo)
return out
########### Miscellaneous special functions ############
def __repr__(
self
) -> str:
"""
Return repr(self).
Returns
-------
out : str
A representation of the object.
"""
return "<{0} of shape (ntaxa = {1}, ntrait = {2}) at {3}>".format(
type(self).__name__,
self.ntaxa,
self.ntrait,
hex(id(self)),
)
############################ Object Properties #############################
################# Breeding Value Data ##################
@DenseTaxaTraitMatrix.mat.setter
def mat(self, value: numpy.ndarray) -> None:
"""Set raw matrix"""
check_is_ndarray(value, "mat")
check_ndarray_ndim(value, "mat", 2)
self._mat = value
@property
def location(self) -> numpy.ndarray:
"""Mean of the phenotype values used to calculate breeding values."""
return self._location
@location.setter
def location(self, value: Union[numpy.ndarray,Real]) -> None:
"""Set the mean of the phenotype values used to calculate breeding values"""
if isinstance(value, numpy.ndarray):
check_ndarray_ndim(value, "location", 1)
check_ndarray_axis_len(value, "location", 0, self.ntrait)
elif isinstance(value, Real):
value = numpy.repeat(value, self.ntrait)
else:
raise TypeError("variable 'location' must be of type 'numpy.ndarray' or 'Real'")
self._location = value
@property
def scale(self) -> numpy.ndarray:
"""Standard deviation of the phenotype values used to calculate breeding values."""
return self._scale
@scale.setter
def scale(self, value: Union[numpy.ndarray,Real]) -> None:
"""Set the standard deviation of the phenotype values used to calculate breeding values"""
if isinstance(value, numpy.ndarray):
check_ndarray_ndim(value, "scale", 1)
check_ndarray_axis_len(value, "scale", 0, self.ntrait)
check_ndarray_all_gteq(value, "scale", 0)
elif isinstance(value, Real):
check_is_gteq(value, "scale", 0)
value = numpy.repeat(value, self.ntrait)
else:
raise TypeError("variable 'scale' must be of type 'numpy.ndarray' or 'Real'")
self._scale = value
############################## Object Methods ##############################
#################### Matrix copying ####################
[docs]
def copy(
self
) -> 'DenseBreedingValueMatrix':
"""
Make a shallow copy of the Matrix.
Returns
-------
out : DenseMatrix
A shallow copy of the original DenseMatrix.
"""
return copy.copy(self)
[docs]
def deepcopy(
self,
memo: Optional[dict] = None
) -> 'DenseBreedingValueMatrix':
"""
Make a deep copy of the Matrix.
Parameters
----------
memo : dict
Dictionary of memo metadata.
Returns
-------
out : DenseMatrix
A deep copy of the original DenseMatrix.
"""
return copy.deepcopy(self, memo)
######### Matrix element copy-on-manipulation ##########
[docs]
def adjoin_taxa(
self,
values: Union[BreedingValueMatrix,numpy.ndarray],
taxa: Optional[numpy.ndarray] = None,
taxa_grp: Optional[numpy.ndarray] = None,
**kwargs: dict
) -> 'DenseBreedingValueMatrix':
"""
Add additional elements to the end of the TaxaMatrix along the taxa
axis. Copy-on-manipulation routine.
Parameters
----------
values : BreedingValueMatrix, numpy.ndarray
Values to be appended to append to the Matrix.
If numpy.ndarray, assumed to be unscaled.
taxa : numpy.ndarray
Taxa names to adjoin to the Matrix.
If values is a DenseBreedingValueMatrix that has a non-None
taxa field, providing this argument overwrites the field.
taxa_grp : numpy.ndarray
Taxa groups to adjoin to the Matrix.
If values is a DenseBreedingValueMatrix that has a non-None
taxa_grp field, providing this argument overwrites the field.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : DenseBreedingValueMatrix
A copy of the TaxaMatrix with values appended to the taxa axis
Note that adjoin does not occur in-place: a new Matrix is allocated
and filled.
"""
# extract mat values
if isinstance(values, self.__class__):
if taxa is None:
taxa = values.taxa
if taxa_grp is None:
taxa_grp = values.taxa_grp
# unscale values
values = values.unscale()
elif not isinstance(values, numpy.ndarray):
raise ValueError("cannot adjoin: 'values' must be of type {0} or numpy.ndarray".format(self.__class__))
# perform error checks before allocating memory
if values.ndim != self.mat_ndim:
raise ValueError("cannot adjoin: 'values' must have ndim == {0}".format(self.mat_ndim))
for i,(j,k) in enumerate(zip(values.shape, self.mat_shape)):
if (i != self.taxa_axis) and (j != k):
raise ValueError("cannot adjoin: axis lengths incompatible for axis {0}".format(i))
if (self._taxa is not None) and (taxa is None):
taxa = numpy.empty(values.shape[self.taxa_axis], dtype = "object") # fill with None
if (self._taxa_grp is not None) and (taxa_grp is None):
raise TypeError("cannot adjoin: 'taxa_grp' argument is required")
# adjoin values
values = numpy.append(self.unscale(), values, axis = self.taxa_axis)
if self._taxa is not None:
taxa = numpy.append(self.taxa, taxa, axis = 0)
if self._taxa_grp is not None:
taxa_grp = numpy.append(self.taxa_grp, taxa_grp, axis = 0)
# construct output from numpy
out = self.__class__.from_numpy(
mat = values,
taxa = taxa,
taxa_grp = taxa_grp,
trait = self.trait,
**kwargs
)
return out
[docs]
def delete_taxa(
self,
obj: Union[int,slice,Sequence],
**kwargs: dict
) -> 'DenseBreedingValueMatrix':
"""
Delete sub-arrays along the taxa axis.
Parameters
----------
obj : int, slice, or Sequence of ints
Indicate indices of sub-arrays to remove along the specified axis.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : DenseBreedingValueMatrix
A DenseBreedingValueMatrix with deleted elements. Note that concat does not occur
in-place: a new DenseBreedingValueMatrix is allocated and filled.
"""
# get values
mat = self.unscale()
taxa = self.taxa
taxa_grp = self.taxa_grp
trait = self.trait
# delete values
mat = numpy.delete(mat, obj, axis = self.taxa_axis)
if taxa is not None:
taxa = numpy.delete(taxa, obj, axis = 0)
if taxa_grp is not None:
taxa_grp = numpy.delete(taxa_grp, obj, axis = 0)
out = self.__class__.from_numpy(
mat = mat,
taxa = taxa,
taxa_grp = taxa_grp,
trait = trait,
**kwargs
)
return out
[docs]
def insert_taxa(
self,
obj: Union[int,slice,Sequence],
values: Union[BreedingValueMatrix,numpy.ndarray],
taxa: Optional[numpy.ndarray] = None,
taxa_grp: Optional[numpy.ndarray] = None,
**kwargs: dict
) -> 'DenseBreedingValueMatrix':
"""
Insert values along the taxa axis before the given indices.
Parameters
----------
obj: int, slice, or Sequence of ints
Object that defines the index or indices before which values is
inserted.
values : BreedingValueMatrix, numpy.ndarray
Values to insert into the matrix.
taxa : numpy.ndarray
Taxa names to insert into the Matrix.
taxa_grp : numpy.ndarray
Taxa groups to insert into the Matrix.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : DenseBreedingValueMatrix
A DenseBreedingValueMatrix with values inserted. Note that insert does not occur
in-place: a new DenseBreedingValueMatrix is allocated and filled.
"""
# extract mat values
if isinstance(values, self.__class__):
if taxa is None:
taxa = values.taxa
if taxa_grp is None:
taxa_grp = values.taxa_grp
values = values.unscale()
elif not isinstance(values, numpy.ndarray):
raise ValueError("'values' must be of type {0} or numpy.ndarray".format(self.__class__))
# perform error checks before allocating memory
if values.ndim != self.mat_ndim:
raise ValueError("cannot insert: 'values' must have ndim == {0}".format(self.mat_ndim))
for i,(j,k) in enumerate(zip(values.shape, self.mat_shape)):
if (i != self.taxa_axis) and (j != k):
raise ValueError("cannot insert: axis lengths incompatible for axis {0}".format(i))
if (self._taxa is not None) and (taxa is None):
taxa = numpy.empty(values.shape[self.taxa_axis], dtype = "object") # fill with None
if (self._taxa_grp is not None) and (taxa_grp is None):
raise TypeError("cannot insert: 'taxa_grp' argument is required")
# insert values
values = numpy.insert(self.unscale(), obj, values, axis = self.taxa_axis)
if self._taxa is not None:
taxa = numpy.insert(self._taxa, obj, taxa, axis = 0)
if self._taxa_grp is not None:
taxa_grp = numpy.insert(self._taxa_grp, obj, taxa_grp, axis = 0)
# create output
out = self.__class__.from_numpy(
mat = values,
taxa = taxa,
taxa_grp = taxa_grp,
trait = self.trait,
**kwargs
)
return out
[docs]
def select_taxa(
self,
indices: ArrayLike,
**kwargs: dict
) -> 'DenseBreedingValueMatrix':
"""
Select certain values from the Matrix along the taxa axis.
Selection re-centers and re-scales breeding values to mean zero and unit variance.
Parameters
----------
indices : array_like (Nj, ...)
The indices of the values to select.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : Matrix
The output Matrix with values selected. Note that select does not
occur in-place: a new Matrix is allocated and filled.
"""
# check for array_like
check_is_array_like(indices, "indices")
# get unscaled values
mat = self.unscale()
# get taxa, taxa group, trait labels
taxa = self.taxa
taxa_grp = self.taxa_grp
trait = self.trait
# select values
mat = numpy.take(mat, indices, axis = self.taxa_axis)
if taxa is not None:
taxa = numpy.take(taxa, indices, axis = 0)
if taxa_grp is not None:
taxa_grp = numpy.take(taxa_grp, indices, axis = 0)
# construct output from numpy, which conducts centering, scaling, etc.
out = self.__class__.from_numpy(
mat = mat,
taxa = taxa,
taxa_grp = taxa_grp,
trait = trait,
**kwargs
)
return out
############## Matrix summary statistics ###############
[docs]
def targmax(self) -> numpy.ndarray:
"""
Return indices of the maximum values for each trait column (along the taxa axis).
Returns
-------
out : numpy.ndarray
An index array of shape ``(t,)`` containing indices of maximum
values along the taxa axis.
Where:
- ``t`` is the number of traits.
"""
out = self._mat.argmax(axis = self.taxa_axis) # get argument maximum
return out
[docs]
def targmin(self) -> numpy.ndarray:
"""
Return indices of the minimum values for each trait column (along the taxa axis).
Returns
-------
out : numpy.ndarray
An index array of shape ``(t,)`` containing indices of minimum
values along the taxa axis.
Where:
- ``t`` is the number of traits.
"""
out = self._mat.argmin(axis = self.taxa_axis) # get argument minimum
return out
[docs]
def tmax(self, unscale: bool = False) -> numpy.ndarray:
"""
Return the maximum for each trait column (along the taxa axis).
Parameters
----------
unscale : bool, default = False
Whether to transform results to their unscaled values.
Returns
-------
out : numpy.ndarray
An array of shape ``(t,)`` containing maximum values along the taxa
axis.
Where:
- ``t`` is the number of traits.
"""
out = self._mat.max(axis = self.taxa_axis) # get maximum
if unscale:
out *= self._scale
out += self._location
return out
[docs]
def tmean(self, unscale: bool = False) -> numpy.ndarray:
"""
Return the mean for each trait column (along the taxa axis).
Parameters
----------
unscale : bool, default = False
Whether to transform results to their unscaled values.
Returns
-------
out : numpy.ndarray
An array of shape ``(t,)`` containing maximum values along the taxa
axis.
Where:
- ``t`` is the number of traits.
"""
out = self._location if unscale else self._mat.mean(axis = self.taxa_axis) # get mean
return out
[docs]
def tmin(self, unscale: bool = False) -> numpy.ndarray:
"""
Return the minimum for each trait column (along the taxa axis).
Parameters
----------
unscale : bool, default = False
Whether to transform results to their unscaled values.
Returns
-------
out : numpy.ndarray
An index array of shape ``(t,)`` containing minimum values along the
taxa axis.
Where:
- ``t`` is the number of traits.
"""
out = self._mat.min(axis = self.taxa_axis) # get minimum
if unscale:
out *= self._scale
out += self._location
return out
[docs]
def trange(self, unscale: bool = False) -> numpy.ndarray:
"""
Return the range for each trait column (along the taxa axis).
Parameters
----------
unscale : bool, default = False
Whether to transform results to their unscaled values.
Returns
-------
out : numpy.ndarray
An array of shape ``(t,)`` containing range values along the taxa
axis.
Where:
- ``t`` is the number of traits.
"""
out = numpy.ptp(self._mat, axis = self.taxa_axis) # get range
if unscale:
out *= self._scale
return out
[docs]
def tstd(self, unscale: bool = False) -> numpy.ndarray:
"""
Return the standard deviation for each trait column (along the taxa axis).
Parameters
----------
unscale : bool, default = False
whether to transform results to their unscaled values.
Returns
-------
out : numpy.ndarray
An array of shape ``(t,)`` containing standard deviation values
along the taxa axis.
Where:
- ``t`` is the number of traits.
"""
out = self._scale if unscale else self._mat.std(axis = self.taxa_axis) # get standard deviation
return out
[docs]
def tvar(self, unscale: bool = False) -> numpy.ndarray:
"""
Return the variance for each trait column (along the taxa axis).
Parameters
----------
unscale : bool, default = False
whether to transform results to their unscaled values.
Returns
-------
out : numpy.ndarray
An array of shape ``(t,)`` containing variance values along the taxa
axis.
Where:
- ``t`` is the number of traits.
"""
out = self._scale**2 if unscale else self._mat.var(axis = self.taxa_axis) # get variance
return out
[docs]
def unscale(self) -> numpy.ndarray:
"""
Transform values within the BreedingValueMatrix back to their unscaled
and de-centered values
Returns
-------
out : numpy.ndarray
An array of shape ``(n,t)`` containing unscaled and de-centered
values.
Where:
- ``n`` is the number of taxa.
- ``t`` is the number of traits.
"""
return (self._scale * self._mat) + self._location
################### Matrix File I/O ####################
[docs]
def to_pandas(
self,
taxa_col: Optional[str] = "taxa",
taxa_grp_col: Optional[str] = "taxa_grp",
trait_cols: Optional[Union[str,Sequence]] = "all",
unscale: bool = False,
**kwargs: dict
) -> pandas.DataFrame:
"""
Export a DenseBreedingValueMatrix to a pandas.DataFrame.
Parameters
----------
taxa_col : str, None, default = "taxa"
Name of the column to which to write taxa names.
If ``str``, the column is given the name in ``taxa_col``.
If ``None``, the column is not exported.
taxa_grp_col : str, None, default = "taxa_grp"
Name of the column to which to write taxa group names.
If ``str``, the column is given the name in ``taxa_grp_col``.
If ``None``, the column is not exported.
trait_cols : Sequence, str, None, default = "trait"
Names of the trait columns to which to write breeding values.
If ``Sequence``, column names are given by the strings in the
``trait_cols`` Sequence.
If ``str``, must be equal to ``"all"``. Use all trait names given
in the ``trait`` property.
If ``None``, use numeric trait column names.
unscale : bool, default = False
whether to transform breeding values to their unscaled values.
kwargs : dict
Additional keyword arguments to use for dictating export to a
pandas.DataFrame.
Returns
-------
out : pandas.DataFrame
An output dataframe.
"""
# type checks
if taxa_col is not None:
check_is_str(taxa_col, "taxa_col")
if taxa_grp_col is not None:
check_is_str(taxa_grp_col, "taxa_grp_col")
if trait_cols is not None:
if isinstance(trait_cols, str):
check_str_value(trait_cols, "trait_cols", "all")
elif isinstance(trait_cols, Sequence):
check_len(trait_cols, "trait_cols", self.ntrait)
else:
check_is_str_or_Sequence(trait_cols, "trait_cols")
check_is_bool(unscale, "unscale")
# construct dictionary for labels and data
data_dict = {}
# process taxa_col
if taxa_col is not None:
data_dict[taxa_col] = self.taxa
# process taxa_grp_col
if taxa_grp_col is not None:
data_dict[taxa_grp_col] = self.taxa_grp
# process trait_cols
if trait_cols is None:
trait_cols = numpy.arange(self.ntrait)
elif isinstance(trait_cols, str):
trait_cols = numpy.arange(self.ntrait) if self.trait is None else self.trait
# extract breeding values
bv = self.unscale() if unscale else self.mat
for i,trait in zip(range(self.ntrait),trait_cols):
data_dict[trait] = bv[:,i]
# create dataframe
out = pandas.DataFrame(data_dict)
return out
[docs]
def to_csv(
self,
filename: str,
taxa_col: Optional[str] = "taxa",
taxa_grp_col: Optional[str] = "taxa_grp",
trait_cols: Optional[Union[str,Sequence]] = "all",
unscale: bool = False,
sep: str = ',',
header: bool = True,
index: bool = False,
**kwargs: dict
) -> None:
"""
Write a DenseBreedingValueMatrix to a CSV file.
Parameters
----------
filename : str
CSV file name to which to write.
taxa_col : str, None, default = "taxa"
Name of the column to which to write taxa names.
If ``str``, the column is given the name in ``taxa_col``.
If ``None``, the column is not exported.
taxa_grp_col : str, None, default = "taxa_grp"
Name of the column to which to write taxa group names.
If ``str``, the column is given the name in ``taxa_grp_col``.
If ``None``, the column is not exported.
trait_cols : Sequence, str, None, default = "all"
Names of the trait columns to which to write breeding values.
If ``Sequence``, column names are given by the strings in the
``trait_cols`` Sequence.
If ``str``, must be equal to ``"all"``. Use trait names given in
the ``trait`` property.
If ``None``, use numeric trait column names.
unscale : bool, default = False
whether to transform breeding values to their unscaled values.
sep : str, default = ","
Separator to use in the exported CSV file.
header : bool, default = True
Whether to save header names.
index : bool, default = False
Whether to save a row index in the exported CSV file.
kwargs : dict
Additional keyword arguments to use for dictating export to a CSV.
"""
# convert DenseBreedingValueMatrix to pandas.DataFrame
df = self.to_pandas(
taxa_col = taxa_col,
taxa_grp_col = taxa_grp_col,
trait_cols = trait_cols,
unscale = unscale,
)
# export using pandas
df.to_csv(
path_or_buf = filename,
sep = sep,
header = header,
index = index,
**kwargs
)
[docs]
def to_hdf5(
self,
filename: Union[str,Path,h5py.File],
groupname: Optional[str] = None,
overwrite: bool = True,
) -> None:
"""
Write ``DenseBreedingValueMatrix`` to an HDF5 file.
Parameters
----------
filename : str, Path, h5py.File
If ``str``, an HDF5 file name to which to write. File is closed after writing.
If ``h5py.File``, an opened HDF5 file to which to write. File is not closed after writing.
groupname : str, None
If ``str``, an HDF5 group name under which ``DenseBreedingValueMatrix`` data is stored.
If ``None``, ``DenseBreedingValueMatrix`` is written to the base HDF5 group.
overwrite : bool
Whether to overwrite values in an HDF5 file if a field already exists.
"""
########################################################
############ process ``filename`` argument #############
# HDF5 file object
h5file = None
# if we have a string or Path, open HDF5 file in append (``r+``) mode
if isinstance(filename, (str,Path)):
h5file = h5py.File(filename, "a")
# elif we have an h5py.File, make sure mode is writable, and copy pointer
elif isinstance(filename, h5py.File):
check_h5py_File_is_writable(filename)
h5file = filename
# else raise TypeError
else:
raise TypeError(
"``filename`` must be of type ``str``, ``Path``, or ``h5py.File`` but received type ``{0}``".format(
type(filename).__name__
)
)
########################################################
############ process ``groupname`` argument ############
# if we have a string
if isinstance(groupname, str):
# if last character in string is not '/', add '/' to end of string
if groupname[-1] != '/':
groupname += '/'
# else if ``groupname`` is None, set ``groupname`` to empty string
elif groupname is None:
groupname = ""
# else raise error
else:
raise TypeError(
"``groupname`` must be of type ``str`` or ``None`` but received type ``{0}``".format(
type(groupname).__name__
)
)
########################################################
#### write data to HDF5 file and (optionally) close ####
# data dictionary
data = {
"mat" : self.mat,
"location" : self.location,
"scale" : self.scale,
"taxa" : self.taxa,
"taxa_grp" : self.taxa_grp,
"trait" : self.trait,
# metadata
"taxa_grp_name" : self.taxa_grp_name,
"taxa_grp_stix" : self.taxa_grp_stix,
"taxa_grp_spix" : self.taxa_grp_spix,
"taxa_grp_len" : self.taxa_grp_len,
}
# save data
h5py_File_write_dict(h5file, groupname, data, overwrite)
# close the file, only if the provided filename was a string or Path and not a h5py.File.
if isinstance(filename, (str,Path)):
h5file.close()
############################## Class Methods ###############################
################### Matrix File I/O ####################
[docs]
@classmethod
def from_numpy(
cls,
mat: numpy.ndarray,
taxa: Optional[numpy.ndarray] = None,
taxa_grp: Optional[numpy.ndarray] = None,
trait: Optional[numpy.ndarray] = None,
**kwargs: dict
) -> 'DenseBreedingValueMatrix':
"""
Construct a DenseBreedingValueMatrix from a numpy.ndarray.
Calculates mean-centering and scaling to unit variance.
Parameters
----------
mat : numpy.ndarray
A ``float64`` matrix of shape ``(n,t)``.
Where:
- ``n`` is the number of taxa.
- ``t`` is the number of traits.
taxa : numpy.ndarray
An array of taxa names.
taxa_grp : numpy.ndarray
An array of taxa groups.
trait : numpy.ndarray
An array of trait names.
Returns
-------
out : DenseBreedingValueMatrix
Output breeding value matrix.
"""
# check inputs
check_ndarray_ndim(mat, "mat", 2)
# calculate location parameters
# (n,t) -> (t,)
location = numpy.nanmean(mat, axis = 0)
# calculate scale parameters
# (n,t) -> (t,)
scale = numpy.nanstd(mat, axis = 0)
# if scale == 0.0, set to 1.0 (do not scale)
scale[scale == 0.0] = 1.0
# mean center and scale values
# scalar / (1,t) -> (1,t)
# (1,t) * ( (n,t) - (1,t) ) -> (n,t)
# multiply since multiplication is faster than division for floating points
mat = (1.0 / scale[None,:]) * (mat - location[None,:])
# construct output
out = cls(
mat = mat,
location = location,
scale = scale,
taxa = taxa,
taxa_grp = taxa_grp,
trait = trait,
**kwargs
)
return out
[docs]
@classmethod
def from_pandas(
cls,
df: pandas.DataFrame,
location: Union[numpy.ndarray,Real] = 0.0,
scale: Union[numpy.ndarray,Real] = 1.0,
taxa_col: Optional[Union[str,Integral]] = "taxa",
taxa_grp_col: Optional[Union[str,Integral]] = "taxa_grp",
trait_cols: Optional[Union[str,Sequence]] = "infer",
**kwargs: dict
) -> 'DenseBreedingValueMatrix':
"""
Read a DenseBreedingValueMatrix from a pandas.DataFrame.
Parameters
----------
df : pandas.DataFrame
Pandas dataframe from which to read.
location : numpy.ndarray, Real, default = 0.0
A ``numpy.ndarray`` of shape ``(t,)`` containing breeding value
locations. If given a ``Real``, create a ``numpy.ndarray`` of shape
``(t,)`` filled with the provided value.
scale : numpy.ndarray, Real, default = 1.0
A ``numpy.ndarray`` of shape ``(t,)`` containing breeding value
scales. If given a ``Real``, create a ``numpy.ndarray`` of shape
``(t,)`` filled with the provided value.
taxa_col : str, Integral, None, default = "taxa"
Name of the column from which to read taxa names.
If of type ``str``, taxa names are read from the column named
defined by ``taxa_col``.
If of type ``Integral``, taxa names are read from the column
number defined by ``taxa_col``.
If ``None``, taxa names are not imported.
taxa_grp_col : str, None, default = "taxa_grp"
Name of the column to which to read taxa group names.
If of type ``str``, taxa group names are read from the column named
defined by ``taxa_col``.
If of type ``Integral``, taxa group names are read from the column
number defined by ``taxa_col``.
If ``None``, taxa group names are not imported.
trait_cols : Sequence, str, None, default = "trait"
Names of the trait columns to which to read breeding values.
If ``Sequence``, column names are given by the strings or integers
in the ``trait_cols`` Sequence.
If ``str``, must be equal to ``"infer"``. Use remaining columns in
the input dataframe to load trait breeding values.
If ``None``, do not load any trait breeding values.
kwargs : dict
Additional keyword arguments to use for dictating importing from a
pandas.DataFrame.
Returns
-------
out : DenseBreedingValueMatrix
A DenseBreedingValueMatrix read from a pandas.DataFrame.
"""
# type checks
check_is_pandas_DataFrame(df, "df")
if taxa_col is not None:
check_is_str_or_Integral(taxa_col, "taxa_col")
if taxa_grp_col is not None:
check_is_str_or_Integral(taxa_grp_col, "taxa_grp_col")
if trait_cols is not None:
if isinstance(trait_cols, str):
check_str_value(trait_cols, "trait_cols", "infer")
elif isinstance(trait_cols, Sequence):
pass
else:
check_is_str_or_Sequence(trait_cols, "trait_cols")
### extract data from dataframe
colmask = numpy.full(len(df.columns), True, dtype = bool)
# extract taxa data
taxa = None
if taxa_col is not None:
taxaix = df.columns.get_loc(taxa_col) if isinstance(taxa_col, str) else taxa_col
taxa = df.iloc[:,taxaix].to_numpy(dtype = object)
colmask[taxaix] = False
# extract taxa group data
taxa_grp = None
if taxa_grp_col is not None:
taxagrpix = df.columns.get_loc(taxa_grp_col) if isinstance(taxa_grp_col, str) else taxa_grp_col
taxa_grp = df.iloc[:,taxagrpix].to_numpy(dtype = int)
colmask[taxagrpix] = False
# for non-string Sequence, re-construct column mask
if isinstance(trait_cols, Sequence) and not isinstance(trait_cols, str):
colmask[:] = False
for trait_col in trait_cols:
traitcolix = df.columns.get_loc(trait_col) if isinstance(trait_col, str) else trait_col
colmask[traitcolix] = True
# extract trait and matrix data
trait = df.columns[colmask].to_numpy(dtype = object)
mat = df.iloc[:,colmask].to_numpy(dtype = float)
# construct output from numpy
out = cls.from_numpy(
mat = mat,
taxa = taxa,
taxa_grp = taxa_grp,
trait = trait,
**kwargs
)
return out
[docs]
@classmethod
def from_csv(
cls,
filename: str,
location: Union[numpy.ndarray,Real] = 0.0,
scale: Union[numpy.ndarray,Real] = 1.0,
taxa_col: Optional[Union[str,Integral]] = "taxa",
taxa_grp_col: Optional[Union[str,Integral]] = "taxa_grp",
trait_cols: Optional[Union[str,Sequence]] = "infer",
sep: str = ',',
header: int = 0,
**kwargs: dict
) -> 'DenseBreedingValueMatrix':
"""
Read a DenseBreedingValueMatrix from a CSV file.
Parameters
----------
filename : str
CSV file name from which to read.
sep : str, default = ','
CSV delimiter to use.
header : int, list of int, default=0
Row number(s) to use as the column names, and the start of the data.
location : numpy.ndarray, Real, default = 0.0
A ``numpy.ndarray`` of shape ``(t,)`` containing breeding value
locations. If given a ``Real``, create a ``numpy.ndarray`` of shape
``(t,)`` filled with the provided value.
scale : numpy.ndarray, Real, default = 1.0
A ``numpy.ndarray`` of shape ``(t,)`` containing breeding value
scales. If given a ``Real``, create a ``numpy.ndarray`` of shape
``(t,)`` filled with the provided value.
taxa_col : str, Integral, None, default = "taxa"
Name of the column from which to read taxa names.
If of type ``str``, taxa names are read from the column named
defined by ``taxa_col``.
If of type ``Integral``, taxa names are read from the column
number defined by ``taxa_col``.
If ``None``, taxa names are not imported.
taxa_grp_col : str, None, default = "taxa_grp"
Name of the column from which to read taxa group names.
If of type ``str``, taxa group names are read from the column named
defined by ``taxa_col``.
If of type ``Integral``, taxa group names are read from the column
number defined by ``taxa_col``.
If ``None``, taxa group names are not imported.
trait_cols : Sequence, str, None, default = "trait"
Names of the trait columns to which to read breeding values.
If ``Sequence``, column names are given by the strings or integers
in the ``trait_cols`` Sequence.
If ``str``, must be equal to ``"infer"``. Use remaining columns in
the input dataframe to load trait breeding values.
If ``None``, do not load any trait breeding values.
kwargs : dict
Additional keyword arguments to use for dictating importing from a CSV.
Returns
-------
out : DenseBreedingValueMatrix
A DenseBreedingValueMatrix read from a CSV file.
"""
# read file using pandas
df = pandas.read_csv(
filepath_or_buffer = filename,
sep = sep,
header = header,
**kwargs
)
# construct genetic map from pandas.DataFrame
out = cls.from_pandas(
df = df,
location = location,
scale = scale,
taxa_col = taxa_col,
taxa_grp_col = taxa_grp_col,
trait_cols = trait_cols,
**kwargs
)
return out
[docs]
@classmethod
def from_hdf5(
cls,
filename: Union[str,Path,h5py.File],
groupname: Optional[str] = None
) -> 'DenseBreedingValueMatrix':
"""
Read ``DenseBreedingValueMatrix`` from an HDF5 file.
Parameters
----------
filename : str, Path, h5py.File
If ``str`` or ``Path``, an HDF5 file name from which to read. File is closed after reading.
If ``h5py.File``, an opened HDF5 file from which to read. File is not closed after reading.
groupname : str, None
If ``str``, an HDF5 group name under which ``DenseBreedingValueMatrix`` data is stored.
If ``None``, ``DenseBreedingValueMatrix`` is read from base HDF5 group.
Returns
-------
gmat : DenseBreedingValueMatrix
A ``DenseBreedingValueMatrix`` read from file.
"""
########################################################
############ process ``filename`` argument #############
# HDF5 file object
h5file = None
# if we have a string or Path, open HDF5 file in read (``r``) mode
if isinstance(filename, (str,Path)):
check_file_exists(filename)
h5file = h5py.File(filename, "r")
# elif we have an ``h5py.File``, make sure mode is in at least ``r`` mode, and copy pointer
elif isinstance(filename, h5py.File):
check_h5py_File_is_readable(filename)
h5file = filename
# else raise TypeError
else:
raise TypeError(
"``filename`` must be of type ``str``, ``Path``, or ``h5py.File`` but received type ``{0}``".format(
type(filename).__name__
)
)
########################################################
############ process ``groupname`` argument ############
# if we have a string
if isinstance(groupname, str):
# FIXME: errors if groupname == "" or "/"
# if the group does not exist in the file, close and raise error
check_h5py_File_has_group(h5file, groupname)
# if last character in string is not '/', add '/' to end of string
if groupname[-1] != '/':
groupname += '/'
# else if ``groupname`` is None, set ``groupname`` to empty string
elif groupname is None:
groupname = ""
# else raise error
else:
raise TypeError(
"``groupname`` must be of type ``str`` or ``None`` but received type ``{0}``".format(
type(groupname).__name__
)
)
########################################################
######## check that we have all required fields ########
# all required arguments
required_fields = ["mat", "location", "scale"]
# for each required field, check if the field exists in the HDF5 file.
for field in required_fields:
check_h5py_File_has_group(h5file, groupname + field)
########################################################
### read data from HDF5 file and (optionally) close ####
# output dictionary
data = {
"mat" : None,
"location" : None,
"scale" : None,
"taxa" : None,
"taxa_grp" : None,
"trait" : None,
# metadata
"taxa_grp_name" : None,
"taxa_grp_stix" : None,
"taxa_grp_spix" : None,
"taxa_grp_len" : None,
}
##################################
### read mandatory data fields ###
# read mat array (ndarray dtype = any)
data["mat"] = h5py_File_read_ndarray(h5file, groupname + "mat")
# read location array (ndarray dtype = any)
data["location"] = h5py_File_read_ndarray(h5file, groupname + "location")
# read scale array (ndarray dtype = any)
data["scale"] = h5py_File_read_ndarray(h5file, groupname + "scale")
#################################
### read optional data fields ###
# read taxa array (ndarray dtype = unicode / object)
if groupname + "taxa" in h5file:
data["taxa"] = h5py_File_read_ndarray_utf8(h5file, groupname + "taxa")
# read taxa_grp array (ndarray dtype = any)
if groupname + "taxa_grp" in h5file:
data["taxa_grp"] = h5py_File_read_ndarray(h5file, groupname + "taxa_grp")
# read trait array (ndarray dtype = unicode / object)
if groupname + "trait" in h5file:
data["trait"] = h5py_File_read_ndarray_utf8(h5file, groupname + "trait")
#####################################
### read optional metadata fields ###
# read taxa_grp_name array (ndarray dtype = any)
if groupname + "taxa_grp_name" in h5file:
data["taxa_grp_name"] = h5py_File_read_ndarray(h5file, groupname + "taxa_grp_name")
# read taxa_grp_stix array (ndarray dtype = any)
if groupname + "taxa_grp_stix" in h5file:
data["taxa_grp_stix"] = h5py_File_read_ndarray(h5file, groupname + "taxa_grp_stix")
# read taxa_grp_spix array (ndarray dtype = any)
if groupname + "taxa_grp_spix" in h5file:
data["taxa_grp_spix"] = h5py_File_read_ndarray(h5file, groupname + "taxa_grp_spix")
# read taxa_grp_len array (ndarray dtype = any)
if groupname + "taxa_grp_len" in h5file:
data["taxa_grp_len"] = h5py_File_read_ndarray(h5file, groupname + "taxa_grp_len")
######################
### close the file ###
# close the file, only if the provided fieldname was a string or Path an not an h5py.File.
if isinstance(filename, (str,Path)):
h5file.close()
########################################################
################### Object creation ####################
# create object from read data
out = cls(
mat = data["mat"],
location = data["location"],
scale = data["scale"],
taxa = data["taxa"],
taxa_grp = data["taxa_grp"],
trait = data["trait"],
)
# copy metadata
out.taxa_grp_name = data["taxa_grp_name"]
out.taxa_grp_stix = data["taxa_grp_stix"]
out.taxa_grp_spix = data["taxa_grp_spix"]
out.taxa_grp_len = data["taxa_grp_len"]
return out
########################################################
############ process ``groupname`` argument ############
# if we have a string
if isinstance(groupname, str):
# if last character in string is not '/', add '/' to end of string
if groupname[-1] != '/':
groupname += '/'
# else if ``groupname`` is None, set ``groupname`` to empty string
elif groupname is None:
groupname = ""
# else raise error
else:
raise TypeError(
"``groupname`` must be of type ``str`` or ``None`` but received type ``{0}``".format(
type(groupname).__name__
)
)
########################################################
############ process ``filename`` argument #############
# HDF5 file object
h5file = None
# if we have a string or Path, open HDF5 file in append (``r``) mode
if isinstance(filename, (str,Path)):
check_file_exists(filename)
h5file = h5py.File(filename, "r")
# elif we have an h5py.File, make sure mode is in at least ``r`` mode, and copy pointer
elif isinstance(filename, h5py.File):
check_h5py_File_is_readable(filename)
h5file = filename
# else raise TypeError
else:
raise TypeError(
"``filename`` must be of type ``str``, ``Path``, or ``h5py.File`` but received type ``{0}``".format(
type(filename).__name__
)
)
########################################################
######## check that we have all required fields ########
# all required arguments
required_fields = ["mat", "location", "scale"] # all required arguments
for field in required_fields: # for each required field
fieldname = groupname + field # concatenate base groupname and field
check_h5py_File_has_group(h5file, fieldname) # check that group exists
######################################################### read data
data_dict = { # output dictionary
"mat": None,
"location": None,
"scale": None,
"taxa": None,
"taxa_grp": None,
"trait": None
}
for field in data_dict.keys(): # for each field
fieldname = groupname + field # concatenate base groupname and field
if fieldname in h5file: # if the field exists in the HDF5 file
data_dict[field] = h5file[fieldname][:] # read array
######################################################### read conclusion
h5file.close() # close file
data_dict["taxa"] = numpy.array( # convert taxa strings from byte to utf-8
[s.decode("utf-8") for s in data_dict["taxa"]],
dtype = object
)
data_dict["trait"] = numpy.array( # convert trait string from byte to utf-8
[s.decode("utf-8") for s in data_dict["trait"]],
dtype = object
)
######################################################### create object
gmat = cls(**data_dict) # create object from read data
return gmat
################################## Utilities ###################################
[docs]
def check_is_DenseBreedingValueMatrix(v: object, vname: str) -> None:
"""
Check if object is of type DenseBreedingValueMatrix. Otherwise raise TypeError.
Parameters
----------
v : object
Any Python object to test.
vname : str
Name of variable to print in TypeError message.
"""
if not isinstance(v, DenseBreedingValueMatrix):
raise TypeError("variable '{0}' must be a DenseBreedingValueMatrix".format(vname))