Source code for pybrops.core.util.haplo

"""
Module containing haplotype calculation utility functions.
"""

from numbers import Integral
import numpy

__all__ = [
    "nhaploblk_chrom", 
    "haplobin", 
    "haplobin_bounds",
    "haplomat",
]

[docs] def nhaploblk_chrom( nhaploblk: int, genpos: numpy.ndarray, chrgrp_stix: numpy.ndarray, chrgrp_spix: numpy.ndarray ) -> numpy.ndarray: """ Given a total number of haplotype blocks to assign across the genome, determine the number of haplotype blocks to give to each chromosome. Parameters ---------- nhaploblk : int Total number of haplotype blocks to assign across the genome. genpos : numpy.ndarray Array of shape ``(p,)`` containing genetic positions of markers across the genome. Where: - ``p`` is the number of markers. Input contraints: - Must be sorted in ascending order. - Must be grouped based on chromosome. The chromosome boundary start and stop indices must be provided in ``chrgrp_stix`` and ``chrgrp_spix``, respectively. chrgrp_stix : numpy.ndarray Chromosome boundary start indices array of shape ``(c,)``. Where: - ``c`` is the number of chromosomes. chrgrp_spix : numpy.ndarray Chromosome boundary stop indices array of shape ``(c,)``. Where: - ``c`` is the number of chromosomes. Returns ------- out : numpy.ndarray Array of shape ``(c,)`` containing the number of haplotype blocks assigned to each chromosome. Where: - ``c`` is the number of chromosomes. """ # get number of chromosomes nchr = len(chrgrp_stix) # make sure that number of haplotype blocks >= number of chromosomes if nhaploblk < nchr: raise ValueError( "Number of haplotype blocks (nhaploblk = {0}) ".format(nhaploblk) + "is less than the number of chromosomes (nchr = {1})".format(nchr) ) # calculate genetic lengths of each chromosome in Morgans/cM/other genlen = genpos[chrgrp_spix-1] - genpos[chrgrp_stix] # calculate ideal number of markers per chromosome nhaploblk_ideal = (nhaploblk / genlen.sum()) * genlen # calculate number of chromosome markers, assuming at least one per chromosome nhaploblk_chrom = numpy.ones(nchr, dtype = "int") # start with min of one # greedily increment number of haplotype blocks assigned to chromosomes for i in range(nhaploblk - nchr): # forces conformance to self.nhaploblk diff = nhaploblk_chrom - nhaploblk_ideal # take actual - ideal ix = diff.argmin() # get index of lowest difference nhaploblk_chrom[ix] += 1 # increment at lowest index return nhaploblk_chrom
[docs] def haplobin( nhaploblk_chrom: numpy.ndarray, genpos: numpy.ndarray, chrgrp_stix: numpy.ndarray, chrgrp_spix: numpy.ndarray ) -> numpy.ndarray: """ Given the number of haplotype blocks to give to each chromosome across the genome, assign bins for each marker Parameters ---------- nhaploblk_chrom : numpy.ndarray Array of shape ``(c,)`` containing the total number of haplotype blocks to assign to each chromosome across the genome. Where: - ``c`` is the number of chromosomes. genpos : numpy.ndarray Array of shape ``(p,)`` containing genetic positions of markers across the genome. Where: - ``p`` is the number of markers. Input contraints: - Must be sorted in ascending order. - Must be grouped based on chromosome. The chromosome boundary start and stop indices must be provided in ``chrgrp_stix`` and ``chrgrp_spix``, respectively. chrgrp_stix : numpy.ndarray Chromosome boundary start indices array of shape ``(c,)``. Where: - ``c`` is the number of chromosomes. chrgrp_spix : numpy.ndarray Chromosome boundary stop indices array of shape ``(c,)``. Where: - ``c`` is the number of chromosomes. Returns ------- out : numpy.ndarray Array of shape ``(p,)`` containing haplotype bin assignments for each marker across the genome. Bins are assigned starting at ``0`` and strictly increase as the index in the array increases due to constraints placed on ``genpos``. Where: - ``p`` is the number of markers. Example output:: [0,0,0,0,0,1,1,1,1,1,1,1,2,2,2,2,2,2,3,3,3,3,3,3,3,3,3,3,3] """ # initialize output array haplobin = numpy.empty( # create empty array len(genpos), # same length as number of markers dtype = "int" # integer array ) # get number of chromosomes nchr = len(chrgrp_stix) k = 0 # haplotype block counter for i in range(nchr): # for each chromosome nhap = nhaploblk_chrom[i] # get number of haplotype blocks on chromosome stix = chrgrp_stix[i] # get chromosome start index spix = chrgrp_spix[i] # get chromosome stop index hbound = numpy.linspace( # calculate haplotype boundaries genpos[stix], # start genetic position on chromosome genpos[spix-1], # stop genetic position on chromosome nhap+1 # nhap + 1 for chromosome tips ) for j in range(nhap): # for each haplotype block chrmap = genpos[stix:spix] # get chromosome genetic positions lmask = chrmap >= hbound[j] # select all >= lower haplotype block bound umask = chrmap <= hbound[j+1] # select all <= upper haplotype block bound mask = lmask & umask # select all: lower <= markers <= upper haplobin[stix:spix][mask] = k # set haplotype bin to 'k' k += 1 # increment bin count return haplobin
[docs] def haplobin_bounds( haplobin: numpy.ndarray ) -> tuple: """ Calculate haplotype bin boundaries and lengths. Parameters ---------- haplobin : numpy.ndarray Array of shape ``(p,)`` containing haplotype bin assignments for each marker across the genome. Bins are assigned starting at ``0`` and strictly increase as the index in the array increases. Where: - ``p`` is the number of markers. Example input:: [0,0,0,0,0,1,1,1,1,1,1,1,2,2,2,2,2,2,3,3,3,3,3,3,3,3,3,3,3] Returns ------- out : tuple A tuple of length 3 containing ``(hstix, hspix, hlen)``. Where: - ``hstix`` is a ``numpy.ndarray`` containing haplotype bin boundary start indices. - ``hspix`` is a ``numpy.ndarray`` containing haplotype bin boundary stop indices. - ``hlen`` is a ``numpy.ndarray`` containing haplotype bin lengths. """ hstix = [0] # starting indices hspix = [] # stopping indices prev = haplobin[0] # get first group label for i in range(1, len(haplobin)): # for each group label if haplobin[i] != prev: # if the label is different hspix.append(i) # add the stop index for prev prev = haplobin[i] # get next label hstix.append(i) # add the start index for next prev hspix.append(len(haplobin)) # append last stop index hstix = numpy.int_(hstix) # convert to ndarray hspix = numpy.int_(hspix) # convert to ndarray hlen = hspix - hstix # get lengths of each haplotype group return hstix, hspix, hlen
[docs] def haplomat( nhaploblk: int, genomemat: numpy.ndarray, genpos: numpy.ndarray, chrgrp_stix: numpy.ndarray, chrgrp_spix: numpy.ndarray, chrgrp_len: numpy.ndarray, u_a: numpy.ndarray, ) -> numpy.ndarray: """ Calculate a haplotype matrix from a genome matrix and model. Parameters ---------- nhaploblk : Integral Number of haplotype blocks genomemat : numpy.ndarray A genome matrix of shape ``(m,n,p)``. Where: - ``m`` is the number of chromsome phases. - ``n`` is the number of individuals. - ``p`` is the number of markers. genpos : numpy.ndarray Array of shape ``(p,)`` containing genetic positions of markers across the genome. Where: - ``p`` is the number of markers. Input contraints: - Must be sorted in ascending order. - Must be grouped based on chromosome. The chromosome boundary start and stop indices must be provided in ``chrgrp_stix`` and ``chrgrp_spix``, respectively. chrgrp_stix : numpy.ndarray Chromosome boundary start indices array of shape ``(c,)``. Where: - ``c`` is the number of chromosomes. chrgrp_spix : numpy.ndarray Chromosome boundary stop indices array of shape ``(c,)``. Where: - ``c`` is the number of chromosomes. chrgrp_len : numpy.ndarray Chromosome length array of shape ``(c,)``. Where: - ``c`` is the number of chromosomes. u_a : numpy.ndarray Array of shape ``(p,t)`` containing additive marker effect estimates. Where: - ``p`` is the number of markers. - ``t`` is the number of traits. Returns ------- out : numpy.ndarray A haplotype effect matrix of shape ``(m,n,b,t)``. Where: - ``m`` is the number of chromsome phases. - ``n`` is the number of individuals. - ``b`` is the number of haplotype blocks. - ``t`` is the number of traits. """ if (chrgrp_stix is None) or (chrgrp_spix is None): raise RuntimeError("markers are not sorted by chromosome position") # get number of chromosomes nchr = len(chrgrp_stix) if nhaploblk < nchr: raise RuntimeError("number of haplotype blocks is less than the number of chromosomes") # calculate number of marker blocks to assign to each chromosome nblk = nhaploblk_chrom(nhaploblk, genpos, chrgrp_stix, chrgrp_spix) # ensure there are enough markers per chromosome if numpy.any(nblk > chrgrp_len): raise RuntimeError( "number of haplotype blocks assigned to a chromosome greater than number of available markers" ) # calculate haplotype bins hbin = haplobin(nblk, genpos, chrgrp_stix, chrgrp_spix) # define shape s = (genomemat.shape[0], genomemat.shape[1], nhaploblk, u_a.shape[1]) # (m,n,b,t) # allocate haplotype matrix hmat = numpy.empty(s, dtype = u_a.dtype) # (m,n,b,t) # get boundary indices hstix, hspix, hlen = haplobin_bounds(hbin) # OPTIMIZE: perhaps eliminate one loop using dot function # fill haplotype matrix for i in range(hmat.shape[3]): # for each trait for j,(st,sp) in enumerate(zip(hstix,hspix)): # for each haplotype block hmat[:,:,j,i] = genomemat[:,:,st:sp].dot(u_a[st:sp,i]) # take dot product and fill return hmat