"""
Module defining a general class for real selection protocols.
"""
__all__ = [
"RealSelectionProtocol",
"check_is_RealSelectionProtocol",
]
from abc import ABCMeta
from abc import abstractmethod
from numbers import Integral
from numbers import Real
from typing import Callable
from typing import Optional
from typing import Union
import numpy
from numpy.random import Generator
from numpy.random import RandomState
import pandas
from pybrops.breed.prot.sel.SelectionProtocol import SelectionProtocol
from pybrops.breed.prot.sel.cfg.RealSelectionConfiguration import RealSelectionConfiguration
from pybrops.breed.prot.sel.prob.RealSelectionProblem import RealSelectionProblem
from pybrops.breed.prot.sel.soln.RealSelectionSolution import RealSelectionSolution
from pybrops.model.gmod.GenomicModel import GenomicModel
from pybrops.opt.algo.NSGA2RealGeneticAlgorithm import NSGA2RealGeneticAlgorithm
from pybrops.opt.algo.RealGeneticAlgorithm import RealGeneticAlgorithm
from pybrops.opt.algo.RealOptimizationAlgorithm import RealOptimizationAlgorithm
from pybrops.opt.algo.RealOptimizationAlgorithm import check_is_RealOptimizationAlgorithm
from pybrops.popgen.bvmat.BreedingValueMatrix import BreedingValueMatrix
from pybrops.popgen.gmat.GenotypeMatrix import GenotypeMatrix
from pybrops.popgen.gmat.PhasedGenotypeMatrix import PhasedGenotypeMatrix
[docs]
class RealSelectionProtocol(
SelectionProtocol,
metaclass = ABCMeta,
):
"""
Semi-abstract class for creating real selection protocols.
"""
########################## Special Object Methods ##########################
def __init__(
self,
ncross: Integral,
nparent: Integral,
nmating: Union[Integral,numpy.ndarray],
nprogeny: Union[Integral,numpy.ndarray],
nobj: Integral,
obj_wt: Optional[Union[numpy.ndarray,Real]] = None,
obj_trans: Optional[Callable[[numpy.ndarray,numpy.ndarray,dict],numpy.ndarray]] = None,
obj_trans_kwargs: Optional[dict] = None,
nineqcv: Optional[Integral] = None,
ineqcv_wt: Optional[Union[numpy.ndarray,Real]] = None,
ineqcv_trans: Optional[Callable[[numpy.ndarray,numpy.ndarray,dict],numpy.ndarray]] = None,
ineqcv_trans_kwargs: Optional[dict] = None,
neqcv: Optional[Integral] = None,
eqcv_wt: Optional[Union[numpy.ndarray,Real]] = None,
eqcv_trans: Optional[Callable[[numpy.ndarray,numpy.ndarray,dict],numpy.ndarray]] = None,
eqcv_trans_kwargs: Optional[dict] = None,
ndset_wt: Optional[Real] = None,
ndset_trans: Optional[Callable[[numpy.ndarray,dict],numpy.ndarray]] = None,
ndset_trans_kwargs: Optional[dict] = None,
rng: Optional[Union[Generator,RandomState]] = None,
soalgo: Optional[RealOptimizationAlgorithm] = None,
moalgo: Optional[RealOptimizationAlgorithm] = None,
**kwargs: dict
) -> None:
"""
Constructor for the semi-abstract class RealSelectionProtocol.
Parameters
----------
ncross : Integral
Number of cross configurations to consider.
nparent : Integral
Number of parents per cross configuration.
nmating : Integral, numpy.ndarray
Number of matings per configuration.
If ``nmating`` is ``Integral``, then broadcast to a ``numpy.ndarray``
of shape ``(ncross,)``.
If ``nmating`` is ``numpy.ndarray``, then the array must be of type
``Integral`` and of shape ``(ncross,)``.
nprogeny : Integral, numpy.ndarray
Number of progeny to derive from each mating event.
If ``nprogeny`` is ``Integral``, then broadcast to a ``numpy.ndarray``
of shape ``(ncross,)``.
If ``nprogeny`` is ``numpy.ndarray``, then the array must be of type
``Integral`` and of shape ``(ncross,)``.
nobj : Integral
Number of optimization objectives when constructing a
``SelectionProblem``. This is equivalent to the vector length
returned by the ``obj_trans`` function. Must be ``Integral`` greater
than 0.
obj_wt : numpy.ndarray, Real, None
Objective function weights. Weights from this vector are applied
to objective function values via the Hadamard product. If values
are ``1.0`` or ``-1.0``, this can be used to specify minimizing
and maximizing objectives, respectively.
If ``obj_wt`` is ``numpy.ndarray``, then the array must be of shape
``(nobj,)``.
If ``obj_wt`` is ``Real``, then the value is broadcast to a
``numpy.ndarray`` of shape ``(nobj,)``.
If ``obj_wt`` is ``None``, then the value ``1.0`` is broadcast to a
``numpy.ndarray`` of shape ``(nobj,)``. This assumes that all
objectives are to be minimized.
obj_trans : Callable, None
A function which transforms values from a latent objective space to
the objective space. This transformation function must have the
following signature::
def obj_trans(
decnvec: numpy.ndarray,
latentvec: numpy.ndarray,
**kwargs: dict
) -> numpy.ndarray:
# do stuff
return output
Where:
- ``decnvec`` is a ``numpy.ndarray`` containing the decision vector.
- ``latentvec`` is a ``numpy.ndarray`` containing the latent space
objective function values which are to be transformed.
- ``kwargs`` is a ``dict`` containing additional keyword arguments.
If ``obj_trans`` is ``None``, then default to an identity objective
transformation function.
obj_trans_kwargs : dict
Keyword arguments for the latent space to objective space
transformation function.
If `obj_trans_kwargs`` is ``None``, then default to an empty
dictionary.
nineqcv : Integral, None
Number of inequality constraint violation functions. This is
equivalent to the vector length returned by the ``ineqcv_trans``
function. Must be ``Integral`` greater than or equal to zero.
If ``nineqcv`` is ``None``, then set to zero.
ineqcv_wt : numpy.ndarray, None
Inequality constraint violation function weights. Weights from this
vector are applied to inequality constraint violation function
values via the Hadamard product. If values are ``1.0`` or ``-1.0``,
this can be used to specify minimizing and maximizing constraints,
respectively.
If ``ineqcv_wt`` is ``numpy.ndarray``, then the array must be of
shape ``(nineqcv,)``.
If ``ineqcv_wt`` is ``Real``, then the value is broadcast to a
``numpy.ndarray`` of shape ``(nineqcv,)``.
If ``ineqcv_wt`` is ``None``, then the value ``1.0`` is broadcast
to a ``numpy.ndarray`` of shape ``(nineqcv,)``. This assumes that
all constraints are to be minimized.
ineqcv_trans : Callable, None
A function which transforms values from a latent objective space to
the inequality constraint violation space. This transformation
function must have the following signature::
def ineqcv_trans(
decnvec: numpy.ndarray,
latentvec: numpy.ndarray,
**kwargs: dict
) -> numpy.ndarray:
# do stuff
return output
Where:
- ``decnvec`` is a ``numpy.ndarray`` containing the decision vector.
- ``latentvec`` is a ``numpy.ndarray`` containing the latent space
objective function values which are to be transformed.
- ``kwargs`` is a ``dict`` containing additional keyword arguments.
If ``ineqcv_trans`` is ``None``, then default to a transformation
function returning an empty vector.
ineqcv_trans_kwargs : dict, None
Keyword arguments for the latent space to inequality constraint
violation transformation function.
If `ineqcv_trans_kwargs`` is ``None``, then default to an empty
dictionary.
neqcv : Integral, None
Number of equality constraint violations. This is equivalent to the
vector length returned by the ``eqcv_trans`` function. Must be
``Integral`` greater than or equal to zero.
If ``neqcv`` is ``None``, then set to zero.
eqcv_wt : numpy.ndarray, None
Equality constraint violation function weights. Weights from this
vector are applied to equality constraint violation function
values via the Hadamard product. If values are ``1.0`` or ``-1.0``,
this can be used to specify minimizing and maximizing constraints,
respectively.
If ``eqcv_wt`` is ``numpy.ndarray``, then the array must be of
shape ``(neqcv,)``.
If ``eqcv_wt`` is ``Real``, then the value is broadcast to a
``numpy.ndarray`` of shape ``(neqcv,)``.
If ``eqcv_wt`` is ``None``, then the value ``1.0`` is broadcast
to a ``numpy.ndarray`` of shape ``(neqcv,)``. This assumes that
all constraints are to be minimized.
eqcv_trans : Callable, None
A function which transforms values from a latent objective space to
the equality constraint violation space. This transformation
function must have the following signature::
def eqcv_trans(
decnvec: numpy.ndarray,
latentvec: numpy.ndarray,
**kwargs: dict
) -> numpy.ndarray:
# do stuff
return output
Where:
- ``decnvec`` is a ``numpy.ndarray`` containing the decision vector.
- ``latentvec`` is a ``numpy.ndarray`` containing the latent space
objective function values which are to be transformed.
- ``kwargs`` is a ``dict`` containing additional keyword arguments.
If ``eqcv_trans`` is ``None``, then default to a transformation
function returning an empty vector.
eqcv_trans_kwargs : dict, None
Keyword arguments for the latent space to equality constraint
violation transformation function.
If `eqcv_trans_kwargs`` is ``None``, then default to an empty
dictionary.
ndset_wt : Real, None
Nondominated set weight. The weight from this function is applied
to outputs from ``ndset_trans``. If values are ``1.0`` or ``-1.0``,
this can be used to specify minimizing and maximizing objectives,
respectively.
If ``ndset_wt`` is ``None``, then it is set to the default value of ``1.0``.
This assumes that the objective is to be minimized.
ndset_trans : Callable, None
A function which transforms values from the non-dominated set
objective space to the single-objective space. This transformation
function must have the following signature::
def ndset_trans(
mat: numpy.ndarray,
**kwargs: dict
) -> numpy.ndarray:
# do stuff
return output
Where:
- ``mat`` is a ``numpy.ndarray`` containing a point coordinate array
of shape ``(npt, nobj)`` where ``npt`` is the number of points
and ``nobj`` is the number of objectives (dimensions). This
array contains input points for calculating the distance between
a point to the vector ``vec_wt``.
- ``kwargs`` is a ``dict`` containing additional keyword arguments.
If ``ndset_trans`` is ``None``, then default to a transformation
function calculating the distance between a weight vector and
provided points
ndset_trans_kwargs : dict, None
Nondominated set transformation function keyword arguments.
If ``ndset_trans_kwargs`` is ``None``, then default to defaults for
the default ``ndset_trans`` function::
ndset_trans_kwargs = {
"obj_wt": numpy.repeat(1.0, nobj),
"vec_wt": numpy.repeat(1.0, nobj)
}
rng : numpy.random.Generator, numpy.random.RandomState, None
Random number source.
If ``rng`` is ``None``, default to the global random number
generator.
soalgo : RealOptimizationAlgorithm, None
Single-objective optimization algorithm.
If ``soalgo`` is ``None``, then use a default single-objective
optimization algorithm.
moalgo : RealOptimizationAlgorithm, None
Multi-objective opimization algorithm.
If ``moalgo`` is ``None``, then use a default multi-objective
optimization algorithm.
kwargs : dict
Additional keyword arguments.
"""
# call super constructor
super(RealSelectionProtocol, self).__init__(
ncross = ncross,
nparent = nparent,
nmating = nmating,
nprogeny = nprogeny,
nobj = nobj,
obj_wt = obj_wt,
obj_trans = obj_trans,
obj_trans_kwargs = obj_trans_kwargs,
nineqcv = nineqcv,
ineqcv_wt = ineqcv_wt,
ineqcv_trans = ineqcv_trans,
ineqcv_trans_kwargs = ineqcv_trans_kwargs,
neqcv = neqcv,
eqcv_wt = eqcv_wt,
eqcv_trans = eqcv_trans,
eqcv_trans_kwargs = eqcv_trans_kwargs,
ndset_wt = ndset_wt,
ndset_trans = ndset_trans,
ndset_trans_kwargs = ndset_trans_kwargs,
rng = rng,
soalgo = soalgo,
moalgo = moalgo
)
############################ Object Properties #############################
@property
def soalgo(self) -> RealOptimizationAlgorithm:
"""Single-objective optimization algorithm."""
return self._soalgo
@soalgo.setter
def soalgo(self, value: Union[RealOptimizationAlgorithm,None]) -> None:
"""Set single-objective optimization algorithm."""
if value is None:
# construct default multi-objective algorithm
value = RealGeneticAlgorithm(
ngen = 250, # number of generations to evolve
pop_size = 100 # number of parents in population
)
# construct default hillclimber
# value = SteepestDescentRealHillClimber(self.rng)
check_is_RealOptimizationAlgorithm(value, "soalgo")
self._soalgo = value
@property
def moalgo(self) -> RealOptimizationAlgorithm:
"""Multi-objective opimization algorithm."""
return self._moalgo
@moalgo.setter
def moalgo(self, value: Union[RealOptimizationAlgorithm,None]) -> None:
"""Set multi-objective opimization algorithm."""
if value is None:
# construct default multi-objective algorithm
value = NSGA2RealGeneticAlgorithm(
ngen = 250, # number of generations to evolve
pop_size = 100 # number of parents in population
)
check_is_RealOptimizationAlgorithm(value, "moalgo")
self._moalgo = value
############################## Object Methods ##############################
########## Optimization Problem Construction ###########
[docs]
@abstractmethod
def problem(
self,
pgmat: PhasedGenotypeMatrix,
gmat: GenotypeMatrix,
ptdf: pandas.DataFrame,
bvmat: BreedingValueMatrix,
gpmod: GenomicModel,
t_cur: Integral,
t_max: Integral,
**kwargs: dict
) -> RealSelectionProblem:
"""
Create an optimization problem definition using provided inputs.
Parameters
----------
pgmat : PhasedGenotypeMatrix
Genomes
gmat : GenotypeMatrix
Genotypes
ptdf : pandas.DataFrame
Phenotype dataframe
bvmat : BreedingValueMatrix
Breeding value matrix
gpmod : GenomicModel
Genomic prediction model
t_cur : int
Current generation number.
t_max : int
Maximum (deadline) generation number.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : RealSelectionProblem
An optimization problem definition.
"""
raise NotImplementedError("method is abstract")
################ Single Objective Solve ################
[docs]
def sosolve(
self,
pgmat: PhasedGenotypeMatrix,
gmat: GenotypeMatrix,
ptdf: pandas.DataFrame,
bvmat: BreedingValueMatrix,
gpmod: GenomicModel,
t_cur: Integral,
t_max: Integral,
miscout: Optional[dict] = None,
**kwargs: dict
) -> RealSelectionSolution:
"""
Solve the selection problem using a single-objective optimization algorithm.
Parameters
----------
pgmat : PhasedGenotypeMatrix
Genomes
gmat : GenotypeMatrix
Genotypes
ptdf : pandas.DataFrame
Phenotype dataframe
bvmat : BreedingValueMatrix
Breeding value matrix
gpmod : GenomicModel
Genomic prediction model
t_cur : int
Current generation number.
t_max : int
Maximum (deadline) generation number.
miscout : dict, None
Pointer to a dictionary for miscellaneous user defined output.
If ``dict``, write to dict (may overwrite previously defined fields).
If ``None``, user defined output is not calculated or stored.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : Solution
A single-objective solution to the posed selection problem.
"""
# check the number of objectives and raise error if needed
if self.nobj != 1:
raise RuntimeError("{0} instance is not single-objective in nature: expected nobj == 1 but received nobj == {1}".format(type(self).__name__,self.nobj))
# construct the problem
prob = self.problem(
pgmat = pgmat,
gmat = gmat,
ptdf = ptdf,
bvmat = bvmat,
gpmod = gpmod,
t_cur = t_cur,
t_max = t_max,
**kwargs
)
# optimize the problem
soln = self.soalgo.minimize(
prob = prob,
miscout = miscout
)
# convert real solution to real selection solution
# this has the exact same metadata as a RealSolution
out = RealSelectionSolution(
ndecn = soln.ndecn,
decn_space = soln.decn_space,
decn_space_lower = soln.decn_space_lower,
decn_space_upper = soln.decn_space_upper,
nobj = soln.nobj,
obj_wt = soln.obj_wt,
nineqcv = soln.nineqcv,
ineqcv_wt = soln.ineqcv_wt,
neqcv = soln.neqcv,
eqcv_wt = soln.eqcv_wt,
nsoln = soln.nsoln,
soln_decn = soln.soln_decn,
soln_obj = soln.soln_obj,
soln_ineqcv = soln.soln_ineqcv,
soln_eqcv = soln.soln_eqcv
)
return out
############## Pareto Frontier Functions ###############
[docs]
def mosolve(
self,
pgmat: PhasedGenotypeMatrix,
gmat: GenotypeMatrix,
ptdf: pandas.DataFrame,
bvmat: BreedingValueMatrix,
gpmod: GenomicModel,
t_cur: Integral,
t_max: Integral,
miscout: Optional[dict] = None,
**kwargs: dict
) -> RealSelectionSolution:
"""
Calculate a Pareto frontier for objectives.
Parameters
----------
pgmat : PhasedGenotypeMatrix
Genomes
gmat : GenotypeMatrix
Genotypes
ptdf : pandas.DataFrame
Phenotype dataframe
bvmat : BreedingValueMatrix
Breeding value matrix
gpmod : GenomicModel
Genomic prediction model
t_cur : int
Current generation number.
t_max : int
Maximum (deadline) generation number.
miscout : dict, None
Pointer to a dictionary for miscellaneous user defined output.
If ``dict``, write to dict (may overwrite previously defined fields).
If ``None``, user defined output is not calculated or stored.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : tuple
A tuple containing two objects ``(frontier, sel_config)``.
Where:
- ``frontier`` is a ``numpy.ndarray`` of shape ``(q,v)`` containing
Pareto frontier points.
- ``sel_config`` is a ``numpy.ndarray`` of shape ``(q,k)`` containing
parent selection decisions for each corresponding point in the
Pareto frontier.
Where:
- ``q`` is the number of points in the frontier.
- ``v`` is the number of objectives for the frontier.
- ``k`` is the number of search space decision variables.
"""
# check the number of objectives and raise error if needed
if self.nobj <= 1:
raise RuntimeError("{0} instance is not multi-objective in nature: expected nobj > 1 but received nobj == {1}".format(type(self).__name__,self.nobj))
# construct the problem
prob = self.problem(
pgmat = pgmat,
gmat = gmat,
ptdf = ptdf,
bvmat = bvmat,
gpmod = gpmod,
t_cur = t_cur,
t_max = t_max,
**kwargs
)
# optimize the problem
soln = self.moalgo.minimize(
prob = prob,
miscout = miscout
)
# convert real solution to real selection solution
# this has the exact same metadata as a RealSolution
out = RealSelectionSolution(
ndecn = soln.ndecn,
decn_space = soln.decn_space,
decn_space_lower = soln.decn_space_lower,
decn_space_upper = soln.decn_space_upper,
nobj = soln.nobj,
obj_wt = soln.obj_wt,
nineqcv = soln.nineqcv,
ineqcv_wt = soln.ineqcv_wt,
neqcv = soln.neqcv,
eqcv_wt = soln.eqcv_wt,
nsoln = soln.nsoln,
soln_decn = soln.soln_decn,
soln_obj = soln.soln_obj,
soln_ineqcv = soln.soln_ineqcv,
soln_eqcv = soln.soln_eqcv
)
return out
################# Selection Functions ##################
[docs]
def select(
self,
pgmat: PhasedGenotypeMatrix,
gmat: GenotypeMatrix,
ptdf: pandas.DataFrame,
bvmat: BreedingValueMatrix,
gpmod: GenomicModel,
t_cur: Integral,
t_max: Integral,
miscout: Optional[dict] = None,
**kwargs: dict
) -> RealSelectionConfiguration:
"""
Select individuals for breeding.
Parameters
----------
pgmat : PhasedGenotypeMatrix
Genomes
gmat : GenotypeMatrix
Genotypes
ptdf : pandas.DataFrame
Phenotype dataframe
bvmat : BreedingValueMatrix
Breeding value matrix
gpmod : GenomicModel
Genomic prediction model
t_cur : int
Current generation number.
t_max : int
Maximum (deadline) generation number.
miscout : dict, None
Pointer to a dictionary for miscellaneous user defined output.
If ``dict``, write to dict (may overwrite previously defined fields).
If ``None``, user defined output is not calculated or stored.
kwargs : dict
Additional keyword arguments.
Returns
-------
out : RealSelectionConfiguration
A selection configuration object, requiring all necessary information to mate individuals.
"""
# if the number of objectives is 1, then we use a single objective algorithm
if self.nobj == 1:
# solve the single-objective problem
sosoln = self.sosolve(
pgmat = pgmat,
gmat = gmat,
ptdf = ptdf,
bvmat = bvmat,
gpmod = gpmod,
t_cur = t_cur,
t_max = t_max,
miscout = miscout,
**kwargs
)
# add solution to miscout if provided.
if miscout is not None:
miscout["sosoln"] = sosoln
# construct a RealSelectionConfiguration
selcfg = RealSelectionConfiguration(
ncross = self.ncross,
nparent = self.nparent,
nmating = self.nmating,
nprogeny = self.nprogeny,
pgmat = pgmat,
xconfig_decn = sosoln.soln_decn[0],
rng = None
)
return selcfg
# else, we use a multi-objective algorithm and apply a transformation on the non-dominated points to identify a
# multi-objective method: objfn_trans returns a multiple values for each
# selection configuration
elif self.nobj > 1:
# solve the multi-objective problem
mosoln = self.mosolve(
pgmat = pgmat,
gmat = gmat,
ptdf = ptdf,
bvmat = bvmat,
gpmod = gpmod,
t_cur = t_cur,
t_max = t_max,
miscout = miscout,
**kwargs
)
# get scores for each of the points along the non-dominated set
# (nndpts,)
score = self.ndset_wt * self.ndset_trans(
mosoln.soln_obj,
**self.ndset_trans_kwargs
)
# get index of maximum score
ix = score.argmax()
# add solution to miscout if provided.
if miscout is not None:
miscout["mosoln"] = mosoln
# construct a RealSelectionConfiguration
selcfg = RealSelectionConfiguration(
ncross = self.ncross,
nparent = self.nparent,
nmating = self.nmating,
nprogeny = self.nprogeny,
pgmat = pgmat,
xconfig_decn = mosoln.soln_decn[ix],
rng = None
)
return selcfg
# else raise an error as the number of objectives is an illegal value
else:
raise ValueError("number of objectives must be greater than zero")
################################## Utilities ###################################
[docs]
def check_is_RealSelectionProtocol(v: object, vname: str) -> None:
"""
Check if object is of type RealSelectionProtocol, otherwise raise TypeError.
Parameters
----------
v : object
Any Python object to test.
vname : str
Name of variable to print in TypeError message.
"""
if not isinstance(v, RealSelectionProtocol):
raise TypeError("variable '{0}' must be of type '{1}' but received type '{2}'".format(vname,RealSelectionProtocol.__name__,type(v).__name__))