Source code for emat.analysis.prim


import numpy
import pandas
import operator

from ..workbench.analysis import prim
from ..workbench.analysis.scenario_discovery_util import RuleInductionType
from ..workbench.analysis.prim_util import PrimException
from ..scope.box import Box, Bounds, Boxes
from ..scope.scope import Scope
from .discovery import ScenarioDiscoveryMixin


from plotly import graph_objects as go

[docs]class Prim(prim.Prim, ScenarioDiscoveryMixin): """ Patient rule induction method This implementation of Prim is derived from the EMA workbench, and is enhanced to work interactively within the TMIP-EMAT framework. Args: x (pandas.DataFrame): The independent variables, generally the scoped input parameters (uncertainties and/or policy levers) from a design of experiments. y (array-like): The dependent variable, generally a pandas.Series or other one dimensional array with size equal to the number of rows in `x`. threshold (float): The density threshold that a box must meet. obj_function ({LENIENT1, LENIENT2, ORIGINAL}): The objective function used by PRIM. This defaults to a lenient objective function based on the gain of mean divided by the loss of mass. peel_alpha (float, default 0.05): The parameter controlling the peeling stage. paste_alpha (float, default 0.05): The parameter controlling the pasting stage. mass_min (float, default 0.05): The minimum mass of a box. threshold_type ({ABOVE, BELOW}): Whether to look above or below the threshold value. mode (RuleInductionType): This PRIM implementation defaults to binary classification, but this argument can be used to switch to regression mode. In binary mode, the `y` array should only have binary values, while in regression mode it can contain other floating point values. update_function ({'default', 'guivarch'}): The default behavior for this algorithm after finding the first box is to remove all points in the first box, so that subsequent boxes do not include them. An alternate procedure suggested by Guivarch et al (2016) doi:10.1016/j.envsoft.2016.03.006 is to simply set all points in the first box to be no longer of interest. This alternate procedure is only valid in binary mode. """
[docs] def find_box(self): """ Execute one iteration of the PRIM algorithm. This method will find one box, starting from the current state of Prim (i.e., over all observations that are not already within a previously selected PrimBox). All existing boxes will be "frozen" by this command, so that those prior boxes can no longer be modified by their `select` methods or from within their `tradeoff_selector`. Returns: emat.analysis.PrimBox """ primbox = super().find_box() if primbox is None: return None primbox.__class__ = PrimBox prim_explorer = getattr(self, '_explorer', None) if prim_explorer is not None: from .explore_2.explore_visualizer import Visualizer subdata = prim_explorer.data.iloc[primbox.yi_initial] if len(subdata) != len(prim_explorer.data): # Generally, second and later boxes get new subvisualizers primbox._explorer = Visualizer( subdata.copy(), scope=prim_explorer.scope, ) else: primbox._explorer = prim_explorer primbox._target_name = getattr(self, '_target_name', None) primbox.select(primbox._cur_box) return primbox
[docs] def tradeoff_selector(self, n=-1, colorscale='viridis', figure_class=None): """ Visualize the trade off between coverage and density. This visualization plots all of the points along the peeling trajectory for the selected `PrimBox`, plotting coverage along the x axis, density along the y axis, and showing the number of restricted dimensions by color. Coverage is percentage of the cases of interest that are in the box (i.e., number of cases of interest in the box divided by total number of cases of interest). The starting point of the PRIM algorithm is the unrestricted full set of cases, which includes all outcomes of interest, and therefore, the coverage starts at 1.0 and drops as the algorithm progresses. Density is the share of cases in the box that are case of interest (i.e., number of cases of interest in the box divided by the total number of cases in the box). As the box is reduced, the density will increase (as that is the objective of the PRIM algorithm). Args: n (int, optional): The index number of the PrimBox to use. If not given, the last found box is used. If no boxes have been found yet, an initial box is found using the `find_box` method, and in this case giving any value other than -1 will raise an error. colorscale (str, default 'viridis'): A valid color scale name, as compatible with the color_palette method in seaborn. Returns: plotly.FigureWidget """ try: box = self._boxes[n] except IndexError: if n == -1: box = self.find_box() else: raise return box.tradeoff_selector(colorscale=colorscale, figure_class=figure_class)
def to_json(self, n=-1): try: box = self._boxes[n] except IndexError: if n == -1: box = self.find_box() else: raise return box.to_json() def __init__(self, x, y, threshold=0.05, *args, scope=None, explorer=None, **kwargs): super().__init__(x, y, threshold, *args, **kwargs) self._target_name = getattr(y, 'name', None) if explorer is not None: self._explorer = explorer elif explorer is False: self._explorer = None else: self._explorer = None if hasattr(x, 'scope') and scope is None: scope = x.scope if scope is not None: from .explore_2.explore_visualizer import Visualizer self._explorer = Visualizer(x, scope=scope)
def _discrete_color_scale(name='viridis', n=8): import seaborn as sns colors = sns.color_palette(name, n) colorlist = [] for i in range(n): c = colors[i] thiscolor_s = f"rgb({int(c[0]*255)}, {int(c[1]*255)}, {int(c[2]*255)})" colorlist.append([i/n, thiscolor_s]) colorlist.append([(i+1)/n, thiscolor_s]) return colorlist
[docs]class PrimBox(prim.PrimBox): """ Information for a specific Prim box. By default, the currently selected box is the last box on the peeling trajectory, unless this is changed via :meth:`PrimBox.select`. """ def box_number(self): for n, b in enumerate(self.prim._boxes, start=1): if b is self: return n return -1 def to_emat_box(self, i=None, name=None, src_name=None): if i is None: i = self._cur_box if name is None: name = f'PRIM Box {self.box_number()} Peel {i}' if src_name is not None: name = name + f" [{src_name}]" limits = self.box_lims[i] b = Box(name) for col in limits.columns: if isinstance(self.prim.x.dtypes[col], pandas.CategoricalDtype): if set(self.prim.x[col].cat.categories) != limits[col].iloc[0]: b.replace_allowed_set(col, limits[col].iloc[0]) else: if limits[col].iloc[0] != self.prim.x[col].min(): b.set_lower_bound(col, limits[col].iloc[0]) if limits[col].iloc[1] != self.prim.x[col].max(): b.set_upper_bound(col, limits[col].iloc[1]) b.coverage = self.peeling_trajectory['coverage'][i] b.density = self.peeling_trajectory['density'][i] b.mass = self.peeling_trajectory['mass'][i] return b def __repr__(self): i = self._cur_box head = f"<{self.__class__.__name__} peel {i+1} of {len(self.peeling_trajectory)}>" # make the box definition qp_values = self.qp[i] uncs = [(key, value) for key, value in qp_values.items()] uncs.sort(key=operator.itemgetter(1)) uncs = [uncs[0] for uncs in uncs] box_lim = pandas.DataFrame( index=uncs, columns=['min','max']) for unc in uncs: values = self.box_lims[i][unc] box_lim.loc[unc] = [values[0], values[1]] head += f'\n coverage: {self.coverage:.5f}' head += f'\n density: {self.density:.5f}' head += f'\n mean: {self.mean:.5f}' head += f'\n mass: {self.mass:.5f}' head += f'\n restricted dims: {self.res_dim}' if not box_lim.empty: head += "\n "+str(box_lim).replace("\n", "\n ") return head def to_json(self): state = {} for i in range(len(self.peeling_trajectory)): state[i] = self.to_emat_box(i, name=str(i)).to_json() import json return json.dumps(state) def _make_tradeoff_selector(self, colorscale='cividis', figure_class=None): ''' Visualize the trade off between coverage and density. Color is used to denote the number of restricted dimensions. Parameters ---------- colorscale : str valid seaborn color scale name Returns ------- a FigureWidget instance ''' peeling_trajectory = self.peeling_trajectory hovertext = pandas.Series('', index=peeling_trajectory.index) if figure_class is None: figure_class = go.FigureWidget fig = figure_class() for i in range(len(peeling_trajectory)): t = str(self.to_emat_box(i, name=str(i))).replace("\n","<br>") hovertext.iloc[i] = f'<span style="font-family:Consolas,monospace">{t}</span>' n_colors = max(peeling_trajectory['res_dim'])+1 color_scale_ = _discrete_color_scale(colorscale, n_colors) colortickvals = numpy.arange(0.5, n_colors, 1) * (n_colors-1)/n_colors colorticktext = [str(i) for i in range(n_colors)] symbols = numpy.zeros(len(peeling_trajectory), dtype=int) symbols[self._cur_box] = 4 # cross sizes = numpy.full(len(peeling_trajectory), 6, dtype=int) sizes[self._cur_box] = 9 scatter = fig.add_scatter( x=peeling_trajectory['coverage'], y=peeling_trajectory['density'], mode='markers', marker=dict( color=peeling_trajectory['res_dim'], colorscale=color_scale_, showscale=True, colorbar=dict( title="Number of Restricted Dimensions", titleside="right", tickmode="array", tickvals=colortickvals, ticktext=colorticktext, ticks="outside", ), symbol=symbols, size=sizes, ), text=hovertext, hoverinfo="text", ).data[-1] fig.update_layout( margin=dict(l=10, r=10, t=10, b=10), width=600, height=400, xaxis_title_text='Coverage', yaxis_title_text='Density', ) # create callback function def select_point(trace, points, selector): for i in points.point_inds: self.select(i) scatter.on_click(select_point) return fig
[docs] def tradeoff_selector(self, colorscale='viridis', figure_class=None): ''' Visualize the trade off between coverage and density. This visualization plots all of the points along the peeling trajectory for this `PrimBox`, plotting coverage along the x axis, density along the y axis, and showing the number of restricted dimensions by color. Coverage is percentage of the cases of interest that are in the box (i.e., number of cases of interest in the box divided by total number of cases of interest). The starting point of the PRIM algorithm is the unrestricted full set of cases, which includes all outcomes of interest, and therefore, the coverage starts at 1.0 and drops as the algorithm progresses. Density is the share of cases in the box that are case of interest (i.e., number of cases of interest in the box divided by the total number of cases in the box). As the box is reduced, the density will increase (as that is the objective of the PRIM algorithm). Parameters ---------- colorscale : str, default 'viridis' A valid color scale name, as compatible with the color_palette method in seaborn. Returns ------- FigureWidget ''' if getattr(self, '_tradeoff_widget', None) is None: self._tradeoff_widget = self._make_tradeoff_selector( colorscale=colorscale, figure_class=figure_class, ) return self._tradeoff_widget
[docs] def select(self, i): """ Select an entry from the peeling and pasting trajectory. This will update the PRIM box to this selected box, as well as update the `tradeoff_selector` and `explorer`, if either is attached to this PrimBox. Args: i (int): The index of the box to select. """ try: super(PrimBox, self).select(i) except PrimException: pass else: widget = getattr(self, '_tradeoff_widget', None) if widget is not None: symbols = numpy.zeros(len(self.peeling_trajectory), dtype=int) symbols[i] = 4 # cross sizes = numpy.full(len(self.peeling_trajectory), 6, dtype=int) sizes[i] = 9 widget['data'][0]['marker']['symbol'] = symbols widget['data'][0]['marker']['size'] = sizes explorer = getattr(self, '_explorer', None) if explorer is not None: from .explore_2.explore_base import DataFrameExplorerBase if isinstance(explorer, DataFrameExplorerBase): name_t = f"PRIM Box {self.box_number()} Target [{self._target_name}]" name_s = f"PRIM Box {self.box_number()} Solution [{self._target_name}]" explorer.new_selection( self, name=name_t, activate=False, ) explorer.new_selection( self.to_emat_box(), name=name_s, activate=False, ) if explorer.active_selection_name() not in (name_t, name_s): explorer.set_active_selection_name(name_s) else: explorer.set_active_selection_name( explorer.active_selection_name(), force_update=True, ) else: # for old explorer interface explorer.set_box(self.to_emat_box())
def explore(self, scope=None, data=None): if getattr(self, '_explorer', None) is None: from .explore_2.explore_visualizer import Visualizer if data is None: data = self.prim.x if scope is None: scope = getattr(data, 'scope', None) if scope is None: raise ValueError("failed to initialize visualizer, cannot find scope") self._explorer = Visualizer(scope=scope, data=data) self._explorer["PRIM Target"] = self.to_emat_box() return self._explorer def splom(self, rows=None, cols=None): if rows is None: rows = sorted(self.to_emat_box().demanded_features) if cols is None: cols = sorted(self.to_emat_box().demanded_features) fig = self.explore().splom( f"{rows}|{cols}", rows=rows, cols=cols, ) return fig def hmm(self, rows=None, cols=None): if rows is None: rows = sorted(self.to_emat_box().demanded_features) if cols is None: cols = sorted(self.to_emat_box().demanded_features) fig = self.explore().hmm( f"{rows}|{cols}", rows=rows, cols=cols, ) return fig