Module implementing classes and associated error checking routines for matrices
storing dense additive genetic variance estimates calculated using two-way DH

import math
from numbers import Integral
from numbers import Real
from pathlib import Path
from typing import Optional
from typing import Union
import h5py
import numpy
import pandas

from pybrops.core.error.error_type_numpy import check_is_ndarray
from pybrops.core.error.error_type_pandas import check_is_pandas_DataFrame
from pybrops.core.error.error_type_python import check_is_Integral
from pybrops.core.error.error_type_python import check_is_Integral_or_None
from pybrops.core.error.error_type_python import check_is_Integral_or_inf
from pybrops.core.error.error_type_python import check_is_str
from pybrops.core.error.error_type_python import check_is_str_or_Integral
from pybrops.core.error.error_value_numpy import check_ndarray_ndim
from pybrops.core.error.error_value_pandas import check_pandas_DataFrame_has_column
from pybrops.core.error.error_value_pandas import check_pandas_DataFrame_has_column_index
from pybrops.core.util.array import flattenix
from pybrops.core.util.subroutines import srange
from pybrops.model.gmod.AdditiveLinearGenomicModel import AdditiveLinearGenomicModel
from pybrops.model.gmod.AdditiveLinearGenomicModel import check_is_AdditiveLinearGenomicModel
from pybrops.model.gmod.GenomicModel import GenomicModel
from pybrops.model.gmod.GenomicModel import check_is_GenomicModel
from pybrops.model.vmat.util import cov_D1s
from pybrops.model.vmat.DenseAdditiveGeneticVarianceMatrix import DenseAdditiveGeneticVarianceMatrix
from pybrops.popgen.gmap.GeneticMapFunction import GeneticMapFunction
from pybrops.popgen.gmap.GeneticMapFunction import check_is_GeneticMapFunction
from pybrops.popgen.gmat.PhasedGenotypeMatrix import PhasedGenotypeMatrix
from pybrops.popgen.gmat.PhasedGenotypeMatrix import check_is_PhasedGenotypeMatrix

[docs] class DenseTwoWayDHAdditiveGeneticVarianceMatrix( DenseAdditiveGeneticVarianceMatrix, ): """ A concrete class for dense additive genetic variance matrices calculated for two-way DH progenies. The purpose of this concrete class is to implement functionality for: 1) Genetic variance estimation for two-way DH progenies. 2) I/O for two-way DH progeny variance matrices. """ ########################## Special Object Methods ########################## def __init__( self, mat: numpy.ndarray, taxa: Optional[numpy.ndarray] = None, taxa_grp: Optional[numpy.ndarray] = None, trait: Optional[numpy.ndarray] = None, **kwargs: dict ) -> None: """ Constructor for the concrete class DenseTwoWayDHAdditiveGeneticVarianceMatrix. Parameters ---------- mat : numpy.ndarray Array used to construct the object. taxa : numpy.ndarray, None Taxa names. taxa_grp : numpy.ndarray, None Taxa groupings. trait : numpy.ndarray, None Trait names. kwargs : dict Additional keyword arguments. """ super(DenseTwoWayDHAdditiveGeneticVarianceMatrix, self).__init__( mat = mat, taxa = taxa, taxa_grp = taxa_grp, trait = trait, **kwargs ) ############################ Object Properties ############################# ##################### Matrix Data ###################### @DenseAdditiveGeneticVarianceMatrix.mat.setter def mat(self, value: numpy.ndarray) -> None: """Set pointer to raw numpy.ndarray object.""" check_is_ndarray(value, "mat") check_ndarray_ndim(value, "mat", 3) # (ntaxa,ntaxa,ntrait) self._mat = value ############## Square Metadata Properties ############## @DenseAdditiveGeneticVarianceMatrix.square_axes.getter def square_axes(self) -> tuple: """Get axis indices for axes that are square""" return (0,1) # (female, male) #################### Trait metadata #################### @DenseAdditiveGeneticVarianceMatrix.trait_axis.getter def trait_axis(self) -> int: """Axis along which traits are stored.""" return 2 ######## Expected parental genome contributions ######## @DenseAdditiveGeneticVarianceMatrix.epgc.getter def epgc(self) -> tuple: """Get a tuple of the expected parental genome contributions.""" return (0.5, 0.5) ################# Parental dimensions ################## @property def nfemale(self) -> Integral: """Number of female parents.""" return self._mat.shape[self.female_axis] @property def female_axis(self) -> Integral: """Axis along which female parents are stored.""" return 0 @property def nmale(self) -> Integral: """Number of male parents.""" return self._mat.shape[self.male_axis] @property def male_axis(self) -> Integral: """Axis along which male parents are stored.""" return 1 ############################## Object Methods ############################## ###################### Matrix I/O ###################### # TODO: make exporting of specific female, male, trait values, not just all
[docs] def to_pandas( self, female_col: str = "female", female_grp_col: Optional[str] = "female_grp", male_col: str = "male", male_grp_col: Optional[str] = "male_grp", trait_col: str = "trait", variance_col: str = "variance", **kwargs: dict ) -> pandas.DataFrame: """ Export a DenseTwoWayDHAdditiveGeneticVarianceMatrix to a pandas.DataFrame. Parameters ---------- female_col : str, default = "female" Name of the column to which to write female taxa names. female_grp_col : str, None, default = "female_grp" Name of the column to which to write female taxa groups. male_col : str, default = "male" Name of the column to which to write male taxa names. male_grp_col : str, None, default = "male_grp" Name of the column to which to write male taxa groups. trait_col : str, default = "trait" Name of the column to which to write trait taxa names. variance_col : str, default = "variance" Name of the column to which to write variance taxa names. kwargs : dict Additional keyword arguments to use for dictating export to a pandas.DataFrame. Returns ------- out : pandas.DataFrame An output dataframe. """ # type checks check_is_str(female_col, "female_col") if female_grp_col is not None: check_is_str(female_grp_col, "female_grp_col") check_is_str(male_col, "male_col") if male_grp_col is not None: check_is_str(male_grp_col, "male_grp_col") check_is_str(trait_col, "trait_col") check_is_str(variance_col, "variance_col") # calculate how much zero fill we need taxazfill = math.ceil(math.log10(self.ntaxa))+1 traitzfill = math.ceil(math.log10(self.ntrait))+1 # get names for taxa if self.taxa is None: taxa = numpy.array( ["Taxon"+str(e).zfill(taxazfill) for e in range(self.ntaxa)], dtype = object ) else: taxa = self.taxa # get names for traits if self.trait is None: trait = numpy.array( ["Trait"+str(e).zfill(traitzfill) for e in range(self.ntrait)], dtype = object ) else: trait = self.trait # calculate flattened array and corresponding axis indices flatmat, (femaleix, maleix, traitix) = flattenix(self.mat) # make dictionary to store output columns in specific column ordering out_dict = {} out_dict.update({female_col: taxa[femaleix]}) if female_grp_col is not None: values = None if self.taxa_grp is None else self.taxa_grp[femaleix] out_dict.update({female_grp_col: values}) out_dict.update({male_col: taxa[maleix]}) if male_grp_col is not None: values = None if self.taxa_grp is None else self.taxa_grp[maleix] out_dict.update({male_grp_col: values}) out_dict.update({trait_col: trait[traitix]}) out_dict.update({variance_col: flatmat}) # create a pandas DataFrame from the data out = pandas.DataFrame(out_dict, **kwargs) return out
[docs] def to_csv( self, filename: str, female_col: str = "female", female_grp_col: Optional[str] = "female_grp", male_col: str = "male", male_grp_col: Optional[str] = "male_grp", trait_col: str = "trait", variance_col: str = "variance", sep: str = ',', header: bool = True, index: bool = False, **kwargs: dict ) -> None: """ Write a DenseTwoWayDHAdditiveGeneticVarianceMatrix to a CSV file. Parameters ---------- filename : str CSV file name to which to write. female_col : str, default = "female" Name of the column to which to write female taxa names. female_grp_col : str, None, default = "female_grp" Name of the column to which to write female taxa groups. male_col : str, default = "male" Name of the column to which to write male taxa names. male_grp_col : str, None, default = "male_grp" Name of the column to which to write male taxa groups. trait_col : str, default = "trait" Name of the column to which to write trait taxa names. variance_col : str, default = "variance" Name of the column to which to write variance taxa names. sep : str, default = "," Separator to use in the exported CSV file. header : bool, default = True Whether to save header names. index : bool, default = False Whether to save a row index in the exported CSV file. kwargs : dict Additional keyword arguments to use for dictating export to a CSV. """ # convert DenseTwoWayDHAdditiveGeneticVarianceMatrix to pandas.DataFrame df = self.to_pandas( female_col = female_col, female_grp_col = female_grp_col, male_col = male_col, male_grp_col = male_grp_col, trait_col = trait_col, variance_col = variance_col, ) # export using pandas df.to_csv( path_or_buf = filename, sep = sep, header = header, index = index, **kwargs )
[docs] def to_hdf5( self, filename: Union[str,Path,h5py.File], groupname: Optional[str] = None, overwrite: bool = True, ) -> None: """ Write ``DenseTwoWayDHAdditiveGeneticVarianceMatrix`` to an HDF5 file. Parameters ---------- filename : str, Path, h5py.File HDF5 file name or HDF5 file stream to which to write. groupname : str, None If ``str``, an HDF5 group name under which the ``DenseTwoWayDHAdditiveGeneticVarianceMatrix`` data is stored. If ``None``, the ``DenseTwoWayDHAdditiveGeneticVarianceMatrix`` is written to the base HDF5 group. overwrite : bool Whether to overwrite data fields if they are present in the HDF5 file. """ # call super function super(DenseTwoWayDHAdditiveGeneticVarianceMatrix, self).to_hdf5( filename = filename, groupname = groupname, overwrite = overwrite, )
############################## Class Methods ############################### ###################### Matrix I/O ######################
[docs] @classmethod def from_pandas( cls, df: pandas.DataFrame, female_col: Union[str,Integral] = "female", female_grp_col: Optional[Union[str,Integral]] = "female_grp", male_col: Union[str,Integral] = "male", male_grp_col: Optional[Union[str,Integral]] = "male_grp", trait_col: Union[str,Integral] = "trait", variance_col: Union[str,Integral] = "variance", **kwargs: dict ) -> 'DenseTwoWayDHAdditiveGeneticVarianceMatrix': """ Read a ``DenseTwoWayDHAdditiveGeneticVarianceMatrix`` from a ``pandas.DataFrame``. Parameters ---------- df : pandas.DataFrame Pandas dataframe from which to read. female_col : str, Integral, default = "female" Name or index of the column from which to read female taxa names. female_grp_col : str, Integral, None, default = "female" Name or index of the column from which to read female taxa groups. male_col : str, Integral, None, default = "male" Name or index of the column from which to read male taxa names. male_grp_col : str, Integral, None, default = "male" Name or index of the column from which to read male taxa names. trait_col : str, Integral, default = "trait" Name or index of the column from which to read trait taxa names. variance_col : str, Integral, default = "variance" Name or index of the column from which to read variance taxa names. kwargs : dict Additional keyword arguments to use for dictating importing from a ``pandas.DataFrame``. Returns ------- out : DenseTwoWayDHAdditiveGeneticVarianceMatrix A ``DenseTwoWayDHAdditiveGeneticVarianceMatrix`` read from a ``pandas.DataFrame``. """ ### type checks and get df indices # df check_is_pandas_DataFrame(df, "df") # female_col if isinstance(female_col, str): check_pandas_DataFrame_has_column(df, "df", female_col) female_colix = df.columns.get_loc(female_col) elif isinstance(female_col, Integral): check_pandas_DataFrame_has_column_index(df, "df", female_col) female_colix = female_col else: check_is_str_or_Integral(female_col, "female_col") # female_grp_col if female_grp_col is not None: if isinstance(female_grp_col, str): check_pandas_DataFrame_has_column(df, "df", female_grp_col) female_grp_colix = df.columns.get_loc(female_grp_col) elif isinstance(female_grp_col, Integral): check_pandas_DataFrame_has_column_index(df, "df", female_grp_col) female_grp_colix = female_grp_col else: check_is_str_or_Integral(female_grp_col, "female_grp_col") # male_col if isinstance(male_col, str): check_pandas_DataFrame_has_column(df, "df", male_col) male_colix = df.columns.get_loc(male_col) elif isinstance(male_col, Integral): check_pandas_DataFrame_has_column_index(df, "df", male_col) male_colix = male_col else: check_is_str_or_Integral(male_col, "male_col") # male_grp_col if male_grp_col is not None: if isinstance(male_grp_col, str): check_pandas_DataFrame_has_column(df, "df", male_grp_col) male_grp_colix = df.columns.get_loc(male_grp_col) elif isinstance(male_grp_col, Integral): check_pandas_DataFrame_has_column_index(df, "df", male_grp_col) male_grp_colix = male_grp_col else: check_is_str_or_Integral(male_grp_col, "male_grp_col") # trait_col if isinstance(trait_col, str): check_pandas_DataFrame_has_column(df, "df", trait_col) trait_colix = df.columns.get_loc(trait_col) elif isinstance(trait_col, Integral): check_pandas_DataFrame_has_column_index(df, "df", trait_col) trait_colix = trait_col else: check_is_str_or_Integral(trait_col, "trait_col") # variance_col if isinstance(variance_col, str): check_pandas_DataFrame_has_column(df, "df", variance_col) variance_colix = df.columns.get_loc(variance_col) elif isinstance(variance_col, Integral): check_pandas_DataFrame_has_column_index(df, "df", variance_col) variance_colix = variance_col else: check_is_str_or_Integral(variance_col, "variance_col") # get required data columns (type numpy.ndarray) female_data = df.iloc[:,female_colix ].to_numpy(dtype = object) male_data = df.iloc[:,male_colix ].to_numpy(dtype = object) trait_data = df.iloc[:,trait_colix ].to_numpy(dtype = object) variance_data = df.iloc[:,variance_colix].to_numpy(dtype = float) # get unique female, male taxa (type numpy.ndarray) female_taxa, female_taxaix = numpy.unique(female_data, return_index = True) male_taxa, male_taxaix = numpy.unique(male_data, return_index = True) # combine female and male taxa names (type numpy.ndarray) taxa = numpy.union1d(female_taxa, male_taxa) # allocate female, male indices femaleix = numpy.empty(len(female_data), dtype = int) maleix = numpy.empty(len(male_data ), dtype = int) # calculate female, male indices for i,taxon in enumerate(taxa): femaleix[female_data == taxon] = i maleix[male_data == taxon] = i # calculate unique trait values, trait indices trait, traitix = numpy.unique(trait_data, return_inverse = True) # get optional taxa group data taxa_grp = None if female_grp_col is not None: if taxa_grp is None: taxa_grp = numpy.empty(len(taxa), dtype = int) female_grp_data = df.iloc[:,female_grp_colix].to_numpy(dtype = int) for i,taxon in enumerate(taxa): if taxon in female_taxa: ix = female_taxaix[female_taxa == taxon][0] taxa_grp[i] = female_grp_data[ix] if male_grp_col is not None: if taxa_grp is None: taxa_grp = numpy.empty(len(taxa), dtype = int) male_grp_data = df.iloc[:,male_grp_colix].to_numpy(dtype = int) for i,taxon in enumerate(taxa): if taxon in male_taxa: ix = male_taxaix[male_taxa == taxon][0] taxa_grp[i] = male_grp_data[ix] # get array dimensions nfemale = len(taxa) nmale = len(taxa) ntrait = len(trait) # allocate NaN array for variance matrix mat = numpy.full((nfemale,nmale,ntrait), numpy.nan, dtype = float) # overwrite NaN values with variance values mat[femaleix,maleix,traitix] = variance_data # construct an object out = cls( mat = mat, taxa = taxa, taxa_grp = taxa_grp, trait = trait, ) return out
[docs] @classmethod def from_csv( cls, filename: str, female_col: Union[str,Integral] = "female", female_grp_col: Optional[Union[str,Integral]] = "female_grp", male_col: Union[str,Integral] = "male", male_grp_col: Optional[Union[str,Integral]] = "male_grp", trait_col: Union[str,Integral] = "trait", variance_col: Union[str,Integral] = "variance", sep: str = ',', header: int = 0, **kwargs: dict ) -> 'DenseTwoWayDHAdditiveGeneticVarianceMatrix': """ Read an object from a CSV file. Parameters ---------- filename : str CSV file name from which to read. female_col : str, Integral, default = "female" Name or index of the column from which to read female taxa names. female_grp_col : str, Integral, None, default = "female" Name or index of the column from which to read female taxa groups. male_col : str, Integral, None, default = "male" Name or index of the column from which to read male taxa names. male_grp_col : str, Integral, None, default = "male" Name or index of the column from which to read male taxa names. trait_col : str, Integral, default = "trait" Name or index of the column from which to read trait taxa names. variance_col : str, Integral, default = "variance" Name or index of the column from which to read variance taxa names. sep : str, default = "," Separator to use in the CSV file. header : int, default = 0 Row index of the header. kwargs : dict Additional keyword arguments to use for dictating importing from a CSV. Returns ------- out : CSVInputOutput An object read from a CSV file. """ df = pandas.read_csv( filepath_or_buffer = filename, sep = sep, header = header, **kwargs ) out = cls.from_pandas( df = df, female_col = female_col, female_grp_col = female_grp_col, male_col = male_col, male_grp_col = male_grp_col, trait_col = trait_col, variance_col = variance_col, ) return out
[docs] @classmethod def from_hdf5( cls, filename: Union[str,Path,h5py.File], groupname: Optional[str] = None ) -> 'DenseTwoWayDHAdditiveGeneticVarianceMatrix': """ Read ``DenseTwoWayDHAdditiveGeneticVarianceMatrix`` from an HDF5 file. Parameters ---------- filename : str, Path, h5py.File If ``str`` or ``Path``, an HDF5 file name from which to read. File is closed after reading. If ``h5py.File``, an opened HDF5 file from which to read. File is not closed after reading. groupname : str, None If ``str``, an HDF5 group name under which ``DenseTwoWayDHAdditiveGeneticVarianceMatrix`` data is stored. If ``None``, ``DenseTwoWayDHAdditiveGeneticVarianceMatrix`` is read from base HDF5 group. Returns ------- out : DenseTwoWayDHAdditiveGeneticVarianceMatrix A ``DenseTwoWayDHAdditiveGeneticVarianceMatrix`` read from file. """ # call super function return super(DenseTwoWayDHAdditiveGeneticVarianceMatrix, cls).from_hdf5( filename = filename, groupname = groupname, )
############# Matrix Factory Class Methods ############# # TODO: provide support for non-linear models
[docs] @classmethod def from_gmod( cls, gmod: GenomicModel, pgmat: PhasedGenotypeMatrix, nmating: Integral, nprogeny: Integral, nself: Union[Integral,Real], gmapfn: GeneticMapFunction, **kwargs: dict ) -> 'DenseTwoWayDHAdditiveGeneticVarianceMatrix': """ Calculate a symmetrical matrix of progeny variance for each pairwise 2-way cross between *inbred* individuals. Parameters ---------- gmod : GenomicModel Genomic Model with which to estimate genetic variances. pgmat : PhasedGenotypeMatrix Input genomes to use to estimate genetic variances. nmating : Integral Number of cross patterns to simulate for genetic variance estimation. nprogeny : Integral Number of progeny to simulate per cross to estimate genetic variance. nself : Integral, Real Number of selfing generations post-cross pattern before 'nprogeny' individuals are simulated. +-----------------+-------------------------+ | Example | Description | +=================+=========================+ | ``nself = 0`` | Derive gametes from F1 | +-----------------+-------------------------+ | ``nself = 1`` | Derive gametes from F2 | +-----------------+-------------------------+ | ``nself = 2`` | Derive gametes from F3 | +-----------------+-------------------------+ | ``...`` | etc. | +-----------------+-------------------------+ | ``nself = inf`` | Derive gametes from SSD | +-----------------+-------------------------+ gmapfn : GeneticMapFunction GeneticMapFunction to use to estimate covariance induced by recombination. kwargs : dict Additional keyword arguments. Returns ------- out : DenseTwoWayDHAdditiveGeneticVarianceMatrix A matrix of additive genetic variance estimations. """ # type checks check_is_GenomicModel(gmod, "gmod") check_is_PhasedGenotypeMatrix(pgmat, "pgmat") check_is_Integral(nmating, "nmating") check_is_Integral(nprogeny, "nprogeny") check_is_Integral_or_inf(nself, "nself") check_is_GeneticMapFunction(gmapfn, "gmapfn") # if genomic model is an additive linear genomic model, then use specialized routine if isinstance(gmod, AdditiveLinearGenomicModel): return cls.from_algmod( algmod = gmod, pgmat = pgmat, nmating = nmating, nprogeny = nprogeny, nself = nself, gmapfn = gmapfn, **kwargs ) # otherwise raise error since non-linear support hasn't been implemented yet else: raise NotImplementedError("support for non-linear models not implemented yet")
[docs] @classmethod def from_algmod( cls, algmod: AdditiveLinearGenomicModel, pgmat: PhasedGenotypeMatrix, nmating: Integral, nprogeny: Integral, nself: Union[Integral,Real], gmapfn: GeneticMapFunction, mem: Union[Integral,None] = 1024 ) -> 'DenseTwoWayDHAdditiveGeneticVarianceMatrix': """ Calculate a symmetrical matrix of progeny variance for each pairwise 2-way cross between *inbred* individuals. Calculations are derived from Osthushenrich et al. (2017). Parameters ---------- algmod : AdditiveLinearGenomicModel AdditiveLinearGenomicModel with which to estimate genetic variances. pgmat : PhasedGenotypeMatrix Input genomes to use to estimate genetic variances. nmating : Integral Number of cross patterns to simulate for genetic variance estimation. nprogeny : Integral Number of progeny to simulate per cross to estimate genetic variance. nself : Integral, Real Number of selfing generations post-cross pattern before 'nprogeny' individuals are simulated. +-----------------+-------------------------+ | Example | Description | +=================+=========================+ | ``nself = 0`` | Derive gametes from F1 | +-----------------+-------------------------+ | ``nself = 1`` | Derive gametes from F2 | +-----------------+-------------------------+ | ``nself = 2`` | Derive gametes from F3 | +-----------------+-------------------------+ | ``...`` | etc. | +-----------------+-------------------------+ | ``nself = inf`` | Derive gametes from SSD | +-----------------+-------------------------+ gmapfn : GeneticMapFunction GeneticMapFunction to use to estimate covariance induced by recombination. mem : Integral, default = 1024 Memory chunk size to use during matrix operations. If ``None``, then memory chunk size is not limited. WARNING: Setting ``mem = None`` might result in memory allocation errors! For reference, ``mem = 1024`` refers to a matrix of size 1024x1024, which needs about 8.5 MB of storage. Matrices of course need a quadratic amount of memory: :math:`O(n^2)`. Returns ------- out : GeneticVarianceMatrix A matrix of additive genetic variance estimations. """ # type checks check_is_AdditiveLinearGenomicModel(algmod, "algmod") check_is_PhasedGenotypeMatrix(pgmat, "pgmat") check_is_Integral(nmating, "nmating") check_is_Integral(nprogeny, "nprogeny") check_is_Integral_or_inf(nself, "nself") check_is_GeneticMapFunction(gmapfn, "gmapfn") check_is_Integral_or_None(mem, "mem") # check for chromosome grouping if not pgmat.is_grouped_vrnt(): raise ValueError("pgmat must be grouped along the vrnt axis") # check for genetic positions if pgmat.vrnt_genpos is None: raise ValueError("pgmat must have genetic positions") # gather shapes of data input ntrait = algmod.ntrait # number of traits (t) ntaxa = pgmat.ntaxa # number of individuals (n) # gather data pointers from genotypes and linear model geno = pgmat.mat # (m,n,p) genotype matrix chrgrp_stix = pgmat.vrnt_chrgrp_stix # (g,) chromosome group start indices chrgrp_spix = pgmat.vrnt_chrgrp_spix # (g,) chromosome group stop indices genpos = pgmat.vrnt_genpos # (p,) marker genetic positions u = algmod.u_a # (p,t) marker effect coefficients # allocate a square matrix for each pairwise variance var_A = numpy.zeros( (ntaxa, ntaxa, ntrait), # (n,n,t) variance matrix dtype = float ) # for each linkage group for lst, lsp in zip(chrgrp_stix, chrgrp_spix): # determine memory chunk step (for iterators) step = (lsp - lst) if mem is None else mem # for each computational chunk: rb = row block, cb = column block for rst,rsp in zip(range(lst,lsp,step),srange(lst+step,lsp,step)): for cst,csp in zip(range(lst,lsp,step),srange(lst+step,lsp,step)): # create sparse meshgrid indicating where genetic positions are gi, gj = numpy.meshgrid( genpos[rst:rsp], # row block genetic positions genpos[cst:csp], # column block genetic positions indexing='ij', # use ij indexing sparse=True # generate a spare array tuple for speed ) # calculate recombination probability matrix for chunk r = gmapfn.mapfn(numpy.abs(gi - gj)) # (rb,cb) covariance matrix # calculate a D1 recombination covariance matrix; this is specific to mating scheme D1 = cov_D1s(r, nself) # (rb,cb) covariance matrix # get marker coefficients for rows and columns ru = u[rst:rsp].T # (rb,t)' -> (t,rb) cu = u[cst:csp].T # (cb,t)' -> (t,cb) # for each mate pair (excluding selfs) for female in range(1,ntaxa): # varA row index for male in range(0,female): # varA col index # calculate genotype differences for row, col # phased genotype matrix must be coded in {0,1} # resulting difference matrix has values {-1,0,1} rdgeno = geno[0,female,rst:rsp] - geno[0,male,rst:rsp] # (rb,) cdgeno = geno[0,female,cst:csp] - geno[0,male,cst:csp] # (cb,) # calculate effect differences reffect = rdgeno * ru # (rb,)*(t,rb) -> (t,rb) ceffect = cdgeno * cu # (cb,)*(t,cb) -> (t,cb) # compute dot product for each trait to get partial variance sum # (t,rb)@(rb,cb) -> (t,cb) # (t,cb)*(t,cb) -> (t,cb) # (t,cb)[1] -> (t,) var_A_partial = (reffect @ D1 * ceffect).sum(1) # add this partial variance to the lower triangle var_A[female,male,:] += var_A_partial # since var_A matrix is symmetrical, copy lower triangle to the upper for female in range(1, ntaxa): for male in range(0, female): var_A[male,female,:] = var_A[female,male,:] # construct output out = cls( mat = var_A, taxa = pgmat.taxa, taxa_grp = pgmat.taxa_grp, trait = algmod.trait ) return out