# -*- coding: utf-8 -*-
""" core_model.py - define coure model API"""
import os
import abc
import yaml
import pandas as pd
import numpy as np
import logging
import subprocess
import warnings
from contextlib import contextmanager
from typing import Union, Mapping
from ..workbench.em_framework.model import AbstractModel as AbstractWorkbenchModel
from ..workbench.em_framework.evaluators import BaseEvaluator
from typing import Collection
from typing import Iterable
from ..database.database import Database
from ..scope.scope import Scope
from ..optimization.optimization_result import OptimizationResult
from ..optimization import EpsilonProgress, ConvergenceMetrics, SolutionCount
from ..util.evaluators import prepare_evaluator
from ..exceptions import MissingArchivePathError, ReadOnlyDatabaseError, MissingIdWarning
from .._pkg_constants import *
from ..util.loggers import get_module_logger
_logger = get_module_logger(__name__)
[docs]class AbstractCoreModel(abc.ABC, AbstractWorkbenchModel):
    """
    An interface for using a model with EMAT.
    Individual models should be instantiated using derived
    subclasses of this abstract base class, and not using
    this class directly.
    Args:
        configuration (str or Mapping or None):
            The configuration for this core model. This can be given
            explicitly as a `dict`, or as a `str` which gives the
            filename of a YAML file that will be loaded. If there is
            no configuration, giving `None` is also acceptable.
        scope (Scope or str):
            The exploration scope, as a `Scope` object or as
            a `str` which gives the filename of a YAML file that will be
            loaded.
        safe (bool):
            Load the configuration YAML file in 'safe' mode.
            This can be disabled if the configuration requires
            custom Python types or is otherwise not compatible with
            safe mode. Loading configuration files with safe mode
            off is not secure and should not be done with files from
            untrusted sources.
        db (Database, optional):
            An optional Database to store experiments and results.
        name (str, default "EMAT"):
            A name for this model, given as an alphanumeric string.
            The name is required by workbench operations.
        metamodel_id (int, optional):
            An identifier for this model, if it is a meta-model.
            Defaults to 0 (i.e., not a meta-model).
    """
    def __init__(self,
                 configuration:Union[str,Mapping,None],
                 scope,
                 safe=True,
                 db=None,
                 name='EMAT',
                 metamodel_id=0,
                 ):
        if isinstance(configuration, str):
            with open(configuration, 'r') as stream:
                if safe:
                    configuration = yaml.safe_load(stream)
                else:
                    configuration = yaml.load(stream, Loader=yaml.FullLoader)
            if configuration is None:
                configuration = {}
        self.config = configuration if configuration is not None else {}
        self.db = db
        if isinstance(scope, Scope):
            self.scope = scope
        else:
            self.scope = Scope(scope)
        AbstractWorkbenchModel.__init__(self, name=name.replace('_','').replace(' ',''))
        self.uncertainties = self.scope._x_list
        self.levers = self.scope._l_list
        self.constants = self.scope._c_list
        self.outcomes = self.scope._m_list
        self.metamodel_id = metamodel_id
    def __getstate__(self):
        # don't pickle the db connection
        return dict((k, v) for (k, v) in self.__dict__.items() if (k != 'db'))
[docs]    @abc.abstractmethod
    def setup(self, params):
        """
        Configure the core model with the experiment variable values.
        This method is the place where the core model set up takes place,
        including creating or modifying files as necessary to prepare
        for a core model run.  When running experiments, this method
        is called once for each core model experiment, where each experiment
        is defined by a set of particular values for both the exogenous
        uncertainties and the policy levers.  These values are passed to
        the experiment only here, and not in the `run` method itself.
        This facilitates debugging, as the `setup` method can potentially
        be used without the `run` method, allowing the user to manually
        inspect the prepared files and ensure they are correct before
        actually running a potentially expensive model.
        Each input exogenous uncertainty or policy lever can potentially
        be used to manipulate multiple different aspects of the underlying
        core model.  For example, a policy lever that includes a number of
        discrete future network "build" options might trigger the replacement
        of multiple related network definition files.  Or, a single uncertainty
        relating to the cost of fuel might scale both a parameter linked to
        the modeled per-mile cost of operating an automobile, as well as the
        modeled total cost of fuel used by transit services.
        At the end of the `setup` method, a core model experiment should be
        ready to run using the `run` method.
        Args:
            params (dict):
                experiment variables including both exogenous
                uncertainty and policy levers
                
        Raises:
            KeyError:
                if a defined experiment variable is not supported
                by the core model        
        """      
 
[docs]    @abc.abstractmethod
    def get_experiment_archive_path(
            self,
            experiment_id=None,
            makedirs=False,
            parameters=None,
            run_id=None,
    ):
        """
        Returns a file system location to store model run outputs.
        For core models with long model run times, it is recommended
        to store the complete model run results in an archive.  This
        will facilitate adding additional performance measures to the
        scope at a later time.
        Both the scope name and experiment id can be used to create the 
        folder path. 
        
        Args:
            experiment_id (int):
                The experiment id, which is also the row id of the
                experiment in the database. If this is omitted, an
                experiment id is read or created using the parameters.
            makedirs (bool, default False):
                If this archive directory does not yet exist, create it.
            parameters (dict, optional):
                The parameters for this experiment, used to create or
                lookup an experiment id. The parameters are ignored
                if `experiment_id` is given.
            run_id (UUID, optional):
                The run_id of this model run.  If not given but a
                run_id attribute is stored in this FilesCoreModel
                instance, that value is used.
        Returns:
            str: Experiment archive path (no trailing backslashes).
        """      
    
[docs]    @abc.abstractmethod
    def run(self):
        """
        Run the core model.
        This method is the place where the core model run takes place.
        Note that this method takes no arguments; all the input
        exogenous uncertainties and policy levers are delivered to the
        core model in the `setup` method, which will be executed prior
        to calling this method. This facilitates debugging, as the `setup`
        method can potentially be used without the `run` method, allowing
        the user to manually inspect the prepared files and ensure they
        are correct before actually running a potentially expensive model.
        When running experiments, this method is called once for each core
        model experiment, after the `setup` method completes.
        If the core model requires some post-processing by `post_process`
        method defined in this API, then when this function terminates
        the model directory should be in a state that is ready to run the
        `post_process` command next.
        Raises:
            UserWarning: If model is not properly setup
        """      
    
[docs]    def post_process(self, params, measure_names, output_path=None):
        """
        Runs post processors associated with particular performance measures.
        This method is the place to conduct automatic post-processing
        of core model run results, in particular any post-processing that
        is expensive or that will write new output files into the core model's
        output directory.  The core model run should already have
        been completed using `setup` and `run`.  If the relevant performance
        measures do not require any post-processing to create (i.e. they
        can all be read directly from output files created during the core
        model run itself) then this method does not need to be overloaded
        for a particular core model implementation.
        Args:
            params (dict):
                Dictionary of experiment variables, with keys as variable names
                and values as the experiment settings. Most post-processing
                scripts will not need to know the particular values of the
                inputs (exogenous uncertainties and policy levers), but this
                method receives the experiment input parameters as an argument
                in case one or more of these parameter values needs to be known
                in order to complete the post-processing.
            measure_names (List[str]):
                List of measures to be processed.  Normally for the first pass
                of core model run experiments, post-processing will be completed
                for all performance measures.  However, it is possible to use
                this argument to give only a subset of performance measures to
                post-process, which may be desirable if the post-processing
                of some performance measures is expensive.  Additionally, this
                method may also be called on archived model results, allowing
                it to run to generate only a subset of (probably new) performance
                measures based on these archived runs.
            output_path (str, optional):
                Path to model outputs.  If this is not given (typical for the
                initial run of core model experiments) then the local/default
                model directory is used.  This argument is provided primarily
                to facilitate post-processing archived model runs to make new
                performance measures (i.e. measures that were not in-scope when
                the core model was actually run).
        Raises:
            KeyError:
                If post process is not available for specified measure
        """ 
    
[docs]    @abc.abstractmethod
    def load_measures(
            self,
            measure_names: Collection[str]=None,
            *,
            rel_output_path=None,
            abs_output_path=None,
    ) -> dict:
        """
        Import selected measures from the core model.
        
        This method is the place to put code that can actually reach into
        files in the core model's run results and extract performance
        measures. It is expected that it should not do any post-processing
        of results (i.e. it should read from but not write to the model
        outputs directory).
        Imports measures from active scenario
        
        Args:
            measure_names (Collection[str]):
                Collection of measures to be loaded.
            rel_output_path, abs_output_path (str, optional):
                Path to model output locations, either relative
                to the `model_path` directory (when a subclass
                is a type that has a model path) or as an absolute
                directory.  If neither is given, the default
                value is equivalent to setting `rel_output_path` to
                'Outputs'.
        Returns:
            dict of measure name and values from active scenario
        
        Raises:
            KeyError: If load_measures is not available for specified
                measure
        """            
        
[docs]    @abc.abstractmethod
    def archive(self, params, model_results_path, experiment_id:int=0):
        """
        Copies model outputs to archive location.
        
        Args:
            params (dict): Dictionary of experiment variables
            model_results_path (str): archive path
            experiment_id (int, optional): The id number for this experiment.
        
        """ 
    @property
    def allow_short_circuit(self):
        """
        Bool: Allow model runs to be skipped if measures already appear in the database.
        """
        return self.config.get('allow_short_circuit', True)
    @allow_short_circuit.setter
    def allow_short_circuit(self, value):
        self.config['allow_short_circuit'] = bool(value)
    @property
    def ignore_crash(self):
        """
        Bool: Allow model runs to `post_process` and `archive` even after an apparent crash in `run`.
        """
        return self.config.get('ignore_crash', False)
    @ignore_crash.setter
    def ignore_crash(self, value):
        self.config['ignore_crash'] = bool(value)
    @property
    def success_indicator(self):
        """
        str: The name of a file that indicates the model has run successfully.
        The flag is the mere existance of a file with this name, not any particular
        file content. This file is deleted automatically when the model `run` is
        initiated, so that it can be recreated to indicate a success.
        """
        return self.config.get('success_indicator', None)
    @success_indicator.setter
    def success_indicator(self, value):
        self.config['success_indicator'] = value
    @property
    def killed_indicator(self):
        """
        str: The name of a file that indicates the model was killed due to an unrecoverable error.
        The flag is the mere existance of a file with this name, not any particular
        file content. This file is deleted automatically when the model `run` is
        initiated, so that it can be recreated to indicate an unrecoverable error.
        """
        return self.config.get('killed_indicator', None)
    @killed_indicator.setter
    def killed_indicator(self, value):
        self.config['killed_indicator'] = value
    @property
    def local_directory(self):
        """Path: The current local working directory for this model."""
        return self.config.get("local_directory", os.getcwd())
    @local_directory.setter
    def local_directory(self, value):
        self.config["local_directory"] = value
    @property
    def resolved_model_path(self):
        """
        Path: The resolved model path.
        For core models that don't rely on the file system, this
        is set to the current working directory and is generally
        irrelevant. Overload this property for models that do
        rely on the file system.
        """
        return self.local_directory
    @property
    def is_db_locked(self):
        if self.db:
            return self.db.is_locked
        return False
    @contextmanager
    def lock_db(self, x=True):
        if x and self.db:
            with self.db.lock:
                yield
        else:
            yield
[docs]    def enter_run_model(self):
        """A hook for actions at the very beginning of the run_model step.""" 
[docs]    def exit_run_model(self):
        """A hook for actions at the very end of the run_model step.""" 
    def run_model(self, scenario, policy):
        """
        Runs an experiment through core model.
        This method overloads the `run_model` method given in
        the EMA Workbench, and provides the correct execution
        of a core model within the workbench framework.  This
        function assembles and executes the steps laid out in
        other methods of this class, adding some useful logic
        to optimize the process (e.g. optionally short-
        circuiting runs that already have results stored
        in the database).
        For each experiment, the core model is called to:
            1.  `setup` experiment variables, copy files
                as needed, and otherwise prepare to run the
                core model for a particular experiment,
            2.  `run` the experiment,
            3.  `post_process` the result if needed to
                produce all relevant performance measures,
            4.  `archive` model outputs from this experiment
                (optional), and
            5.  `load_measures` from the experiment and
                store those measures in the associated database.
        Note that this method does *not* return any outcomes.
        Outcomes are instead written into self.outcomes_output,
        and can be retrieved from there, or from the database at
        a later time.
        In general, it should not be necessary to overload this
        method in derived classes built for particular core models.
        Instead, write overloaded methods for `setup`, `run`,
        `post_process` , `archive`, and `load_measures`.  Moreover,
        in typical usage a modeler will generally not want to rely
        on this method directly, but instead use `run_experiments`
        to automatically run multiple experiments with one command.
        Args:
            scenario (Scenario): A dict-like object that
                has key-value pairs for each uncertainty.
            policy (Policy): A dict-like object that
                has key-value pairs for each lever.
        Raises:
            UserWarning: If there are no experiments associated with
                this type.
        """
        self.enter_run_model()
        try:
            self.comment_on_run = None
            _logger.debug("run_core_model read_experiment_parameters")
            experiment_id = policy.get("_experiment_id_", None)
            if experiment_id is None:
                experiment_id = scenario.get("_experiment_id_", None)
            if not hasattr(self, 'db') and hasattr(self, '_db'):
                self.db = self._db
            # If running a core files model using the DistributedEvaluator,
            # the workers won't have access to the DB directly, so we'll only
            # run the short-circuit test and the ad-hoc write-to-database
            # section of this code if the `db` attribute is available.
            if hasattr(self, 'db') and self.db is not None:
                assert isinstance(self.db, Database)
                if experiment_id is None:
                    with warnings.catch_warnings():
                        if self.is_db_locked:
                            warnings.simplefilter("ignore", category=MissingIdWarning)
                        experiment_id = self.db.read_experiment_id(self.scope.name, scenario, policy)
                if experiment_id and self.allow_short_circuit:
                    # opportunity to short-circuit run by loading pre-computed values.
                    precomputed = self.db.read_experiment_measures(
                        self.scope,
                        design_name=None,
                        experiment_id=experiment_id,
                    )
                    if not precomputed.empty:
                        self.outcomes_output = dict(precomputed.iloc[0])
                        self.log(f"short circuit experiment_id {experiment_id} / {getattr(self, 'uid', 'no uid')}")
                        return
                if experiment_id is None and not self.is_db_locked:
                    experiment_id = self.db.write_experiment_parameters_1(
                        self.scope.name, 'ad hoc', scenario, policy
                    )
                self.log(f"YES DATABASE experiment_id {experiment_id}", level=logging.DEBUG)
            else:
                _logger.debug(f"NO DATABASE experiment_id {experiment_id}")
            xl = {}
            xl.update(scenario)
            xl.update(policy)
            m_names = self.scope.get_measure_names()
            _logger.debug(f"run_core_model setup {experiment_id}")
            self.setup(xl)
            if self.success_indicator is not None:
                success_indicator = os.path.join(self.resolved_model_path, self.success_indicator)
                if os.path.exists(success_indicator):
                    os.remove(success_indicator)
            else:
                success_indicator = None
            if self.killed_indicator is not None:
                killed_indicator = os.path.join(self.resolved_model_path, self.killed_indicator)
                if os.path.exists(killed_indicator):
                    os.remove(killed_indicator)
            else:
                killed_indicator = None
            _logger.debug(f"run_core_model run {experiment_id}")
            try:
                self.run()
            except subprocess.CalledProcessError as err:
                _logger.error(f"ERROR in run_core_model run {experiment_id}: {str(err)}")
                try:
                    ex_archive_path = self.get_experiment_archive_path(experiment_id, makedirs=True)
                except MissingArchivePathError:
                    pass
                else:
                    if isinstance(err, subprocess.CalledProcessError):
                        if err.stdout:
                            with open(os.path.join(ex_archive_path, 'error.stdout.log'), 'ab') as stdout:
                                stdout.write(err.stdout)
                        if err.stderr:
                            with open(os.path.join(ex_archive_path, 'error.stderr.log'), 'ab') as stderr:
                                stderr.write(err.stderr)
                    with open(os.path.join(ex_archive_path, 'error.log'), 'a') as errlog:
                        errlog.write(str(err))
                measures_dictionary = {name: np.nan for name in m_names}
                # Assign to outcomes_output, for ema_workbench compatibility
                self.outcomes_output = measures_dictionary
                if not self.ignore_crash:
                    # If 'ignore_crash' is False (the default), then abort now and skip
                    # any post-processing and other archiving steps, which will
                    # probably fail anyway.
                    self.log(f"run_core_model ABORT {experiment_id}", level=logging.ERROR)
                    self.comment_on_run = f"FAILED EXPERIMENT {experiment_id}: {str(err)}"
                    return
                else:
                    _logger.error(f"run_core_model CONTINUE AFTER ERROR {experiment_id}")
            try:
                if success_indicator and not os.path.exists(success_indicator):
                    # The absence of the `success_indicator` file means that the model
                    # did not actually terminate correctly, so we do not want to
                    # post-process or store these results in the database.
                    self.comment_on_run = f"NON-SUCCESSFUL EXPERIMENT {experiment_id}: success_indicator missing"
                    raise ValueError(f"success_indicator missing: {success_indicator}")
                if killed_indicator and os.path.exists(killed_indicator):
                    self.comment_on_run = f"KILLED EXPERIMENT {experiment_id}: killed_indicator present"
                    raise ValueError(f"killed_indicator present: {killed_indicator}")
                _logger.debug(f"run_core_model post_process {experiment_id}")
                self.post_process(xl, m_names)
                _logger.debug(f"run_core_model wrap up {experiment_id}")
                measures_dictionary = self.load_measures(m_names)
                m_df = pd.DataFrame(measures_dictionary, index=[experiment_id])
            except KeyboardInterrupt:
                _logger.exception(
                    f"KeyboardInterrupt in post_process, load_measures or outcome processing {experiment_id}")
                raise
            except Exception as err:
                _logger.exception(f"error in post_process, load_measures or outcome processing {experiment_id}")
                _logger.error(f"proceeding directly to archive attempt {experiment_id}")
                if not self.comment_on_run:
                    self.comment_on_run = f"PROBLEM IN EXPERIMENT {experiment_id}: {str(err)}"
            else:
                # only write to database if there was no error in post_process, load_measures or outcome processing
                if experiment_id and hasattr(self, 'db') and self.db is not None and not self.db.readonly:
                    _logger.debug(f"run_core_model write db {experiment_id}")
                    run_id = getattr(self, 'run_id', None)
                    if run_id is None:
                        run_id, _ = self.db.new_run_id(
                            scope_name=self.scope.name,
                            experiment_id=experiment_id,
                            source=self.metamodel_id or 0,
                        )
                    try:
                        self.db.write_experiment_measures(self.scope.name, self.metamodel_id, m_df, [run_id])
                    except ReadOnlyDatabaseError:
                        warnings.warn("database is read-only, not storing model outcomes")
                    except Exception as err:
                        _logger.exception(f"error in writing results to database: {str(err)}")
                    else:
                        _logger.debug(f"run_core_model OK write db {experiment_id} {self.metamodel_id} {run_id}\n{m_df}")
                else:
                    _logger.debug(f"run_core_model no db to write to {experiment_id}")
            if experiment_id:
                try:
                    ex_archive_path = self.get_experiment_archive_path(experiment_id)
                except MissingArchivePathError:
                    pass
                else:
                    _logger.debug(f"run_core_model archive {experiment_id}")
                    self.archive(xl, ex_archive_path, experiment_id)
            else:
                _logger.debug(f"run_core_model no archive because no experiment_id")
        finally:
            self.exit_run_model()
[docs]    def read_experiments(
            self,
            design_name,
            db=None,
            only_pending=False,
            only_complete=False,
            only_with_measures=False,
    ):
        """
        Reads results from a design of experiments from the database.
        Args:
            design_name (str): The name of the design to load.
            db (Database, optional): The Database from which to read experiments.
                If no db is given, the default `db` for this model is used.
            only_pending (bool, default False): If True, only pending
                experiments (which have no performance measure results
                stored in the database) are returned.
            only_complete (bool, default False): If True, only complete
                experiments (which have no performance measure
                results missing in the database) are returned.
            only_with_measures (bool, default False): If True, only
                experiments with at least one stored performance measure
                are returned.
        Returns:
            pandas.DataFrame:
                A DataFrame that contains all uncertainties, levers, and measures
                for the experiments.
        Raises:
            ValueError:
                If there is no Database connection `db` set.
        """
        db = db if db is not None else self.db
        if db is None:
            raise ValueError('no database to read from')
        return self.ensure_dtypes(
            db.read_experiment_all(
                self.scope.name,
                design_name,
                only_pending=only_pending,
                only_complete=only_complete,
                only_with_measures=only_with_measures,
            )
        ) 
[docs]    def read_experiment_parameters(
            self,
            design_name=None,
            db=None,
            only_pending=False,
            *,
            experiment_ids=None,
    ):
        """
        Reads uncertainties and levers from a design of experiments from the database.
        Args:
            design_name (str, optional): If given, only experiments
                associated with both the scope and the named design
                are returned, otherwise all experiments associated
                with the scope are returned.
            db (Database, optional): The Database from which to read experiments.
                If no db is given, the default `db` for this model is used.
            only_pending (bool, default False): If True, only pending
                experiments (which have no performance measure results
                stored in the database) are returned.
            experiment_ids (Collection, optional):
                A collection of experiment id's to load.  If given,
                both `design_name` and `only_pending` are ignored.
        Returns:
            pandas.DataFrame:
                A DataFrame that contains all uncertainties, levers, and measures
                for the experiments.
        Raises:
            ValueError:
                If `db` is not given and there is no default
                Database connection set.
        """
        db = db if db is not None else self.db
        if db is None:
            raise ValueError('no database to read from')
        return self.ensure_dtypes(
            db.read_experiment_parameters(
                self.scope.name,
                design_name,
                only_pending=only_pending,
                experiment_ids=experiment_ids,
            )
        ) 
[docs]    def read_experiment_measures(
            self,
            design_name,
            experiment_id=None,
            db=None,
    ):
        """
        Reads performance measures from a design of experiments from the database.
        Args:
            design_name (str): The name of the design to load.
            experiment_id (int, optional): The id of the experiment to load.
            db (Database, optional): The Database from which to read experiment(s).
                If no db is given, the default `db` for this model is used.
        Returns:
            pandas.DataFrame:
                A DataFrame that contains all uncertainties, levers, and measures
                for the experiments.
        Raises:
            ValueError:
                If `db` is not given and there is no default
                Database connection set.
        """
        db = db if db is not None else self.db
        if db is None:
            raise ValueError('no database to read from')
        measures =  self.ensure_dtypes(
            db.read_experiment_measures(
                self.scope.name,
                design_name,
                experiment_id,
                source=self.metamodel_id,
            )
        )
        
        # only return measures within scope
        measures = measures[[i for i in self.scope.get_measure_names()
                             if i in measures.columns]]
        
        return measures 
        
[docs]    def ensure_dtypes(self, df:pd.DataFrame):
        """
        Convert columns of dataframe to correct dtype as needed.
        Args:
            df (pandas.DataFrame): A dataframe with column names
                that are uncertainties, levers, or measures.
        Returns:
            pandas.DataFrame:
                The same data as input, but with dtypes as appropriate.
        """
        return self.scope.ensure_dtypes(df) 
[docs]    def design_experiments(self, *args, **kwargs):
        """
        Create a design of experiments based on this model.
        Args:
            n_samples_per_factor (int, default 10): The number of samples in the
                design per random factor.
            n_samples (int or tuple, optional): The total number of samples in the
                design.  If `jointly` is False, this is the number of samples in each
                of the uncertainties and the levers, the total number of samples will
                be the square of this value.  Give a 2-tuple to set values for
                uncertainties and levers respectively, to set them independently.
                If this argument is given, it overrides `n_samples_per_factor`.
            random_seed (int or None, default 1234): A random seed for reproducibility.
            db (Database, optional): If provided, this design will be stored in the
                database indicated.  If not provided, the `db` for this model will
                be used, if one is set.
            design_name (str, optional): A name for this design, to identify it in the
                database. If not given, a unique name will be generated based on the
                selected sampler.
            sampler (str or AbstractSampler, default 'lhs'): The sampler to use for this
                design.  Available pre-defined samplers include:
                    - 'lhs': Latin Hypercube sampling
                    - 'ulhs': Uniform Latin Hypercube sampling, which ignores defined
                        distribution shapes from the scope and samples everything
                        as if it was from a uniform distribution
                    - 'mc': Monte carlo sampling
                    - 'uni': Univariate sensitivity testing, whereby experiments are
                        generated setting each parameter individually to minimum and
                        maximum values (for numeric dtypes) or all possible values
                        (for boolean and categorical dtypes).  Note that designs for
                        univariate sensitivity testing are deterministic and the number
                        of samples given is ignored.
            sample_from ('all', 'uncertainties', or 'levers'): Which scope components
                from which to sample.  Components not sampled are set at their default
                values in the design.
            jointly (bool, default True): Whether to sample jointly all uncertainties
                and levers in a single design, or, if False, to generate separate samples
                for levers and uncertainties, and then combine the two in a full-factorial
                manner.  This argument has no effect unless `sample_from` is 'all'.
                Note that setting `jointly` to False may produce a very large design,
                as the total number of experiments will be the product of the number of
                experiments for the levers and the number of experiments for the
                uncertainties, which are set separately (i.e. if `n_samples` is given,
                the total number of experiments is the square of that value).
        Returns:
            pandas.DataFrame: The resulting design.
        """
        if 'scope' in kwargs:
            kwargs.pop('scope')
        if 'db' not in kwargs:
            kwargs['db'] = self.db
        from ..experiment import experimental_design
        return experimental_design.design_experiments(self.scope, *args, **kwargs) 
[docs]    def async_experiments(
            self,
            design:pd.DataFrame=None,
            db=None,
            *,
            design_name=None,
            evaluator=None,
            max_n_workers=None,
            stagger_start=None,
            batch_size=None,
    ):
        """
        Asynchronously runs a design of combined experiments using this model.
        A combined experiment includes a complete set of input values for
        all exogenous uncertainties (a Scenario) and all policy levers
        (a Policy). Unlike the perform_experiments function in the EMA Workbench,
        this method pairs each Scenario and Policy in sequence, instead
        of running all possible combinations of Scenario and Policy.
        This change ensures compatibility with the EMAT database modules, which
        preserve the complete set of input information (both uncertainties
        and levers) for each experiment.  To conduct a full cross-factorial set
        of experiments similar to the default settings for EMA Workbench,
        use a factorial design, by setting the `jointly` argument for the
        `design_experiments` to False, or by designing experiments outside
        of EMAT with your own approach.
        Args:
            design (pandas.DataFrame, optional): experiment definitions
                given as a DataFrame, where each exogenous uncertainties and
                policy levers is given as a column, and each row is an experiment.
            db (Database, required): The database to use for loading and saving experiments.
                If none is given, the default database for this model is used.
                If there is no default db, and none is given here,
                these experiments will be aborted.
            design_name (str, optional): The name of a design of experiments to
                load from the database.  This design is only used if
                `design` is None.
            evaluator (emat.workbench.Evaluator, optional): Optionally give an
                evaluator instance.  If not given, a default DistributedEvaluator
                will be instantiated.  Passing any other kind of evaluator will
                currently cause an error, although in the future other async
                compatible evaluators may be provided.
            max_n_workers (int, optional):
                The maximum number of workers that will be created for a default
                dask.distributed LocalCluster.  If the number of cores available is
                smaller than this number, fewer workers will be spawned.  This value
                is only used if a default LocalCluster has not yet been created.
            stagger_start (int, optional):
                If provided, wait this number of seconds between initial dispatch
                of experiments to the evaluator.  For models that do a lot of
                file copying up front, this can prevent over-saturating the file
                storage system.
            batch_size (int, optional):
                For fast-running core models, the overhead from multi-processing
                can represent a big chunk of overall runtime.  Grouping experiments
                into batches that are sent to workers as a group can mitigate this.
                Setting batch_size to 1 will process every experiment separately.
                If no batch size is given, a guess is made as to an efficient
                batch_size based on the number of experiments and the number of
                workers.
        Raises:
            ValueError:
                If there are no experiments defined.  This includes
                the situation where `design` is given but no database is
                available.
        """
        # catch user gives only a design, not experiment_parameters
        if isinstance(design, str) and design_name is None:
            design_name, design = design, None
        if design_name is None and design is None:
            raise ValueError(f"must give design_name or design")
        if db is None:
            db = self.db
        if design_name is not None and design is None:
            if not db:
                raise ValueError(f'cannot load design "{design_name}", there is no db')
            design = db.read_experiment_parameters(self.scope.name, design_name)
        if design.empty:
            raise ValueError(f"no experiments available")
        from .asynchronous import  asynchronous_experiments
        if self.db is None:
            if db is not None:
                self.db = db
            else:
                raise ValueError("cannot run async_experiments without a `db` defined")
        return asynchronous_experiments(
            self,
            design,
            evaluator=evaluator,
            max_n_workers=max_n_workers,
            stagger_start=stagger_start,
            batch_size=batch_size,
        ) 
[docs]    def run_experiments(
            self,
            design:pd.DataFrame=None,
            evaluator=None,
            *,
            design_name=None,
            db=None,
            allow_short_circuit=None,
    ):
        """
        Runs a design of combined experiments using this model.
        A combined experiment includes a complete set of input values for
        all exogenous uncertainties (a Scenario) and all policy levers
        (a Policy). Unlike the perform_experiments function in the EMA Workbench,
        this method pairs each Scenario and Policy in sequence, instead
        of running all possible combinations of Scenario and Policy.
        This change ensures compatibility with the EMAT database modules, which
        preserve the complete set of input information (both uncertainties
        and levers) for each experiment.  To conduct a full cross-factorial set
        of experiments similar to the default settings for EMA Workbench,
        use a factorial design, by setting the `jointly` argument for the
        `design_experiments` to False, or by designing experiments outside
        of EMAT with your own approach.
        Args:
            design (pandas.DataFrame, optional): experiment definitions
                given as a DataFrame, where each exogenous uncertainties and
                policy levers is given as a column, and each row is an experiment.
            evaluator (emat.workbench.Evaluator, optional): Optionally give an
                evaluator instance.  If not given, a default SequentialEvaluator
                will be instantiated.
            design_name (str, optional): The name of a design of experiments to
                load from the database.  This design is only used if
                `design` is None.
            db (Database, optional): The database to use for loading and saving experiments.
                If none is given, the default database for this model is used.
                If there is no default db, and none is given here,
                the results are not stored in a database. Set to False to explicitly
                not use the default database, even if it exists.
        Returns:
            pandas.DataFrame:
                A DataFrame that contains all uncertainties, levers, and measures
                for the experiments.
        Raises:
            ValueError:
                If there are no experiments defined.  This includes
                the situation where `design` is given but no database is
                available.
        """
        from ..workbench import Scenario, Policy, perform_experiments
        # catch user gives only a design, not experiment_parameters
        if isinstance(design, str) and design_name is None:
            design_name, design = design, None
        if design_name is None and design is None:
            raise ValueError(f"must give design_name or design")
        if db is None:
            db = self.db
        if design_name is not None and design is None:
            if not db:
                raise ValueError(f'cannot load design "{design_name}", there is no db')
            design = db.read_experiment_parameters(self.scope.name, design_name)
        if design.empty:
            raise ValueError(f"no experiments available")
        # catch metamodels here and run them as a batch, which is much faster
        function = getattr(self, 'function', None)
        from .meta_model import MetaModel
        if isinstance(function, MetaModel):
            outcomes = function.predict(design)
            result = self.ensure_dtypes(pd.concat([
                design,
                outcomes
            ], axis=1, sort=False))
            from ..experiment.experimental_design import ExperimentalDesign
            result = ExperimentalDesign(result)
            result.scope = self.scope
            result.name = getattr(design, 'name', None)
            result.sampler_name = getattr(design, 'sampler_name', None)
            return result
        scenarios = []
        scenario_cols = self.scope._get_uncertainty_and_constant_names()
        design_scenarios = design[scenario_cols]
        for rownum in range(len(design)):
            if design.index.name == 'experiment':
                s = Scenario(
                    _experiment_id_=design.index[rownum],
                    **design_scenarios.iloc[rownum],
                )
            else:
                s = Scenario(
                    _experiment_id_=False,
                    **design_scenarios.iloc[rownum],
                )
            scenarios.append(s)
        lever_names = self.scope.get_lever_names()
        policies = [
            Policy(f"Incognito{n}", **dict(zip(lever_names, i)))
            for n,i in enumerate(design[lever_names].itertuples(index=False, name='ExperimentL'))
        ]
        evaluator = prepare_evaluator(evaluator, self)
        if getattr(evaluator, 'asynchronous', False):
            # When the evaluator is in asynchronous mode, the core model runs will be
            # dispatched here but the function will not block waiting on the result, and
            # instead depend on the model execution process to write the results into
            # the database when complete.
            with evaluator:
                if allow_short_circuit is not None:
                    _stored_allow_short_circuit = self.allow_short_circuit
                    self.allow_short_circuit = allow_short_circuit
                else:
                    _stored_allow_short_circuit = None
                try:
                    perform_experiments(
                        self,
                        scenarios=scenarios,
                        policies=policies,
                        zip_over={'scenarios', 'policies'},
                        evaluator=evaluator,
                    )
                finally:
                    if _stored_allow_short_circuit is not None:
                        self.allow_short_circuit = _stored_allow_short_circuit
            return
        else:
            with evaluator:
                if db is False:
                    _stored_db = self.db
                    self.db = None
                else:
                    _stored_db = None
                if allow_short_circuit is not None:
                    _stored_allow_short_circuit = self.allow_short_circuit
                    self.allow_short_circuit = allow_short_circuit
                else:
                    _stored_allow_short_circuit = None
                try:
                    experiments, outcomes = perform_experiments(
                        self,
                        scenarios=scenarios,
                        policies=policies,
                        zip_over={'scenarios', 'policies'},
                        evaluator=evaluator,
                    )
                finally:
                    if _stored_db:
                        self.db = _stored_db
                    if _stored_allow_short_circuit is not None:
                        self.allow_short_circuit = _stored_allow_short_circuit
            experiments.index = design.index
            outcomes = pd.DataFrame.from_dict(outcomes)
            outcomes.index = design.index
            # if db:
            #     metamodel_id = self.metamodel_id
            #     if metamodel_id is None:
            #         metamodel_id = 0
            #     db.write_experiment_measures(self.scope.name, metamodel_id, outcomes)
            # Put constants back into experiments
            experiments_ = experiments.drop(
                columns=['scenario', 'policy', 'model', '_experiment_id_'],
                errors='ignore',
            )
            for i in self.scope.get_constants():
                experiments_[i.name] = i.value
            result = self.ensure_dtypes(pd.concat([
                experiments_,
                outcomes
            ], axis=1, sort=False))
            from ..experiment.experimental_design import ExperimentalDesign
            result = ExperimentalDesign(result)
            result.scope = self.scope
            result.name = getattr(design, 'name', None)
            result.sampler_name = getattr(design, 'sampler_name', None)
            return result 
[docs]    def run_reference_experiment(
            self,
            evaluator=None,
            *,
            db=None,
    ):
        """
        Runs a reference experiment using this model.
        This single experiment includes a complete set of input values for
        all exogenous uncertainties (a Scenario) and all policy levers
        (a Policy). Each is set to the default value indicated by the scope.
        Args:
            evaluator (emat.workbench.Evaluator, optional): Optionally give an
                evaluator instance.  If not given, a default SequentialEvaluator
                will be instantiated.
            db (Database, optional): The database to use for loading and saving experiments.
                If none is given, the default database for this model is used.
                If there is no default db, and none is given here,
                the results are not stored in a database. Set to False to explicitly
                not use the default database, even if it exists.
        Returns:
            pandas.DataFrame:
                A DataFrame that contains all uncertainties, levers, and measures
                for the experiments.
        """
        if db is None:
            db = self.db
        ref = self.design_experiments(sampler='ref', db=db)
        return self.run_experiments(ref, evaluator=evaluator, db=db) 
[docs]    def feature_scores(
            self,
            design,
            return_type='styled',
            random_state=None,
            cmap='viridis',
            measures=None,
            shortnames=None,
    ):
        """
        Calculate feature scores based on a design of experiments.
        This method is provided as a convenient pass-through to the
        `feature_scores` function in the `analysis` sub-package, using
        the scope and database attached to this model.
        Args:
            design (str or pandas.DataFrame): The name of the design
                of experiments to use for feature scoring, or a single
                pandas.DataFrame containing the experimental design and
                results.
            return_type ({'styled', 'figure', 'dataframe'}):
                The format to return, either a heatmap figure as an SVG
                render in and xmle.Elem, or a plain pandas.DataFrame,
                or a styled dataframe.
            random_state (int or numpy.RandomState, optional):
                Random state to use.
            cmap (string or colormap, default 'viridis'): matplotlib
                colormap to use for rendering.
            measures (Collection, optional): The performance measures
                on which feature scores are to be generated.  By default,
                all measures are included.
        Returns:
            xmle.Elem or pandas.DataFrame:
                Returns a rendered SVG as xml, or a DataFrame,
                depending on the `return_type` argument.
        This function internally uses feature_scoring from the EMA Workbench, which in turn
        scores features using the "extra trees" regression approach.
        """
        from ..analysis.feature_scoring import feature_scores
        if shortnames is True:
            shortnames = self.scope
        return feature_scores(
            self.scope,
            design=design,
            return_type=return_type,
            db=self.db,
            random_state=random_state,
            cmap=cmap,
            measures=measures,
            shortnames=shortnames,
        ) 
    def get_feature_scores(self, *args, **kwargs):
        """
        Deprecated, use `Model.feature_scores`.
        """
        # for compatability with prior versions of TMIP-EMAT
        return self.feature_scores(*args, **kwargs)
    def _common_optimization_setup(
            self,
            epsilons=0.1,
            convergence='default',
            display_convergence=True,
            evaluator=None,
    ):
        import numbers
        if isinstance(epsilons, numbers.Number):
            epsilons = [epsilons]*len(self.outcomes)
        if convergence == 'default':
            convergence = ConvergenceMetrics(
                EpsilonProgress(),
                SolutionCount(),
            )
        if display_convergence and isinstance(convergence, ConvergenceMetrics):
            from IPython.display import display
            display(convergence)
        evaluator = prepare_evaluator(evaluator, self)
        return epsilons, convergence, display_convergence, evaluator
[docs]    def optimize(
            self,
            searchover='levers',
            evaluator=None,
            nfe=10000,
            convergence='default',
            display_convergence=True,
            convergence_freq=100,
            constraints=None,
            reference=None,
            reverse_targets=False,
            algorithm=None,
            epsilons='auto',
            min_epsilon=0.1,
            cache_dir=None,
            cache_file=None,
            check_extremes=False,
            **kwargs,
    ):
        """
        Perform multi-objective optimization over levers or uncertainties.
        The targets for the multi-objective optimization (i.e. whether each
        individual performance measures is to be maximized or minimized) are
        read from the model's scope.
        Args:
            searchover ({'levers', 'uncertainties'}):
                Which group of inputs to search over.  The other group
                will be set at their default values, unless other values
                are provided in the `reference` argument.
            evaluator (Evaluator, optional): The evaluator to use to
                run the model. If not given, a SequentialEvaluator will
                be created.
            nfe (int, default 10_000): Number of function evaluations.
                This generally needs to be fairly large to achieve stable
                results in all but the most trivial applications.
            convergence ('default', None, or emat.optimization.ConvergenceMetrics):
                A convergence display during optimization.  The default
                value is to report the epsilon-progress (the number of
                solutions that ever enter the candidate pool of non-dominated
                solutions) and the number of solutions remaining in that candidate
                pool.  Pass `None` explicitly to disable convergence tracking.
            display_convergence (bool, default True): Whether to automatically
                display figures that dynamically track convergence.  Set to
                `False` if you are not using this method within a Jupyter
                interactive environment.
            convergence_freq (int, default 100): How frequently to update the
                convergence measures.  There is some computational overhead to
                these convergence updates, so setting a value too small may
                noticeably slow down the process.
            constraints (Collection[Constraint], optional):
                Solutions will be constrained to only include values that
                satisfy these constraints. The constraints can be based on
                the search parameters (levers or uncertainties, depending on the
                value given for `searchover`), or performance measures, or
                some combination thereof.
            reference (Mapping): A set of values for the non-active inputs,
                i.e. the uncertainties if `searchover` is 'levers', or the
                levers if `searchover` is 'uncertainties'.  Any values not
                set here revert to the default values identified in the scope.
            reverse_targets (bool, default False): Whether to reverse the
                optimization targets given in the scope (i.e., changing
                minimize to maximize, or vice versa).  This will result in
                the optimization searching for the worst outcomes, instead of
                the best outcomes.
            algorithm (platypus.Algorithm, optional): Select an
                algorithm for multi-objective optimization.  The default
                algorithm is EpsNSGAII. See `platypus` documentation for details.
            epsilons (float or array-like): Used to limit the number of
                distinct solutions generated.  Set to a larger value to get
                fewer distinct solutions.
            cache_dir (path-like, optional): A directory in which to
                cache results.  Most of the arguments will be hashed
                to develop a unique filename for these results, making this
                generally safer than `cache_file`.
            cache_file (path-like, optional): A file into which to
                cache results.  If this file exists, the contents of the
                file will be loaded and all other arguments are ignored.
                Use with great caution.
            kwargs: Any additional arguments will be passed on to the
                platypus algorithm.
        Returns:
            emat.OptimizationResult:
                The set of non-dominated solutions found.
                When `convergence` is given, the convergence measures are
                included, as a pandas.DataFrame in the `convergence` attribute.
        """
        from ..util.disk_cache import load_cache_if_available, save_cache
        if isinstance(algorithm, str) or algorithm is None:
            alg = algorithm
        else:
            alg = algorithm.__name__
        if reference is not None:
            from ..workbench import Policy, Scenario
            if searchover == 'levers' and not isinstance(reference, Scenario):
                reference = Scenario("ReferenceScenario", **reference)
            elif searchover == 'uncertainties' and not isinstance(reference, Policy):
                reference = Policy("ReferencePolicy", **reference)
        else:
            if searchover == 'levers':
                reference = self.scope.default_scenario()
            elif searchover == 'uncertainties':
                reference = self.scope.default_policy()
        x, cache_file = load_cache_if_available(
            cache_file=cache_file,
            cache_dir=cache_dir,
            searchover=searchover,
            nfe=nfe,
            convergence=convergence,
            convergence_freq=convergence_freq,
            constraints=constraints,
            reference=reference,
            reverse_targets=reverse_targets,
            algorithm=alg,
            epsilons=epsilons,
        )
        if x is None:
            epsilons, convergence, display_convergence, evaluator = self._common_optimization_setup(
                epsilons, convergence, display_convergence, evaluator
            )
            if reverse_targets:
                for k in self.scope.get_measures():
                    k.kind_original = k.kind
                    k.kind = k.kind * -1
            _db_pause = self.db
            try:
                self.db = None
                with evaluator:
                    if epsilons == 'auto':
                        from ..workbench import perform_experiments
                        if searchover == 'levers':
                            _, trial_outcomes = perform_experiments(
                                self,
                                scenarios=reference,
                                policies=30,
                                evaluator=evaluator,
                            )
                        else:
                            _, trial_outcomes = perform_experiments(
                                self,
                                scenarios=30,
                                policies=reference,
                                evaluator=evaluator,
                            )
                        epsilons = [max(min_epsilon, np.std(trial_outcomes[mn]) / 20) for mn in self.scope.get_measure_names()]
                    results = evaluator.optimize(
                        searchover=searchover,
                        reference=reference,
                        nfe=nfe,
                        constraints=constraints,
                        convergence=convergence,
                        convergence_freq=convergence_freq,
                        epsilons=epsilons,
                        **kwargs,
                    )
                    if isinstance(results, tuple) and len(results) == 2:
                        results, result_convergence = results
                    else:
                        result_convergence = None
                    # Put constants back in to results
                    for i in self.scope.get_constants():
                        results[i.name] = i.value
                    results = self.ensure_dtypes(results)
                    x = OptimizationResult(results, result_convergence, scope=self.scope)
                    if searchover == 'levers':
                        x.scenarios = reference
                    elif searchover == 'uncertainties':
                        x.policies = reference
                    if check_extremes:
                        x.check_extremes(
                            self,
                            1 if check_extremes is True else check_extremes,
                            evaluator=evaluator,
                            searchover=searchover,
                            robust=False,
                        )
            finally:
                if reverse_targets:
                    for k in self.scope.get_measures():
                        k.kind = k.kind_original
                        del k.kind_original
                self.db = _db_pause
        elif display_convergence:
            _, convergence, display_convergence, _ = self._common_optimization_setup(
                None, convergence, display_convergence, False
            )
            for c in convergence:
                try:
                    c.rebuild(x.convergence)
                except KeyboardInterrupt:
                    raise
                except:
                    pass
        x.cache_file = cache_file
        save_cache(x, cache_file)
        return x 
[docs]    def robust_optimize(
            self,
            robustness_functions,
            scenarios,
            evaluator=None,
            nfe=10000,
            convergence='default',
            display_convergence=True,
            convergence_freq=100,
            constraints=None,
            epsilons=0.1,
            cache_dir=None,
            cache_file=None,
            algorithm=None,
            check_extremes=False,
            **kwargs,
    ):
        """
        Perform robust optimization.
        The robust optimization generally a multi-objective optimization task.
        It is undertaken using statistical measures of outcomes evaluated across
        a number of scenarios, instead of using the individual outcomes themselves.
        For each candidate policy, the model is evaluated against all of the considered
        scenarios, and then the robustness measures are evaluated using the
        set of outcomes from the original runs.  The robustness measures
        are aggregate measures that are computed from a set of outcomes.
        For example, this may be expected value, median, n-th percentile,
        minimum, or maximum value of any individual outcome.  It is also
        possible to have joint measures, e.g. expected value of the larger
        of outcome 1 or outcome 2.
        Each robustness function is indicated as a maximization or minimization
        target, where higher or lower values are better, respectively.
        The optimization process then tries to identify one or more
        non-dominated solutions for the possible policy levers.
        Args:
            robustness_functions (Collection[Measure]): A collection of
                aggregate statistical performance measures.
            scenarios (int or Collection): A collection of scenarios to
                use in the evaluation(s), or give an integer to generate
                that number of random scenarios.
            evaluator (Evaluator, optional): The evaluator to use to
                run the model. If not given, a SequentialEvaluator will
                be created.
            nfe (int, default 10_000): Number of function evaluations.
                This generally needs to be fairly large to achieve stable
                results in all but the most trivial applications.
            convergence ('default', None, or emat.optimization.ConvergenceMetrics):
                A convergence display during optimization.
            display_convergence (bool, default True): Automatically display
                the convergence metric figures when optimizing.
            convergence_freq (int, default 100): The frequency at which
                convergence metric figures are updated.
            constraints (Collection[Constraint], optional)
                Solutions will be constrained to only include values that
                satisfy these constraints. The constraints can be based on
                the policy levers, or on the computed values of the robustness
                functions, or some combination thereof.
            epsilons (float or array-like): Used to limit the number of
                distinct solutions generated.  Set to a larger value to get
                fewer distinct solutions.
            cache_dir (path-like, optional): A directory in which to
                cache results.  Most of the arguments will be hashed
                to develop a unique filename for these results, making this
                generally safer than `cache_file`.
            cache_file (path-like, optional): A file into which to
                cache results.  If this file exists, the contents of the
                file will be loaded and all other arguments are ignored.
                Use with great caution.
            algorithm (platypus.Algorithm or str, optional): Select an
                algorithm for multi-objective optimization.  The algorithm can
                be given directly, or named in a string. See `platypus`
                documentation for details.
            check_extremes (bool or int, default False): Conduct additional
                evaluations, setting individual policy levers to their
                extreme values, for each candidate Pareto optimal solution.
            kwargs: any additional arguments will be passed on to the
                platypus algorithm.
        Returns:
            emat.OptimizationResult:
                The set of non-dominated solutions found.
                When `convergence` is given, the convergence measures are
                included, as a pandas.DataFrame in the `convergence` attribute.
        """
        from ..optimization.optimize import robust_optimize
        from ..util.disk_cache import load_cache_if_available, save_cache
        if isinstance(algorithm, str) or algorithm is None:
            alg = algorithm
        else:
            alg = algorithm.__name__
        result, cache_file = load_cache_if_available(
            cache_file=cache_file,
            cache_dir=cache_dir,
            scenarios=scenarios,
            convergence=convergence,
            convergence_freq=convergence_freq,
            constraints=constraints,
            epsilons=epsilons,
            nfe=nfe,
            robustness_functions=robustness_functions,
            alg=alg,
            check_extremes=check_extremes,
        )
        if result is None:
            _db_pause = self.db
            try:
                self.db = None
                result = robust_optimize(
                    self,
                    robustness_functions,
                    scenarios,
                    evaluator=evaluator,
                    nfe=nfe,
                    convergence=convergence,
                    display_convergence=display_convergence,
                    convergence_freq=convergence_freq,
                    constraints=constraints,
                    epsilons=epsilons,
                    check_extremes=check_extremes,
                    **kwargs,
                )
            finally:
                self.db = _db_pause
        elif display_convergence:
            _, convergence, display_convergence, _ = self._common_optimization_setup(
                None, convergence, display_convergence, False
            )
            for c in convergence:
                try:
                    c.rebuild(result.convergence)
                except KeyboardInterrupt:
                    raise
                except:
                    pass
        result.cache_file = cache_file
        save_cache(result, cache_file)
        return result 
[docs]    def robust_evaluate(
            self,
            robustness_functions,
            scenarios,
            policies,
            evaluator=None,
            cache_dir=None,
            suspend_db=True,
    ):
        """
        Perform robust evaluation(s).
        The robust evaluation is used to generate statistical measures
        of outcomes, instead of generating the individual outcomes themselves.
        For each policy, the model is evaluated against all of the considered
        scenarios, and then the robustness measures are evaluated using the
        set of outcomes from the original runs.  The robustness measures
        are aggregate measures that are computed from a set of outcomes.
        For example, this may be expected value, median, n-th percentile,
        minimum, or maximum value of any individual outcome.  It is also
        possible to have joint measures, e.g. expected value of the larger
        of outcome 1 or outcome 2.
        Args:
            robustness_functions (Collection[Measure]): A collection of
                aggregate statistical performance measures.
            scenarios (int or Collection): A collection of scenarios to
                use in the evaluation(s), or give an integer to generate
                that number of random scenarios.
            policies (int, or collection): A collection of policies to
                use in the evaluation(s), or give an integer to generate
                that number of random policies.
            evaluator (Evaluator, optional): The evaluator to use to
                run the model. If not given, a SequentialEvaluator will
                be created.
            cache_dir (path-like, optional): A directory in which to
                cache results.
            suspend_db (bool, default True):
                Suspend writing the results of individual model runs to
                the database.  Robust evaluation potentially generates a
                large number of model executions, and storing all these
                individual results may not be useful.
        Returns:
            pandas.DataFrame: The computed value of each item
                in `robustness_functions`, for each policy in `policies`.
        """
        robust_results = None
        cache_file = None
        if cache_dir is not None:
            try:
                from ..util.hasher import hash_it
                hh = hash_it(
                    scenarios,
                    policies,
                    robustness_functions,
                )
                os.makedirs(os.path.join(cache_dir,hh[2:4],hh[4:6]), exist_ok=True)
                cache_file = os.path.join(cache_dir,hh[2:4],hh[4:6],hh[6:]+".gz")
                if os.path.exists(cache_file):
                    _logger.debug(f"loading from cache_file={cache_file}")
                    from ..util.filez import load
                    robust_results = load(cache_file)
                    cache_file = None
            except KeyboardInterrupt:
                raise
            except:
                import traceback
                warnings.warn('unable to manage cache')
                traceback.print_exc()
        if robust_results is None:
            with self.lock_db(suspend_db):
                if evaluator is None:
                    from ..workbench.em_framework import SequentialEvaluator
                    evaluator = SequentialEvaluator(self)
                if not isinstance(evaluator, BaseEvaluator):
                    from dask.distributed import Client
                    if isinstance(evaluator, Client):
                        from ..workbench.em_framework.ema_distributed import DistributedEvaluator
                        evaluator = DistributedEvaluator(self, client=evaluator)
                from ..workbench.em_framework.samplers import sample_uncertainties, sample_levers
                if isinstance(scenarios, int):
                    n_scenarios = scenarios
                    scenarios = sample_uncertainties(self, n_scenarios)
                with evaluator:
                    robust_results = evaluator.robust_evaluate(
                        robustness_functions,
                        scenarios,
                        policies,
                    )
                robust_results = self.ensure_dtypes(robust_results)
        if cache_file is not None:
            from ..util.filez import save
            save(robust_results, cache_file, overwrite=True)
            with open(cache_file.replace('.gz','.info.txt'), 'wt') as notes:
                print("scenarios=", scenarios, file=notes)
                print("robustness_functions=", robustness_functions, file=notes)
                print("policies=", policies, file=notes)
        return robust_results 
[docs]    def io_experiment(self, params):
        """
        Run an experiment, and return a dictionary of inputs and outputs together.
        Args:
            params: dict
        Returns:
            dict
        """
        out = self.run_experiment(params).copy()
        out.update(params)
        return out 
[docs]    def log(self, message, level=logging.INFO):
        """
        Log a message.
        This facility will attempt to send log messages to
        the attached database, falling back to the regular
        module logger in case that fails.
        Args:
            message (str): Message to send to log.
            level (int, default logging.INFO): Log level.
        Returns:
        """
        db = getattr(self, 'db', None)
        try:
            db.log(message, level=level)
        except:
            _logger.log(level, message)