Source code for emat.model.meta_model

# -*- coding: utf-8 -*-

import pandas
import numpy
import warnings
from typing import Mapping
from ..learn.base import clone_or_construct
from ..learn.boosting import LinearAndGaussian
from ..util.one_hot import OneHotCatEncoder
from ..util.variance_threshold import VarianceThreshold
from ..experiment.experimental_design import batch_pick_new_experiments, minimum_weighted_distance
from ..database.database import Database
from ..scope.scope import Scope
from ..exceptions import ReadOnlyDatabaseError

from ..util.loggers import get_module_logger
_logger = get_module_logger(__name__)


[docs]def create_metamodel( scope, experiments=None, metamodel_id=None, db=None, include_measures=None, exclude_measures=None, random_state=None, experiment_stratification=None, suppress_converge_warnings=False, regressor=None, name=None, design_name=None, find_best_metamodeltype=False, ): """ Create a MetaModel from a set of input and output observations. Args: scope (emat.Scope): The scope for this model. experiments (pandas.DataFrame): This dataframe should contain all of the experimental inputs and outputs, including values for each uncertainty, level, constant, and performance measure. metamodel_id (int, optional): An identifier for this meta-model. If not given, a unique id number will be created randomly (if not `db` is given) or sequentially based on any existing metamodels already stored in the database. db (Database, optional): The database to use for loading and saving metamodels. If none is given here, the metamodel will not be stored in a database otherwise, the metamodel is automatically saved to the database after it is created. include_measures (Collection[str], optional): If provided, only output performance measures with names in this set will be included. exclude_measures (Collection[str], optional): If provided, only output performance measures with names not in this set will be included. random_state (int, optional): A random state to use in the metamodel regression fitting. experiment_stratification (pandas.Series, optional): A stratification of experiments, used in cross-validation. suppress_converge_warnings (bool, default False): Suppress convergence warnings during metamodel fitting. regressor (Estimator, optional): A scikit-learn estimator implementing a multi-target regression. If not given, a detrended simple Gaussian process regression is used. name (str, optional): A descriptive name for this metamodel. design_name (str, optional): The name of the design of experiments from `db` to use to create the metamodel. Only used if `experiments` is not given explicitly. find_best_metamodeltype (int, default 0): Run a search to find the best metamodeltype for each performance measure, repeating each cross-validation step this many times. For more stable results, choose 3 or more, although larger numbers will be slow. If domain knowledge about the normal expected range and behavior of each performance measure is available, it is better to give the metamodeltype explicitly in the Scope. Returns: PythonCoreModel: a callable object that, when called as if a function, accepts keyword arguments as inputs and returns a dictionary of (measure name: value) pairs. """ _logger.info("creating metamodel from data") from .core_python import PythonCoreModel if experiments is None: if design_name is None or db is None: raise ValueError('must give `experiments` as a DataFrame or both `db` and `design_name`') experiments = db.read_experiment_all(scope.name, design_name, only_with_measures=True) experiments = scope.ensure_dtypes(experiments) _actually_excluded_measures = [] meas = [] for j in scope.get_measure_names(): if include_measures is not None and j not in include_measures: _actually_excluded_measures.append(j) continue if exclude_measures is not None and j in exclude_measures: _actually_excluded_measures.append(j) continue if j not in experiments: _actually_excluded_measures.append(j) continue j_na_count = experiments[j].isna().sum() if j_na_count == len(experiments[j]): warnings.warn(f"measure '{j}' is all missing data, excluding it from the metamodel") _actually_excluded_measures.append(j) continue if j_na_count: warnings.warn(f"measure '{j}' has some missing data, excluding it from the metamodel") _actually_excluded_measures.append(j) continue # TODO: allow for development of meta-models with the non-missing parts meas.append(j) experiment_outputs = experiments[meas] params = [] for j in scope.get_parameter_names(): if j not in experiments: continue params.append(j) experiment_inputs = experiments[params] if metamodel_id is None: if db is not None: metamodel_id = db.get_new_metamodel_id(scope.name) if metamodel_id is None: metamodel_id = numpy.random.randint(1<<32, 1<<63, dtype='int64') if find_best_metamodeltype: output_transforms, metamodeltype_tabulation = select_best_metamodeltype( experiment_inputs, experiment_outputs, return_tabulation=True ) output_transforms = dict(output_transforms) else: output_transforms = { i.name: i.metamodeltype for i in scope.get_measures() if i.name in meas } metamodeltype_tabulation = None # change log tranforms to log1p when the experimental minimum is # non-positive but not less than -1 for i, i_transform in output_transforms.items(): if i_transform == 'log' and i in experiment_outputs: i_min = experiment_outputs[i].min() if -1 < i_min <= 0: output_transforms[i] = 'log1p' disabled_outputs = [i for i in scope.get_measure_names() if i not in experiment_outputs.columns] func = MetaModel( experiment_inputs, experiment_outputs, metamodel_types=output_transforms, disabled_outputs=disabled_outputs, random_state=random_state, sample_stratification=experiment_stratification, suppress_converge_warnings=suppress_converge_warnings, regressor=regressor, ) if metamodeltype_tabulation is not None: func.metamodeltype_tabulation = metamodeltype_tabulation scope_ = scope.duplicate( strip_measure_transforms=True, include_measures=include_measures, exclude_measures=_actually_excluded_measures, ) result = PythonCoreModel( func, configuration=None, scope=scope_, safe=True, db=db, name=name or f"MetaModel{metamodel_id}", metamodel_id=metamodel_id, ) if db is not None: try: db.write_metamodel(result) except ReadOnlyDatabaseError: pass # read only database, don't store except Exception as err: _logger.exception("exception in storing metamodel in database") return result
[docs]class MetaModel: """ A gaussian process regression-based meta-model. The MetaModel is a callable object that provides an EMA Workbench standard python interface, taking keyword arguments for parameters and returning a python dictionary of named outcomes. Args: input_sample (pandas.DataFrame): A set of experimental parameters, where each row in the dataframe is an experiment that has already been evaluated using the core model. Each column will be a required keyword parameter when calling this meta-model. output_sample (pandas.DataFrame): A set of experimental performance measures, where each row in the dataframe is the results of the experiment evaluated using the core model. Each column of this dataframe will be a named output value in the returned dictionary from calling this meta-model. metamodel_types (Mapping, optional): If given, the keys of this mapping should include a subset of the columns in `output_sample`, and the values indicate the metamodel type for each performance measure, given as `str`. Available metamodel types include: + *log*: The natural log of the performance measure is taken before fitting the regression model. This is appropriate only when the performance measure will always give a strictly positive outcome. If the performance measure can take on non-positive values, this may result in errors. + *log1p*: The natural log of 1 plus the performance measure is taken before fitting the regression model. This is preferred to log-linear when the performance measure is only guaranteed to be non-negative, rather than strictly positive. + *logxp(X)*: The natural log of X plus the performance measure is taken before fitting the regression model. This allows shifting the position of the regression intercept to a point other than 0. + *clip(LO,HI)*: A linear model is used, but results are truncated to the range (LO,HI). Set either value as None to have a one-sided truncation range. + *linear*: No transforms are made. This is the default when a performance measure is not included in `metamodel_types`. disabled_outputs (Collection, optional): A collection of disabled outputs. All names included in this collection will be returned in the resulting outcomes dictionary when this meta-model is evaluated, but with a value of `None`. It is valid to include names in `disabled_outputs` that are included in the columns of `output_sample`, although the principal use of this argument is to include names that are *not* included, as disabling outputs that are included will not prevent these values from being included in the computational process. random_state (int, optional): A random state, passed to the created regression (but only if that regressor includes a 'random_state' parameter). regressor (Estimator, optional): A scikit-learn estimator implementing a multi-target regression. If not given, a detrended simple Gaussian process regression is used. """ _metamodel_types = { 'log': (numpy.log, numpy.exp), 'log-linear': (numpy.log, numpy.exp), 'ln': (numpy.log, numpy.exp), 'log1p': (numpy.log1p, numpy.expm1), 'log1p-linear': (numpy.log1p, numpy.expm1), 'exp': (numpy.exp, numpy.log), 'logit': (lambda x: numpy.log(x/(1-x)), lambda x: numpy.exp(x)/(numpy.exp(x)+1)), # x is applied immediate from arguments in metamodeltype string, y is the eventual data 'logxp': (lambda x: (lambda y: numpy.log(y + x)), lambda x: (lambda y: numpy.exp(y) - x)), 'logxp-linear': (lambda x: (lambda y: numpy.log(y + x)), lambda x: (lambda y: numpy.exp(y) - x)), 'clip': (lambda x: (lambda y: y), lambda x: (lambda y: numpy.clip(y, *x))), } def __init__( self, input_sample, output_sample, metamodel_types=None, disabled_outputs=None, random_state=None, sample_stratification=None, suppress_converge_warnings=False, regressor=None, use_best_cv=True, ): if not isinstance(input_sample, pandas.DataFrame): raise TypeError('input_sample must be DataFrame') if not isinstance(output_sample, pandas.DataFrame): raise TypeError('output_sample must be DataFrame') self.raw_input_columns = input_sample.columns self.disabled_outputs = disabled_outputs # One-hot encode here and save the mapping self.cat_encoder = OneHotCatEncoder().fit(input_sample) input_sample = self.cat_encoder.transform(input_sample) input_sample = input_sample.astype(numpy.float64) self.var_thresh = VarianceThreshold().fit(input_sample) input_sample = self.var_thresh.transform(input_sample) self.input_sample = input_sample self.output_sample = output_sample.copy(deep=(metamodel_types is not None)).astype(float) self.sample_stratification = sample_stratification self.output_transforms = {} if metamodel_types is not None: self.metamodel_types = metamodel_types for k,t in metamodel_types.items(): if t is None: continue if "(" in t: t, t_args = t.split("(", 1) import ast t_args = ast.literal_eval(t_args.strip("()")) if isinstance(t, str): t = t.lower() if t == 'linear': continue if t not in self._metamodel_types: raise ValueError(f'unknown metamodeltype "{t}" for output "{k}"') self.output_transforms[k] = ( self._metamodel_types[t][0](t_args), self._metamodel_types[t][1](t_args) ) else: if isinstance(t, str): t = t.lower() if t == 'linear': continue if t not in self._metamodel_types: raise ValueError(f'unknown metamodeltype "{t}" for output "{k}"') self.output_transforms[k] = self._metamodel_types[t] for k, (v_func,_) in self.output_transforms.items(): self.output_sample[k] = v_func(self.output_sample[k]) if regressor is None: regressor = LinearAndGaussian() self.regression = clone_or_construct(regressor) if random_state is not None and 'random_state' in self.regression.get_params(): self.regression.set_params(random_state=random_state) if suppress_converge_warnings: from sklearn.exceptions import ConvergenceWarning import warnings with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=ConvergenceWarning) self.regression.fit(self.input_sample, self.output_sample) else: self.regression.fit(self.input_sample, self.output_sample) if use_best_cv: from ..learn.model_selection import take_best self.regression = take_best(self.regression)
[docs] def preprocess_raw_input(self, df, to_type=None): """ Preprocess raw data input. This convenience method provides batch-processing of a raw data input DataFrame into the format used for regression. Args: df (pandas.DataFrame): The raw input data to process, which can include input values for multiple experiments. to_type (dtype, optional): If given, the entire resulting DataFrame is cast to this data type. Returns: pandas.DataFrame """ result = self.cat_encoder.transform(df[self.raw_input_columns]) if to_type is not None: result = result.astype(to_type) result = self.var_thresh.transform(result) return result
[docs] def __call__(self, *args, **kwargs): """ Evaluate the meta-model. Args: **kwargs: All defined (meta)model parameters are passed as keyword arguments, including both uncertainties and levers. Returns: dict: A single dictionary containing all performance measure outcomes. """ if len(args) == 1: if isinstance(args[0], pandas.DataFrame): return args[0].apply( lambda x: pandas.Series(self.__call__(**x)), axis=1, ) else: raise TypeError(f'mm(...) optionally takes a DataFrame as a ' f'positional argument, not {type(args[0])}') elif len(args) > 1: raise TypeError(f'mm(...) takes at most one ' f'positional argument, not {len(args)}') input_row = pandas.DataFrame.from_dict(kwargs, orient='index').T[self.raw_input_columns] input_row = self.preprocess_raw_input(input_row, to_type=numpy.float) output_row = self.regression.predict(input_row) result = dict(output_row.iloc[0]) # undo the output transforms for k, (_,v_func) in self.output_transforms.items(): result[k] = v_func(result[k]) for i in self.disabled_outputs: result[i] = None return result
[docs] def compute_std(self, *args, **kwargs): """ Evaluate standard deviations of estimates generated by the meta-model. Args: df (pandas.DataFrame, optional) **kwargs: All defined (meta)model parameters are passed as keyword arguments, including both uncertainties and levers. Returns: dict: A single dictionary containing the standard deviaition of the estimate of all performance measure outcomes. """ if len(args) == 1: if isinstance(args[0], pandas.DataFrame): return args[0].apply( lambda x: pandas.Series(self.compute_std(**x)), axis=1, ) else: raise TypeError(f'compute_std() optionally takes a DataFrame as a ' f'positional argument, not {type(args[0])}') elif len(args) > 1: raise TypeError(f'compute_std() takes at most one ' f'positional argument, not {len(args)}') input_row = pandas.DataFrame.from_dict(kwargs, orient='index').T[self.raw_input_columns] input_row = self.preprocess_raw_input(input_row, to_type=numpy.float) output_row, output_std = self.regression.predict(input_row, return_std=True) result = dict(output_std.iloc[0]) # DO NOT undo the output transforms # for k, (_,v_func) in self.output_transforms.items(): # result[k] = v_func(result[k]) for i in self.disabled_outputs: result[i] = None return result
[docs] def predict(self, *args, trend_only=False, residual_only=False, **kwargs): """ Generate predictions using the meta-model. Args: df (pandas.DataFrame, optional) trend_only, residual_only (bool) **kwargs: All defined (meta)model parameters are passed as keyword arguments, including both uncertainties and levers. Returns: dict: A single dictionary containing the standard deviaition of the estimate of all performance measure outcomes. """ if len(args) == 1: if isinstance(args[0], pandas.DataFrame): input_row = args[0][self.raw_input_columns] input_row = self.preprocess_raw_input(input_row, to_type=numpy.float64) if trend_only: result = self.regression.lr.predict(input_row) elif residual_only: result = self.regression.gpr.predict(input_row) else: result = self.regression.predict(input_row) # undo the output transforms for k, (_, v_func) in self.output_transforms.items(): result[k] = v_func(result[k]) if self.disabled_outputs: drop_cols = [i for i in self.disabled_outputs if i in result.columns] result = result.drop(drop_cols, axis=1) return result else: raise TypeError(f'predict() optionally takes a DataFrame as a ' f'positional argument, not {type(args[0])}') elif len(args) > 1: raise TypeError(f'predict() takes at most one ' f'positional argument, not {len(args)}') input_row = pandas.DataFrame.from_dict(kwargs, orient='index').T[self.raw_input_columns] input_row = self.preprocess_raw_input(input_row, to_type=numpy.float64) if trend_only: output_row = self.regression.detrend_predict(input_row) elif residual_only: output_row = self.regression.residual_predict(input_row) else: output_row = self.regression.predict(input_row) result = dict(output_row.iloc[0]) # undo the output transforms for k, (_,v_func) in self.output_transforms.items(): result[k] = v_func(result[k]) for i in self.disabled_outputs: result[i] = None return result
[docs] def cross_val_scores( self, cv=5, gpr_only=False, use_cache=True, return_type='styled', shortnames=None, **kwargs, ): """ Calculate the cross validation scores for this meta-model. Args: cv (int, default 5): The number of folds to use in cross-validation. gpr_only (bool, default False): Whether to limit the cross-validation analysis to only the GPR step (i.e., to measure the improvement in meta-model fit from using the GPR-based meta-model, over and above using the linear regression meta-model alone.) use_cache (bool, default True): Use cached cross validation results if available. All other arguments are ignored if a cached results if available. return_type ({'styled', 'raw'}): How to return the results. shortnames (Scope or callable): If given, use this function to convert the measure names into more readable `shortname` values from the scope, or by using a function that maps measures names to something else. Returns: pandas.Series: The cross-validation scores, by output. """ result = None if use_cache: cached_value = getattr(self, '_cv_cache', None) if cached_value is not None: result = cached_value if result is None: if self.sample_stratification is not None: from ..learn.splits import ExogenouslyStratifiedKFold cv = ExogenouslyStratifiedKFold(exo_data=self.sample_stratification, n_splits=cv) if gpr_only: raise NotImplementedError # residuals = self.regression.residual_predict(self.input_sample) # regression = multitarget.MultipleTargetRegression() # return regression.cross_val_scores(self.input_sample, residuals, cv=cv) result = self.regression.cross_val_scores(self.input_sample, self.output_sample, cv=cv, **kwargs) result.name = "Cross Validation Score" if use_cache: self._cv_cache = result if shortnames is not None: if isinstance(shortnames, Scope): result.index = result.index.map(shortnames.shortname) else: result.index = result.index.map(shortnames) if return_type == 'styled': from ..util.styling import cross_validation_styling return cross_validation_styling(result) return result
[docs] def cross_val_predicts(self, cv=5): """ Generate cross validated predictions using this meta-model. Args: cv (int, default 5): The number of folds to use in cross-validation. Set to zero for leave-one-out (i.e., the maximum number of folds), which may be quite slow. Returns: pandas.DataFrame: The cross-validated predictions. """ if cv==0: cv = len(self.input_sample) return self.regression.cross_val_predict(self.input_sample, self.output_sample, cv=cv)
def __repr__(self): in_dim = len(self.raw_input_columns) out_dim = len(self.output_sample.columns) if self.disabled_outputs: out_dims = f"{out_dim} active and {out_dim + len(self.disabled_outputs)} total outputs" else: out_dims = f"{out_dim} outputs" return f"<emat.MetaModel {in_dim} inputs -> {out_dims}>"
[docs] def get_length_scales(self): """ Get the length scales from the GPR kernels of this metamodel. This MetaModel must already be `fit` to use this method, although the fit process is generally completed when the MetaModel is instantiated. Returns: pandas.DataFrame: The columns correspond to the columns of pre-processed input (not raw input) and the rows correspond to the outputs. """ return pandas.DataFrame( [ est.kernel_.length_scale for est in self.regression.step1.estimators_ ], index=self.regression.Y_columns, columns=self.input_sample.columns, ).T
[docs] def mix_length_scales(self, balance=None, inv=True): """ Mix the length scales from the GPR kernels of this metamodel. This MetaModel must already be `fit` to use this method, although the fit process is generally completed when the MetaModel is instantiated. Args: balance (Mapping or Collection, optional): When given as a mapping, the keys are the output measures that are included in the mix, and the values are the relative weights to use for mixing. When given as a collection, the items are the output measures that are included in the mix, all with equal weight. inv (bool, default True): Take the inverse of the length scales before mixing. Returns: ndarray: The columns correspond to the columns of pre-processed input (not raw input) and the rows correspond to the outputs. """ s = self.get_length_scales() if inv: s = s.rtruediv(1, fill_value=1) # s = 1/s if balance is None: w = numpy.full(len(s.columns), 1.0/len(s.columns)) elif isinstance(balance, Mapping): w = numpy.zeros(len(s.columns)) for i,col in enumerate(s.columns): w[i] = balance.get(col, 0) else: w = numpy.zeros(len(s.columns)) balance = set(balance) each_w = 1/len(balance) for i,col in enumerate(s.columns): w[i] = each_w if col in balance else 0 return numpy.dot(s, w)
[docs] def pick_new_experiments( self, possible_experiments, batch_size, output_focus=None, scope: Scope=None, db: Database=None, design_name: str=None, debug=None, future_experiments=None, future_experiments_std=None, ): """ Select a set of new experiments to perform from a pool of candidates. This method implements the "maximin" approach described by Johnson et al (1990), as proposed for batch-sequential augmentation of designs by Loeppky et al (2010). New experiments are selected from a pool of possible new experiments by maximizing the minimum distance between the set of selected experiments, with distances between experiments scaled by the correlation parameters from a GP regression fitted to the initial experimental results. Note that the "binning" aspect of Loeppky is not presently implemented here, instead favoring the analyst's capability to manually focus the new experiments by manipulating the input `possible_experiments`. We also extend Loeppky et al by allowing for multiple output models, mixing the results from a selected set of outputs, to potentially focus the information from the new experiments on a subset of output measures. Args: possible_experiments: A pool of possible experiments. All selected experiments will be selected from this pool, so the pool should be sufficiently large and diverse to provide requried support for this process. batch_size (int): How many experiments to select from `possible_experiments`. output_focus (Mapping or Collection, optional): A subset of output measures that will be the focus of these new experiments. The length scales of these measures will be mixed when developing relative weights. scope (Scope, optional): The exploratory scope to use for writing the design to a database. Ignored unless `db` is also given. db (Database, optional): If provided, this design will be stored in the database indicated. Ignored unless `scope` is also given. design_name (str, optional): A name for this design, to identify it in the database. If not given, a unique name will be generated. Has no effect if no `db` or `scope` is given. debug (Tuple[str,str], optional): The names of x and y axis to plot for debugging. Returns: pandas.DataFrame: A subset of rows from `possible_experiments` References: - Johnson, M.E., Moore, L.M., and Ylvisaker, D., 1990. "Minimax and maximin distance designs." Journal of Statistical Planning and Inference 26, 131–148. - Loeppky, J., Moore, L., and Williams, B.J., 2010. "Batch sequential designs for computer experiments." Journal of Statistical Planning and Inference 140, 1452–1464. """ dimension_weights = self.mix_length_scales(output_focus, inv=True) if debug: _logger.info(f"output_focus = {output_focus}") _logger.info(f"length_scales =\n{self.get_length_scales()}") _logger.info(f"dimension_weights = {dimension_weights}") possible_experiments_processed = self.preprocess_raw_input(possible_experiments, float) picks = batch_pick_new_experiments( self.input_sample, possible_experiments_processed, batch_size, dimension_weights, future_experiments, future_experiments_std, debug=debug, ) design = possible_experiments.loc[picks.index] # If using the default design_name, append the design_name with a number # until a new unused name is found. if db is not None and scope is not None and design_name is None: proposed_design_name = 'augment' existing_design_names = set(db.read_design_names(scope.name)) if proposed_design_name not in existing_design_names: design_name = proposed_design_name else: n = 2 while f'{proposed_design_name}_{n}' in existing_design_names: n += 1 design_name = f'{proposed_design_name}_{n}' if db is not None and scope is not None: experiment_ids = db.write_experiment_parameters(scope.name, design_name, design) design.index = experiment_ids design.index.name = 'experiment' if debug: debug_x, debug_y = debug mwd = minimum_weighted_distance( self.input_sample, possible_experiments, dimension_weights ) from matplotlib import pyplot as plt plt.clf() plt.scatter(possible_experiments[debug_x], possible_experiments[debug_y], c=mwd) plt.scatter(self.input_sample[debug_x], self.input_sample[debug_y], color='red') plt.scatter(design[debug_x], design[debug_y], color="red", marker='x') plt.show() return design
def heuristic_pick_experiment( self, candidate_experiments, poorness_of_fit, candidate_density, plot=True, ): candidate_std = self.compute_std(candidate_experiments) candidate_raw_value = (poorness_of_fit * candidate_std).sum(axis=1) candidate_wgt_value = candidate_raw_value * candidate_density proposed_experiment = candidate_wgt_value.idxmax() if plot: from matplotlib import pyplot as plt fig, axs = plt.subplots(1, 1, figsize=(4, 4)) axs.scatter( candidate_experiments.iloc[:, 0], candidate_experiments.iloc[:, 1], c=candidate_wgt_value, ) axs.scatter( candidate_experiments.iloc[:, 0].loc[proposed_experiment], candidate_experiments.iloc[:, 1].loc[proposed_experiment], color="red", marker='x', ) plt.show() plt.close(fig) return proposed_experiment def heuristic_batch_pick_experiment( self, batch_size, candidate_experiments, scope, poorness_of_fit=None, plot=True, ): _logger.info(f"computing density") candidate_density = candidate_experiments.apply(lambda x: scope.get_density(x), axis=1) if poorness_of_fit is None: _logger.info(f"computing poorness of fit") crossval = self.cross_val_scores() poorness_of_fit = dict(1 - crossval) proposed_candidate_ids = set() proposed_candidates = None _logger.info(f"populating initial batch") for i in range(batch_size): self.regression.set_hypothetical_training_points(proposed_candidates) proposed_id = self.heuristic_pick_experiment( candidate_experiments, poorness_of_fit, candidate_density, plot=plot, ) proposed_candidate_ids.add(proposed_id) proposed_candidates = candidate_experiments.loc[proposed_candidate_ids] proposed_candidate_ids = list(proposed_candidate_ids) _logger.info(f"initial batch complete, checking for exchanges") # Exchanges n_exchanges = 1 while n_exchanges > 0: n_exchanges = 0 for i in range(batch_size): provisionally_dropping = proposed_candidate_ids[i] self.regression.set_hypothetical_training_points( candidate_experiments.loc[set(proposed_candidate_ids) - {provisionally_dropping}] ) provisional_replacement = self.heuristic_pick_experiment( candidate_experiments, poorness_of_fit, candidate_density, plot=plot, ) if provisional_replacement not in proposed_candidate_ids: n_exchanges += 1 proposed_candidate_ids[i] = provisional_replacement _logger.info(f"replacing {provisionally_dropping} with {provisional_replacement}") _logger.info(f"{n_exchanges} exchanges completed.") self.regression.clear_hypothetical_training_points() return proposed_candidates
def select_best_metamodeltype( params, measures, random_state=0, suppress_converge_warnings=True, possible_types=None, n_repeats=3, regressor=None, return_tabulation=False, ): if possible_types is None: possible_types = {'linear', 'log', 'log1p', 'logit', 'exp'} if regressor is None: from ..learn.boosting import LinearAndGaussian regressor = LinearAndGaussian def _metamodel_scores(t, filter_cols): if t in possible_types and len(filter_cols): result = MetaModel( params, measures[filter_cols], { i: t for i in filter_cols }, disabled_outputs=None, random_state=random_state, suppress_converge_warnings=suppress_converge_warnings, regressor=LinearAndGaussian, ).cross_val_scores( random_state=random_state, n_repeats=n_repeats, ).data result.columns = [t] return result else: return pandas.Series(-2.0, index=filter_cols, name=t) scores = [pandas.Series(-1.0, index=measures.columns, name='linear')] scores.append(_metamodel_scores('linear', measures.columns)) check_log = measures.columns[(measures.min() > 0)] scores.append(_metamodel_scores('log', check_log)) check_log1p = measures.columns[(measures.min() > -1)] scores.append(_metamodel_scores('log1p', check_log1p)) check_logit = measures.columns[(measures.min() > 0) & (measures.max() < 1)] scores.append(_metamodel_scores('logit', check_logit)) check_exp = measures.columns[(measures.max() < 10)] scores.append(_metamodel_scores('exp', check_exp)) tabulation = pandas.concat(scores, axis=1) if return_tabulation: return tabulation.idxmax(axis=1), tabulation else: return tabulation.idxmax(axis=1)