Source code for pybrops.breed.prot.sel.RealSelectionProtocol

"""
Module defining a general class for real selection protocols.
"""

__all__ = [
    "RealSelectionProtocol",
    "check_is_RealSelectionProtocol",
]

from abc import ABCMeta
from abc import abstractmethod
from numbers import Integral
from numbers import Real
from typing import Callable
from typing import Optional
from typing import Union

import numpy
from numpy.random import Generator
from numpy.random import RandomState
import pandas

from pybrops.breed.prot.sel.SelectionProtocol import SelectionProtocol
from pybrops.breed.prot.sel.cfg.RealSelectionConfiguration import RealSelectionConfiguration
from pybrops.breed.prot.sel.prob.RealSelectionProblem import RealSelectionProblem
from pybrops.breed.prot.sel.soln.RealSelectionSolution import RealSelectionSolution
from pybrops.model.gmod.GenomicModel import GenomicModel
from pybrops.opt.algo.NSGA2RealGeneticAlgorithm import NSGA2RealGeneticAlgorithm
from pybrops.opt.algo.RealGeneticAlgorithm import RealGeneticAlgorithm
from pybrops.opt.algo.RealOptimizationAlgorithm import RealOptimizationAlgorithm
from pybrops.opt.algo.RealOptimizationAlgorithm import check_is_RealOptimizationAlgorithm
from pybrops.popgen.bvmat.BreedingValueMatrix import BreedingValueMatrix
from pybrops.popgen.gmat.GenotypeMatrix import GenotypeMatrix
from pybrops.popgen.gmat.PhasedGenotypeMatrix import PhasedGenotypeMatrix


[docs] class RealSelectionProtocol( SelectionProtocol, metaclass = ABCMeta, ): """ Semi-abstract class for creating real selection protocols. """ ########################## Special Object Methods ########################## def __init__( self, ncross: Integral, nparent: Integral, nmating: Union[Integral,numpy.ndarray], nprogeny: Union[Integral,numpy.ndarray], nobj: Integral, obj_wt: Optional[Union[numpy.ndarray,Real]] = None, obj_trans: Optional[Callable[[numpy.ndarray,numpy.ndarray,dict],numpy.ndarray]] = None, obj_trans_kwargs: Optional[dict] = None, nineqcv: Optional[Integral] = None, ineqcv_wt: Optional[Union[numpy.ndarray,Real]] = None, ineqcv_trans: Optional[Callable[[numpy.ndarray,numpy.ndarray,dict],numpy.ndarray]] = None, ineqcv_trans_kwargs: Optional[dict] = None, neqcv: Optional[Integral] = None, eqcv_wt: Optional[Union[numpy.ndarray,Real]] = None, eqcv_trans: Optional[Callable[[numpy.ndarray,numpy.ndarray,dict],numpy.ndarray]] = None, eqcv_trans_kwargs: Optional[dict] = None, ndset_wt: Optional[Real] = None, ndset_trans: Optional[Callable[[numpy.ndarray,dict],numpy.ndarray]] = None, ndset_trans_kwargs: Optional[dict] = None, rng: Optional[Union[Generator,RandomState]] = None, soalgo: Optional[RealOptimizationAlgorithm] = None, moalgo: Optional[RealOptimizationAlgorithm] = None, **kwargs: dict ) -> None: """ Constructor for the semi-abstract class RealSelectionProtocol. Parameters ---------- ncross : Integral Number of cross configurations to consider. nparent : Integral Number of parents per cross configuration. nmating : Integral, numpy.ndarray Number of matings per configuration. If ``nmating`` is ``Integral``, then broadcast to a ``numpy.ndarray`` of shape ``(ncross,)``. If ``nmating`` is ``numpy.ndarray``, then the array must be of type ``Integral`` and of shape ``(ncross,)``. nprogeny : Integral, numpy.ndarray Number of progeny to derive from each mating event. If ``nprogeny`` is ``Integral``, then broadcast to a ``numpy.ndarray`` of shape ``(ncross,)``. If ``nprogeny`` is ``numpy.ndarray``, then the array must be of type ``Integral`` and of shape ``(ncross,)``. nobj : Integral Number of optimization objectives when constructing a ``SelectionProblem``. This is equivalent to the vector length returned by the ``obj_trans`` function. Must be ``Integral`` greater than 0. obj_wt : numpy.ndarray, Real, None Objective function weights. Weights from this vector are applied to objective function values via the Hadamard product. If values are ``1.0`` or ``-1.0``, this can be used to specify minimizing and maximizing objectives, respectively. If ``obj_wt`` is ``numpy.ndarray``, then the array must be of shape ``(nobj,)``. If ``obj_wt`` is ``Real``, then the value is broadcast to a ``numpy.ndarray`` of shape ``(nobj,)``. If ``obj_wt`` is ``None``, then the value ``1.0`` is broadcast to a ``numpy.ndarray`` of shape ``(nobj,)``. This assumes that all objectives are to be minimized. obj_trans : Callable, None A function which transforms values from a latent objective space to the objective space. This transformation function must have the following signature:: def obj_trans( decnvec: numpy.ndarray, latentvec: numpy.ndarray, **kwargs: dict ) -> numpy.ndarray: # do stuff return output Where: - ``decnvec`` is a ``numpy.ndarray`` containing the decision vector. - ``latentvec`` is a ``numpy.ndarray`` containing the latent space objective function values which are to be transformed. - ``kwargs`` is a ``dict`` containing additional keyword arguments. If ``obj_trans`` is ``None``, then default to an identity objective transformation function. obj_trans_kwargs : dict Keyword arguments for the latent space to objective space transformation function. If `obj_trans_kwargs`` is ``None``, then default to an empty dictionary. nineqcv : Integral, None Number of inequality constraint violation functions. This is equivalent to the vector length returned by the ``ineqcv_trans`` function. Must be ``Integral`` greater than or equal to zero. If ``nineqcv`` is ``None``, then set to zero. ineqcv_wt : numpy.ndarray, None Inequality constraint violation function weights. Weights from this vector are applied to inequality constraint violation function values via the Hadamard product. If values are ``1.0`` or ``-1.0``, this can be used to specify minimizing and maximizing constraints, respectively. If ``ineqcv_wt`` is ``numpy.ndarray``, then the array must be of shape ``(nineqcv,)``. If ``ineqcv_wt`` is ``Real``, then the value is broadcast to a ``numpy.ndarray`` of shape ``(nineqcv,)``. If ``ineqcv_wt`` is ``None``, then the value ``1.0`` is broadcast to a ``numpy.ndarray`` of shape ``(nineqcv,)``. This assumes that all constraints are to be minimized. ineqcv_trans : Callable, None A function which transforms values from a latent objective space to the inequality constraint violation space. This transformation function must have the following signature:: def ineqcv_trans( decnvec: numpy.ndarray, latentvec: numpy.ndarray, **kwargs: dict ) -> numpy.ndarray: # do stuff return output Where: - ``decnvec`` is a ``numpy.ndarray`` containing the decision vector. - ``latentvec`` is a ``numpy.ndarray`` containing the latent space objective function values which are to be transformed. - ``kwargs`` is a ``dict`` containing additional keyword arguments. If ``ineqcv_trans`` is ``None``, then default to a transformation function returning an empty vector. ineqcv_trans_kwargs : dict, None Keyword arguments for the latent space to inequality constraint violation transformation function. If `ineqcv_trans_kwargs`` is ``None``, then default to an empty dictionary. neqcv : Integral, None Number of equality constraint violations. This is equivalent to the vector length returned by the ``eqcv_trans`` function. Must be ``Integral`` greater than or equal to zero. If ``neqcv`` is ``None``, then set to zero. eqcv_wt : numpy.ndarray, None Equality constraint violation function weights. Weights from this vector are applied to equality constraint violation function values via the Hadamard product. If values are ``1.0`` or ``-1.0``, this can be used to specify minimizing and maximizing constraints, respectively. If ``eqcv_wt`` is ``numpy.ndarray``, then the array must be of shape ``(neqcv,)``. If ``eqcv_wt`` is ``Real``, then the value is broadcast to a ``numpy.ndarray`` of shape ``(neqcv,)``. If ``eqcv_wt`` is ``None``, then the value ``1.0`` is broadcast to a ``numpy.ndarray`` of shape ``(neqcv,)``. This assumes that all constraints are to be minimized. eqcv_trans : Callable, None A function which transforms values from a latent objective space to the equality constraint violation space. This transformation function must have the following signature:: def eqcv_trans( decnvec: numpy.ndarray, latentvec: numpy.ndarray, **kwargs: dict ) -> numpy.ndarray: # do stuff return output Where: - ``decnvec`` is a ``numpy.ndarray`` containing the decision vector. - ``latentvec`` is a ``numpy.ndarray`` containing the latent space objective function values which are to be transformed. - ``kwargs`` is a ``dict`` containing additional keyword arguments. If ``eqcv_trans`` is ``None``, then default to a transformation function returning an empty vector. eqcv_trans_kwargs : dict, None Keyword arguments for the latent space to equality constraint violation transformation function. If `eqcv_trans_kwargs`` is ``None``, then default to an empty dictionary. ndset_wt : Real, None Nondominated set weight. The weight from this function is applied to outputs from ``ndset_trans``. If values are ``1.0`` or ``-1.0``, this can be used to specify minimizing and maximizing objectives, respectively. If ``ndset_wt`` is ``None``, then it is set to the default value of ``1.0``. This assumes that the objective is to be minimized. ndset_trans : Callable, None A function which transforms values from the non-dominated set objective space to the single-objective space. This transformation function must have the following signature:: def ndset_trans( mat: numpy.ndarray, **kwargs: dict ) -> numpy.ndarray: # do stuff return output Where: - ``mat`` is a ``numpy.ndarray`` containing a point coordinate array of shape ``(npt, nobj)`` where ``npt`` is the number of points and ``nobj`` is the number of objectives (dimensions). This array contains input points for calculating the distance between a point to the vector ``vec_wt``. - ``kwargs`` is a ``dict`` containing additional keyword arguments. If ``ndset_trans`` is ``None``, then default to a transformation function calculating the distance between a weight vector and provided points ndset_trans_kwargs : dict, None Nondominated set transformation function keyword arguments. If ``ndset_trans_kwargs`` is ``None``, then default to defaults for the default ``ndset_trans`` function:: ndset_trans_kwargs = { "obj_wt": numpy.repeat(1.0, nobj), "vec_wt": numpy.repeat(1.0, nobj) } rng : numpy.random.Generator, numpy.random.RandomState, None Random number source. If ``rng`` is ``None``, default to the global random number generator. soalgo : RealOptimizationAlgorithm, None Single-objective optimization algorithm. If ``soalgo`` is ``None``, then use a default single-objective optimization algorithm. moalgo : RealOptimizationAlgorithm, None Multi-objective opimization algorithm. If ``moalgo`` is ``None``, then use a default multi-objective optimization algorithm. kwargs : dict Additional keyword arguments. """ # call super constructor super(RealSelectionProtocol, self).__init__( ncross = ncross, nparent = nparent, nmating = nmating, nprogeny = nprogeny, nobj = nobj, obj_wt = obj_wt, obj_trans = obj_trans, obj_trans_kwargs = obj_trans_kwargs, nineqcv = nineqcv, ineqcv_wt = ineqcv_wt, ineqcv_trans = ineqcv_trans, ineqcv_trans_kwargs = ineqcv_trans_kwargs, neqcv = neqcv, eqcv_wt = eqcv_wt, eqcv_trans = eqcv_trans, eqcv_trans_kwargs = eqcv_trans_kwargs, ndset_wt = ndset_wt, ndset_trans = ndset_trans, ndset_trans_kwargs = ndset_trans_kwargs, rng = rng, soalgo = soalgo, moalgo = moalgo ) ############################ Object Properties ############################# @property def soalgo(self) -> RealOptimizationAlgorithm: """Single-objective optimization algorithm.""" return self._soalgo @soalgo.setter def soalgo(self, value: Union[RealOptimizationAlgorithm,None]) -> None: """Set single-objective optimization algorithm.""" if value is None: # construct default multi-objective algorithm value = RealGeneticAlgorithm( ngen = 250, # number of generations to evolve pop_size = 100 # number of parents in population ) # construct default hillclimber # value = SteepestDescentRealHillClimber(self.rng) check_is_RealOptimizationAlgorithm(value, "soalgo") self._soalgo = value @property def moalgo(self) -> RealOptimizationAlgorithm: """Multi-objective opimization algorithm.""" return self._moalgo @moalgo.setter def moalgo(self, value: Union[RealOptimizationAlgorithm,None]) -> None: """Set multi-objective opimization algorithm.""" if value is None: # construct default multi-objective algorithm value = NSGA2RealGeneticAlgorithm( ngen = 250, # number of generations to evolve pop_size = 100 # number of parents in population ) check_is_RealOptimizationAlgorithm(value, "moalgo") self._moalgo = value ############################## Object Methods ############################## ########## Optimization Problem Construction ###########
[docs] @abstractmethod def problem( self, pgmat: PhasedGenotypeMatrix, gmat: GenotypeMatrix, ptdf: pandas.DataFrame, bvmat: BreedingValueMatrix, gpmod: GenomicModel, t_cur: Integral, t_max: Integral, **kwargs: dict ) -> RealSelectionProblem: """ Create an optimization problem definition using provided inputs. Parameters ---------- pgmat : PhasedGenotypeMatrix Genomes gmat : GenotypeMatrix Genotypes ptdf : pandas.DataFrame Phenotype dataframe bvmat : BreedingValueMatrix Breeding value matrix gpmod : GenomicModel Genomic prediction model t_cur : int Current generation number. t_max : int Maximum (deadline) generation number. kwargs : dict Additional keyword arguments. Returns ------- out : RealSelectionProblem An optimization problem definition. """ raise NotImplementedError("method is abstract")
################ Single Objective Solve ################
[docs] def sosolve( self, pgmat: PhasedGenotypeMatrix, gmat: GenotypeMatrix, ptdf: pandas.DataFrame, bvmat: BreedingValueMatrix, gpmod: GenomicModel, t_cur: Integral, t_max: Integral, miscout: Optional[dict] = None, **kwargs: dict ) -> RealSelectionSolution: """ Solve the selection problem using a single-objective optimization algorithm. Parameters ---------- pgmat : PhasedGenotypeMatrix Genomes gmat : GenotypeMatrix Genotypes ptdf : pandas.DataFrame Phenotype dataframe bvmat : BreedingValueMatrix Breeding value matrix gpmod : GenomicModel Genomic prediction model t_cur : int Current generation number. t_max : int Maximum (deadline) generation number. miscout : dict, None Pointer to a dictionary for miscellaneous user defined output. If ``dict``, write to dict (may overwrite previously defined fields). If ``None``, user defined output is not calculated or stored. kwargs : dict Additional keyword arguments. Returns ------- out : Solution A single-objective solution to the posed selection problem. """ # check the number of objectives and raise error if needed if self.nobj != 1: raise RuntimeError("{0} instance is not single-objective in nature: expected nobj == 1 but received nobj == {1}".format(type(self).__name__,self.nobj)) # construct the problem prob = self.problem( pgmat = pgmat, gmat = gmat, ptdf = ptdf, bvmat = bvmat, gpmod = gpmod, t_cur = t_cur, t_max = t_max, **kwargs ) # optimize the problem soln = self.soalgo.minimize( prob = prob, miscout = miscout ) # convert real solution to real selection solution # this has the exact same metadata as a RealSolution out = RealSelectionSolution( ndecn = soln.ndecn, decn_space = soln.decn_space, decn_space_lower = soln.decn_space_lower, decn_space_upper = soln.decn_space_upper, nobj = soln.nobj, obj_wt = soln.obj_wt, nineqcv = soln.nineqcv, ineqcv_wt = soln.ineqcv_wt, neqcv = soln.neqcv, eqcv_wt = soln.eqcv_wt, nsoln = soln.nsoln, soln_decn = soln.soln_decn, soln_obj = soln.soln_obj, soln_ineqcv = soln.soln_ineqcv, soln_eqcv = soln.soln_eqcv ) return out
############## Pareto Frontier Functions ###############
[docs] def mosolve( self, pgmat: PhasedGenotypeMatrix, gmat: GenotypeMatrix, ptdf: pandas.DataFrame, bvmat: BreedingValueMatrix, gpmod: GenomicModel, t_cur: Integral, t_max: Integral, miscout: Optional[dict] = None, **kwargs: dict ) -> RealSelectionSolution: """ Calculate a Pareto frontier for objectives. Parameters ---------- pgmat : PhasedGenotypeMatrix Genomes gmat : GenotypeMatrix Genotypes ptdf : pandas.DataFrame Phenotype dataframe bvmat : BreedingValueMatrix Breeding value matrix gpmod : GenomicModel Genomic prediction model t_cur : int Current generation number. t_max : int Maximum (deadline) generation number. miscout : dict, None Pointer to a dictionary for miscellaneous user defined output. If ``dict``, write to dict (may overwrite previously defined fields). If ``None``, user defined output is not calculated or stored. kwargs : dict Additional keyword arguments. Returns ------- out : tuple A tuple containing two objects ``(frontier, sel_config)``. Where: - ``frontier`` is a ``numpy.ndarray`` of shape ``(q,v)`` containing Pareto frontier points. - ``sel_config`` is a ``numpy.ndarray`` of shape ``(q,k)`` containing parent selection decisions for each corresponding point in the Pareto frontier. Where: - ``q`` is the number of points in the frontier. - ``v`` is the number of objectives for the frontier. - ``k`` is the number of search space decision variables. """ # check the number of objectives and raise error if needed if self.nobj <= 1: raise RuntimeError("{0} instance is not multi-objective in nature: expected nobj > 1 but received nobj == {1}".format(type(self).__name__,self.nobj)) # construct the problem prob = self.problem( pgmat = pgmat, gmat = gmat, ptdf = ptdf, bvmat = bvmat, gpmod = gpmod, t_cur = t_cur, t_max = t_max, **kwargs ) # optimize the problem soln = self.moalgo.minimize( prob = prob, miscout = miscout ) # convert real solution to real selection solution # this has the exact same metadata as a RealSolution out = RealSelectionSolution( ndecn = soln.ndecn, decn_space = soln.decn_space, decn_space_lower = soln.decn_space_lower, decn_space_upper = soln.decn_space_upper, nobj = soln.nobj, obj_wt = soln.obj_wt, nineqcv = soln.nineqcv, ineqcv_wt = soln.ineqcv_wt, neqcv = soln.neqcv, eqcv_wt = soln.eqcv_wt, nsoln = soln.nsoln, soln_decn = soln.soln_decn, soln_obj = soln.soln_obj, soln_ineqcv = soln.soln_ineqcv, soln_eqcv = soln.soln_eqcv ) return out
################# Selection Functions ##################
[docs] def select( self, pgmat: PhasedGenotypeMatrix, gmat: GenotypeMatrix, ptdf: pandas.DataFrame, bvmat: BreedingValueMatrix, gpmod: GenomicModel, t_cur: Integral, t_max: Integral, miscout: Optional[dict] = None, **kwargs: dict ) -> RealSelectionConfiguration: """ Select individuals for breeding. Parameters ---------- pgmat : PhasedGenotypeMatrix Genomes gmat : GenotypeMatrix Genotypes ptdf : pandas.DataFrame Phenotype dataframe bvmat : BreedingValueMatrix Breeding value matrix gpmod : GenomicModel Genomic prediction model t_cur : int Current generation number. t_max : int Maximum (deadline) generation number. miscout : dict, None Pointer to a dictionary for miscellaneous user defined output. If ``dict``, write to dict (may overwrite previously defined fields). If ``None``, user defined output is not calculated or stored. kwargs : dict Additional keyword arguments. Returns ------- out : RealSelectionConfiguration A selection configuration object, requiring all necessary information to mate individuals. """ # if the number of objectives is 1, then we use a single objective algorithm if self.nobj == 1: # solve the single-objective problem sosoln = self.sosolve( pgmat = pgmat, gmat = gmat, ptdf = ptdf, bvmat = bvmat, gpmod = gpmod, t_cur = t_cur, t_max = t_max, miscout = miscout, **kwargs ) # add solution to miscout if provided. if miscout is not None: miscout["sosoln"] = sosoln # construct a RealSelectionConfiguration selcfg = RealSelectionConfiguration( ncross = self.ncross, nparent = self.nparent, nmating = self.nmating, nprogeny = self.nprogeny, pgmat = pgmat, xconfig_decn = sosoln.soln_decn[0], rng = None ) return selcfg # else, we use a multi-objective algorithm and apply a transformation on the non-dominated points to identify a # multi-objective method: objfn_trans returns a multiple values for each # selection configuration elif self.nobj > 1: # solve the multi-objective problem mosoln = self.mosolve( pgmat = pgmat, gmat = gmat, ptdf = ptdf, bvmat = bvmat, gpmod = gpmod, t_cur = t_cur, t_max = t_max, miscout = miscout, **kwargs ) # get scores for each of the points along the non-dominated set # (nndpts,) score = self.ndset_wt * self.ndset_trans( mosoln.soln_obj, **self.ndset_trans_kwargs ) # get index of maximum score ix = score.argmax() # add solution to miscout if provided. if miscout is not None: miscout["mosoln"] = mosoln # construct a RealSelectionConfiguration selcfg = RealSelectionConfiguration( ncross = self.ncross, nparent = self.nparent, nmating = self.nmating, nprogeny = self.nprogeny, pgmat = pgmat, xconfig_decn = mosoln.soln_decn[ix], rng = None ) return selcfg # else raise an error as the number of objectives is an illegal value else: raise ValueError("number of objectives must be greater than zero")
################################## Utilities ###################################
[docs] def check_is_RealSelectionProtocol(v: object, vname: str) -> None: """ Check if object is of type RealSelectionProtocol, otherwise raise TypeError. Parameters ---------- v : object Any Python object to test. vname : str Name of variable to print in TypeError message. """ if not isinstance(v, RealSelectionProtocol): raise TypeError("variable '{0}' must be of type '{1}' but received type '{2}'".format(vname,RealSelectionProtocol.__name__,type(v).__name__))