# -*- coding: utf-8 -*-
import pandas
import numpy
import yaml
import warnings
import itertools
from ..workbench import ScalarOutcome
from ..workbench.em_framework.parameters import Category
from typing import Mapping
from scipy.stats._distn_infrastructure import rv_frozen
from ..database.database import Database
from .parameter import Parameter, standardize_parameter_type, make_parameter
from .measure import Measure
from ..util.docstrings import copydoc
from ..util import rv_frozen_as_dict
from ..util.loggers import get_module_logger
_logger = get_module_logger(__name__)
from ..exceptions import *
def _name_or_dict(x):
if isinstance(x, rv_frozen):
x = rv_frozen_as_dict(x)
if x is None or not isinstance(x, Mapping):
return x
if len(x)>1:
return x
if list(x.keys()) == ['name']:
return x['name']
return x
def _as_float(x):
if x is None:
return None
try:
return float(x)
except:
return x
[docs]class Scope:
'''Definitions for the relevant inputs and outputs for a model.
A Scope provides a structure to define the nature of the inputs
and outputs for exploratory modeling.
Args:
scope_file (str): path to scope file
scope_def (str, optional): The content of the scope file, if it has
already been read into a string. If this value is given, it is
assumed to be the contents of the file and the file is not actually
read again.
'''
scope_file = ''
name = ''
random_seed = 1234
desc = ''
xl_di = {}
m_di = {}
def __init__(self, scope_file, scope_def=None):
self.scope_file = scope_file
if scope_file is None and scope_def is None:
scope_def = """
scope:
name: Empty-Scope
inputs:
outputs:
"""
self._m_list = []
"""list of Measure: A list of performance measures that are output by the model."""
self._x_list = []
self._l_list = []
self._c_list = []
self.__parse_scope(scope_def=scope_def)
def __parse_scope(self, scope_def=None):
'''parser to read scope yaml file'''
if scope_def is None:
with open(self.scope_file, 'r') as stream:
scope = yaml.load(stream, Loader=yaml.FullLoader)
else:
scope = yaml.load(scope_def, Loader=yaml.FullLoader)
for k in ('scope', 'inputs', 'outputs'):
if k not in scope:
raise ScopeFormatError(f'scope file must include "{k}" as a top level key')
self.name = str(scope['scope']['name'])
self.desc = scope['scope'].get('desc', '')
self.xl_di = scope['inputs']
self.m_di = scope['outputs']
if self.m_di is not None:
if not isinstance(self.m_di, dict):
raise ScopeFormatError(
'outputs must be a dictionary with (name: attributes) key:value pairs'
)
for m_name, m_attr in self.m_di.items():
if isinstance(m_attr, dict):
self._m_list.append(Measure(m_name, **m_attr))
else:
warnings.warn(f'for {m_name} cannot process list {m_attr}')
self._m_list.append(Measure(m_name))
if 'random_seed' in scope:
self.random_seed = scope['random_seed']
if self.xl_di is not None:
if not isinstance(self.xl_di, dict):
raise ScopeFormatError(
'inputs must be a dictionary with (name: attributes) key:value pairs'
)
for x_name, x_attr in self.xl_di.items():
if not isinstance(x_attr, dict):
warnings.warn(f'for {x_name} cannot process list {x_attr}')
else:
x_attr_type = x_attr.get('ptype', 'missing')
if x_attr_type == 'missing':
raise ScopeFormatError(f'inputs:{x_name} is missing ptype, must be uncertainty, lever, or constant')
if not isinstance(x_attr_type, str):
raise ScopeFormatError(f'inputs:{x_name} has invalid ptype {x_attr_type}, it must be uncertainty, lever, or constant')
try:
x_attr_type = standardize_parameter_type(x_attr_type)
except ValueError:
raise ScopeFormatError(f'inputs:{x_name} has invalid ptype {x_attr.get("ptype")}, it must be uncertainty, lever, or constant')
try:
p = make_parameter(x_name, **x_attr)
except Exception as err:
raise ScopeFormatError(f"in making parameter '{x_name}': {str(err)}")
else:
if x_attr_type == 'uncertainty':
self._x_list.append(p)
elif x_attr_type == 'lever':
self._l_list.append(p)
elif x_attr_type == 'constant':
self._c_list.append(p)
else:
raise ScopeFormatError(f'inputs:{x_name} has invalid ptype {x_attr.get("ptype")}')
def __eq__(self, other):
if type(other) != type(self):
return False
for k in ('_x_list', '_l_list', '_c_list', ):
if len(getattr(self,k)) != len(getattr(other,k)):
return False
for i,j in zip(getattr(self,k), getattr(other,k)):
if isinstance(i, rv_frozen):
if rv_frozen_as_dict(i) != rv_frozen_as_dict(j):
return False
else:
if i != j:
return False
for k in ('_m_list', 'name', 'desc'):
if getattr(self,k) != getattr(other,k):
return False
return True
def _assert_equal(self, other):
if type(other) != type(self):
raise AssertionError(f"not same type: {type(other)} != {type(self)}")
for k in ('_x_list', '_l_list', '_c_list', ):
if len(getattr(self,k)) != len(getattr(other,k)):
raise AssertionError(f"mismatch length {k}: {len(getattr(self,k))} != {len(getattr(other,k))}")
for i,j in zip(getattr(self,k), getattr(other,k)):
if isinstance(i, rv_frozen):
if rv_frozen_as_dict(i) != rv_frozen_as_dict(j):
raise AssertionError(f"mismatch rv_frozen: {rv_frozen_as_dict(i)} != {rv_frozen_as_dict(j)}")
else:
if i != j:
raise AssertionError(f"mismatch item: {i} != {j}")
for k in ('_m_list', 'name', 'desc'):
if getattr(self,k) != getattr(other,k):
raise AssertionError(f"mismatch {k}: {getattr(self,k)} != {getattr(other,k)}")
[docs] def store_scope(self, db: Database):
'''
Write variables and scope definition to database.
Writing the scope to the database is required
prior to running a experiments that will be
stored.
Args:
db (Database): database object
'''
# write experiment variables and performance measures
db.init_xlm(
[(xl, self.xl_di[xl]['ptype']) for xl in self.xl_di],
[(m.name, m.transform) for m in self._m_list],
)
# write scope definitions
db._write_scope(
self.name,
self.scope_file,
[xl for xl in self.xl_di],
[m.name for m in self._m_list],
content=self,
)
[docs] def delete_scope(self, db: Database):
'''Deletes scope from database.
Args:
db (Database): The database from which to delete this Scope.
Note:
Only the `name` attribute is used to identify the scope
to delete. If some other different scope is stored in
the database with the same name as this scope, it will
be deleted.
'''
db.delete_scope(self.name)
[docs] def n_factors(self):
'''Number of input factors defined in this scope.'''
return len(self._c_list) + len(self._x_list) + len(self._l_list)
[docs] def n_sample_factors(self):
'''Number of non-constant input factors defined in this scope.'''
return len(self._x_list) + len(self._l_list)
@property
def xl_list(self):
return self._x_list + self._l_list
@property
def xlc_list(self):
return self._x_list + self._l_list + self._c_list
def __repr__(self):
content = []
if len(self._c_list):
content.append(f"{len(self._c_list)} constants")
if len(self._x_list):
content.append(f"{len(self._x_list)} uncertainties")
if len(self._l_list):
content.append(f"{len(self._l_list)} levers")
if len(self._m_list):
content.append(f"{len(self._m_list)} measures")
if not content:
content.append("no content")
return f"<emat.Scope with " + ", ".join(content) + ">"
[docs] def duplicate(
self,
strip_measure_transforms=False,
include_measures=None,
exclude_measures=None,
):
"""Create a duplicate scope, optionally stripping some features.
Args:
strip_measure_transforms (bool, default False):
Remove the 'transform' values from all measures.
include_measures (Collection[str], optional): If provided, only
output performance measures with names in this set will be included.
exclude_measures (Collection[str], optional): If provided, only
output performance measures with names not in this set will be included.
Returns:
Scope
"""
y = self.dump(strip_measure_transforms=strip_measure_transforms,
include_measures=include_measures,
exclude_measures=exclude_measures,)
try:
return type(self)(self.scope_file, scope_def=y)
except:
_logger.error(f"scope dump\n{str(y)}")
raise
[docs] def dump(
self,
stream=None,
filename=None,
strip_measure_transforms=False,
include_measures=None,
exclude_measures=None,
default_flow_style=False,
**kwargs,
):
"""
Serialize this scope into a YAML stream.
Args:
stream (file-like or None): Serialize into this stream. If None,
return the produced string instead, unless `filename` is given.
filename (path-like or None): If given and `stream` is None,
then write the serialized result into this file.
strip_measure_transforms (bool, default False): Remove the
'transform' values from all measures in the output.
include_measures (Collection[str], optional): If provided, only
output performance measures with names in this set will be included.
exclude_measures (Collection[str], optional): If provided, only
output performance measures with names not in this set will be included.
default_flow_style (bool, default False): Use the default_flow_style,
see yaml.dump for details.
**kwargs:
All other keyword arguments are forwarded as-is to `yaml.dump`
Returns:
str:
If both `stream` and `filename` are None, the serialized YAML
content is returned as a string.
Raises:
FileExistsError: If `filename` already exists.
ValueError: If both `stream` and `filename` are given.
"""
if stream and filename:
raise ValueError('only one of stream or filename can be given.')
from collections import OrderedDict
s = dict()
s['scope'] = dict()
s['scope']['name'] = self.name
s['scope']['desc'] = self.desc
s['inputs'] = {}
s['outputs'] = {}
const_keys = ['ptype','desc','dtype','default']
parameter_keys = OrderedDict([
# ('shortname', lambda x: x or None), # processed separately
('ptype', lambda x: x),
('desc', lambda x: x),
('dtype', lambda x: x),
('default', lambda x: x),
('min', lambda x: x),
('max', lambda x: x),
# ('dist', lambda x: _name_or_dict(x) or None), # processed separately
('corr', lambda x: x or None),
('values', lambda x: x or None),
('abbrev', lambda x: x or None),
('tags', lambda x: list(x) if x else None),
])
measure_keys = {
# 'shortname': lambda x: x or None, # processed separately
'kind': lambda x: {-1:'minimize', 0:'info', 1:'maximize'}.get(x,x),
'desc': lambda x: x,
'transform': lambda x: x,
'metamodeltype': lambda x: 'linear' if x is None else x,
'tags': lambda x: list(x) if x else None,
'formula': lambda x: x if x else None,
'parser': lambda x: x,
}
if strip_measure_transforms:
measure_keys.pop('transform', None)
for i in self._c_list:
s['inputs'][i.name] = {}
for k in const_keys:
if hasattr(i, k):
v = getattr(i,k)
if v is not None:
s['inputs'][i.name][k] = getattr(i,k)
for i in self._x_list + self._l_list:
s['inputs'][i.name] = {}
if i.shortname_if_any:
s['inputs'][i.name]['shortname'] = i.shortname_if_any
for k in parameter_keys:
if hasattr(i, k):
v = parameter_keys[k](getattr(i,k))
if v is not None:
s['inputs'][i.name][k] = v
v = i.distdef
if v is not None:
s['inputs'][i.name]['dist'] = _name_or_dict(v)
for i in self._m_list:
if include_measures is not None and i.name not in include_measures:
continue
if exclude_measures is not None and i.name in exclude_measures:
continue
s['outputs'][i.name] = {}
if i.shortname_if_any:
s['outputs'][i.name]['shortname'] = i.shortname_if_any
for k in measure_keys:
if hasattr(i, k):
v = measure_keys[k](getattr(i,k))
if v is not None:
s['outputs'][i.name][k] = v
import yaml.representer
yaml.add_representer(dict,
lambda self, data: yaml.representer.SafeRepresenter.represent_dict(self, data.items()))
if filename is not None:
import os
if os.path.exists(filename):
raise FileExistsError(filename)
with open(filename, 'w') as stream:
yaml.dump(s, stream=stream, default_flow_style=default_flow_style, **kwargs)
else:
return yaml.dump(s, stream=stream, default_flow_style=default_flow_style, **kwargs)
def subscope(
self,
name,
include_measures=None,
exclude_measures=None,
add_measure_formulas=None,
):
"""
Create a limited version of an existing scope.
This method can limit the number of performance measures
included in the scope, or create a curated set of formulaic
performance measures.
Args:
name (str, optional): A new name for this sub-scope.
If not provided, the subscope uses the same name
as the original scope.
include_measures (Collection[str], optional):
If provided, only performance measures with names in
this set will be included.
exclude_measures (Collection[str], optional):
If provided, only performance measures with names not
in this set will be included.
add_measure_formulas (Mapping[str,str], optional):
If provided, these formulaic performance measures
will be added to the subscope (but not to the original
scope).
Returns:
Scope
"""
subscope = type(self)(
"modified.yaml",
self.dump(
include_measures=include_measures,
exclude_measures=exclude_measures,
),
)
subscope.name = name or self.name
if add_measure_formulas is not None:
for mname, formula in add_measure_formulas.items():
subscope.add_measure(Measure(mname, formula=formula))
return subscope
[docs] def info(self, return_string=False):
"""Print a summary of this Scope.
Args:
return_string (bool): Defaults False (print to stdout) but if given as True
then this function returns the string instead of printing it.
"""
if return_string:
import io
f = io.StringIO
else:
f = None
print(f'name: {self.name}', file=f)
print(f'desc: {self.desc}', file=f)
if self._c_list:
print('constants:', file=f)
for i in self._c_list:
print(f' {i.name} = {i.default}', file=f)
if self._x_list:
print('uncertainties:', file=f)
for i in self._x_list:
if i.dtype in ('int','real'):
print(f' {i.name} = {i.min} to {i.max}', file=f)
elif i.dtype in ('bool',):
print(f' {i.name} = boolean', file=f)
elif i.dtype in ('cat',):
print(f' {i.name} = categorical', file=f)
if self._l_list:
print('levers:', file=f)
for i in self._l_list:
if i.dtype in ('int','real'):
print(f' {i.name} = {i.min} to {i.max}', file=f)
elif i.dtype in ('bool',):
print(f' {i.name} = boolean', file=f)
elif i.dtype in ('cat',):
print(f' {i.name} = categorical', file=f)
if self._m_list:
print('measures:', file=f)
for i in self._m_list:
print(f' {i.name}', file=f)
if return_string:
return f.getvalue()
[docs] def get_uncertainty_names(self):
"""Get a list of exogenous uncertainty names."""
return [i.name for i in self._x_list]
def _get_uncertainty_and_constant_names(self):
"""Get a list of exogenous uncertainty and constant names."""
return self.get_uncertainty_names() + self.get_constant_names()
[docs] def get_lever_names(self):
"""Get a list of policy lever names."""
return [i.name for i in self._l_list]
[docs] def get_constant_names(self):
"""Get a list of model constant names."""
return [i.name for i in self._c_list]
[docs] def get_parameter_names(self, include_constants=True):
"""
Get a list of model parameter (uncertainty+lever+constant) names.
Args:
include_constants (bool, default True): Include constants.
Returns:
list
"""
if include_constants:
return self.get_constant_names()+self.get_uncertainty_names()+self.get_lever_names()
return self.get_uncertainty_names() + self.get_lever_names()
[docs] def get_all_names(self):
"""Get a list of all (uncertainty+lever+constant+measure) model names."""
return self.get_parameter_names()+self.get_measure_names()
[docs] def get_measure_names(self):
"""Get a list of performance measure names."""
return [i.name for i in self._m_list]
[docs] def get_uncertainties(self):
"""Get a list of exogenous uncertainties."""
return [i for i in self._x_list]
[docs] def get_levers(self):
"""Get a list of policy levers."""
return [i for i in self._l_list]
[docs] def get_constants(self):
"""Get a list of model constants."""
return [i for i in self._c_list]
[docs] def get_parameters(self):
"""Get a list of model parameters (uncertainties+levers+constants)."""
return self.get_constants()+self.get_uncertainties()+self.get_levers()
def get_parameter_defaults(self):
"""Get a dict of default values of model parameters (uncertainties+levers+constants)."""
return {p.name:p.default for p in self.get_parameters()}
[docs] def get_measures(self):
"""Get a list of performance measures."""
return [i for i in self._m_list]
def get_measure_tags(self):
"""
Get the set of all performance measure tags.
Returns:
set
"""
tags = set()
for i in self._m_list:
tags |= i.tags
return tags
def __getitem__(self, item):
"""Get a parameter or measure by name."""
for i in itertools.chain(self._x_list, self._l_list, self._c_list, self._m_list):
if i.name == item:
return i
raise KeyError(item)
def __contains__(self, item):
for i in itertools.chain(self._x_list, self._l_list, self._c_list, self._m_list):
if i.name == item:
return True
return False
[docs] def ensure_dtypes(self, df):
"""
Convert columns of dataframe to correct dtype as needed.
Args:
df (pandas.DataFrame): A dataframe with column names
that are uncertainties, levers, or measures.
Returns:
pandas.DataFrame:
The same data as input, but with dtypes as appropriate.
"""
correct_dtypes = { }
correct_dtypes.update({i.name: (i.dtype, getattr(i,'values',None)) for i in self.get_parameters()})
correct_dtypes.update({i.name: (i.dtype, getattr(i,'values',None)) for i in self.get_measures()})
copy_made = False
for col in df.columns:
if col in correct_dtypes:
correct_dtype, cat_values = correct_dtypes[col]
if correct_dtype == 'real':
if not pandas.api.types.is_float_dtype(df[col]):
if not copy_made: df = df.copy()
df[col] = df[col].astype(float)
elif correct_dtype == 'int':
if not pandas.api.types.is_integer_dtype(df[col]):
if not copy_made: df = df.copy()
df[col] = df[col].astype(int)
elif correct_dtype == 'bool':
if not pandas.api.types.is_bool_dtype(df[col]):
if not copy_made: df = df.copy()
t = df[col].apply(lambda z: z.value if isinstance(z,Category) else z)
df[col] = t.astype(bool)
elif correct_dtype == 'cat':
if not pandas.api.types.is_categorical_dtype(df[col]):
if not copy_made: df = df.copy()
t = df[col].apply(lambda z: z.value if isinstance(z,Category) else z)
df[col] = pandas.Categorical(t, categories=cat_values, ordered=True)
elif correct_dtype is None and df[col].dtype is numpy.dtype('O'):
if not copy_made: df = df.copy()
df[col] = df[col].astype(float)
return df
def formula_components(self):
"""
The set of all measure names used as inputs to other measure formulas.
Returns:
set
"""
from tokenize import tokenize
import token
math = {
"sin", "cos", "exp", "log", "expm1", "log1p", "sqrt", "sinh", "cosh",
"tanh", "arcsin", "arccos", "arctan", "arccosh", "arcsinh", "arctanh",
"abs", "arctan2", "log10",
}
names = set()
for measure in self.get_measures():
formula = getattr(measure, 'formula', None)
if formula:
import io
stream = io.BytesIO(formula)
stream.seek(0)
for tokentype, k, _, _, _ in tokenize(stream.readline):
if tokentype == token.NAME:
if k not in math:
names.add(k)
return names
def apply_formulas(self, df, overwrite=False):
"""
Compute formulaic measures as needed.
Args:
df (pandas.DataFrame): A dataframe with column names
that are uncertainties, levers, or measures.
Returns:
pandas.DataFrame:
The same data as input, but with added results.
"""
queue = {}
for measure in self.get_measures():
formula = getattr(measure, 'formula', None)
if formula:
if measure.name in df.columns and not overwrite:
if df[measure.name].isna().sum():
dataseries = df.eval(formula).rename(measure.name)
queue[measure.name] = df[measure.name].fillna(dataseries)
else:
dataseries = df.eval(formula).rename(measure.name)
queue[measure.name] = dataseries
if queue:
df = df.assign(**queue)
return df
[docs] def get_dtype(self, name):
"""
Get the dtype for a parameter or measure.
Args:
name (str):
The name of the parameter or measure
Returns:
str:
{'real', 'int', 'bool', 'cat'}
"""
correct_dtypes = { }
correct_dtypes.update({i.name: i.dtype for i in self.get_parameters()})
correct_dtypes.update({i.name: i.dtype for i in self.get_measures()})
if name not in correct_dtypes:
raise KeyError(name)
return correct_dtypes[name]
def get_ptype(self, name):
"""
Get the ptype for a parameter or measure.
Args:
name (str):
The name of the parameter or measure
Returns:
str:
{'X', 'L', 'C', 'M', ''}
"""
if name in self.get_measure_names(): return 'M'
if name in self.get_uncertainty_names(): return 'X'
if name in self.get_lever_names(): return 'L'
if name in self.get_constant_names(): return 'C'
return ''
[docs] def get_cat_values(self, name):
"""
Get the category values for a parameter or measure.
Args:
name (str):
The name of the parameter or measure
Returns:
list or None
"""
correct_dtypes = {}
correct_dtypes.update({i.name: getattr(i,'values',None) for i in self.get_parameters()})
correct_dtypes.update({i.name: getattr(i,'values',None) for i in self.get_measures()})
if name not in correct_dtypes:
raise KeyError(name)
return correct_dtypes[name]
def ensure_cat_ordering(self, data, inplace=True):
"""
Ensure that all categorical columns have correctly ordered values.
Args:
data (pandas.DataFrame or pandas.Series):
Returns:
pandas.DataFrame or pandas.Series
"""
if isinstance(data, pandas.Series):
data = pandas.DataFrame(data)
base_was_series = True
else:
base_was_series = False
if not inplace:
data = data.copy(deep=True)
categorical_columns = data.select_dtypes('category').columns
for c in categorical_columns:
ordering = getattr(self[c],'values',None)
if ordering:
data[c].cat.reorder_categories(ordering, inplace=True)
if not inplace:
if base_was_series:
return data.iloc[:,0]
else:
return data
[docs] def design_experiments(self, *args, **kwargs):
"""
Create a design of experiments based on this Scope.
Args:
n_samples_per_factor (int, default 10): The number of samples in the
design per random factor.
n_samples (int or tuple, optional): The total number of samples in the
design. If `jointly` is False, this is the number of samples in each
of the uncertainties and the levers, the total number of samples will
be the square of this value. Give a 2-tuple to set values for
uncertainties and levers respectively, to set them independently.
If this argument is given, it overrides `n_samples_per_factor`.
random_seed (int or None, default 1234): A random seed for reproducibility.
db (Database, optional): If provided, this design will be stored in the
database indicated.
design_name (str, optional): A name for this design, to identify it in the
database. If not given, a unique name will be generated based on the
selected sampler. Has no effect if no `db` is given.
sampler (str or AbstractSampler, default 'lhs'): The sampler to use for this
design. Available pre-defined samplers include:
- 'lhs': Latin Hypercube sampling
- 'ulhs': Uniform Latin Hypercube sampling, which ignores defined
distribution shapes from the scope and samples everything
as if it was from a uniform distribution
- 'mc': Monte carlo sampling
- 'uni': Univariate sensitivity testing, whereby experiments are
generated setting each parameter individually to minimum and
maximum values (for numeric dtypes) or all possible values
(for boolean and categorical dtypes). Note that designs for
univariate sensitivity testing are deterministic and the number
of samples given is ignored.
sample_from ('all', 'uncertainties', or 'levers'): Which scope components
from which to sample. Components not sampled are set at their default
values in the design.
jointly (bool, default True): Whether to sample jointly all uncertainties
and levers in a single design, or, if False, to generate separate samples
for levers and uncertainties, and then combine the two in a full-factorial
manner. This argument has no effect unless `sample_from` is 'all'.
Note that setting `jointly` to False may produce a very large design,
as the total number of experiments will be the product of the number of
experiments for the levers and the number of experiments for the
uncertainties, which are set separately (i.e. if `n_samples` is given,
the total number of experiments is the square of that value).
Returns:
pandas.DataFrame: The resulting design.
"""
if 'scope' in kwargs:
kwargs.pop('scope')
from ..experiment import experimental_design
return experimental_design.design_experiments(self, *args, **kwargs)
def _any_correlated_parameters(self):
for p in self.get_parameters():
if len(p.corr):
return True
return False
def get_density(self, *args, **kwargs):
"""
Compute the parametric density at any point.
"""
if self._any_correlated_parameters():
raise NotImplementedError("density with correlated parameters is coming soon")
if args:
for arg in args:
kwargs.update(arg)
density = 1.0
for p in self.get_parameters():
value = kwargs.get(p.name, p.default)
density *= p.dist.pdf(value)
return density
def shortname(self, name):
"""
Get a shortname, if available, for any named parameter or measure.
Args:
name: str
Returns:
str
"""
try:
x = self[name]
except KeyError:
return name
else:
try:
return x.shortname
except:
return name
def tagged_shortname(self, name, wrap_width=None, line_sep="<br>"):
"""
Get a label, for any named parameter or measure.
The label is the shortname, if available, and a circled letter
symbol indicating the ptype of the parameter or measure.
Args:
name: str
Returns:
str
"""
tags = dict(
L="Ⓛ ",
X="Ⓧ ",
M="Ⓜ ",
C="Ⓒ ",
)
result = tags.get(self.get_ptype(name),"")+self.shortname(name)
if wrap_width is not None:
import textwrap
result = line_sep.join(textwrap.wrap(result, width=wrap_width))
return result
def get_description(self, name):
"""
Get a description, if available, for any named parameter or measure.
Args:
name: str
Returns:
str
"""
try:
x = self[name]
except KeyError:
return ''
else:
try:
return x.desc
except:
return ''
def default_policy(self, **kwargs):
"""
The default settings for policy levers.
Args:
**kwargs:
Override the defaults given in the scope
with these values.
Returns:
emat.workbench.Policy
"""
from ..workbench import Policy
values = {l.name: l.default for l in self.get_levers()}
values.update(kwargs)
return Policy('default', **values)
def default_scenario(self, **kwargs):
"""
The default settings for exogenous uncertainties.
Args:
**kwargs:
Override the defaults given in the scope
with these values.
Returns:
emat.workbench.Scenario
"""
from ..workbench import Scenario
values = {u.name: u.default for u in self.get_uncertainties()}
values.update(kwargs)
return Scenario('default', **values)
def add_measure(self, measure, db=None, precompute=False, **kwargs):
"""
Add a performance measure to this scope.
Args:
measure (str or emat.Measure):
A measure to add. If given as a string, a measure
is created with this name and the default settings, and
other keyword arguments given will be forwarded to the
`Measure` constructor.
db (emat.Database, optional):
If given, the scope in this database is updated to include
the new measure (and any other prior changes to this
scope).
precompute (bool, default False):
Whether to pre-compute formula-based measures and store
them in the database. This will result in a larger file
size, but faster reading, especially for complex measures.
"""
if isinstance(measure, str):
measure = Measure(measure, **kwargs)
if not isinstance(measure, Measure):
raise TypeError("must add `Measure`, or give a name to create one")
for m in self._m_list:
if m.name == measure.name:
raise ValueError(f"duplicate measure name '{measure.name}'")
self._m_list.append(measure)
if db is not None:
if not isinstance(db, Database):
raise TypeError("db must be an emat.Database")
db.update_scope(self)
if precompute and measure.formula is not None and db is not None:
df_m = db.read_experiment_measures(self.name, runs='all', source=0)
df_n = pandas.DataFrame(df_m.eval(measure.formula).rename(measure.name))
db.write_experiment_measures(self.name, 0, df_n)