import numpy
import pandas
import warnings
import functools
from ...viz import colors
from ...scope.box import GenericBox
from ...database import Database
from traitlets import TraitError
from plotly import graph_objs as go
from ipywidgets import Dropdown
import ipywidgets as widget
import logging
_logger = logging.getLogger('EMAT.widget')
from .explore_base import DataFrameExplorer
from ..prim import PrimBox
from ...exceptions import ScopeError
def _deselect_all_points(trace):
trace.selectedpoints = None
# def _debugprint(s):
# print(s.replace("rgb(255, 127, 14)", "<ORANGE>").replace("rgb(255, 46, 241)","<PINK>"))
from .components import *
range_caption_css = (
"<style> "
".emat-rangecaption > input "
"{ border: solid 1px #eeeeee !important; text-align: center;} "
".emat-rangecaption > input::placeholder "
"{color:#dddddd}</style>"
)
[docs]class Visualizer(DataFrameExplorer):
"""
A data visualization framework.
Args:
data (pandas.DataFrame or str):
The base data to visualize. Give the data directly as a
DataFrame, or give the name of a design that can be loaded
from the `db` Database.
selections (Mapping or pandas.DataFrame, optional):
Any pre-existing selections. Each selection should be a
boolean pandas.Series indexed the same as the data.
scope (emat.Scope, optional):
The scope that describes the data.
active_selection_name (str, optional):
The name of the selection to activate.
reference_point (Mapping or pandas.DataFrame):
An optional reference point to visualize. Give as a simple
mapping, or as a one-row DataFrame with the same columns as
`data`, or give the name of a one-row design that can be loaded
from the `db` Database.
db (emat.Database, optional): A database from which to read content.
"""
def __init__(
self,
data,
selections=None,
scope=None,
active_selection_name=None,
reference_point=None,
*,
db=None,
):
if db is not None:
if scope is None:
scope = db.read_scope()
elif isinstance(scope, str):
scope = db.read_scope(scope)
if isinstance(data, str):
data = db.read_experiment_all(
scope_name=scope.name,
design_name=data,
ensure_dtypes=True,
)
if isinstance(reference_point, str):
reference_point = db.read_experiment_all(
scope_name=scope.name,
design_name=reference_point,
ensure_dtypes=True,
)
if selections is None:
from ...scope.box import Box
selections = {'Explore': Box(name='Explore', scope=scope)}
if active_selection_name is None:
active_selection_name = 'Explore'
super().__init__(
data,
selections=selections,
active_selection_name=active_selection_name,
reference_point=reference_point,
)
self.scope = scope
self._figures_hist = {}
self._figures_freq = {}
self._base_histogram = {}
self._categorical_data = {}
self._freeze = False
self._two_way = {}
self._three_way = {}
self._splom = {}
self._hmm = {}
self._parcoords = {}
self._selection_feature_score_fig = None
self._status_txt = widget.HTML(
value="<i>Explore Status Not Set</i>",
)
self._status_pie = go.FigureWidget(
go.Pie(
values=[75, 250],
labels=['Inside', 'Outside'],
hoverinfo='label+value',
textinfo='percent',
textfont_size=10,
marker=dict(
colors=[
self.active_selection_color(),
colors.DEFAULT_BASE_COLOR,
],
line=dict(color='#FFF', width=0.25),
)
),
layout=dict(
width=100,
height=100,
showlegend=False,
margin=dict(l=10, r=10, t=10, b=10),
)
)
self._status = widget.HBox(
[
widget.VBox([self._active_selection_chooser, self._status_txt]),
self._status_pie
],
layout=dict(
justify_content = 'space-between',
align_items = 'center',
)
)
self._update_status()
def get_histogram_figure(self, col, bins=20, marker_line_width=None):
try:
this_type = self.scope.get_dtype(col)
except:
this_type = 'float'
if this_type in ('cat','bool'):
return self.get_frequency_figure(col)
if this_type in ('int',):
param = self.scope[col]
if param.max - param.min + 1 <= bins * configuration.config.get("integer_bin_ratio", 4):
print("OVERLOAD BINS",bins, configuration.config.get("integer_bin_ratio", 4), param.max - param.min + 1)
bins = param.max - param.min + 1
if marker_line_width is None:
marker_line_width = 0
self._create_histogram_figure(col, bins=bins, marker_line_width=marker_line_width)
return self._figures_hist[col]
def get_frequency_figure(self, col):
if self.scope.get_dtype(col) == 'cat':
labels = self.scope.get_cat_values(col)
else:
labels = [False, True]
self._create_frequencies_figure(col, labels=labels)
return self._figures_freq[col]
def __get_plain_box(self):
if self.active_selection_deftype() == 'box':
box = self._selection_defs[self.active_selection_name()]
elif self.active_selection_deftype() == 'primbox':
box = self._selection_defs[self.active_selection_name()].to_emat_box()
else:
box = None
return box
def _create_histogram_figure(self, col, bins=20, *, marker_line_width=None):
if col in self._figures_hist:
self._update_histogram_figure(col)
else:
selection = self.active_selection()
box = self.__get_plain_box()
fig = new_histogram_figure(
selection, self.data[col], bins,
marker_line_width=marker_line_width,
on_deselect=lambda *a: self._on_deselect_from_histogram(*a,name=col),
on_select=lambda *a: self._on_select_from_histogram(*a,name=col),
box=box,
title_text=self.scope.shortname(col),
ref_point=self.reference_point(col),
selected_color=self.active_selection_color(),
)
fig_rangecaption = widget.Text(
value="",
placeholder="any value",
continuous_update=False,
layout={'padding': '0px 25px 15px', },
).add_class("emat-rangecaption")
fig_rangecaption.observe(
lambda payload: self._on_select_from_rangestring(payload, name=col),
names='value',
)
self._figures_hist[col] = widget.VBox([fig, fig_rangecaption, widget.HTML(range_caption_css)])
def _create_frequencies_figure(self, col, labels=None, *, marker_line_width=None):
if col in self._figures_freq:
self._update_frequencies_figure(col)
else:
selection = self.active_selection()
box = self.__get_plain_box()
fig = new_frequencies_figure(
selection, self.data[col], labels,
marker_line_width=marker_line_width,
on_deselect=functools.partial(self._on_deselect_from_histogram, name=col),
on_select=functools.partial(self._on_select_from_freq, name=col),
#on_click=functools.partial(self._on_click_from_frequencies, name=col), # not always stable
box=box,
title_text=self.scope.shortname(col),
ref_point=self.reference_point(col),
label_name_map=self.scope[col].abbrev,
selected_color=self.active_selection_color(),
)
fig_rangecaption = widget.Text(
value="",
placeholder="any value",
continuous_update=False,
layout={'padding': '0px 25px 15px', },
).add_class("emat-rangecaption")
fig_rangecaption.observe(
lambda payload: self._on_select_from_setstring(payload, name=col),
names='value',
)
self._figures_freq[col] = widget.VBox([fig, fig_rangecaption, widget.HTML(range_caption_css)])
def _update_histogram_figure(self, col):
if col in self._figures_hist:
fig = self._figures_hist[col].children[0]
box = self.__get_plain_box()
with fig.batch_update():
update_histogram_figure(
fig,
self.active_selection(),
self.data[col],
box=box,
ref_point=self.reference_point(col),
)
rangestring_input = self._figures_hist[col].children[1]
if box is not None:
bounds = box.thresholds.get(col, None)
else:
bounds = None
rangestring_input.value = convert_bounds_to_rangestring(bounds)
def _update_frequencies_figure(self, col):
if col in self._figures_freq:
fig = self._figures_freq[col].children[0]
box = self.__get_plain_box()
with fig.batch_update():
update_frequencies_figure(
fig,
self.active_selection(),
self.data[col],
box=box,
ref_point=self.reference_point(col),
)
rangestring_input = self._figures_freq[col].children[1]
if box is not None:
allowedset = box.thresholds.get(col, None)
else:
allowedset = None
rangestring_input.value = convert_set_to_rangestring(allowedset)
def _compute_histogram(self, col, selection, bins=None):
if col not in self._base_histogram:
if bins is None:
bins = 20
bar_heights, bar_x = numpy.histogram(self.data[col], bins=bins)
self._base_histogram[col] = bar_heights, bar_x
else:
bar_heights, bar_x = self._base_histogram[col]
bins_left = bar_x[:-1]
bins_width = bar_x[1:] - bar_x[:-1]
bar_heights_select, bar_x = numpy.histogram(self.data[col][selection], bins=bar_x)
return bar_heights, bar_heights_select, bins_left, bins_width
def _compute_frequencies(self, col, selection, labels):
if col in self._categorical_data:
v = self._categorical_data[col]
else:
self._categorical_data[col] = v = self.data[col].astype(
pandas.CategoricalDtype(categories=labels, ordered=False)
).cat.codes
if col not in self._base_histogram:
bar_heights, bar_x = numpy.histogram(v, bins=numpy.arange(0, len(labels) + 1))
self._base_histogram[col] = bar_heights, bar_x
else:
bar_heights, bar_x = self._base_histogram[col]
bar_heights_select, _ = numpy.histogram(v[selection], bins=numpy.arange(0, len(labels) + 1))
return bar_heights, bar_heights_select, labels
def _on_select_from_histogram(self, *args, name=None):
if self._freeze:
return
try:
self._freeze = True
select_min, select_max = args[2].xrange
_logger.debug("name: %s range: %f - %f", name, select_min, select_max)
self._figures_hist[name].children[0].for_each_trace(_deselect_all_points)
if self.active_selection_deftype() == 'box':
box = self._selection_defs[self.active_selection_name()]
box = interpret_histogram_selection(name, args[2].xrange, box, self.data, self.scope)
self.new_selection(box, name=self.active_selection_name())
self._active_selection_changed()
except:
_logger.exception("error in _on_select_from_histogram")
raise
finally:
self._freeze = False
def _on_select_from_rangestring(self, payload, name=None):
if self._freeze:
return
try:
self._freeze = True
from .components import convert_rangestring_to_tuple
select_min, select_max = convert_rangestring_to_tuple(payload.get('new', None))
_logger.debug("name: %s range: %f - %f", name, select_min, select_max)
if self.active_selection_deftype() == 'box':
box = self._selection_defs[self.active_selection_name()]
box = interpret_histogram_selection(name, (select_min, select_max), box, self.data, self.scope)
self.new_selection(box, name=self.active_selection_name())
self._active_selection_changed()
except:
_logger.exception("error in _on_select_from_histogram")
raise
finally:
self._freeze = False
def _on_deselect_from_histogram(self, *args, name=None):
_logger.debug("deselect %s", name)
if self.active_selection_deftype() == 'box':
box = self._selection_defs[self.active_selection_name()]
if name in box:
del box[name]
self.new_selection(box, name=self.active_selection_name())
self._active_selection_changed()
def _on_select_from_freq(self, *args, name=None):
select_min, select_max = args[2].xrange
select_min = int(numpy.ceil(select_min))
select_max = int(numpy.ceil(select_max))
fig = self.get_figure(name).children[0]
toggles = fig.layout['meta']['x_tick_values'][select_min:select_max]
fig.for_each_trace(_deselect_all_points)
if self.active_selection_deftype() == 'box':
box = self._selection_defs[self.active_selection_name()]
box.scope = self.scope
if name not in box:
for x in toggles:
box.add_to_allowed_set(name, x)
else:
for x in toggles:
if name not in box or x in box[name]:
box.remove_from_allowed_set(name, x)
if name in box and len(box[name]) == 0:
del box[name]
else:
box.add_to_allowed_set(name, x)
if toggles:
self.new_selection(box, name=self.active_selection_name())
self._active_selection_changed()
def _on_select_from_setstring(self, payload, name=None):
if self._freeze:
return
try:
self._freeze = True
from .components import convert_rangestring_to_set
allowed_set = convert_rangestring_to_set(payload.get('new', None))
if self.active_selection_deftype() == 'box':
box = self._selection_defs[self.active_selection_name()]
try:
if allowed_set is None:
if name in box._thresholds:
del box._thresholds[name]
else:
box.replace_allowed_set(name, allowed_set)
except ScopeError:
pass
else:
self.new_selection(box, name=self.active_selection_name())
self._active_selection_changed()
except:
_logger.exception("error in _on_select_from_setstring")
raise
finally:
self._freeze = False
def _on_click_from_frequencies(self, *args, name=None):
x = None
if len(args) >= 2:
xs = getattr(args[1],'xs',None)
if xs:
x = xs[0]
if x is not None:
if self.active_selection_deftype() == 'box':
box = self._selection_defs[self.active_selection_name()]
box.scope = self.scope
if name not in box or x in box[name]:
box.remove_from_allowed_set(name, x)
if name in box and len(box[name]) == 0:
del box[name]
else:
box.add_to_allowed_set(name, x)
self.new_selection(box, name=self.active_selection_name())
self._active_selection_changed()
def _active_selection_changed(self):
if hasattr(self, '_active_selection_changing_'):
return # prevent recursive looping
try:
self._active_selection_changing_ = True
with self._status_pie.batch_update():
super()._active_selection_changed()
self._pre_update_selection_feature_score_figure()
self._update_status()
for col in self._figures_hist:
self._update_histogram_figure(col)
for col in self._figures_freq:
self._update_frequencies_figure(col)
for key in self._two_way:
self._two_way[key].refresh_selection_names()
self._two_way[key]._on_change_selection_choose(payload={
'new':self.active_selection_name(),
})
for key in self._three_way:
self._three_way[key].change_selection(
self.active_selection(),
self.active_selection_color(),
)
self._update_sploms()
self._update_hmms()
self._update_selection_feature_score_figure()
finally:
del self._active_selection_changing_
[docs] def status(self):
"""Display the status widget."""
return self._status
def _update_status(self):
text = '<span style="font-weight:bold;font-size:150%">{:,d} Cases Selected out of {:,d} Total Cases</span>'
selection = self.active_selection()
values = (int(numpy.sum(selection)), int(selection.size))
self._status_txt.value = text.format(*values)
self._status_pie.data[0].values = [values[0], values[1]-values[0]]
def get_figure(self, col):
if col in self._figures_hist:
return self._figures_hist[col]
if col in self._figures_freq:
return self._figures_freq[col]
return None
def _clear_boxes_on_figure(self, col):
fig = self.get_figure(col).children[0]
if fig is None: return
foreground_shapes = []
refpoint = self.reference_point(col)
if refpoint is not None:
if refpoint in (True, False):
refpoint = str(refpoint).lower()
_y_max = sum(t.y for t in fig.select_traces()).max()
y_range = (
-_y_max * 0.02,
_y_max * 1.04,
)
foreground_shapes.append(
go.layout.Shape(
type="line",
xref="x1",
yref="y1",
x0=refpoint,
y0=y_range[0],
x1=refpoint,
y1=y_range[1],
**colors.DEFAULT_REF_LINE_STYLE,
)
)
fig.layout.shapes= foreground_shapes
fig.layout.title.font.color = 'black'
fig.layout.title.text = col
# def _draw_boxes_on_figure(self, col):
#
# if self.active_selection_deftype() != 'box':
# self._clear_boxes_on_figure(col)
# return
#
# fig = self.get_figure(col)
# if fig is None: return
# box = self._selection_defs[self.active_selection_name()]
# if box is None:
# self._clear_boxes_on_figure(col)
# return
#
# from ...scope.box import Bounds
#
# if col in box.thresholds:
# x_lo, x_hi = None, None
# thresh = box.thresholds.get(col)
# if isinstance(thresh, Bounds):
# x_lo, x_hi = thresh
# if isinstance(thresh, set):
# x_lo, x_hi = [], []
# for tickval, ticktext in enumerate(fig.data[0].x):
# if ticktext in thresh:
# x_lo.append(tickval-0.45)
# x_hi.append(tickval+0.45)
#
# try:
# x_range = (
# fig.data[0].x[0] - (fig.data[0].width[0] / 2),
# fig.data[0].x[-1] + (fig.data[0].width[-1] / 2),
# )
# except TypeError:
# x_range = (
# -0.5,
# len(fig.data[0].x)+0.5
# )
# x_width = x_range[1] - x_range[0]
# if x_lo is None:
# x_lo = x_range[0]-x_width * 0.02
# if x_hi is None:
# x_hi = x_range[1]+x_width * 0.02
# if not isinstance(x_lo, list):
# x_lo = [x_lo]
# if not isinstance(x_hi, list):
# x_hi = [x_hi]
#
# y_lo, y_hi = None, None
# _y_max = sum(t.y for t in fig.select_traces()).max()
# y_range = (
# -_y_max * 0.02,
# _y_max * 1.04,
# )
# y_width = y_range[1] - y_range[0]
# if y_lo is None:
# y_lo = y_range[0]-y_width * 0
# if y_hi is None:
# y_hi = y_range[1]+y_width * 0
# if not isinstance(y_lo, list):
# y_lo = [y_lo]
# if not isinstance(y_hi, list):
# y_hi = [y_hi]
#
# x_pairs = list(zip(x_lo, x_hi))
# y_pairs = list(zip(y_lo, y_hi))
#
# background_shapes = [
# # Rectangle background color
# go.layout.Shape(
# type="rect",
# xref="x1",
# yref="y1",
# x0=x_pair[0],
# y0=y_pair[0],
# x1=x_pair[1],
# y1=y_pair[1],
# line=dict(
# width=0,
# ),
# fillcolor=colors.DEFAULT_BOX_BG_COLOR,
# opacity=0.2,
# layer="below",
# )
# for x_pair in x_pairs
# for y_pair in y_pairs
# ]
#
# foreground_shapes = [
# # Rectangle reference to the axes
# go.layout.Shape(
# type="rect",
# xref="x1",
# yref="y1",
# x0=x_pair[0],
# y0=y_pair[0],
# x1=x_pair[1],
# y1=y_pair[1],
# line=dict(
# width=2,
# color=colors.DEFAULT_BOX_LINE_COLOR,
# ),
# fillcolor='rgba(0,0,0,0)',
# opacity=1.0,
# )
# for x_pair in x_pairs
# for y_pair in y_pairs
# ]
#
# refpoint = self.reference_point(col)
# if refpoint is not None:
# if refpoint in (True, False):
# refpoint = str(refpoint).lower()
# foreground_shapes.append(
# go.layout.Shape(
# type="line",
# xref="x1",
# yref="y1",
# x0=refpoint,
# y0=y_range[0],
# x1=refpoint,
# y1=y_range[1],
# **colors.DEFAULT_REF_LINE_STYLE,
# )
# )
#
# fig.layout.shapes=background_shapes+foreground_shapes
# fig.layout.title.font.color = colors.DEFAULT_BOX_LINE_COLOR
# fig.layout.title.text = f'<b>{col}</b>'
# else:
# self._clear_boxes_on_figure(col)
def _get_widgets(self, *include):
if self.scope is None:
raise ValueError('cannot create visualization with no scope')
viz_widgets = []
for i in include:
if i not in self.scope:
warnings.warn(f'{i} not in scope')
elif i not in self.data.columns:
warnings.warn(f'{i} not in data')
else:
fig = self.get_histogram_figure(i)
if fig is not None:
viz_widgets.append(fig)
return widget.Box(viz_widgets, layout=widget.Layout(flex_flow='row wrap'))
[docs] def selectors(self, names):
"""
Display selector widgets for certain dimensions.
This method returns an ipywidgets Box containing
the selector widgets.
Args:
names (Collection[str]):
These names will included in this set of
widgets. If the name is not found in the
scope or this visualizer's data, a warning
is issued but the remaining valid widgets
are still returned.
Returns:
ipywidgets.Box
"""
return self._get_widgets(*names)
[docs] def uncertainty_selectors(self):
"""
Display selector widgets for all uncertainties.
Returns:
ipywidgets.Box
"""
return self._get_widgets(*self.scope.get_uncertainty_names())
[docs] def lever_selectors(self):
"""
Display selector widgets for all policy levers.
Returns:
ipywidgets.Box
"""
return self._get_widgets(*self.scope.get_lever_names())
[docs] def measure_selectors(self):
"""
Display selector widgets for all performance measures.
Returns:
ipywidgets.Box
"""
return self._get_widgets(*self.scope.get_measure_names())
[docs] def complete(self, measures=None):
"""
Display status and selector widgets for all dimensions.
Returns:
ipywidgets.Box
"""
content = [self.status()]
levers = self.lever_selectors()
if levers.children:
content += [
widget.HTML("<h3>Policy Levers</h3>"),
levers,
]
uncs = self.uncertainty_selectors()
if uncs.children:
content += [
widget.HTML("<h3>Exogenous Uncertainties</h3>"),
uncs,
]
if measures is None:
meas = self.measure_selectors()
else:
meas = self.selectors(measures)
if meas.children:
content += [
widget.HTML("<h3>Performance Measures</h3>"),
meas,
]
return widget.VBox(content)
[docs] def set_active_selection_color(self, color):
super().set_active_selection_color(color)
for col, fig in self._figures_freq.items():
fig.children[0].data[0].marker.color = color
for col, fig in self._figures_hist.items():
fig.children[0].data[0].marker.color = color
c = self._status_pie.data[0].marker.colors
self._status_pie.data[0].marker.colors = [color, c[1]]
for k, twoway in self._two_way.items():
#_debugprint(f"twoway[{self._active_selection_name}][{k}] to {color}")
twoway.change_selection_color(color)
def refresh_selection_names(self):
super().refresh_selection_names()
try:
_two_way = self._two_way
except AttributeError:
pass
else:
for k, twoway in _two_way.items():
twoway.refresh_selection_names()
[docs] def two_way(
self,
key=None,
reset=False,
*,
x=None,
y=None,
use_gl=True,
):
"""
Create or display a two-way widget.
Args:
key (hashable, optional):
A hashable key value (e.g. `str`) to identify
this two_way widget. Subsequent calls to
this command with he same key will return
references to the same widget, instead of
creating new widgets.
reset (bool, default False):
Whether to reset the two_way widget for the
given key. Doing so will create a new two_way
widget, and will break any other existing references
to the same keyed widget (they will no longer live
update with this visualizer).
x, y (str, optional):
The names of the initial x- and y-axis dimensions to
display. Because the resulting figure widget is
interactive, these dimensions may be changed later.
use_gl (bool, default True):
Use Plotly's `Scattergl` instead of `Scatter`, which may
provide some performance benefit for large data sets.
Returns:
TwoWayFigure
"""
if key is None and (x is not None or y is not None):
key = (x,y)
if key in self._two_way and not reset:
return self._two_way[key]
from .twoway import TwoWayFigure
self._two_way[key] = TwoWayFigure(self, use_gl=use_gl)
self._two_way[key].selection_choose.value = self.active_selection_name()
def _try_set_value(where, value, describe):
if value is not None:
try:
where.value = value
except TraitError:
warnings.warn(f'"{value}" is not a valid value for {describe}')
_try_set_value(self._two_way[key].x_axis_choose, x, 'the x axis dimension')
_try_set_value(self._two_way[key].y_axis_choose, y, 'the y axis dimension')
return self._two_way[key]
[docs] def three_way(
self,
key=None,
reset=False,
*,
x=None,
y=None,
z=None,
s=None,
):
"""
Create or display a three-way widget.
"""
if key is None and (x is not None or y is not None or z is not None or s is not None):
key = (x,y,z,s)
if key in self._three_way and not reset:
return self._three_way[key]
from .threeway import ThreeWayFigure
self._three_way[key] = ThreeWayFigure(self, x=x,y=y,z=z,s=s)
return self._three_way[key]
[docs] def splom(
self,
key=None,
reset=False,
*,
cols='M',
rows='L',
use_gl=True,
):
"""
Create or display a scatter plot matrix widget.
Args:
key (hashable, optional):
A hashable key value (e.g. `str`) to identify
this splom widget. Subsequent calls to
this command with he same key will return
references to the same widget, instead of
creating new widgets.
reset (bool, default False):
Whether to reset the two_way widget for the
given key. Doing so will create a new splom
widget, and will break any other existing references
to the same keyed widget (they will no longer live
update with this visualizer).
cols, rows (str or Collection[str]):
The dimensions to display across each of the
columns (rows) of the scatter plot matrix.
Can be given as a list of dimension names, or
a single string that is some subset of 'XLM' to
include all uncertainties, policy levers, and/or
performance measures respectively.
use_gl (bool, default True):
Use Plotly's `Scattergl` instead of `Scatter`, which may
provide some performance benefit for large data sets.
Returns:
plotly.FigureWidget
"""
if not isinstance(rows, str):
rows = tuple(rows)
if not isinstance(cols, str):
cols = tuple(cols)
if key is None and (cols is not None or rows is not None):
key = (cols,rows)
if key in self._splom and not reset:
return self._splom[key]
box = None
if self.active_selection_deftype() == 'box':
name = self.active_selection_name()
box = self._selection_defs[name]
elif self.active_selection_deftype() == 'primbox':
name = self.active_selection_name()
box = self._selection_defs[name].to_emat_box()
self._splom[key] = new_splom_figure(
self.scope,
self.data,
rows=rows,
cols=cols,
use_gl=use_gl,
mass=250,
row_titles='side',
size=150,
selection=self.active_selection(),
box=box,
refpoint=self._reference_point,
figure_class=go.FigureWidget,
on_select=functools.partial(self._on_select_from_splom, name=key),
selected_color=self.active_selection_color(),
)
return self._splom[key]
def _on_select_from_splom(self, row, col, trace, points, selection, name=None):
# if len(points.point_inds)==0:
# return
# print("name=",name)
# print(row, col, "->", selection)
# print( "->", selection.xrange)
# print( "->", selection.yrange)
# print( "->", type(selection.yrange))
# trace.selectedpoints = None
pass
def _update_sploms(self):
box = None
if self.active_selection_deftype() == 'box':
name = self.active_selection_name()
box = self._selection_defs[name]
elif self.active_selection_deftype() == 'primbox':
name = self.active_selection_name()
box = self._selection_defs[name].to_emat_box()
for fig in self._splom.values():
with fig.batch_update():
update_splom_figure(
self.scope,
self.data,
fig,
self.active_selection(),
box,
mass=None,
selected_color=self.active_selection_color(),
)
[docs] def hmm(
self,
key=None,
reset=False,
*,
cols='M',
rows='L',
emph_selected=True,
show_points=30,
size=150,
with_hover=True,
):
"""
Create or display a heat map matrix widget.
Args:
key (hashable, optional):
A hashable key value (e.g. `str`) to identify
this hmm widget. Subsequent calls to
this command with he same key will return
references to the same widget, instead of
creating new widgets.
reset (bool, default False):
Whether to reset the two_way widget for the
given key. Doing so will create a new hmm
widget, and will break any other existing references
to the same keyed widget (they will no longer live
update with this visualizer).
cols, rows (str or Collection[str]):
The dimensions to display across each of the
columns (rows) of the heat map matrix.
Can be given as a list of dimension names, or
a single string that is some subset of 'XLM' to
include all uncertainties, policy levers, and/or
performance measures respectively.
emph_selected (bool, default True):
Emphasize selected points, using a variety of
techniques to ensure that small sized selections
remain visible. If disabled, when small sized
selections are shown from large visualization
datasets, the selected points will typically
become washed out and undetectable.
show_points (int, default 30):
If `emph_selected` is true and the number of
selected points is less than this threshold,
the selection will be overlaid on the heatmap
as a scatter plot instead of a heatmap colorization.
size (int, default 150):
The plot size for each heatmap.
Returns:
plotly.FigureWidget
"""
if not isinstance(rows, str):
rows = tuple(rows)
if not isinstance(cols, str):
cols = tuple(cols)
if key is None and (cols is not None or rows is not None):
key = (cols,rows)
if key in self._hmm and not reset:
return self._hmm[key]
box = None
if self.active_selection_deftype() == 'box':
name = self.active_selection_name()
box = self._selection_defs[name]
elif self.active_selection_deftype() == 'primbox':
name = self.active_selection_name()
box = self._selection_defs[name].to_emat_box()
self._hmm[key] = new_hmm_figure(
self.scope,
self.data,
rows=rows,
cols=cols,
row_titles='side',
size=size,
selection=self.active_selection(),
box=box,
refpoint=self._reference_point,
figure_class=go.FigureWidget,
emph_selected=emph_selected,
show_points=show_points,
selected_color=self.active_selection_color(),
with_hover=with_hover,
)
return self._hmm[key]
def _update_hmms(self):
box = None
if self.active_selection_deftype() == 'box':
name = self.active_selection_name()
box = self._selection_defs[name]
elif self.active_selection_deftype() == 'primbox':
name = self.active_selection_name()
box = self._selection_defs[name].to_emat_box()
for fig in self._hmm.values():
with fig.batch_update():
update_hmm_figure(
self.scope,
self.data,
fig,
self.active_selection(),
box,
selected_color=self.active_selection_color(),
)
[docs] def parcoords(
self,
key=None,
reset=False,
*,
coords='XLM',
):
"""
Args:
key (hashable, optional):
A hashable key value (e.g. `str`) to identify
this parcoords widget. Subsequent calls to
this command with he same key will return
references to the same widget, instead of
creating new widgets.
reset (bool, default False):
Whether to reset the parcoords widget for the
given key. Doing so will create a new parcoords
widget, and will break any other existing references
to the same keyed widget (they will no longer live
update with this visualizer).
coords (str or Collection[str]):
Names of the visualizer dimensions to display
in this parcoords widget. Give a list-like set
of named dimensions, or a string that is some
subset of 'XLM' to include all uncertainties,
policy levers, and/or performance measures
respectively.
Returns:
plotly.FigureWidget: A parallel coordinates plot.
"""
if not isinstance(coords, str):
coords = tuple(coords)
if key is None and coords is not None:
key = coords
if key in self._parcoords and not reset:
return self._parcoords[key]
self._parcoords[key] = new_parcoords_figure(
self.scope,
self.data,
coords=coords,
selection=self.active_selection(),
figure_class=go.FigureWidget,
selected_color=self.active_selection_color(),
# on_select=functools.partial(self._on_select_from_splom, name=key),
)
return self._parcoords[key]
[docs] def new_selection(self, value, name=None, color=None, activate=True):
"""
Add a new selection set to the Visualizer.
Args:
value (Box, PrimBox, str, or array-like):
The new selection. If given as an `emat.Box`,
the selection is defined entirely by the boundaries of the
box, as applied to the visualizer data.
If given as a `PrimBox`, the box boundaries are defined
by the selected point on the peeling trajectory (and
are immutable within the Visualizer interface), but the
selection is taken from the Prim target.
If given as a `str`, a new immutable selection array is created
by evaluating the string in the context of the visualizer data.
If given as an array-like, the array is used to explicitly
define an immutable selection.
name (str, optional):
A name for this selection. If not given, the name is inferred
from the `name` attribute of the `value` argument, if possible.
color (str, optional):
A color to use for this selection, in "rgb(n,n,n)" format.
If not provided, a default color is selected based on the
type of `value`.
activate (bool, default True):
Whether to immediately make this new selection as the "active"
selection for this visualizer.
Raises:
TypeError: If `name` is not a string or cannot be inferred.
"""
if name is None and hasattr(value, 'name'):
name = value.name
if not isinstance(name, str):
raise TypeError(f'selection names must be str not {type(name)}')
color = None
if value is None:
from ...scope.box import Box
value = Box(name=name, scope=self.scope)
if isinstance(value, GenericBox):
color = colors.DEFAULT_HIGHLIGHT_COLOR
elif isinstance(value, str):
color = colors.DEFAULT_EXPRESSION_COLOR
elif isinstance(value, pandas.Series):
color = colors.DEFAULT_LASSO_COLOR
elif isinstance(value, PrimBox):
color = colors.DEFAULT_PRIMTARGET_COLOR
super().new_selection(value, name=name, color=color, activate=activate)
def __setitem__(self, key, value):
self.new_selection(value, name=key)
def __getitem__(self, item):
if item not in self.selection_names():
return KeyError(item)
return self._selection_defs.get(item, None)
[docs] def prim(self, data='parameters', target=None, **kwargs):
"""
Create a new Prim search for this Visualizer.
Args:
data ({'parameters', 'levers', 'uncertainties', 'measures', 'all'}):
Limit the restricted dimensions to only be drawn
from this subset of possible dimensions from the scope.
Defaults to 'parameters` (i.e. levers and uncertainties).
target (str, optional):
If not given, the current active selection is used as the
target for Prim. Otherwise, give the name of an existing
selection, or an expression to be evaluated on the visualizer
data to create a new target.
**kwargs:
All other keyword arguments are forwarded to the
`emat.analysis.Prim` constructor.
Returns:
emat.analysis.Prim
"""
from ..prim import Prim
if target is None:
of_interest = self.active_selection()
elif isinstance(target, str):
try:
of_interest = self._selections[target]
except KeyError:
self.new_selection(target, name=f"PRIM Target: {target}")
of_interest = self.active_selection()
else:
self.new_selection(target, name="PRIM Target")
of_interest = self.active_selection()
if data == 'parameters':
data_ = self.data[self.scope.get_parameter_names()]
elif data == 'levers':
data_ = self.data[self.scope.get_lever_names()]
elif data == 'uncertainties':
data_ = self.data[self.scope.get_uncertainty_names()]
elif data == 'measures':
data_ = self.data[self.scope.get_measure_names()]
elif data == 'all':
data_ = self.data
else:
data_ = self.data[data]
self._prim_target = of_interest
if (of_interest).all():
raise ValueError("all points are in the target, cannot run PRIM")
if (~of_interest).all():
raise ValueError("no points are in the target, cannot run PRIM")
result = Prim(
data_,
of_interest,
**kwargs,
)
result._explorer = self
return result
[docs] def clear_box(self, name=None):
"""
Clear the contents of an editable selection box.
If the selection to be cleared is not editable
(i.e. if it is not based on an :class:`emat.Box`)
this method does nothing.
Args:
name (str, optional):
The name of the box to clear. If not
specified, the currently active selection
is cleared.
"""
if name is None:
name = self.active_selection_name()
if self.selection_deftype(name) == 'box':
box = self._selection_defs[name]
if box.thresholds:
box.clear()
self[name] = box
self._active_selection_changed()
[docs] def new_box(self, name, **kwargs):
"""
Create a new Box and add it to this Visualizer.
Args:
name (str):
The name of the selection box to add.
If this name already exists in this
Visualizer, it will be overwritten.
activate (bool, default True):
Immediately make this new box the active
selection in this Visualizer.
**kwargs:
All other keyword arguments are
forwarded to the :class:`emat.Box`
constructor.
Returns:
emat.Box: The newly created box.
"""
from ...scope.box import Box
scope = kwargs.pop('scope', self.scope)
activate = kwargs.pop('activate', True)
self.new_selection(
Box(name, scope=scope, **kwargs),
name=name,
color=colors.DEFAULT_HIGHLIGHT_COLOR,
activate=activate,
)
return self[name]
[docs] def add_box(self, box, activate=True):
"""
Add an existing Box to this Visualizer.
Args:
box (emat.Box): The box to add.
"""
self.new_selection(
box,
name=box.name,
activate=activate,
)
def _compute_selection_feature_scores(self, name=None):
if name is None:
name = self.active_selection_name()
if self.selection_deftype(name) == 'box':
box = self._selection_defs[name]
from ..feature_scoring import box_feature_scores
try:
return box_feature_scores(
self.scope,
box,
self.data,
return_type='styled',
db=None,
random_state=None,
cmap='viridis',
exclude_measures=True,
)
except ValueError:
return pandas.DataFrame(
index=['target'],
columns=[],
data=None,
)
else:
from ..feature_scoring import target_feature_scores
target = self._selections[name]
return target_feature_scores(
self.scope,
target,
self.data,
return_type='styled',
db=None,
random_state=None,
cmap='viridis',
exclude_measures=True,
)
[docs] def selection_feature_scores(self):
try:
scores = self._compute_selection_feature_scores().data.iloc[0]
except KeyboardInterrupt:
raise
except:
scores = {}
y = self.scope.get_parameter_names(False)
x = [scores.get(yi, numpy.nan) for yi in y]
fmt = lambda x: x if isinstance(x, str) else "{:.3f}".format(x)
t = [fmt(scores.get(yi, "N/A")) for yi in y]
fig = go.FigureWidget(
go.Bar(
x=x,
y=y,
text=t,
orientation='h',
textposition='outside',
texttemplate='%{text}',
marker_color=colors.DEFAULT_HIGHLIGHT_COLOR,
),
layout=dict(
margin=dict(t=0, b=0, l=0, r=0),
height = len(x) * 22,
yaxis_autorange="reversed",
)
)
self._selection_feature_score_fig = fig
return fig
def _pre_update_selection_feature_score_figure(self):
if self._selection_feature_score_fig is None:
return
fig = self._selection_feature_score_fig
fig.data[0].marker.color = 'yellow'
def _update_selection_feature_score_figure(self):
if self._selection_feature_score_fig is None:
return
fig = self._selection_feature_score_fig
try:
scores = self._compute_selection_feature_scores().data.iloc[0]
except KeyboardInterrupt:
raise
except:
scores = {}
y = self.scope.get_parameter_names(False)
x = [scores.get(yi, numpy.nan) for yi in y]
fmt = lambda x: x if isinstance(x, str) else "{:.3f}".format(x)
t = [fmt(scores.get(yi, "N/A")) for yi in y]
with fig.batch_update():
fig.data[0].x = x
fig.data[0].text = t
fig.data[0].marker.color = colors.DEFAULT_HIGHLIGHT_COLOR
def subvisualize(self, query=None, iloc=None, copy=True):
kwargs = dict(
reference_point=self._reference_point,
scope=self.scope,
)
if isinstance(query, str):
kwargs['data'] = self.data.query(query)
elif iloc is not None:
kwargs['data'] = self.data.iloc[query]
else:
kwargs['data'] = self.data[query]
if copy:
kwargs['data'] = kwargs['data'].copy()
return type(self)(**kwargs)