from collections import namedtuple
from collections.abc import MutableMapping, Mapping
import itertools
from typing import Collection
import pandas
import numpy
import copy
from abc import ABC, abstractmethod
from ..util.distributions import truncated, get_distribution_bounds
from math import isclose
from .scope import Scope, ScopeError
from .parameter import IntegerParameter, CategoricalParameter, BooleanParameter
from .. import styles
from ..viz import colors
Bounds = namedtuple('Bounds', ['lowerbound', 'upperbound'])
Bounds.__doc__ = """
A lower and upper bound as a 2-tuple.
Args:
lowerbound (numeric or None):
The lower bound to set, or None
if there is no lower bound.
upperbound (numeric or None):
The upper bound to set, or None
if there is no upper bound.
"""
class GenericBox(MutableMapping, ABC):
# Generic methods applicable to both Box and ChainedBox
def inside(self, df):
"""
For each row of a DataFrame, identify if it is inside the box.
Args:
df (pandas.DataFrame): Must include a column matching every
thresholded feature.
Returns:
pandas.Series
With dtype bool.
"""
within = pandas.Series(True, index=df.index)
for label, bounds in self.thresholds.items():
if isinstance(bounds, set):
within &= numpy.in1d(df[label], list(bounds))
else:
if bounds.lowerbound is not None:
within &= (df[label] >= bounds.lowerbound)
if bounds.upperbound is not None:
within &= (df[label] <= bounds.upperbound)
return within
def __init__(self):
self._scope = None
@property
@abstractmethod
def thresholds(self):
"""
Dict[str,Union[Bounds,Set]]:
The restricted dimensions in this Box, with feature names as
keys and :class:`Bounds` or a :class:`Set` of available discrete
values as the dictionary values.
"""
raise NotImplementedError
@property
@abstractmethod
def demanded_features(self):
"""
Set[str]: A set of features upon which thresholds are set.
"""
raise NotImplementedError
@property
@abstractmethod
def relevant_features(self):
"""
Set[str]:
A :class:`Set` of features that are relevant for this Box.
These are features, which are not themselves constrained,
but should be considered in any analytical report developed
based on this Box.
"""
raise NotImplementedError
@property
def relevant_and_demanded_features(self):
"""
Set[str]: The union of relevant and demanded features.
"""
return self.relevant_features | self.demanded_features
@property
def scope(self):
"""Scope: A scope associated with this Box."""
return self._scope
@scope.setter
def scope(self, x):
if x is None or isinstance(x, Scope):
self._scope = x
else:
raise TypeError('scope must be Scope or None')
@scope.deleter
def scope(self):
self._scope = None
@abstractmethod
def set_bounds(self, key, lowerbound, upperbound=None):
"""
Set both lower and upper bounds.
Args:
key (str):
The feature name to which these bounds
will be attached.
lowerbound (numeric, None, or Bounds):
The lower bound, or a Bounds object that gives
upper and lower bounds (in which case the `upperbound`
argument is ignored). Set explicitly to 'None' to
leave unbounded from below.
upperbound (numeric or None, default None):
The upper bound. Set to 'None' to
leave unbounded from above.
Raises:
ScopeError:
If a scope is attached to this box but the `key` cannot
be found in the scope.
"""
raise NotImplementedError
def set_lower_bound(self, key, value):
"""
Set a lower bound, retaining existing upper bound.
Args:
key (str):
The feature name to which this lower bound
will be attached.
value (numeric or None):
The lower bound. Set explicitly to 'None' to
leave unbounded from below.
Raises:
ScopeError:
If a scope is attached to this box but the `key` cannot
be found in the scope.
"""
current = self.thresholds.get(key, Bounds(None,None))
if isinstance(current, set):
raise ValueError("cannot set lowerbound on a set")
self.set_bounds(key, value, current.upperbound)
def set_upper_bound(self, key, value):
"""
Set an upper bound, retaining existing lower bound.
Args:
key (str):
The feature name to which this upper bound
will be attached.
value (numeric or None):
The upper bound. Set explicitly to 'None' to
leave unbounded from above.
Raises:
ScopeError:
If a scope is attached to this box but the `key` cannot
be found in the scope.
"""
current = self.thresholds.get(key, Bounds(None,None))
if isinstance(current, set):
raise ValueError("cannot set upperbound on a set")
self.set_bounds(key, current.lowerbound, value)
def add_to_allowed_set(self, key, value):
"""
Add a value to the allowed set
Args:
key (str):
The feature name to which these allowed values
will be attached.
value (Any):
A value to add to the allowed set.
Raises:
ValueError:
If there is already a directional Bounds set for `key`.
ScopeError:
If a scope is attached to this box but the `key` cannot
be found in the scope.
"""
current = self.thresholds.get(key, set())
if isinstance(current, Bounds):
raise ValueError("cannot add to Bounds")
current.add(value)
self.replace_allowed_set(key, current)
def remove_from_allowed_set(self, key, value):
"""
Remove a value from the allowed set
Args:
key (str):
The feature name to which these allowed values
will be attached.
value (Any):
A value to remove from the allowed set.
Raises:
ValueError:
If the threshold set for `key` is a directional Bounds
instead of a set.
ScopeError:
If a scope is attached to this box but the `key` cannot
be found in the scope.
KeyError:
If the value to be removed was not already in the
allowed set.
"""
current = None
if key not in self.thresholds and self.scope is not None:
v = self.scope.get_cat_values(key)
if v is None:
raise ValueError("cannot use allowed_set for float or int values, use Bounds instead")
current = set(v)
else:
current = self.thresholds.get(key, set())
if isinstance(current, Bounds):
raise ValueError("cannot remove from Bounds")
current.remove(value)
self.replace_allowed_set(key, current)
@abstractmethod
def replace_allowed_set(self, key, values):
"""
Replace the allowed set.
Args:
key (str):
The feature name to which these bounds
will be attached.
values (set):
A set of values to use as the allowed set.
"""
raise NotImplementedError
# def _compute_histogram(self, col, selection, bins=20):
# if self._viz_data is None:
# return
# bar_heights, bar_x = numpy.histogram(self._viz_data[col], bins=bins)
# bar_heights_select, bar_x = numpy.histogram(self._viz_data[col][selection], bins=bar_x)
# return pandas.DataFrame({
# 'Total Freq': bar_heights,
# 'Inside Freq': bar_heights_select,
# 'Bins_Left': bar_x[:-1],
# 'Bins_Width': bar_x[1:] - bar_x[:-1],
# })
#
# def _compute_frequencies(self, col, selection, labels):
# if self._viz_data is None:
# return
# v = self._viz_data[col].astype(
# pandas.CategoricalDtype(categories=labels, ordered=False)
# ).cat.codes
# bar_heights, _ = numpy.histogram(v, bins=numpy.arange(0, len(labels) + 1))
# bar_heights_select, _ = numpy.histogram(v[selection], bins=numpy.arange(0, len(labels) + 1))
#
# return pandas.DataFrame({
# 'Total Freq': bar_heights,
# 'Inside Freq': bar_heights_select,
# 'Label': labels,
# })
#
# def _update_histogram_figure(self, col, *, selection=None):
# if col in self._figures and self._viz_data is not None:
# fig = self._figures[col]
# bins = fig._bins
# if selection is None:
# selection = self.inside(self._viz_data)
# h_data = self._compute_histogram(col, selection, bins=bins)
# with fig.batch_update():
# fig.data[0].y = h_data['Inside Freq']
# fig.data[1].y = h_data['Total Freq'] - h_data['Inside Freq']
#
# def _update_frequencies_figure(self, col, *, selection=None):
# if col in self._figures and self._viz_data is not None:
# fig = self._figures[col]
# labels = fig._labels
# if selection is None:
# selection = self.inside(self._viz_data)
# h_data = self._compute_frequencies(col, selection, labels=labels)
# with fig.batch_update():
# fig.data[0].y = h_data['Inside Freq']
# fig.data[1].y = h_data['Total Freq'] - h_data['Inside Freq']
#
# def _update_all_histogram_figures(self):
# selection = self.inside(self._viz_data)
# for col in self._figures:
# if self._figures[col]._figure_kind == 'histogram':
# self._update_histogram_figure(col, selection=selection)
# elif self._figures[col]._figure_kind == 'frequency':
# self._update_frequencies_figure(col, selection=selection)
#
# def _create_histogram_figure(self, col, bins=20):
# if self._viz_data is None:
# return
# if col in self._figures:
# self._update_histogram_figure(col)
# else:
# selection = self.inside(self._viz_data)
# h_data = self._compute_histogram(col, selection, bins=bins)
# fig = go.FigureWidget(
# data=[
# go.Bar(
# x=h_data['Bins_Left'],
# y=h_data['Inside Freq'],
# width=h_data['Bins_Width'],
# name='Inside',
# marker_color=colors.DEFAULT_HIGHLIGHT_COLOR,
# ),
# go.Bar(
# x=h_data['Bins_Left'],
# y=h_data['Total Freq'] - h_data['Inside Freq'],
# width=h_data['Bins_Width'],
# name='Outside',
# marker_color=colors.DEFAULT_BASE_COLOR,
# ),
# ],
# layout=dict(
# barmode='stack',
# showlegend=False,
# margin=dict(l=10, r=10, t=10, b=10),
# **styles.figure_dims,
# ),
# )
# fig._bins = bins
# fig._figure_kind = 'histogram'
# self._figures[col] = fig
#
# def _create_frequencies_figure(self, col, labels=None):
# if self._viz_data is None:
# return
# if col in self._figures:
# self._update_frequencies_figure(col)
# else:
# selection = self.inside(self._viz_data)
# h_data = self._compute_frequencies(col, selection, labels=labels)
# fig = go.FigureWidget(
# data=[
# go.Bar(
# x=h_data['Label'],
# y=h_data['Inside Freq'],
# name='Inside',
# marker_color=colors.DEFAULT_HIGHLIGHT_COLOR,
# ),
# go.Bar(
# x=h_data['Label'],
# y=h_data['Total Freq'] - h_data['Inside Freq'],
# name='Outside',
# marker_color=colors.DEFAULT_BASE_COLOR,
# ),
# ],
# layout=dict(
# barmode='stack',
# showlegend=False,
# margin=dict(l=10, r=10, t=10, b=10),
# width=250,
# height=150,
# ),
# )
# fig._labels = labels
# fig._figure_kind = 'frequency'
# self._figures[col] = fig
#
#
# def set_viz_data(self, df):
# self._viz_data = df
#
# def get_histogram_figure(self, col, bins=20):
# try:
# this_type = self.scope.get_dtype(col)
# except:
# this_type = 'float'
# if this_type in ('cat','bool'):
# return self.get_frequency_figure(col)
# self._create_histogram_figure(col, bins=bins)
# return self._figures[col]
#
# def get_frequency_figure(self, col):
# if self.scope.get_dtype(col) == 'cat':
# labels = self.scope.get_cat_values(col)
# else:
# labels = [False, True]
# self._create_frequencies_figure(col, labels=labels)
# return self._figures[col]
#
# def _make_range_widget(
# self,
# i,
# min_value=None,
# max_value=None,
# readout_format=None,
# integer=False,
# steps=20,
# ):
# """Construct a RangeSlider to manipulate a Box threshold."""
#
# current_setting = self.get(i, (None, None))
#
# # Use current setting as min and max if still unknown
# if current_setting[0] is not None and min_value is None:
# min_value = current_setting[0]
# if current_setting[1] is not None and max_value is None:
# max_value = current_setting[1]
#
# if min_value is None:
# raise ValueError("min_value cannot be None if there is no current setting")
# if max_value is None:
# raise ValueError("max_value cannot be None if there is no current setting")
#
# current_min = min_value if current_setting[0] is None else current_setting[0]
# current_max = max_value if current_setting[1] is None else current_setting[1]
#
# slider_type = widget.IntRangeSlider if integer else widget.FloatRangeSlider
#
# controller = slider_type(
# value=[current_min, current_max],
# min=min_value,
# max=max_value,
# step=((max_value - min_value) / steps) if not integer else 1,
# disabled=False,
# continuous_update=False,
# orientation='horizontal',
# readout=True,
# readout_format=readout_format,
# description='',
# style=styles.slider_style,
# layout=styles.slider_layout,
# )
#
# def on_value_change(change):
# from ..util.loggers import get_logger
# get_logger().critical("VALUE CHANGE")
# new_setting = change['new']
# if new_setting[0] <= min_value or isclose(new_setting[0], min_value):
# new_setting = (None, new_setting[1])
# if new_setting[1] >= max_value or isclose(new_setting[1], max_value):
# new_setting = (new_setting[0], None)
# self.set_bounds(i, *new_setting)
# self._update_all_histogram_figures()
#
# controller.observe(on_value_change, names='value')
#
# return controller
#
# def _make_togglebutton_widget(
# self,
# i,
# cats=None,
# *,
# df=None,
# ):
# """Construct a MultiToggleButtons to manipulate a Box categorical set."""
#
# if cats is None and df is not None:
# if isinstance(df[i].dtype, pandas.CategoricalDtype):
# cats = df[i].cat.categories
#
# current_setting = self.get(i, set())
#
# from ..analysis.widgets import MultiToggleButtons_AllOrSome
# controller = MultiToggleButtons_AllOrSome(
# description='',
# style=styles.slider_style,
# options=list(cats),
# disabled=False,
# button_style='', # 'success', 'info', 'warning', 'danger' or ''
# layout=styles.slider_layout,
# )
# controller.values = current_setting
#
# def on_value_change(change):
# new_setting = change['new']
# self.replace_allowed_set(i, new_setting)
# self._update_all_histogram_figures()
#
# controller.observe(on_value_change, names='value')
#
# return controller
#
#
# def get_widget(
# self,
# i,
# min_value=None,
# max_value=None,
# readout_format=None,
# steps=20,
# *,
# df=None,
# histogram=None,
# tall=True,
# ):
# """Get a control widget for a Box threshold."""
#
# if self.scope is None:
# raise ValueError('cannot get_widget with no scope')
#
# if not hasattr(self, '_widgets'):
# self._widgets = {}
#
# if i not in self._widgets:
# # Extract min and max from scope if not given explicitly
# if i not in self.scope.get_measure_names():
# if min_value is None:
# min_value = self.scope[i].min
# if max_value is None:
# max_value = self.scope[i].max
#
# # Extract min and max from `df` if still missing (i.e. for Measures)
# if df is not None:
# if min_value is None:
# min_value = df[i].min()
# if max_value is None:
# max_value = df[i].max()
#
# # Extract min and max from `_viz_data` if still missing
# if self._viz_data is not None:
# if min_value is None:
# min_value = self._viz_data[i].min()
# if max_value is None:
# max_value = self._viz_data[i].max()
#
# if isinstance(self.scope[i], BooleanParameter):
# self._widgets[i] = self._make_togglebutton_widget(
# i,
# cats=[False, True],
# )
# elif isinstance(self.scope[i], CategoricalParameter):
# cats = self.scope.get_cat_values(i)
# self._widgets[i] = self._make_togglebutton_widget(
# i,
# cats=cats,
# )
# elif isinstance(self.scope[i], IntegerParameter):
# readout_format = readout_format or 'd'
# self._widgets[i] = self._make_range_widget(
# i,
# min_value=min_value,
# max_value=max_value,
# readout_format=readout_format,
# integer=True,
# steps=steps,
# )
# else:
# readout_format = readout_format or '.3g'
# self._widgets[i] = self._make_range_widget(
# i,
# min_value=min_value,
# max_value=max_value,
# readout_format=readout_format,
# integer=False,
# steps=steps,
# )
#
# if tall:
# if not isinstance(histogram, Mapping):
# histogram = {}
# return widget.VBox(
# [
# widget.Label(i),
# self.get_histogram_figure(i, **histogram),
# self._widgets[i],
# ],
# layout=styles.widget_frame,
# )
#
# if histogram is not None:
# if not isinstance(histogram, Mapping):
# histogram = {}
# return widget.HBox(
# [self._widgets[i], self.get_histogram_figure(i, **histogram)],
# layout=dict(align_items = 'center'),
# )
# else:
# return self._widgets[i]
#
# def visualization(self, include=None, data=None):
#
# if self.scope is None:
# raise ValueError('cannot create visualization with no scope')
#
# if data is not None:
# self.set_viz_data(data)
# self._figures.clear()
#
# if include is None:
# include = []
#
# viz_widgets = []
# include = set(include)
# include = include | self.relevant_and_demanded_features
# for i in self.scope.get_parameter_names() + self.scope.get_measure_names():
# if i in include:
# viz_widgets.append(self.get_widget(i))
#
# return widget.Box(viz_widgets, layout=widget.Layout(flex_flow='row wrap'))
[docs]class Box(GenericBox):
"""
A Box defines a set of restricted dimensions for a Scope.
Args:
name (str): The name for this Box.
parent (str, optional):
The name of the parent for this Box. When extracted
as a :class:`ChainedBox` from a collection of :class:`Boxes`,
the thresholds will also include any thresholds inherited
from this box's ancestor(s).
scope (Scope, optional):
A scope to associate with this box.
upper_bounds (Mapping[str, numeric], optional):
If given, a mapping with keys giving feature names
and values giving an upper bound for each feature.
lower_bounds (Mapping[str, numeric], optional):
If given, a mapping with keys giving feature names
and values giving a lower bound for each feature.
bounds (Mapping[str, Bounds], optional):
If given, a mapping with keys giving feature names
and values giving :class:`Bounds` for each feature.
allowed (Mapping[str, Set], optional):
If given, a mapping with keys giving feature names
and values giving the available :class:`Set` for
each feature.
relevant (Iterable, optional):
If given, a set of names of relevant features.
Attributes:
thresholds (Dict[str,Union[Bounds,Set]]):
The restricted dimensions in this Box, with feature names as
keys and :class:`Bounds` or a :class:`Set` of available discrete
values as the dictionary values.
relevant_features (Set[str]):
A :class:`Set` of features that are relevant for this Box.
These are features, which are not themselves constrained,
but should be considered in any analytical report developed
based on this Box.
"""
def __init__(
self,
name,
parent=None,
scope=None,
upper_bounds=None,
lower_bounds=None,
bounds=None,
allowed=None,
relevant=None,
):
super().__init__()
self._thresholds = {}
if relevant is None:
self.relevant_features = set()
else:
self.relevant_features = set(relevant)
self.parent_box_name = parent
self.scope = scope
self.name = name
if upper_bounds:
for k,v in upper_bounds.items():
self.set_upper_bound(k,v)
if lower_bounds:
for k,v in lower_bounds.items():
self.set_lower_bound(k,v)
if bounds:
for k,v in bounds.items():
self.set_bounds(k,v)
if allowed:
for k,v in allowed.items():
self.replace_allowed_set(k,v)
@property
def scope(self):
"""Scope: A scope associated with this Box."""
return self._scope
@scope.setter
def scope(self, x):
if x is None or isinstance(x, Scope):
self._scope = x
else:
raise TypeError('scope must be Scope or None')
@property
def thresholds(self):
"""
Dict[str,Union[Bounds,Set]]:
The restricted dimensions in this Box, with feature names as
keys and :class:`Bounds` or a :class:`Set` of available discrete
values as the dictionary values.
"""
return self._thresholds
@thresholds.setter
def thresholds(self, value):
if not isinstance(value, MutableMapping):
raise TypeError(f'thresholds must be MutableMapping not {type(value)}')
self._thresholds = value
@thresholds.deleter
def thresholds(self):
self._thresholds = {}
@property
def relevant_features(self):
"""
Dict[str,Union[Bounds,Set]]:
A :class:`Set` of features that are relevant for this Box.
These are features, which are not themselves constrained,
but should be considered in any analytical report developed
based on this Box.
"""
return self._relevant_features
@relevant_features.setter
def relevant_features(self, value):
if not isinstance(value, set):
raise TypeError(f'thresholds must be MutableMapping not {type(value)}')
self._relevant_features = value
@relevant_features.deleter
def relevant_features(self):
self._relevant_features = set()
@property
def measure_thresholds(self):
"""
Dict[str,Union[Bounds,Set]]:
The thresholds in this Box associated with performance measures.
A Scope must be associated with this Box to access this property.
"""
if self.scope is None:
raise ValueError("need scope")
names = self.scope.get_measure_names()
return {k:v for k,v in self._thresholds.items() if k in names}
@property
def uncertainty_thresholds(self):
"""
Dict[str,Union[Bounds,Set]]:
The thresholds in this Box associated with exogenous uncertainties.
A Scope must be associated with this Box to access this property.
"""
if self.scope is None:
raise ValueError("need scope")
names = self.scope.get_uncertainty_names()
return {k:v for k,v in self._thresholds.items() if k in names}
@property
def lever_thresholds(self):
"""
Dict[str,Union[Bounds,Set]]:
The thresholds in this Box associated with policy levers.
A Scope must be associated with this Box to access this property.
"""
if self.scope is None:
raise ValueError("need scope")
names = self.scope.get_lever_names()
return {k:v for k,v in self._thresholds.items() if k in names}
def to_json(self):
"""
Dump the thresholds for this box to a json string.
Only the thresholds are saved, not the scope.
Returns:
str
"""
temp = {'_name_':self.name}
for k,v in self.thresholds.items():
if isinstance(v, Bounds):
temp[k] = {'lowerbound':v.lowerbound, 'upperbound':v.upperbound}
else:
temp[k] = list(v)
from ..util.json_encoder import dumps
return dumps(temp)
@classmethod
def from_json(cls, j):
import json
temp = json.loads(j)
self = cls(temp.get('_name_', None))
for k, v in temp.items():
if k == '_name_':
pass
elif isinstance(v, dict):
self.set_bounds(k, v.get('lowerbound', None), v.get('upperbound', None))
else:
self.replace_allowed_set(k, v)
return self
def __getitem__(self, key):
return self._thresholds[key]
def __setitem__(self, key, value):
if not isinstance(value, (Bounds, set)):
raise TypeError('thresholds must be Bounds or a set')
if self.scope:
if key in self.scope.get_all_names():
self._thresholds[key] = value
else:
raise ScopeError("cannot set threshold on '{key}'")
else:
self._thresholds[key] = value
def __delitem__(self, key):
del self._thresholds[key]
def clear(self):
"""
Clear thresholds and relevant_features.
"""
self._thresholds = {}
self._relevant_features = set()
@property
def demanded_features(self):
"""
Set[str]: A set of features upon which thresholds are set.
"""
t = set(self._thresholds.keys())
return t
[docs] def set_bounds(self, key, lowerbound, upperbound=None):
"""
Set both lower and upper bounds.
If values are both set to None and this key becomes
unbounded but it was not previously unbounded, it is moved from
thresholds to relevant_features. Conversely, if no bounds were
previously set but the key appears in relevant_features, it is
removed from that set.
Args:
key (str):
The feature name to which these bounds
will be attached.
lowerbound (numeric, None, or Bounds):
The lower bound, or a Bounds object that gives
upper and lower bounds (in which case the `upperbound`
argument is ignored). Set explicitly to 'None' to
leave unbounded from below.
upperbound (numeric or None, default None):
The upper bound. Set to 'None' to
leave unbounded from above.
Raises:
ScopeError:
If a scope is attached to this box but the `key` cannot
be found in the scope.
"""
if isinstance(lowerbound, Bounds):
b = lowerbound
lowerbound, upperbound = b.lowerbound, b.upperbound
if self.scope is not None:
if key not in self.scope.get_all_names():
raise ScopeError(f"cannot set bounds on '{key}'")
if lowerbound is None and upperbound is None:
if key in self._thresholds:
del self._thresholds[key]
self._relevant_features.add(key)
else:
self._thresholds[key] = Bounds(lowerbound, upperbound)
if key in self._relevant_features:
self._relevant_features.remove(key)
[docs] def replace_allowed_set(self, key, values):
"""
Replace the allowed set.
If the new allowed set is the same as the complete set of
possible values defined in the scope, this key becomes
unbounded and it is moved from thresholds to
relevant_features. Conversely, if no bounds were previously
set but the key appears in relevant_features, it is
removed from that set.
Args:
key (str):
The feature name to which these bounds
will be attached.
values (set):
A set of values to use as the allowed set.
If an empty set or None is given, then any
values are allowed.
"""
action = True
if self.scope is not None:
if key not in self.scope.get_all_names():
raise ScopeError(f"cannot set allowed_set on '{key}'")
cat_values = set(self.scope.get_cat_values(key))
if values is None or len(values)==0:
values = cat_values
# handle True and False
values = set(values)
_t = True in cat_values
_f = False in cat_values
if _t or _f:
_values = set()
for v in values:
if _t and str(v).lower() == 'true':
_values.add(True)
elif _f and str(v).lower() == 'false':
_values.add(False)
else:
_values.add(v)
values = _values
if not cat_values.issuperset(values):
raise ScopeError(f"allowed_set is not a subset of scope defined values for '{key}'")
if len(cat_values) == len(values):
action = False
if action:
self._thresholds[key] = set(values)
if key in self._relevant_features:
self._relevant_features.remove(key)
else:
if key in self._thresholds:
del self._thresholds[key]
self._relevant_features.add(key)
def __iter__(self):
return itertools.chain(
iter(self._thresholds),
)
def __len__(self):
return (
len(self._thresholds)
)
def __repr__(self):
if self.keys() or self.relevant_features or self.name=='0':
demands = list(self.keys()) or [" "]
relevent = list(self.relevant_features) or [" "]
m = max(
max(map(len, demands)) + 1,
max(map(len, relevent)) + 1
)
members = []
for k, v in self.items():
if isinstance(v, Bounds):
if v.lowerbound is None:
if v.upperbound is None:
v_ = ': unbounded'
else:
v_ = f' <= {v.upperbound}'
else:
if v.upperbound is None:
v_ = f' >= {v.lowerbound}'
else:
v_ = f': {v.lowerbound} to {v.upperbound}'
else:
v_ = ': ' + repr(v)
members.append("● "+k.rjust(m) + v_)
for k in self.relevant_features:
members.append("◌ "+k.rjust(m))
head = f"{self.__class__.__name__}: {self.name}"
if hasattr(self, 'coverage'):
head += f'\n coverage: {self.coverage:.5f}'
if hasattr(self, 'density'):
head += f'\n density: {self.density:.5f}'
if hasattr(self, 'mass'):
head += f'\n mass: {self.mass:.5f}'
if members:
return head+"\n " + '\n '.join(members)
else:
return head
else:
return "<empty "+ self.__class__.__name__ + ">"
def __get_truncated_parameters(self, source):
"""Get a list of truncate parameters.
This method requires a scope to be set, and will adjust
the uncertainty distributions in the scope to be
appropriately truncated.
Args:
source (Collection): The list of parameters to possibly truncate.
"""
result = []
for i in source:
i = copy.deepcopy(i)
if i.name in self._thresholds:
bounds = self._thresholds[i.name]
if isinstance(bounds, Bounds):
lowerbound, upperbound = bounds
if lowerbound is None:
lowerbound = -numpy.inf
if upperbound is None:
upperbound = numpy.inf
i.dist = truncated(i.dist, lowerbound, upperbound)
i.lower_bound, i.upper_bound = get_distribution_bounds(i.dist)
else:
from .parameter import CategoricalParameter
i = CategoricalParameter(i.name, bounds, singleton_ok=True)
result.append(i)
return result
def get_uncertainties(self):
"""Get a list of exogenous uncertainties.
This method requires a scope to be set, and will adjust
the uncertainty distributions in the scope to be
appropriately truncated.
"""
return self.__get_truncated_parameters(self.scope.get_uncertainties())
def get_levers(self):
"""Get a list of policy levers.
This method requires a scope to be set, and will adjust
the policy lever distributions in the scope to be
appropriately truncated.
"""
return self.__get_truncated_parameters(self.scope.get_levers())
def get_constants(self):
"""Get a list of model constants.
This method requires a scope to be set, and will pass through
constants in the scope unaltered."""
return self.scope.get_constants()
def get_parameters(self):
"""Get a list of model parameters (uncertainties+levers+constants)."""
return self.get_constants() + self.get_uncertainties() + self.get_levers()
def get_measures(self):
"""Get a list of performance measures."""
return self.scope.get_measures()
[docs]class ChainedBox(GenericBox):
"""
A Box defines a set of restricted dimensions for a Scope.
Args:
boxes (Boxes):
A collection of Boxes from which to assemble a chain.
name (str):
The name for this ChainedBox. This must be the name of
a Box in `boxes`, which serves as the seed for the chain.
Ancestors are added recursively by finding the parent box
of each box in the chain, until a box is found with no parent.
"""
def __init__(self, boxes, name):
"""
Parameters
----------
boxes : Mapping
Dictionary of {str:Box} pairs
name : str
Name of this chained box
"""
GenericBox.__init__(self)
c = boxes[name]
self.chain = [c]
self.names = [name]
while c.parent_box_name is not None:
self.names.insert(0, c.parent_box_name)
c = boxes[c.parent_box_name]
self.chain.insert(0, c)
def __getitem__(self, key):
return self.thresholds[key]
def __setitem__(self, key, value):
self.chain[-1][key] = value
def __delitem__(self, key):
del self.chain[-1][key]
def clear(self):
"""
Clear thresholds and relevant_features on the last Box in the chain.
"""
self.chain[-1]._thresholds.clear()
self.chain[-1]._relevant_features.clear()
def __iter__(self):
return itertools.chain(
iter(self.thresholds),
)
def __len__(self):
return len(self.thresholds)
@property
def name(self):
"""str: The name of the last (defining) Box in this chain."""
return self.names[-1]
@property
def thresholds(self):
"""
Dict[str,Union[Bounds,Set]]:
The restricted dimensions in this ChainedBox, with feature names as
keys and the Bounds or available set as the values.
"""
t = {}
for single in self.chain:
t.update(single.thresholds)
return t
@thresholds.setter
def thresholds(self, value):
self.chain[-1].thresholds = value
@thresholds.deleter
def thresholds(self):
self.chain[-1].thresholds = {}
[docs] def measure_thresholds(self):
"""
The thresholds in this Box or its ancestor(s) associated with performance measures.
A Scope must be associated with each Box in the chain to access this property.
Returns:
Dict[str,Union[Bounds,Set]]
"""
t = {}
for single in self.chain:
t.update(single.measure_thresholds)
return t
[docs] def uncertainty_thresholds(self):
"""
The thresholds in this Box or its ancestor(s) associated with exogenous uncertainties.
A Scope must be associated with each Box in the chain to access this property.
Returns:
Dict[str,Union[Bounds,Set]]
"""
t = {}
for single in self.chain:
t.update(single.uncertainty_thresholds)
return t
[docs] def lever_thresholds(self):
"""
The thresholds in this Box or its ancestor(s) associated with policy levers.
A Scope must be associated with each Box in the chain to access this property.
Returns:
Dict[str,Union[Bounds,Set]]
"""
t = {}
for single in self.chain:
t.update(single.lever_thresholds)
return t
@property
def relevant_features(self):
"""
Set[str]: A set of features that are relevant at any step of the chain.
"""
t = set()
for single in self.chain:
t |= single.relevant_features
return t
@property
def demanded_features(self):
"""
Set[str]: A set of features upon which thresholds are set at any step of the chain.
"""
t = set()
for single in self.chain:
t |= set(single.thresholds.keys())
return t
def __repr__(self):
if self.keys() or self.relevant_features or True:
demands = list(self.keys()) or [" "]
relevent = list(self.relevant_features) or [" "]
m = max(
max(map(len, demands)) + 1,
max(map(len, relevent)) + 1
)
members = []
for k, v in self.items():
if isinstance(v, Bounds):
if v.lowerbound is None:
if v.upperbound is None:
v_ = ': unbounded'
else:
v_ = f' <= {v.upperbound}'
else:
if v.upperbound is None:
v_ = f' >= {v.lowerbound}'
else:
v_ = f': {v.lowerbound} to {v.upperbound}'
else:
v_ = ': ' + repr(v)
members.append("● "+k.rjust(m) + v_)
for k in self.relevant_features:
members.append("◌ "+k.rjust(m))
head = f"{self.__class__.__name__}: {self.name}"
if hasattr(self, 'coverage'):
head += f'\n coverage: {self.coverage:.5f}'
if hasattr(self, 'density'):
head += f'\n density: {self.density:.5f}'
if hasattr(self, 'mass'):
head += f'\n mass: {self.mass:.5f}'
return head+"\n " + '\n '.join(members)
else:
return "<empty "+ self.__class__.__name__ + ">"
def chain_repr(self):
return "\n".join(f"{repr(c)}" for n,c in zip(self.names,self.chain))
@property
def scope(self):
"""Scope: A scope associated with this Box."""
return self.chain[-1]._scope
@scope.setter
def scope(self, x):
if x is None or isinstance(x, Scope):
self.chain[-1]._scope = x
else:
raise TypeError('scope must be Scope or None')
def replace_allowed_set(self, key, values):
"""
Replace the allowed set in the last Box on the chain.
Args:
key (str):
The feature name to which these bounds
will be attached.
values (set):
A set of values to use as the allowed set.
"""
self.chain[-1].replace_allowed_set(key, values)
def set_bounds(self, key, lowerbound, upperbound=None):
"""
Set both lower and upper bounds in the last Box on the chain.
Args:
key (str):
The feature name to which these bounds
will be attached.
lowerbound (numeric, None, or Bounds):
The lower bound, or a Bounds object that gives
upper and lower bounds (in which case the `upperbound`
argument is ignored). Set explicitly to 'None' to
leave unbounded from below.
upperbound (numeric or None, default None):
The upper bound. Set to 'None' to
leave unbounded from above.
Raises:
ScopeError:
If a scope is attached to this box but the `key` cannot
be found in the scope.
"""
self.chain[-1].set_bounds(key, lowerbound, upperbound)
def find_all_boxes_with_parent(universe:dict, parent=None):
result = []
for name, clusterdef in universe.items():
if clusterdef.parent_box_name == parent:
result.append(name)
return result
def pseudoname_boxes(boxes, root=None):
if root is None:
try:
fancy = [f"Scope: {boxes.scope.name}"]
except AttributeError:
fancy = [f"Boxes Universe"]
plain = [None]
else:
fancy = []
plain = []
tops = sorted(find_all_boxes_with_parent(boxes, parent=root))
for t in tops:
fancy.append("▷ "+t if t[0] in "▶▷" else "▶ "+t)
plain.append(t)
f_, p_ = pseudoname_boxes(boxes, root=t)
for f1, p1 in zip(f_, p_):
fancy.append("▷ "+f1 if f1[0] in "▶▷" else "▶ "+f1)
plain.append(p1)
return fancy, plain
[docs]class Boxes(MutableMapping):
def __init__(self, *args, scope=None, **kw):
self._storage = dict()
self._scope = scope
if len(args) == 1 and isinstance(args[0], (list, tuple, set)):
args = args[0]
for a in args:
self.add(a)
for k,v in kw.items():
self[k] = v
if scope is not None:
for i in self._storage:
self._storage[i].scope = scope
@property
def scope(self):
return self._scope
@scope.setter
def scope(self, s):
self._scope = s
for i in self._storage:
self._storage[i].scope = s
def __getitem__(self, key):
return self._storage[key]
def __setitem__(self, key, value):
if not isinstance(value, Box):
raise TypeError(f"values must be Box not {type(value)}")
if key != value.name:
raise ValueError('key must match name, use Boxes.add(box)')
self._storage[key] = value
def add(self, value):
if not isinstance(value, Box):
raise TypeError(f"values must be Box not {type(value)}")
self[value.name] = value
def __delitem__(self, key):
del self._storage[key]
def __iter__(self):
return iter(self._storage)
def __len__(self):
return len(self._storage)
def plain_names(self):
return pseudoname_boxes(self, root=None)[1]
def fancy_names(self):
return pseudoname_boxes(self, root=None)[0]
def both_names(self):
return pseudoname_boxes(self, root=None)
def get_chain(self, name):
return ChainedBox(self, name)