# -*- coding: utf-8 -*-
""" core_model.py - define coure model API"""
import os
import abc
import yaml
import pandas as pd
import numpy as np
import logging
import subprocess
import warnings
from contextlib import contextmanager
from typing import Union, Mapping
from ..workbench.em_framework.model import AbstractModel as AbstractWorkbenchModel
from ..workbench.em_framework.evaluators import BaseEvaluator
from typing import Collection
from typing import Iterable
from ..database.database import Database
from ..scope.scope import Scope
from ..optimization.optimization_result import OptimizationResult
from ..optimization import EpsilonProgress, ConvergenceMetrics, SolutionCount
from ..util.evaluators import prepare_evaluator
from ..exceptions import MissingArchivePathError, ReadOnlyDatabaseError, MissingIdWarning
from .._pkg_constants import *
from ..util.loggers import get_module_logger
_logger = get_module_logger(__name__)
[docs]class AbstractCoreModel(abc.ABC, AbstractWorkbenchModel):
"""
An interface for using a model with EMAT.
Individual models should be instantiated using derived
subclasses of this abstract base class, and not using
this class directly.
Args:
configuration (str or Mapping or None):
The configuration for this core model. This can be given
explicitly as a `dict`, or as a `str` which gives the
filename of a YAML file that will be loaded. If there is
no configuration, giving `None` is also acceptable.
scope (Scope or str):
The exploration scope, as a `Scope` object or as
a `str` which gives the filename of a YAML file that will be
loaded.
safe (bool):
Load the configuration YAML file in 'safe' mode.
This can be disabled if the configuration requires
custom Python types or is otherwise not compatible with
safe mode. Loading configuration files with safe mode
off is not secure and should not be done with files from
untrusted sources.
db (Database, optional):
An optional Database to store experiments and results.
name (str, default "EMAT"):
A name for this model, given as an alphanumeric string.
The name is required by workbench operations.
metamodel_id (int, optional):
An identifier for this model, if it is a meta-model.
Defaults to 0 (i.e., not a meta-model).
"""
def __init__(self,
configuration:Union[str,Mapping,None],
scope,
safe=True,
db=None,
name='EMAT',
metamodel_id=0,
):
if isinstance(configuration, str):
with open(configuration, 'r') as stream:
if safe:
configuration = yaml.safe_load(stream)
else:
configuration = yaml.load(stream, Loader=yaml.FullLoader)
if configuration is None:
configuration = {}
self.config = configuration if configuration is not None else {}
self.db = db
if isinstance(scope, Scope):
self.scope = scope
else:
self.scope = Scope(scope)
AbstractWorkbenchModel.__init__(self, name=name.replace('_','').replace(' ',''))
self.uncertainties = self.scope._x_list
self.levers = self.scope._l_list
self.constants = self.scope._c_list
self.outcomes = self.scope._m_list
self.metamodel_id = metamodel_id
def __getstate__(self):
# don't pickle the db connection
return dict((k, v) for (k, v) in self.__dict__.items() if (k != 'db'))
[docs] @abc.abstractmethod
def setup(self, params):
"""
Configure the core model with the experiment variable values.
This method is the place where the core model set up takes place,
including creating or modifying files as necessary to prepare
for a core model run. When running experiments, this method
is called once for each core model experiment, where each experiment
is defined by a set of particular values for both the exogenous
uncertainties and the policy levers. These values are passed to
the experiment only here, and not in the `run` method itself.
This facilitates debugging, as the `setup` method can potentially
be used without the `run` method, allowing the user to manually
inspect the prepared files and ensure they are correct before
actually running a potentially expensive model.
Each input exogenous uncertainty or policy lever can potentially
be used to manipulate multiple different aspects of the underlying
core model. For example, a policy lever that includes a number of
discrete future network "build" options might trigger the replacement
of multiple related network definition files. Or, a single uncertainty
relating to the cost of fuel might scale both a parameter linked to
the modeled per-mile cost of operating an automobile, as well as the
modeled total cost of fuel used by transit services.
At the end of the `setup` method, a core model experiment should be
ready to run using the `run` method.
Args:
params (dict):
experiment variables including both exogenous
uncertainty and policy levers
Raises:
KeyError:
if a defined experiment variable is not supported
by the core model
"""
[docs] @abc.abstractmethod
def get_experiment_archive_path(
self,
experiment_id=None,
makedirs=False,
parameters=None,
run_id=None,
):
"""
Returns a file system location to store model run outputs.
For core models with long model run times, it is recommended
to store the complete model run results in an archive. This
will facilitate adding additional performance measures to the
scope at a later time.
Both the scope name and experiment id can be used to create the
folder path.
Args:
experiment_id (int):
The experiment id, which is also the row id of the
experiment in the database. If this is omitted, an
experiment id is read or created using the parameters.
makedirs (bool, default False):
If this archive directory does not yet exist, create it.
parameters (dict, optional):
The parameters for this experiment, used to create or
lookup an experiment id. The parameters are ignored
if `experiment_id` is given.
run_id (UUID, optional):
The run_id of this model run. If not given but a
run_id attribute is stored in this FilesCoreModel
instance, that value is used.
Returns:
str: Experiment archive path (no trailing backslashes).
"""
[docs] @abc.abstractmethod
def run(self):
"""
Run the core model.
This method is the place where the core model run takes place.
Note that this method takes no arguments; all the input
exogenous uncertainties and policy levers are delivered to the
core model in the `setup` method, which will be executed prior
to calling this method. This facilitates debugging, as the `setup`
method can potentially be used without the `run` method, allowing
the user to manually inspect the prepared files and ensure they
are correct before actually running a potentially expensive model.
When running experiments, this method is called once for each core
model experiment, after the `setup` method completes.
If the core model requires some post-processing by `post_process`
method defined in this API, then when this function terminates
the model directory should be in a state that is ready to run the
`post_process` command next.
Raises:
UserWarning: If model is not properly setup
"""
[docs] def post_process(self, params, measure_names, output_path=None):
"""
Runs post processors associated with particular performance measures.
This method is the place to conduct automatic post-processing
of core model run results, in particular any post-processing that
is expensive or that will write new output files into the core model's
output directory. The core model run should already have
been completed using `setup` and `run`. If the relevant performance
measures do not require any post-processing to create (i.e. they
can all be read directly from output files created during the core
model run itself) then this method does not need to be overloaded
for a particular core model implementation.
Args:
params (dict):
Dictionary of experiment variables, with keys as variable names
and values as the experiment settings. Most post-processing
scripts will not need to know the particular values of the
inputs (exogenous uncertainties and policy levers), but this
method receives the experiment input parameters as an argument
in case one or more of these parameter values needs to be known
in order to complete the post-processing.
measure_names (List[str]):
List of measures to be processed. Normally for the first pass
of core model run experiments, post-processing will be completed
for all performance measures. However, it is possible to use
this argument to give only a subset of performance measures to
post-process, which may be desirable if the post-processing
of some performance measures is expensive. Additionally, this
method may also be called on archived model results, allowing
it to run to generate only a subset of (probably new) performance
measures based on these archived runs.
output_path (str, optional):
Path to model outputs. If this is not given (typical for the
initial run of core model experiments) then the local/default
model directory is used. This argument is provided primarily
to facilitate post-processing archived model runs to make new
performance measures (i.e. measures that were not in-scope when
the core model was actually run).
Raises:
KeyError:
If post process is not available for specified measure
"""
[docs] @abc.abstractmethod
def load_measures(
self,
measure_names: Collection[str]=None,
*,
rel_output_path=None,
abs_output_path=None,
) -> dict:
"""
Import selected measures from the core model.
This method is the place to put code that can actually reach into
files in the core model's run results and extract performance
measures. It is expected that it should not do any post-processing
of results (i.e. it should read from but not write to the model
outputs directory).
Imports measures from active scenario
Args:
measure_names (Collection[str]):
Collection of measures to be loaded.
rel_output_path, abs_output_path (str, optional):
Path to model output locations, either relative
to the `model_path` directory (when a subclass
is a type that has a model path) or as an absolute
directory. If neither is given, the default
value is equivalent to setting `rel_output_path` to
'Outputs'.
Returns:
dict of measure name and values from active scenario
Raises:
KeyError: If load_measures is not available for specified
measure
"""
[docs] @abc.abstractmethod
def archive(self, params, model_results_path, experiment_id:int=0):
"""
Copies model outputs to archive location.
Args:
params (dict): Dictionary of experiment variables
model_results_path (str): archive path
experiment_id (int, optional): The id number for this experiment.
"""
@property
def allow_short_circuit(self):
"""
Bool: Allow model runs to be skipped if measures already appear in the database.
"""
return self.config.get('allow_short_circuit', True)
@allow_short_circuit.setter
def allow_short_circuit(self, value):
self.config['allow_short_circuit'] = bool(value)
@property
def ignore_crash(self):
"""
Bool: Allow model runs to `post_process` and `archive` even after an apparent crash in `run`.
"""
return self.config.get('ignore_crash', False)
@ignore_crash.setter
def ignore_crash(self, value):
self.config['ignore_crash'] = bool(value)
@property
def success_indicator(self):
"""
str: The name of a file that indicates the model has run successfully.
The flag is the mere existance of a file with this name, not any particular
file content. This file is deleted automatically when the model `run` is
initiated, so that it can be recreated to indicate a success.
"""
return self.config.get('success_indicator', None)
@success_indicator.setter
def success_indicator(self, value):
self.config['success_indicator'] = value
@property
def killed_indicator(self):
"""
str: The name of a file that indicates the model was killed due to an unrecoverable error.
The flag is the mere existance of a file with this name, not any particular
file content. This file is deleted automatically when the model `run` is
initiated, so that it can be recreated to indicate an unrecoverable error.
"""
return self.config.get('killed_indicator', None)
@killed_indicator.setter
def killed_indicator(self, value):
self.config['killed_indicator'] = value
@property
def local_directory(self):
"""Path: The current local working directory for this model."""
return self.config.get("local_directory", os.getcwd())
@local_directory.setter
def local_directory(self, value):
self.config["local_directory"] = value
@property
def resolved_model_path(self):
"""
Path: The resolved model path.
For core models that don't rely on the file system, this
is set to the current working directory and is generally
irrelevant. Overload this property for models that do
rely on the file system.
"""
return self.local_directory
@property
def is_db_locked(self):
if self.db:
return self.db.is_locked
return False
@contextmanager
def lock_db(self, x=True):
if x and self.db:
with self.db.lock:
yield
else:
yield
[docs] def enter_run_model(self):
"""A hook for actions at the very beginning of the run_model step."""
[docs] def exit_run_model(self):
"""A hook for actions at the very end of the run_model step."""
def run_model(self, scenario, policy):
"""
Runs an experiment through core model.
This method overloads the `run_model` method given in
the EMA Workbench, and provides the correct execution
of a core model within the workbench framework. This
function assembles and executes the steps laid out in
other methods of this class, adding some useful logic
to optimize the process (e.g. optionally short-
circuiting runs that already have results stored
in the database).
For each experiment, the core model is called to:
1. `setup` experiment variables, copy files
as needed, and otherwise prepare to run the
core model for a particular experiment,
2. `run` the experiment,
3. `post_process` the result if needed to
produce all relevant performance measures,
4. `archive` model outputs from this experiment
(optional), and
5. `load_measures` from the experiment and
store those measures in the associated database.
Note that this method does *not* return any outcomes.
Outcomes are instead written into self.outcomes_output,
and can be retrieved from there, or from the database at
a later time.
In general, it should not be necessary to overload this
method in derived classes built for particular core models.
Instead, write overloaded methods for `setup`, `run`,
`post_process` , `archive`, and `load_measures`. Moreover,
in typical usage a modeler will generally not want to rely
on this method directly, but instead use `run_experiments`
to automatically run multiple experiments with one command.
Args:
scenario (Scenario): A dict-like object that
has key-value pairs for each uncertainty.
policy (Policy): A dict-like object that
has key-value pairs for each lever.
Raises:
UserWarning: If there are no experiments associated with
this type.
"""
self.enter_run_model()
try:
self.comment_on_run = None
_logger.debug("run_core_model read_experiment_parameters")
experiment_id = policy.get("_experiment_id_", None)
if experiment_id is None:
experiment_id = scenario.get("_experiment_id_", None)
if not hasattr(self, 'db') and hasattr(self, '_db'):
self.db = self._db
# If running a core files model using the DistributedEvaluator,
# the workers won't have access to the DB directly, so we'll only
# run the short-circuit test and the ad-hoc write-to-database
# section of this code if the `db` attribute is available.
if hasattr(self, 'db') and self.db is not None:
assert isinstance(self.db, Database)
if experiment_id is None:
with warnings.catch_warnings():
if self.is_db_locked:
warnings.simplefilter("ignore", category=MissingIdWarning)
experiment_id = self.db.read_experiment_id(self.scope.name, scenario, policy)
if experiment_id and self.allow_short_circuit:
# opportunity to short-circuit run by loading pre-computed values.
precomputed = self.db.read_experiment_measures(
self.scope,
design_name=None,
experiment_id=experiment_id,
)
if not precomputed.empty:
self.outcomes_output = dict(precomputed.iloc[0])
self.log(f"short circuit experiment_id {experiment_id} / {getattr(self, 'uid', 'no uid')}")
return
if experiment_id is None and not self.is_db_locked:
experiment_id = self.db.write_experiment_parameters_1(
self.scope.name, 'ad hoc', scenario, policy
)
self.log(f"YES DATABASE experiment_id {experiment_id}", level=logging.DEBUG)
else:
_logger.debug(f"NO DATABASE experiment_id {experiment_id}")
xl = {}
xl.update(scenario)
xl.update(policy)
m_names = self.scope.get_measure_names()
_logger.debug(f"run_core_model setup {experiment_id}")
self.setup(xl)
if self.success_indicator is not None:
success_indicator = os.path.join(self.resolved_model_path, self.success_indicator)
if os.path.exists(success_indicator):
os.remove(success_indicator)
else:
success_indicator = None
if self.killed_indicator is not None:
killed_indicator = os.path.join(self.resolved_model_path, self.killed_indicator)
if os.path.exists(killed_indicator):
os.remove(killed_indicator)
else:
killed_indicator = None
_logger.debug(f"run_core_model run {experiment_id}")
try:
self.run()
except subprocess.CalledProcessError as err:
_logger.error(f"ERROR in run_core_model run {experiment_id}: {str(err)}")
try:
ex_archive_path = self.get_experiment_archive_path(experiment_id, makedirs=True)
except MissingArchivePathError:
pass
else:
if isinstance(err, subprocess.CalledProcessError):
if err.stdout:
with open(os.path.join(ex_archive_path, 'error.stdout.log'), 'ab') as stdout:
stdout.write(err.stdout)
if err.stderr:
with open(os.path.join(ex_archive_path, 'error.stderr.log'), 'ab') as stderr:
stderr.write(err.stderr)
with open(os.path.join(ex_archive_path, 'error.log'), 'a') as errlog:
errlog.write(str(err))
measures_dictionary = {name: np.nan for name in m_names}
# Assign to outcomes_output, for ema_workbench compatibility
self.outcomes_output = measures_dictionary
if not self.ignore_crash:
# If 'ignore_crash' is False (the default), then abort now and skip
# any post-processing and other archiving steps, which will
# probably fail anyway.
self.log(f"run_core_model ABORT {experiment_id}", level=logging.ERROR)
self.comment_on_run = f"FAILED EXPERIMENT {experiment_id}: {str(err)}"
return
else:
_logger.error(f"run_core_model CONTINUE AFTER ERROR {experiment_id}")
try:
if success_indicator and not os.path.exists(success_indicator):
# The absence of the `success_indicator` file means that the model
# did not actually terminate correctly, so we do not want to
# post-process or store these results in the database.
self.comment_on_run = f"NON-SUCCESSFUL EXPERIMENT {experiment_id}: success_indicator missing"
raise ValueError(f"success_indicator missing: {success_indicator}")
if killed_indicator and os.path.exists(killed_indicator):
self.comment_on_run = f"KILLED EXPERIMENT {experiment_id}: killed_indicator present"
raise ValueError(f"killed_indicator present: {killed_indicator}")
_logger.debug(f"run_core_model post_process {experiment_id}")
self.post_process(xl, m_names)
_logger.debug(f"run_core_model wrap up {experiment_id}")
measures_dictionary = self.load_measures(m_names)
m_df = pd.DataFrame(measures_dictionary, index=[experiment_id])
except KeyboardInterrupt:
_logger.exception(
f"KeyboardInterrupt in post_process, load_measures or outcome processing {experiment_id}")
raise
except Exception as err:
_logger.exception(f"error in post_process, load_measures or outcome processing {experiment_id}")
_logger.error(f"proceeding directly to archive attempt {experiment_id}")
if not self.comment_on_run:
self.comment_on_run = f"PROBLEM IN EXPERIMENT {experiment_id}: {str(err)}"
else:
# only write to database if there was no error in post_process, load_measures or outcome processing
if experiment_id and hasattr(self, 'db') and self.db is not None and not self.db.readonly:
_logger.debug(f"run_core_model write db {experiment_id}")
run_id = getattr(self, 'run_id', None)
if run_id is None:
run_id, _ = self.db.new_run_id(
scope_name=self.scope.name,
experiment_id=experiment_id,
source=self.metamodel_id or 0,
)
try:
self.db.write_experiment_measures(self.scope.name, self.metamodel_id, m_df, [run_id])
except ReadOnlyDatabaseError:
warnings.warn("database is read-only, not storing model outcomes")
except Exception as err:
_logger.exception(f"error in writing results to database: {str(err)}")
else:
_logger.debug(f"run_core_model OK write db {experiment_id} {self.metamodel_id} {run_id}\n{m_df}")
else:
_logger.debug(f"run_core_model no db to write to {experiment_id}")
if experiment_id:
try:
ex_archive_path = self.get_experiment_archive_path(experiment_id)
except MissingArchivePathError:
pass
else:
_logger.debug(f"run_core_model archive {experiment_id}")
self.archive(xl, ex_archive_path, experiment_id)
else:
_logger.debug(f"run_core_model no archive because no experiment_id")
finally:
self.exit_run_model()
[docs] def read_experiments(
self,
design_name,
db=None,
only_pending=False,
only_complete=False,
only_with_measures=False,
):
"""
Reads results from a design of experiments from the database.
Args:
design_name (str): The name of the design to load.
db (Database, optional): The Database from which to read experiments.
If no db is given, the default `db` for this model is used.
only_pending (bool, default False): If True, only pending
experiments (which have no performance measure results
stored in the database) are returned.
only_complete (bool, default False): If True, only complete
experiments (which have no performance measure
results missing in the database) are returned.
only_with_measures (bool, default False): If True, only
experiments with at least one stored performance measure
are returned.
Returns:
pandas.DataFrame:
A DataFrame that contains all uncertainties, levers, and measures
for the experiments.
Raises:
ValueError:
If there is no Database connection `db` set.
"""
db = db if db is not None else self.db
if db is None:
raise ValueError('no database to read from')
return self.ensure_dtypes(
db.read_experiment_all(
self.scope.name,
design_name,
only_pending=only_pending,
only_complete=only_complete,
only_with_measures=only_with_measures,
)
)
[docs] def read_experiment_parameters(
self,
design_name=None,
db=None,
only_pending=False,
*,
experiment_ids=None,
):
"""
Reads uncertainties and levers from a design of experiments from the database.
Args:
design_name (str, optional): If given, only experiments
associated with both the scope and the named design
are returned, otherwise all experiments associated
with the scope are returned.
db (Database, optional): The Database from which to read experiments.
If no db is given, the default `db` for this model is used.
only_pending (bool, default False): If True, only pending
experiments (which have no performance measure results
stored in the database) are returned.
experiment_ids (Collection, optional):
A collection of experiment id's to load. If given,
both `design_name` and `only_pending` are ignored.
Returns:
pandas.DataFrame:
A DataFrame that contains all uncertainties, levers, and measures
for the experiments.
Raises:
ValueError:
If `db` is not given and there is no default
Database connection set.
"""
db = db if db is not None else self.db
if db is None:
raise ValueError('no database to read from')
return self.ensure_dtypes(
db.read_experiment_parameters(
self.scope.name,
design_name,
only_pending=only_pending,
experiment_ids=experiment_ids,
)
)
[docs] def read_experiment_measures(
self,
design_name,
experiment_id=None,
db=None,
):
"""
Reads performance measures from a design of experiments from the database.
Args:
design_name (str): The name of the design to load.
experiment_id (int, optional): The id of the experiment to load.
db (Database, optional): The Database from which to read experiment(s).
If no db is given, the default `db` for this model is used.
Returns:
pandas.DataFrame:
A DataFrame that contains all uncertainties, levers, and measures
for the experiments.
Raises:
ValueError:
If `db` is not given and there is no default
Database connection set.
"""
db = db if db is not None else self.db
if db is None:
raise ValueError('no database to read from')
measures = self.ensure_dtypes(
db.read_experiment_measures(
self.scope.name,
design_name,
experiment_id,
source=self.metamodel_id,
)
)
# only return measures within scope
measures = measures[[i for i in self.scope.get_measure_names()
if i in measures.columns]]
return measures
[docs] def ensure_dtypes(self, df:pd.DataFrame):
"""
Convert columns of dataframe to correct dtype as needed.
Args:
df (pandas.DataFrame): A dataframe with column names
that are uncertainties, levers, or measures.
Returns:
pandas.DataFrame:
The same data as input, but with dtypes as appropriate.
"""
return self.scope.ensure_dtypes(df)
[docs] def design_experiments(self, *args, **kwargs):
"""
Create a design of experiments based on this model.
Args:
n_samples_per_factor (int, default 10): The number of samples in the
design per random factor.
n_samples (int or tuple, optional): The total number of samples in the
design. If `jointly` is False, this is the number of samples in each
of the uncertainties and the levers, the total number of samples will
be the square of this value. Give a 2-tuple to set values for
uncertainties and levers respectively, to set them independently.
If this argument is given, it overrides `n_samples_per_factor`.
random_seed (int or None, default 1234): A random seed for reproducibility.
db (Database, optional): If provided, this design will be stored in the
database indicated. If not provided, the `db` for this model will
be used, if one is set.
design_name (str, optional): A name for this design, to identify it in the
database. If not given, a unique name will be generated based on the
selected sampler.
sampler (str or AbstractSampler, default 'lhs'): The sampler to use for this
design. Available pre-defined samplers include:
- 'lhs': Latin Hypercube sampling
- 'ulhs': Uniform Latin Hypercube sampling, which ignores defined
distribution shapes from the scope and samples everything
as if it was from a uniform distribution
- 'mc': Monte carlo sampling
- 'uni': Univariate sensitivity testing, whereby experiments are
generated setting each parameter individually to minimum and
maximum values (for numeric dtypes) or all possible values
(for boolean and categorical dtypes). Note that designs for
univariate sensitivity testing are deterministic and the number
of samples given is ignored.
sample_from ('all', 'uncertainties', or 'levers'): Which scope components
from which to sample. Components not sampled are set at their default
values in the design.
jointly (bool, default True): Whether to sample jointly all uncertainties
and levers in a single design, or, if False, to generate separate samples
for levers and uncertainties, and then combine the two in a full-factorial
manner. This argument has no effect unless `sample_from` is 'all'.
Note that setting `jointly` to False may produce a very large design,
as the total number of experiments will be the product of the number of
experiments for the levers and the number of experiments for the
uncertainties, which are set separately (i.e. if `n_samples` is given,
the total number of experiments is the square of that value).
Returns:
pandas.DataFrame: The resulting design.
"""
if 'scope' in kwargs:
kwargs.pop('scope')
if 'db' not in kwargs:
kwargs['db'] = self.db
from ..experiment import experimental_design
return experimental_design.design_experiments(self.scope, *args, **kwargs)
[docs] def async_experiments(
self,
design:pd.DataFrame=None,
db=None,
*,
design_name=None,
evaluator=None,
max_n_workers=None,
stagger_start=None,
batch_size=None,
):
"""
Asynchronously runs a design of combined experiments using this model.
A combined experiment includes a complete set of input values for
all exogenous uncertainties (a Scenario) and all policy levers
(a Policy). Unlike the perform_experiments function in the EMA Workbench,
this method pairs each Scenario and Policy in sequence, instead
of running all possible combinations of Scenario and Policy.
This change ensures compatibility with the EMAT database modules, which
preserve the complete set of input information (both uncertainties
and levers) for each experiment. To conduct a full cross-factorial set
of experiments similar to the default settings for EMA Workbench,
use a factorial design, by setting the `jointly` argument for the
`design_experiments` to False, or by designing experiments outside
of EMAT with your own approach.
Args:
design (pandas.DataFrame, optional): experiment definitions
given as a DataFrame, where each exogenous uncertainties and
policy levers is given as a column, and each row is an experiment.
db (Database, required): The database to use for loading and saving experiments.
If none is given, the default database for this model is used.
If there is no default db, and none is given here,
these experiments will be aborted.
design_name (str, optional): The name of a design of experiments to
load from the database. This design is only used if
`design` is None.
evaluator (emat.workbench.Evaluator, optional): Optionally give an
evaluator instance. If not given, a default DistributedEvaluator
will be instantiated. Passing any other kind of evaluator will
currently cause an error, although in the future other async
compatible evaluators may be provided.
max_n_workers (int, optional):
The maximum number of workers that will be created for a default
dask.distributed LocalCluster. If the number of cores available is
smaller than this number, fewer workers will be spawned. This value
is only used if a default LocalCluster has not yet been created.
stagger_start (int, optional):
If provided, wait this number of seconds between initial dispatch
of experiments to the evaluator. For models that do a lot of
file copying up front, this can prevent over-saturating the file
storage system.
batch_size (int, optional):
For fast-running core models, the overhead from multi-processing
can represent a big chunk of overall runtime. Grouping experiments
into batches that are sent to workers as a group can mitigate this.
Setting batch_size to 1 will process every experiment separately.
If no batch size is given, a guess is made as to an efficient
batch_size based on the number of experiments and the number of
workers.
Raises:
ValueError:
If there are no experiments defined. This includes
the situation where `design` is given but no database is
available.
"""
# catch user gives only a design, not experiment_parameters
if isinstance(design, str) and design_name is None:
design_name, design = design, None
if design_name is None and design is None:
raise ValueError(f"must give design_name or design")
if db is None:
db = self.db
if design_name is not None and design is None:
if not db:
raise ValueError(f'cannot load design "{design_name}", there is no db')
design = db.read_experiment_parameters(self.scope.name, design_name)
if design.empty:
raise ValueError(f"no experiments available")
from .asynchronous import asynchronous_experiments
if self.db is None:
if db is not None:
self.db = db
else:
raise ValueError("cannot run async_experiments without a `db` defined")
return asynchronous_experiments(
self,
design,
evaluator=evaluator,
max_n_workers=max_n_workers,
stagger_start=stagger_start,
batch_size=batch_size,
)
[docs] def run_experiments(
self,
design:pd.DataFrame=None,
evaluator=None,
*,
design_name=None,
db=None,
allow_short_circuit=None,
):
"""
Runs a design of combined experiments using this model.
A combined experiment includes a complete set of input values for
all exogenous uncertainties (a Scenario) and all policy levers
(a Policy). Unlike the perform_experiments function in the EMA Workbench,
this method pairs each Scenario and Policy in sequence, instead
of running all possible combinations of Scenario and Policy.
This change ensures compatibility with the EMAT database modules, which
preserve the complete set of input information (both uncertainties
and levers) for each experiment. To conduct a full cross-factorial set
of experiments similar to the default settings for EMA Workbench,
use a factorial design, by setting the `jointly` argument for the
`design_experiments` to False, or by designing experiments outside
of EMAT with your own approach.
Args:
design (pandas.DataFrame, optional): experiment definitions
given as a DataFrame, where each exogenous uncertainties and
policy levers is given as a column, and each row is an experiment.
evaluator (emat.workbench.Evaluator, optional): Optionally give an
evaluator instance. If not given, a default SequentialEvaluator
will be instantiated.
design_name (str, optional): The name of a design of experiments to
load from the database. This design is only used if
`design` is None.
db (Database, optional): The database to use for loading and saving experiments.
If none is given, the default database for this model is used.
If there is no default db, and none is given here,
the results are not stored in a database. Set to False to explicitly
not use the default database, even if it exists.
Returns:
pandas.DataFrame:
A DataFrame that contains all uncertainties, levers, and measures
for the experiments.
Raises:
ValueError:
If there are no experiments defined. This includes
the situation where `design` is given but no database is
available.
"""
from ..workbench import Scenario, Policy, perform_experiments
# catch user gives only a design, not experiment_parameters
if isinstance(design, str) and design_name is None:
design_name, design = design, None
if design_name is None and design is None:
raise ValueError(f"must give design_name or design")
if db is None:
db = self.db
if design_name is not None and design is None:
if not db:
raise ValueError(f'cannot load design "{design_name}", there is no db')
design = db.read_experiment_parameters(self.scope.name, design_name)
if design.empty:
raise ValueError(f"no experiments available")
# catch metamodels here and run them as a batch, which is much faster
function = getattr(self, 'function', None)
from .meta_model import MetaModel
if isinstance(function, MetaModel):
outcomes = function.predict(design)
result = self.ensure_dtypes(pd.concat([
design,
outcomes
], axis=1, sort=False))
from ..experiment.experimental_design import ExperimentalDesign
result = ExperimentalDesign(result)
result.scope = self.scope
result.name = getattr(design, 'name', None)
result.sampler_name = getattr(design, 'sampler_name', None)
return result
scenarios = []
scenario_cols = self.scope._get_uncertainty_and_constant_names()
design_scenarios = design[scenario_cols]
for rownum in range(len(design)):
if design.index.name == 'experiment':
s = Scenario(
_experiment_id_=design.index[rownum],
**design_scenarios.iloc[rownum],
)
else:
s = Scenario(
_experiment_id_=False,
**design_scenarios.iloc[rownum],
)
scenarios.append(s)
lever_names = self.scope.get_lever_names()
policies = [
Policy(f"Incognito{n}", **dict(zip(lever_names, i)))
for n,i in enumerate(design[lever_names].itertuples(index=False, name='ExperimentL'))
]
evaluator = prepare_evaluator(evaluator, self)
if getattr(evaluator, 'asynchronous', False):
# When the evaluator is in asynchronous mode, the core model runs will be
# dispatched here but the function will not block waiting on the result, and
# instead depend on the model execution process to write the results into
# the database when complete.
with evaluator:
if allow_short_circuit is not None:
_stored_allow_short_circuit = self.allow_short_circuit
self.allow_short_circuit = allow_short_circuit
else:
_stored_allow_short_circuit = None
try:
perform_experiments(
self,
scenarios=scenarios,
policies=policies,
zip_over={'scenarios', 'policies'},
evaluator=evaluator,
)
finally:
if _stored_allow_short_circuit is not None:
self.allow_short_circuit = _stored_allow_short_circuit
return
else:
with evaluator:
if db is False:
_stored_db = self.db
self.db = None
else:
_stored_db = None
if allow_short_circuit is not None:
_stored_allow_short_circuit = self.allow_short_circuit
self.allow_short_circuit = allow_short_circuit
else:
_stored_allow_short_circuit = None
try:
experiments, outcomes = perform_experiments(
self,
scenarios=scenarios,
policies=policies,
zip_over={'scenarios', 'policies'},
evaluator=evaluator,
)
finally:
if _stored_db:
self.db = _stored_db
if _stored_allow_short_circuit is not None:
self.allow_short_circuit = _stored_allow_short_circuit
experiments.index = design.index
outcomes = pd.DataFrame.from_dict(outcomes)
outcomes.index = design.index
# if db:
# metamodel_id = self.metamodel_id
# if metamodel_id is None:
# metamodel_id = 0
# db.write_experiment_measures(self.scope.name, metamodel_id, outcomes)
# Put constants back into experiments
experiments_ = experiments.drop(
columns=['scenario', 'policy', 'model', '_experiment_id_'],
errors='ignore',
)
for i in self.scope.get_constants():
experiments_[i.name] = i.value
result = self.ensure_dtypes(pd.concat([
experiments_,
outcomes
], axis=1, sort=False))
from ..experiment.experimental_design import ExperimentalDesign
result = ExperimentalDesign(result)
result.scope = self.scope
result.name = getattr(design, 'name', None)
result.sampler_name = getattr(design, 'sampler_name', None)
return result
[docs] def run_reference_experiment(
self,
evaluator=None,
*,
db=None,
):
"""
Runs a reference experiment using this model.
This single experiment includes a complete set of input values for
all exogenous uncertainties (a Scenario) and all policy levers
(a Policy). Each is set to the default value indicated by the scope.
Args:
evaluator (emat.workbench.Evaluator, optional): Optionally give an
evaluator instance. If not given, a default SequentialEvaluator
will be instantiated.
db (Database, optional): The database to use for loading and saving experiments.
If none is given, the default database for this model is used.
If there is no default db, and none is given here,
the results are not stored in a database. Set to False to explicitly
not use the default database, even if it exists.
Returns:
pandas.DataFrame:
A DataFrame that contains all uncertainties, levers, and measures
for the experiments.
"""
if db is None:
db = self.db
ref = self.design_experiments(sampler='ref', db=db)
return self.run_experiments(ref, evaluator=evaluator, db=db)
[docs] def feature_scores(
self,
design,
return_type='styled',
random_state=None,
cmap='viridis',
measures=None,
shortnames=None,
):
"""
Calculate feature scores based on a design of experiments.
This method is provided as a convenient pass-through to the
`feature_scores` function in the `analysis` sub-package, using
the scope and database attached to this model.
Args:
design (str or pandas.DataFrame): The name of the design
of experiments to use for feature scoring, or a single
pandas.DataFrame containing the experimental design and
results.
return_type ({'styled', 'figure', 'dataframe'}):
The format to return, either a heatmap figure as an SVG
render in and xmle.Elem, or a plain pandas.DataFrame,
or a styled dataframe.
random_state (int or numpy.RandomState, optional):
Random state to use.
cmap (string or colormap, default 'viridis'): matplotlib
colormap to use for rendering.
measures (Collection, optional): The performance measures
on which feature scores are to be generated. By default,
all measures are included.
Returns:
xmle.Elem or pandas.DataFrame:
Returns a rendered SVG as xml, or a DataFrame,
depending on the `return_type` argument.
This function internally uses feature_scoring from the EMA Workbench, which in turn
scores features using the "extra trees" regression approach.
"""
from ..analysis.feature_scoring import feature_scores
if shortnames is True:
shortnames = self.scope
return feature_scores(
self.scope,
design=design,
return_type=return_type,
db=self.db,
random_state=random_state,
cmap=cmap,
measures=measures,
shortnames=shortnames,
)
def get_feature_scores(self, *args, **kwargs):
"""
Deprecated, use `Model.feature_scores`.
"""
# for compatability with prior versions of TMIP-EMAT
return self.feature_scores(*args, **kwargs)
def _common_optimization_setup(
self,
epsilons=0.1,
convergence='default',
display_convergence=True,
evaluator=None,
):
import numbers
if isinstance(epsilons, numbers.Number):
epsilons = [epsilons]*len(self.outcomes)
if convergence == 'default':
convergence = ConvergenceMetrics(
EpsilonProgress(),
SolutionCount(),
)
if display_convergence and isinstance(convergence, ConvergenceMetrics):
from IPython.display import display
display(convergence)
evaluator = prepare_evaluator(evaluator, self)
return epsilons, convergence, display_convergence, evaluator
[docs] def optimize(
self,
searchover='levers',
evaluator=None,
nfe=10000,
convergence='default',
display_convergence=True,
convergence_freq=100,
constraints=None,
reference=None,
reverse_targets=False,
algorithm=None,
epsilons='auto',
min_epsilon=0.1,
cache_dir=None,
cache_file=None,
check_extremes=False,
**kwargs,
):
"""
Perform multi-objective optimization over levers or uncertainties.
The targets for the multi-objective optimization (i.e. whether each
individual performance measures is to be maximized or minimized) are
read from the model's scope.
Args:
searchover ({'levers', 'uncertainties'}):
Which group of inputs to search over. The other group
will be set at their default values, unless other values
are provided in the `reference` argument.
evaluator (Evaluator, optional): The evaluator to use to
run the model. If not given, a SequentialEvaluator will
be created.
nfe (int, default 10_000): Number of function evaluations.
This generally needs to be fairly large to achieve stable
results in all but the most trivial applications.
convergence ('default', None, or emat.optimization.ConvergenceMetrics):
A convergence display during optimization. The default
value is to report the epsilon-progress (the number of
solutions that ever enter the candidate pool of non-dominated
solutions) and the number of solutions remaining in that candidate
pool. Pass `None` explicitly to disable convergence tracking.
display_convergence (bool, default True): Whether to automatically
display figures that dynamically track convergence. Set to
`False` if you are not using this method within a Jupyter
interactive environment.
convergence_freq (int, default 100): How frequently to update the
convergence measures. There is some computational overhead to
these convergence updates, so setting a value too small may
noticeably slow down the process.
constraints (Collection[Constraint], optional):
Solutions will be constrained to only include values that
satisfy these constraints. The constraints can be based on
the search parameters (levers or uncertainties, depending on the
value given for `searchover`), or performance measures, or
some combination thereof.
reference (Mapping): A set of values for the non-active inputs,
i.e. the uncertainties if `searchover` is 'levers', or the
levers if `searchover` is 'uncertainties'. Any values not
set here revert to the default values identified in the scope.
reverse_targets (bool, default False): Whether to reverse the
optimization targets given in the scope (i.e., changing
minimize to maximize, or vice versa). This will result in
the optimization searching for the worst outcomes, instead of
the best outcomes.
algorithm (platypus.Algorithm, optional): Select an
algorithm for multi-objective optimization. The default
algorithm is EpsNSGAII. See `platypus` documentation for details.
epsilons (float or array-like): Used to limit the number of
distinct solutions generated. Set to a larger value to get
fewer distinct solutions.
cache_dir (path-like, optional): A directory in which to
cache results. Most of the arguments will be hashed
to develop a unique filename for these results, making this
generally safer than `cache_file`.
cache_file (path-like, optional): A file into which to
cache results. If this file exists, the contents of the
file will be loaded and all other arguments are ignored.
Use with great caution.
kwargs: Any additional arguments will be passed on to the
platypus algorithm.
Returns:
emat.OptimizationResult:
The set of non-dominated solutions found.
When `convergence` is given, the convergence measures are
included, as a pandas.DataFrame in the `convergence` attribute.
"""
from ..util.disk_cache import load_cache_if_available, save_cache
if isinstance(algorithm, str) or algorithm is None:
alg = algorithm
else:
alg = algorithm.__name__
if reference is not None:
from ..workbench import Policy, Scenario
if searchover == 'levers' and not isinstance(reference, Scenario):
reference = Scenario("ReferenceScenario", **reference)
elif searchover == 'uncertainties' and not isinstance(reference, Policy):
reference = Policy("ReferencePolicy", **reference)
else:
if searchover == 'levers':
reference = self.scope.default_scenario()
elif searchover == 'uncertainties':
reference = self.scope.default_policy()
x, cache_file = load_cache_if_available(
cache_file=cache_file,
cache_dir=cache_dir,
searchover=searchover,
nfe=nfe,
convergence=convergence,
convergence_freq=convergence_freq,
constraints=constraints,
reference=reference,
reverse_targets=reverse_targets,
algorithm=alg,
epsilons=epsilons,
)
if x is None:
epsilons, convergence, display_convergence, evaluator = self._common_optimization_setup(
epsilons, convergence, display_convergence, evaluator
)
if reverse_targets:
for k in self.scope.get_measures():
k.kind_original = k.kind
k.kind = k.kind * -1
_db_pause = self.db
try:
self.db = None
with evaluator:
if epsilons == 'auto':
from ..workbench import perform_experiments
if searchover == 'levers':
_, trial_outcomes = perform_experiments(
self,
scenarios=reference,
policies=30,
evaluator=evaluator,
)
else:
_, trial_outcomes = perform_experiments(
self,
scenarios=30,
policies=reference,
evaluator=evaluator,
)
epsilons = [max(min_epsilon, np.std(trial_outcomes[mn]) / 20) for mn in self.scope.get_measure_names()]
results = evaluator.optimize(
searchover=searchover,
reference=reference,
nfe=nfe,
constraints=constraints,
convergence=convergence,
convergence_freq=convergence_freq,
epsilons=epsilons,
**kwargs,
)
if isinstance(results, tuple) and len(results) == 2:
results, result_convergence = results
else:
result_convergence = None
# Put constants back in to results
for i in self.scope.get_constants():
results[i.name] = i.value
results = self.ensure_dtypes(results)
x = OptimizationResult(results, result_convergence, scope=self.scope)
if searchover == 'levers':
x.scenarios = reference
elif searchover == 'uncertainties':
x.policies = reference
if check_extremes:
x.check_extremes(
self,
1 if check_extremes is True else check_extremes,
evaluator=evaluator,
searchover=searchover,
robust=False,
)
finally:
if reverse_targets:
for k in self.scope.get_measures():
k.kind = k.kind_original
del k.kind_original
self.db = _db_pause
elif display_convergence:
_, convergence, display_convergence, _ = self._common_optimization_setup(
None, convergence, display_convergence, False
)
for c in convergence:
try:
c.rebuild(x.convergence)
except KeyboardInterrupt:
raise
except:
pass
x.cache_file = cache_file
save_cache(x, cache_file)
return x
[docs] def robust_optimize(
self,
robustness_functions,
scenarios,
evaluator=None,
nfe=10000,
convergence='default',
display_convergence=True,
convergence_freq=100,
constraints=None,
epsilons=0.1,
cache_dir=None,
cache_file=None,
algorithm=None,
check_extremes=False,
**kwargs,
):
"""
Perform robust optimization.
The robust optimization generally a multi-objective optimization task.
It is undertaken using statistical measures of outcomes evaluated across
a number of scenarios, instead of using the individual outcomes themselves.
For each candidate policy, the model is evaluated against all of the considered
scenarios, and then the robustness measures are evaluated using the
set of outcomes from the original runs. The robustness measures
are aggregate measures that are computed from a set of outcomes.
For example, this may be expected value, median, n-th percentile,
minimum, or maximum value of any individual outcome. It is also
possible to have joint measures, e.g. expected value of the larger
of outcome 1 or outcome 2.
Each robustness function is indicated as a maximization or minimization
target, where higher or lower values are better, respectively.
The optimization process then tries to identify one or more
non-dominated solutions for the possible policy levers.
Args:
robustness_functions (Collection[Measure]): A collection of
aggregate statistical performance measures.
scenarios (int or Collection): A collection of scenarios to
use in the evaluation(s), or give an integer to generate
that number of random scenarios.
evaluator (Evaluator, optional): The evaluator to use to
run the model. If not given, a SequentialEvaluator will
be created.
nfe (int, default 10_000): Number of function evaluations.
This generally needs to be fairly large to achieve stable
results in all but the most trivial applications.
convergence ('default', None, or emat.optimization.ConvergenceMetrics):
A convergence display during optimization.
display_convergence (bool, default True): Automatically display
the convergence metric figures when optimizing.
convergence_freq (int, default 100): The frequency at which
convergence metric figures are updated.
constraints (Collection[Constraint], optional)
Solutions will be constrained to only include values that
satisfy these constraints. The constraints can be based on
the policy levers, or on the computed values of the robustness
functions, or some combination thereof.
epsilons (float or array-like): Used to limit the number of
distinct solutions generated. Set to a larger value to get
fewer distinct solutions.
cache_dir (path-like, optional): A directory in which to
cache results. Most of the arguments will be hashed
to develop a unique filename for these results, making this
generally safer than `cache_file`.
cache_file (path-like, optional): A file into which to
cache results. If this file exists, the contents of the
file will be loaded and all other arguments are ignored.
Use with great caution.
algorithm (platypus.Algorithm or str, optional): Select an
algorithm for multi-objective optimization. The algorithm can
be given directly, or named in a string. See `platypus`
documentation for details.
check_extremes (bool or int, default False): Conduct additional
evaluations, setting individual policy levers to their
extreme values, for each candidate Pareto optimal solution.
kwargs: any additional arguments will be passed on to the
platypus algorithm.
Returns:
emat.OptimizationResult:
The set of non-dominated solutions found.
When `convergence` is given, the convergence measures are
included, as a pandas.DataFrame in the `convergence` attribute.
"""
from ..optimization.optimize import robust_optimize
from ..util.disk_cache import load_cache_if_available, save_cache
if isinstance(algorithm, str) or algorithm is None:
alg = algorithm
else:
alg = algorithm.__name__
result, cache_file = load_cache_if_available(
cache_file=cache_file,
cache_dir=cache_dir,
scenarios=scenarios,
convergence=convergence,
convergence_freq=convergence_freq,
constraints=constraints,
epsilons=epsilons,
nfe=nfe,
robustness_functions=robustness_functions,
alg=alg,
check_extremes=check_extremes,
)
if result is None:
_db_pause = self.db
try:
self.db = None
result = robust_optimize(
self,
robustness_functions,
scenarios,
evaluator=evaluator,
nfe=nfe,
convergence=convergence,
display_convergence=display_convergence,
convergence_freq=convergence_freq,
constraints=constraints,
epsilons=epsilons,
check_extremes=check_extremes,
**kwargs,
)
finally:
self.db = _db_pause
elif display_convergence:
_, convergence, display_convergence, _ = self._common_optimization_setup(
None, convergence, display_convergence, False
)
for c in convergence:
try:
c.rebuild(result.convergence)
except KeyboardInterrupt:
raise
except:
pass
result.cache_file = cache_file
save_cache(result, cache_file)
return result
[docs] def robust_evaluate(
self,
robustness_functions,
scenarios,
policies,
evaluator=None,
cache_dir=None,
suspend_db=True,
):
"""
Perform robust evaluation(s).
The robust evaluation is used to generate statistical measures
of outcomes, instead of generating the individual outcomes themselves.
For each policy, the model is evaluated against all of the considered
scenarios, and then the robustness measures are evaluated using the
set of outcomes from the original runs. The robustness measures
are aggregate measures that are computed from a set of outcomes.
For example, this may be expected value, median, n-th percentile,
minimum, or maximum value of any individual outcome. It is also
possible to have joint measures, e.g. expected value of the larger
of outcome 1 or outcome 2.
Args:
robustness_functions (Collection[Measure]): A collection of
aggregate statistical performance measures.
scenarios (int or Collection): A collection of scenarios to
use in the evaluation(s), or give an integer to generate
that number of random scenarios.
policies (int, or collection): A collection of policies to
use in the evaluation(s), or give an integer to generate
that number of random policies.
evaluator (Evaluator, optional): The evaluator to use to
run the model. If not given, a SequentialEvaluator will
be created.
cache_dir (path-like, optional): A directory in which to
cache results.
suspend_db (bool, default True):
Suspend writing the results of individual model runs to
the database. Robust evaluation potentially generates a
large number of model executions, and storing all these
individual results may not be useful.
Returns:
pandas.DataFrame: The computed value of each item
in `robustness_functions`, for each policy in `policies`.
"""
robust_results = None
cache_file = None
if cache_dir is not None:
try:
from ..util.hasher import hash_it
hh = hash_it(
scenarios,
policies,
robustness_functions,
)
os.makedirs(os.path.join(cache_dir,hh[2:4],hh[4:6]), exist_ok=True)
cache_file = os.path.join(cache_dir,hh[2:4],hh[4:6],hh[6:]+".gz")
if os.path.exists(cache_file):
_logger.debug(f"loading from cache_file={cache_file}")
from ..util.filez import load
robust_results = load(cache_file)
cache_file = None
except KeyboardInterrupt:
raise
except:
import traceback
warnings.warn('unable to manage cache')
traceback.print_exc()
if robust_results is None:
with self.lock_db(suspend_db):
if evaluator is None:
from ..workbench.em_framework import SequentialEvaluator
evaluator = SequentialEvaluator(self)
if not isinstance(evaluator, BaseEvaluator):
from dask.distributed import Client
if isinstance(evaluator, Client):
from ..workbench.em_framework.ema_distributed import DistributedEvaluator
evaluator = DistributedEvaluator(self, client=evaluator)
from ..workbench.em_framework.samplers import sample_uncertainties, sample_levers
if isinstance(scenarios, int):
n_scenarios = scenarios
scenarios = sample_uncertainties(self, n_scenarios)
with evaluator:
robust_results = evaluator.robust_evaluate(
robustness_functions,
scenarios,
policies,
)
robust_results = self.ensure_dtypes(robust_results)
if cache_file is not None:
from ..util.filez import save
save(robust_results, cache_file, overwrite=True)
with open(cache_file.replace('.gz','.info.txt'), 'wt') as notes:
print("scenarios=", scenarios, file=notes)
print("robustness_functions=", robustness_functions, file=notes)
print("policies=", policies, file=notes)
return robust_results
[docs] def io_experiment(self, params):
"""
Run an experiment, and return a dictionary of inputs and outputs together.
Args:
params: dict
Returns:
dict
"""
out = self.run_experiment(params).copy()
out.update(params)
return out
[docs] def log(self, message, level=logging.INFO):
"""
Log a message.
This facility will attempt to send log messages to
the attached database, falling back to the regular
module logger in case that fails.
Args:
message (str): Message to send to log.
level (int, default logging.INFO): Log level.
Returns:
"""
db = getattr(self, 'db', None)
try:
db.log(message, level=level)
except:
_logger.log(level, message)