"""
Module implementing a dense phased matrix and associated error checking routines.
"""
__all__ = [
"DensePhasedMatrix",
"check_is_DensePhasedMatrix",
]
import copy
import numpy
from typing import Optional
from typing import Sequence
from typing import Union
from numpy.typing import ArrayLike
from pybrops.core.error.error_type_python import check_is_array_like
from pybrops.core.error.error_attr_python import check_is_iterable
from pybrops.core.error.error_attr_python import error_readonly
from pybrops.core.error.error_generic_python import generic_check_isinstance
from pybrops.core.mat.Matrix import Matrix
from pybrops.core.util.array import get_axis
from pybrops.core.mat.DenseMutableMatrix import DenseMutableMatrix
from pybrops.core.mat.PhasedMatrix import PhasedMatrix
[docs]
class DensePhasedMatrix(
DenseMutableMatrix,
PhasedMatrix,
):
"""
A concrete class implementing dense phased matrices.
A phased matrix is defined as a matrix with a third dimension.
Dense phased matrices utilize numpy.ndarray's for data storage.
The purpose of this concrete class is to implement base functionality for:
1) Dense matrix phase manipulation routines.
"""
########################## Special Object Methods ##########################
def __init__(
self,
mat: numpy.ndarray,
**kwargs: dict
) -> None:
"""
Constructor for the concrete class DensePhasedMatrix.
Parameters
----------
mat : numpy.ndarray
Matrix used to construct the object.
kwargs : dict
Additional keyword arguments.
"""
super(DensePhasedMatrix, self).__init__(
mat = mat,
**kwargs
)
#################### Matrix copying ####################
def __copy__(
self
) -> 'DensePhasedMatrix':
"""
Make a shallow copy of the the matrix.
Returns
-------
out : Matrix
"""
# create new object
out = self.__class__(
mat = copy.copy(self.mat),
)
return out
def __deepcopy__(
self,
memo: Optional[dict] = None
) -> 'DensePhasedMatrix':
"""
Make a deep copy of the matrix.
Parameters
----------
memo : dict
Dictionary of memo metadata.
Returns
-------
out : Matrix
"""
# create new object
out = self.__class__(
mat = copy.deepcopy(self.mat, memo),
)
return out
############################ Object Properties #############################
############## Phase Metadata Properites ###############
@property
def nphase(self) -> int:
"""Number of chromosome phases represented by the matrix."""
return self._mat.shape[self.phase_axis]
@property
def phase_axis(self) -> int:
"""Axis along which phases are stored."""
return 0
############################## Object Methods ##############################
######### Matrix element copy-on-manipulation ##########
[docs]
def adjoin(
self,
values: Union[Matrix,numpy.ndarray],
axis: int = -1,
**kwargs: dict
) -> 'DensePhasedMatrix':
"""
Add additional elements to the end of the DensePhasedMatrix along an axis.
Parameters
----------
values : DensePhasedMatrix, numpy.ndarray
Values are appended to append to the Matrix.
axis : int
The axis along which values are adjoined.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : DensePhasedMatrix
A copy of mat with values appended to axis. Note that adjoin does
not occur in-place: a new DensePhasedMatrix is allocated and filled.
"""
axis = get_axis(axis, self.mat_ndim) # get axis
out = None # declare variable
# dispatch functions to handle operations
if axis == self.phase_axis:
out = self.adjoin_phase(
values = values,
**kwargs
)
else:
raise ValueError("cannot append along axis {0}".format(axis))
return out
[docs]
def adjoin_phase(
self,
values: Union[Matrix,numpy.ndarray],
**kwargs: dict
) -> 'DensePhasedMatrix':
"""
Adjoin values along the phase axis.
Parameters
----------
values : Matrix or numpy.ndarray
Values to adjoin along the phase axis.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : DensePhasedMatrix
A copy of the DensePhasedMatrix with values adjoined along the phase axis.
Note that adjoin does not occur in-place: a new DensePhasedMatrix is allocated and filled.
"""
# extract mat values
if isinstance(values, self.__class__):
values = values.mat
elif not isinstance(values, numpy.ndarray):
raise ValueError("cannot adjoin: 'values' must be of type {0} or numpy.ndarray".format(self.__class__))
# perform error checks before allocating memory
if values.ndim != self.mat_ndim:
raise ValueError("cannot adjoin: 'values' must have ndim == {0}".format(self.mat_ndim))
for i,(j,k) in enumerate(zip(values.shape, self.mat_shape)):
if (i != self.phase_axis) and (j != k):
raise ValueError("cannot adjoin: axis lengths incompatible for axis {0}".format(i))
# adjoin values
values = numpy.append(self._mat, values, axis = self.phase_axis)
out = self.__class__(
mat = values,
**kwargs
)
return out
[docs]
def delete(
self,
obj: Union[int,slice,Sequence],
axis: int = -1,
**kwargs: dict
) -> 'DensePhasedMatrix':
"""
Delete sub-arrays along an axis.
Parameters
----------
obj : int, slice, or Sequence of ints
Indicate indices of sub-arrays to remove along the specified axis.
axis: int
The axis along which to delete the subarray defined by obj.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : DensePhasedMatrix
A DensePhasedMatrix with deleted elements. Note that concat does not occur
in-place: a new DensePhasedMatrix is allocated and filled.
"""
axis = get_axis(axis, self.mat_ndim) # get axis
out = None # declare variable
# dispatch functions to handle operations
if axis == self.phase_axis:
out = self.delete_phase(obj = obj, **kwargs)
else:
raise ValueError("cannot delete along axis {0}".format(axis))
return out
[docs]
def delete_phase(
self,
obj: Union[int,slice,Sequence],
**kwargs: dict
) -> 'DensePhasedMatrix':
"""
Delete sub-arrays along the phase axis.
Parameters
----------
obj : int, slice, or Sequence of ints
Indicate indices of sub-arrays to remove along the specified axis.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : Matrix
A Matrix with deleted elements. Note that concat does not occur
in-place: a new Matrix is allocated and filled.
"""
# get values
mat = self._mat
# delete values
mat = numpy.delete(mat, obj, axis = self.phase_axis)
out = self.__class__(
mat = mat,
**kwargs
)
return out
[docs]
def insert(
self,
obj: Union[int,slice,Sequence],
values: Union[Matrix,numpy.ndarray],
axis: int = -1,
**kwargs: dict
) -> 'DensePhasedMatrix':
"""
Insert values along the given axis before the given indices.
Parameters
----------
obj: int, slice, or Sequence of ints
Object that defines the index or indices before which values is
inserted.
values : Matrix, numpy.ndarray
Values to insert into the matrix.
axis : int
The axis along which values are inserted.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : DensePhasedMatrix
A DensePhasedMatrix with values inserted. Note that insert does not occur
in-place: a new DensePhasedMatrix is allocated and filled.
"""
axis = get_axis(axis, self.mat_ndim) # get axis
out = None # declare variable
# dispatch functions to handle operations
if axis == self.phase_axis:
out = self.insert_phase(
obj = obj,
values = values,
**kwargs
)
else:
raise ValueError("cannot insert along axis {0}".format(axis))
return out
[docs]
def insert_phase(
self,
obj: Union[int,slice,Sequence],
values: Union[Matrix,numpy.ndarray],
**kwargs: dict
) -> 'DensePhasedMatrix':
"""
Insert values along the phase axis before the given indices.
Parameters
----------
obj: int, slice, or Sequence of ints
Object that defines the index or indices before which values is
inserted.
values : Matrix, numpy.ndarray
Values to insert into the matrix.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : DensePhasedMatrix
A DensePhasedMatrix with values inserted. Note that insert does not occur
in-place: a new DensePhasedMatrix is allocated and filled.
"""
# extract mat values
if isinstance(values, self.__class__):
values = values.mat
elif not isinstance(values, numpy.ndarray):
raise ValueError("'values' must be of type {0} or numpy.ndarray".format(self.__class__))
# perform error checks before allocating memory
if values.ndim != self.mat_ndim:
raise ValueError("cannot insert: 'values' must have ndim == {0}".format(self.mat_ndim))
for i,(j,k) in enumerate(zip(values.shape, self.mat_shape)):
if (i != self.phase_axis) and (j != k):
raise ValueError("cannot insert: axis lengths incompatible for axis {0}".format(i))
# insert values
values = numpy.insert(self._mat, obj, values, axis = self.phase_axis)
# create output
out = self.__class__(
mat = values,
**kwargs
)
return out
[docs]
def select(
self,
indices: ArrayLike,
axis: int = -1,
**kwargs: dict
) -> 'DensePhasedMatrix':
"""
Select certain values from the DensePhasedMatrix.
Parameters
----------
indices : ArrayLike (Nj, ...)
The indices of the values to select.
axis : int
The axis along which values are selected.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : DensePhasedMatrix
The output DensePhasedMatrix with values selected. Note that select does not
occur in-place: a new DensePhasedMatrix is allocated and filled.
"""
axis = get_axis(axis, self.mat_ndim) # get axis
out = None # declare variable
# dispatch functions to handle operations
if axis == self.phase_axis:
out = self.select_phase(indices = indices, **kwargs)
else:
raise ValueError("cannot select along axis {0}".format(axis))
return out
[docs]
def select_phase(
self,
indices: ArrayLike,
**kwargs: dict
) -> 'DensePhasedMatrix':
"""
Select certain values from the DensePhasedMatrix along the phase axis.
Parameters
----------
indices : ArrayLike (Nj, ...)
The indices of the values to select.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : DensePhasedMatrix
The output DensePhasedMatrix with values selected. Note that select does not
occur in-place: a new DensePhasedMatrix is allocated and filled.
"""
# check for array_like
check_is_array_like(indices, "indices")
# select values
mat = numpy.take(self._mat, indices, axis = self.phase_axis)
out = self.__class__(
mat = mat,
**kwargs
)
return out
[docs]
@classmethod
def concat(
cls,
mats: Sequence,
axis: int = -1,
**kwargs: dict
) -> 'DensePhasedMatrix':
"""
Concatenate a sequence of Matrix together along an axis.
Parameters
----------
mats : Sequence of Matrix
List of Matrix to concatenate. The matrices must have the same
shape, except in the dimension corresponding to axis.
axis : int
The axis along which the arrays will be joined.
kwargs : dict
Additional keyword arguments
Returns
-------
out : DensePhasedMatrix
The concatenated DensePhasedMatrix. Note that concat does not occur in-place:
a new DensePhasedMatrix is allocated and filled.
"""
axis = get_axis(axis, mats[0].mat_ndim) # get axis
out = None # declare variable
# dispatch items to worker functions
if axis == mats[0].phase_axis:
out = cls.concat_phase(mats, **kwargs)
else:
raise ValueError("cannot concat along axis {0}".format(axis))
return out
[docs]
@classmethod
def concat_phase(
cls,
mats: Sequence,
**kwargs: dict
) -> 'DensePhasedMatrix':
"""
Concatenate list of Matrix together along the taxa axis.
Parameters
----------
mats : Sequence of Matrix
List of Matrix to concatenate. The matrices must have the same
shape, except in the dimension corresponding to axis.
kwargs : dict
Additional keyword arguments
Returns
-------
out : Matrix
The concatenated matrix. Note that concat does not occur in-place:
a new Matrix is allocated and filled.
"""
# ensure that we have an iterable object
check_is_iterable(mats, "mats")
# ensure that we have an array_like of length >= 1
if len(mats) <= 0:
raise ValueError("need at least one Matrix to concatenate")
# ensure that all items in mats are DensePhasedMatrix
for i,v in enumerate(mats):
generic_check_isinstance(v, "mats[{0}]".format(i), cls)
# make sure dimensions are all identical to first element in mats
if any(m.mat_ndim != mats[0].mat_ndim for m in mats):
raise ValueError("cannot concat: not all matrices have the same number of dimensions")
# extract tuple of shapes for testing compatibility
shape_t = tuple(zip(*[m.mat.shape for m in mats]))
# test matrix compatibility (same axis length along non-taxa axes)
for i,v in enumerate(shape_t): # for each index,tuple in shape_t
if (i != mats[0].phase_axis) and any(l != v[0] for l in v): # if not the taxa axis AND axis lengths are different
raise ValueError("cannot concat: matrix shapes do not all align along axis {0}".format(i))
# create matrix lists
mat_ls = [m.mat for m in mats]
# concatenate items
mat = numpy.concatenate(mat_ls, axis = mats[0].phase_axis)
# TODO: decide if first element in list is good source of information
# concatenate everything and put into new DenseHaplotypeMatrix
# use first element as source of variant data
out = cls(
mat = mat,
**kwargs
)
return out
######### Matrix element in-place-manipulation #########
[docs]
def append(
self,
values: Union[Matrix,numpy.ndarray],
axis: int = -1,
**kwargs: dict
) -> None:
"""
Append values to the matrix.
Parameters
----------
values : DenseHaplotypeMatrix, numpy.ndarray
Values are appended to append to the matrix.
Must be of type int8.
Must be of shape (m, n, p)
axis : int
The axis along which values are appended.
"""
# get axis
axis = get_axis(axis, self.mat_ndim)
# dispatch functions
if axis == self.phase_axis:
self.append_phase(values, **kwargs)
else:
raise ValueError("cannot append along axis {0}".format(axis))
[docs]
def append_phase(
self,
values: Union[Matrix,numpy.ndarray],
**kwargs: dict
) -> None:
"""
Append values to the Matrix along the phase axis.
Parameters
----------
values : Matrix, numpy.ndarray
Values are appended to append to the matrix.
kwargs : dict
Additional keyword arguments.
"""
# if given a DensePhasedGenotypeMatrix extract *.mat values
if isinstance(values, self.__class__):
values = values.mat
elif not isinstance(values, numpy.ndarray):
raise ValueError("'values' must be of type {0} or numpy.ndarray".format(self.__class__))
# perform error checks before allocating memory
if values.ndim != self.mat_ndim:
raise ValueError("cannot append: 'values' must have ndim == {0}".format(self.mat_ndim))
for i,(j,k) in enumerate(zip(values.shape, self.mat_shape)):
if (i != self.phase_axis) and (j != k):
raise ValueError("cannot append: axis lengths incompatible for axis {0}".format(i))
# append values
self._mat = numpy.append(self._mat, values, axis = self.phase_axis)
[docs]
def remove(
self,
obj: Union[int,slice,Sequence],
axis: int = -1,
**kwargs: dict
) -> None:
"""
Remove sub-arrays along an axis.
Parameters
----------
obj : int, slice, or Sequence of ints
Indicate indices of sub-arrays to remove along the specified axis.
axis: int
The axis along which to remove the subarray defined by obj.
kwargs : dict
Additional keyword arguments.
"""
# get axis
axis = get_axis(axis, self.mat_ndim)
if axis == self.phase_axis:
self.remove_phase(obj = obj, **kwargs)
else:
raise ValueError("cannot remove along axis {0}".format(axis))
[docs]
def remove_phase(
self,
obj: Union[int,slice,Sequence],
**kwargs: dict
) -> None:
"""
Remove sub-arrays along the phase axis.
Parameters
----------
obj : int, slice, or Sequence of ints
Indicate indices of sub-arrays to remove along the specified axis.
kwargs : dict
Additional keyword arguments.
"""
# delete values
self._mat = numpy.delete(self._mat, obj, axis = self.phase_axis)
[docs]
def incorp(
self,
obj: Union[int,slice,Sequence],
values: ArrayLike,
axis: int = -1,
**kwargs: dict
) -> None:
"""
Incorporate values along the given axis before the given indices.
Parameters
----------
obj: int, slice, or Sequence of ints
Object that defines the index or indices before which values is
incorporated.
values : array_like
Values to incorporate into the matrix.
axis : int
The axis along which values are incorporated.
kwargs : dict
Additional keyword arguments.
"""
# get axis
axis = get_axis(axis, self.mat_ndim)
if axis == self.phase_axis:
self.incorp(
obj = obj,
values = values,
**kwargs
)
else:
raise ValueError("cannot incorp along axis {0}".format(axis))
[docs]
def incorp_phase(
self,
obj: Union[int,slice,Sequence],
values: Union[Matrix,numpy.ndarray],
**kwargs: dict
) -> None:
"""
Incorporate values along the taxa axis before the given indices.
Parameters
----------
obj: int, slice, or Sequence of ints
Object that defines the index or indices before which values is
incorporated.
values : Matrix, numpy.ndarray
Values to incorporate into the matrix.
kwargs : dict
Additional keyword arguments.
"""
# if given a DensePhasedGenotypeMatrix extract *.mat values
if isinstance(values, self.__class__):
values = values.mat
elif not isinstance(values, numpy.ndarray):
raise ValueError("'values' must be of type {0} or numpy.ndarray".format(self.__class__))
# perform error checks before allocating memory
if values.ndim != self.mat_ndim:
raise ValueError("cannot incorp: 'values' must have ndim == {0}".format(self.mat_ndim))
for i,(j,k) in enumerate(zip(values.shape, self.mat_shape)):
if (i != self.phase_axis) and (j != k):
raise ValueError("cannot incorp: axis lengths incompatible for axis {0}".format(i))
# insert values
self._mat = numpy.insert(self._mat, obj, values, axis = self.phase_axis)
################################## Utilities ###################################
[docs]
def check_is_DensePhasedMatrix(v: object, vname: str) -> None:
"""
Check if object is of type DensePhasedMatrix. Otherwise raise TypeError.
Parameters
----------
v : object
Any Python object to test.
vname : str
Name of variable to print in TypeError message.
"""
if not isinstance(v, DensePhasedMatrix):
raise TypeError("variable '{0}' must be a of type '{1}' but received type '{2}'".format(vname,DensePhasedMatrix.__name__,type(v).__name__))