Source code for emat.workbench.analysis.prim

'''

A scenario discovery oriented implementation of PRIM.

The implementation of prim provided here is data type aware, so
categorical variables will be handled appropriately. It also uses a
non-standard objective function in the peeling and pasting phase of the
algorithm. This algorithm looks at the increase in the mean divided
by the amount of data removed. So essentially, it uses something akin
to the first order derivative of the original objective function.

The implementation is designed for interactive use in combination with
the jupyter notebook.

'''
import copy
import itertools
import matplotlib as mpl
from operator import itemgetter
import warnings

import numpy as np
import pandas as pd
import seaborn as sns

try:
    import altair as alt
except ImportError:
    alt = None
    warnings.warn(("altair based interactive "
                   "inspection not available"), ImportWarning)

from ..util import (EMAError, temporary_filter, INFO, get_module_logger)
from . import scenario_discovery_util as sdutil
from .prim_util import (PrimException, CurEntry, PRIMObjectiveFunctions,
                        NotSeen, is_pareto_efficient,
                        get_quantile, rotate_subset, calculate_qp,
                        determine_dimres, is_significant)

# Created on 22 feb. 2013
#
# .. codeauthor:: jhkwakkel <j.h.kwakkel (at) tudelft (dot) nl>


__all__ = ['ABOVE', 'BELOW', 'setup_prim', 'Prim', 'PrimBox',
           "pca_preprocess", "run_constrained_prim", 'PRIMObjectiveFunctions']
_logger = get_module_logger(__name__)


ABOVE = 1
BELOW = -1
PRECISION = '.2f'


def pca_preprocess(experiments, y, subsets=None, exclude=set()):
    '''perform PCA to preprocess experiments before running PRIM

    Pre-process the data by performing a pca based rotation on it.
    This effectively turns the algorithm into PCA-PRIM as described
    in `Dalal et al (2013) <http://www.sciencedirect.com/science/article/pii/S1364815213001345>`_

    Parameters
    ----------
    experiments : DataFrame
    y : ndarray
        one dimensional binary array
    subsets : dict, optional
              expects a dictionary with group name as key and a list of
              uncertainty names as values. If this is used, a constrained
              PCA-PRIM is executed
    exclude : list of str, optional
              the uncertainties that should be excluded from the rotation

    Returns
    -------
    rotated_experiments
        DataFrame
    rotation_matrix
        DataFrame

    Raises
    ------
    RuntimeError
        if mode is not binary (i.e. y is not a binary classification).
        if X contains non numeric columns


    '''
    # experiments to rotate
    x = experiments.drop(exclude, axis=1)

    #
    if not x.select_dtypes(exclude=np.number).empty:
        raise RuntimeError("X includes non numeric columns")
    if not set(np.unique(y)) == {0, 1}:
        raise RuntimeError('y should only contain 0s and 1s')

    # if no subsets are provided all uncertainties with non dtype object
    # are in the same subset, the name of this is r, for rotation
    if not subsets:
        subsets = {"r": x.columns.values.tolist()}
    else:
        # TODO:: should we check on double counting in subsets?
        #        should we check all uncertainties are in x?
        pass

    # prepare the dtypes for the new rotated experiments dataframe
    new_columns = []
    new_dtypes = []
    for key, value in subsets.items():
        # the names of the rotated columns are based on the group name
        # and an index
        subset_cols = ["{}_{}".format(key, i) for i in range(len(value))]
        new_columns.extend(subset_cols)
        new_dtypes.extend((float,) * len(value))

    # make a new empty experiments dataframe
    rotated_experiments = pd.DataFrame(index=experiments.index.values)

    for name, dtype in zip(new_columns, new_dtypes):
        rotated_experiments[name] = pd.Series(dtype=dtype)

    # put the uncertainties with object dtypes already into the new
    for entry in exclude:
        rotated_experiments[name] = experiments[entry]

    # iterate over the subsets, rotate them, and put them into the new
    # experiments dataframe
    rotation_matrix = np.zeros((x.shape[1], ) * 2)
    column_names = []
    row_names = []

    j = 0
    for key, value in subsets.items():
        x_subset = x[value]
        subset_rotmat, subset_experiments = rotate_subset(x_subset, y)
        rotation_matrix[j:j + len(value), j:j + len(value)] = subset_rotmat
        row_names.extend(value)
        j += len(value)

        for i in range(len(value)):
            name = "%s_%s" % (key, i)
            rotated_experiments[name] = subset_experiments[:, i]
            [column_names.append(name)]

    rotation_matrix = pd.DataFrame(rotation_matrix, index=row_names,
                                   columns=column_names)

    return rotated_experiments, rotation_matrix


def run_constrained_prim(experiments, y, issignificant=True,
                         **kwargs):
    ''' Run PRIM repeatedly while constraining the maximum number of dimensions
    available in x

    Improved usage of PRIM as described in `Kwakkel (2019) <https://onlinelibrary.wiley.com/doi/full/10.1002/ffo2.8>`_.

    Parameters
    ----------
    x : numpy structured array
    y : numpy array
    issignificant : bool, optional
                    if True, run prim only on subsets of dimensions
                    that are significant for the initial PRIM on the
                    entire dataset.
    **kwargs : any additional keyword arguments are passed on to PRIM


    Returns
    -------
    PrimBox instance

    '''
    frontier = []
    merged_lims = []
    merged_qp = []

    alg = Prim(experiments, y, threshold=0.1, **kwargs)
    boxn = alg.find_box()

    dims = determine_dimres(boxn, issignificant=issignificant)

    # run prim for all possible combinations of dims
    subsets = []
    for n in range(1, len(dims) + 1):
        for subset in itertools.combinations(dims, n):
            subsets.append(subset)
    _logger.info("going to run PRIM {} times".format(len(subsets)))

    boxes = [boxn]
    for subset in subsets:
        with temporary_filter(__name__, INFO):
            x = experiments.loc[:, subset].copy()
            alg = Prim(x, y, threshold=0.1, **kwargs)
            box = alg.find_box()
            boxes.append(box)

    box_init = boxn.prim.box_init
    not_seen = NotSeen()

    for box in boxes:
        peeling = box.peeling_trajectory
        lims = box.box_lims

        logical = np.ones(box.peeling_trajectory.shape[0], dtype=np.bool)

        for i in range(box.peeling_trajectory.shape[0]):
            lim = lims[i]

            boxlim = box_init.copy()
            for column in lim:
                boxlim[column] = lim[column]

            boolean = is_significant(box, i) & not_seen(boxlim)

            logical[i] = boolean
            if boolean:
                merged_lims.append(boxlim)
                merged_qp.append(box.qp[i])
        frontier.append(peeling[logical])

    frontier = pd.concat(frontier)

    # remove dominated boxes
    peeling_trajectory = frontier.reset_index(drop=True)
    data = peeling_trajectory.iloc[:, [0, 1, -1]].copy().values
    data[:, 2] *= -1
    logical = is_pareto_efficient(data)

    # resort to ensure sensible ordering
    pt = peeling_trajectory[logical]
    pt = pt.reset_index(drop=True)
    pt = pt.sort_values(['res_dim', 'coverage'],
                        ascending=[True, False])

    box_lims = [lim for lim, entry in zip(merged_lims, logical) if entry]
    qps = [qp for qp, entry in zip(merged_qp, logical) if entry]
    sorted_lims = []
    sorted_qps = []
    for entry in pt.index:
        sorted_lims.append(box_lims[int(entry)])
        sorted_qps.append(qps[int(entry)])

    # ensuring index has normal order starting from 0
    pt = pt.reset_index(drop=True)
    pt.id = pt.index

    # create PrimBox
    box = PrimBox(boxn.prim, boxn.prim.box_init, boxn.prim.yi)
    box.peeling_trajectory = pt
    box.box_lims = sorted_lims
    box.qp = sorted_qps
    return box


def setup_prim(results, classify, threshold, incl_unc=[], **kwargs):
    """Helper function for setting up the prim algorithm

    Parameters
    ----------
    results : tuple
              tuple of DataFrame and dict with numpy arrays
              the return from :meth:`perform_experiments`.
    classify : str or callable
               either a string denoting the outcome of interest to
               use or a function.
    threshold : double
                the minimum score on the density of the last box
                on the peeling trajectory. In case of a binary
                classification, this should be between 0 and 1.
    incl_unc : list of str, optional
               list of uncertainties to include in prim analysis
    kwargs : dict
             valid keyword arguments for prim.Prim

    Returns
    -------
    a Prim instance

    Raises
    ------
    PrimException
        if data resulting from classify is not a 1-d array.
    TypeError
        if classify is not a string or a callable.
    """

    x, y, mode = sdutil._setup(results, classify, incl_unc)

    return Prim(x, y, threshold=threshold, mode=mode, **kwargs)


class PrimBox(object):
    '''A class that holds information for a specific box

    Attributes
    ----------
    coverage : float
               coverage of currently selected box
    density : float
               density of currently selected box
    mean : float
           mean of currently selected box
    res_dim : int
              number of restricted dimensions of currently selected box
    mass : float
           mass of currently selected box
    peeling_trajectory : DataFrame
                         stats for each box in peeling trajectory
    box_lims : list
               list of box lims for each box in peeling trajectory

    by default, the currently selected box is the last box on the
    peeling trajectory, unless this is changed via
    :meth:`PrimBox.select`.

    '''

    coverage = CurEntry('coverage')
    density = CurEntry('density')
    mean = CurEntry('mean')
    res_dim = CurEntry('res_dim')
    mass = CurEntry('mass')

    _frozen = False

    def __init__(self, prim, box_lims, indices):
        '''init

        Parameters
        ----------
        prim : Prim instance
        box_lims : DataFrame
        indices : ndarray


        '''

        self.prim = prim

        # peeling and pasting trajectory
        colums = ['coverage', 'density', 'mean', 'res_dim',
                  'mass', 'id']
        self.peeling_trajectory = pd.DataFrame(columns=colums)

        self.box_lims = []
        self.qp = []
        self._resampled = []
        self.yi_initial = indices[:]

        columns = ['name', 'lower', 'upper', 'minimum', 'maximum',
                   'qp_lower', 'qp_upper', 'id']
        self.boxes_quantitative = pd.DataFrame(columns=columns)

        columns = ['item', 'name', 'n_items', 'x', 'id']
        self.boxes_nominal = pd.DataFrame(columns=columns)

        self._cur_box = -1

        # indices van data in box
        self.update(box_lims, indices)

    def __getattr__(self, name):
        '''
        used here to give box_lim same behaviour as coverage, density,
        mean, res_dim, and mass. That is, it will return the box lim
        associated with the currently selected box.
        '''

        if name == 'box_lim':
            return self.box_lims[self._cur_box]
        else:
            raise AttributeError

[docs] def inspect(self, i=None, style='table', **kwargs): '''Write the stats and box limits of the user specified box to standard out. if i is not provided, the last box will be printed Parameters ---------- i : int, optional the index of the box, defaults to currently selected box style : {'table', 'graph'} the style of the visualization additional kwargs are passed to the helper function that generates the table or graph ''' if i is None: i = self._cur_box stats = self.peeling_trajectory.iloc[i].to_dict() stats['restricted_dim'] = stats['res_dim'] qp_values = self.qp[i] uncs = [(key, value) for key, value in qp_values.items()] uncs.sort(key=itemgetter(1)) uncs = [uncs[0] for uncs in uncs] if style == 'table': return self._inspect_table(i, uncs, qp_values, **kwargs) elif style == 'graph': return self._inspect_graph(i, uncs, qp_values, **kwargs) else: raise ValueError("style must be one of graph or table")
def _inspect_table(self, i, uncs, qp_values): '''Helper function for visualizing box statistics in table form''' # make the descriptive statistics for the box print(self.peeling_trajectory.iloc[i]) print() # make the box definition columns = pd.MultiIndex.from_product([['box {}'.format(i)], ['min', 'max', 'qp values']]) box_lim = pd.DataFrame(np.zeros((len(uncs), 3)), index=uncs, columns=columns) for unc in uncs: values = self.box_lims[i][unc] box_lim.loc[unc] = [values[0], values[1], str(qp_values[unc])] print(box_lim) print() def _inspect_graph(self, i, uncs, qp_values, ticklabel_formatter="{} ({})", boxlim_formatter="{: .2g}", table_formatter='{:.3g}'): '''Helper function for visualizing box statistics in graph form''' return sdutil.plot_box(self.box_lims[i], qp_values, self.prim.box_init, uncs, self.peeling_trajectory.at[i, 'coverage'], self.peeling_trajectory.at[i, "density"], ticklabel_formatter=ticklabel_formatter, boxlim_formatter=boxlim_formatter, table_formatter=table_formatter) def inspect_tradeoff(self): # TODO:: # make legend with res_dim color code a selector as well? # https://medium.com/dataexplorations/focus-generating-an-interactive-legend-in-altair-9a92b5714c55 boxes = [] nominal_vars = [] quantitative_dims = set(self.prim.x_float_colums.tolist() + self.prim.x_int_columns.tolist()) nominal_dims = set(self.prim.x_nominal_columns) box_zero = self.box_lims[0] for i, (entry, qp) in enumerate(zip(self.box_lims, self.qp)): qp = pd.DataFrame(qp, index=['qp_lower', 'qp_upper']) dims = qp.columns.tolist() quantitative_res_dim = [e for e in dims if e in quantitative_dims] nominal_res_dims = [e for e in dims if e in nominal_dims] # handle quantitative df = entry box = df[quantitative_res_dim] box.index = ['x1', 'x2'] box = box.T box['name'] = box.index box['id'] = int(i) box['minimum'] = box_zero[quantitative_res_dim].T.iloc[:, 0] box['maximum'] = box_zero[quantitative_res_dim].T.iloc[:, 1] box = box.join(qp.T) boxes.append(box) # handle nominal for dim in nominal_res_dims: # TODO:: qp values items = df[dim].values[0] for j, item in enumerate(items): # we need to have tick labeling to be dynamic? # adding it to the dict wont work, creates horrible figure # unless we can force a selection? name = f"{dim}, {qp.loc[qp.index[0], dim]: .2g}" entry = dict(name=name, n_items=len(items) + 1, item=item, id=int(i), x=j / len(items)) nominal_vars.append(entry) boxes = pd.concat(boxes) nominal_vars = pd.DataFrame(nominal_vars) width = 400 height = width point_selector = alt.selection_single(fields=['id']) peeling = self.peeling_trajectory.copy() peeling['id'] = peeling.index chart = alt.Chart(peeling).mark_circle( size=75).encode( x='coverage:Q', y='density:Q', color=alt.Color( 'res_dim:O', scale=alt.Scale( range=sns.color_palette( 'YlGnBu', n_colors=8).as_hex())), opacity=alt.condition( point_selector, alt.value(1), alt.value(0.4))).properties( selection=point_selector).properties( width=width, height=height) # conda update -c conda-forge altair to 2.1 # move this to encoding tooltip=[<list of items>] chart.encoding.tooltip = [ {"type": "ordinal", "field": "id"}, {"type": "quantitative", "field": "coverage", "format": ".2"}, {"type": "quantitative", "field": "density", "format": ".2"}, {"type": "ordinal", "field": "res_dim", } ] base = alt.Chart(boxes).encode( x=alt.X('x_lower:Q', axis=alt.Axis(grid=False, title='box limits', labels=False), scale=alt.Scale(domain=(0, 1), padding=0.1)), x2='x_upper:Q', y=alt.Y('name:N', scale=alt.Scale(padding=1.0)) ).transform_calculate( x_lower='(datum.x1-datum.minimum)/(datum.maximum-datum.minimum)', x_upper='(datum.x2-datum.minimum)/(datum.maximum-datum.minimum)' ).transform_filter( point_selector ).properties( width=width, ) lines = base.mark_rule() texts1 = base.mark_text( baseline='top', dy=5, align='left').encode( text=alt.Text('text:O')).transform_calculate( text=( 'datum.qp_lower>0?' 'format(datum.x1, ".2")+" ("+format(datum.qp_lower, ".1~g")+")" :' 'format(datum.x1, ".2")')) texts2 = base.mark_text( baseline='top', dy=5, align='right').encode( text=alt.Text('text:O'),x='x_upper:Q').transform_calculate( text=( 'datum.qp_upper>0?' 'format(datum.x2, ".2")+" ("+format(datum.qp_upper, ".1")+")" :' 'format(datum.x2, ".2")')) data = pd.DataFrame([dict(start=0, end=1)]) rect = alt.Chart(data).mark_rect(opacity=0.05).encode( x='start:Q', x2='end:Q', ) # TODO:: for qp can we do something with the y encoding here and # connecting this to a selection? # seems tricky, no clear way to control the actual labels # or can we use the text channel identical to the above? nominal = alt.Chart(nominal_vars).mark_point().encode( x='x:Q', y='name:N', ).transform_filter( point_selector ).properties( width=width, ) texts3 = nominal.mark_text(baseline='top', dy=5, align='center').encode( text='item:N' ) layered = alt.layer(lines, texts1, texts2, rect, nominal, texts3) return chart & layered
[docs] def resample(self, i=None, iterations=10, p=1 / 2): '''Calculate resample statistics for candidate box i Parameters ---------- i : int, optional iterations : int, optional p : float, optional Returns ------- DataFrame ''' if i is None: i = self._cur_box x = self.prim.x.loc[self.yi_initial, :] y = self.prim.y[self.yi_initial] if len(self._resampled) < iterations: with temporary_filter(__name__, INFO, 'find_box'): for j in range(len(self._resampled), iterations): _logger.info('resample {}'.format(j)) index = np.random.choice(x.index, size=int(x.shape[0] * p), replace=False) x_temp = x.loc[index, :].reset_index(drop=True) y_temp = y[index] box = Prim(x_temp, y_temp, threshold=0.1, peel_alpha=self.prim.peel_alpha, paste_alpha=self.prim.paste_alpha).find_box() self._resampled.append(box) counters = [] for _ in range(2): counter = {column: 0.0 for column in x.columns} counters.append(counter) coverage = self.peeling_trajectory.coverage[i] density = self.peeling_trajectory.density[i] for box in self._resampled: coverage_index = ( box.peeling_trajectory.coverage - coverage).abs().idxmin() density_index = ( box.peeling_trajectory.density - density).abs().idxmin() for counter, index in zip(counters, [coverage_index, density_index]): for unc in box.qp[index].keys(): counter[unc] += 1 / iterations scores = pd.DataFrame(counters, index=['reproduce coverage', 'reproduce density'], columns=box.box_lim.columns).T * 100 return scores.sort_values(by=['reproduce coverage', 'reproduce density'], ascending=False)
def select(self, i): ''' select an entry from the peeling and pasting trajectory and update the prim box to this selected box. Parameters ---------- i : int the index of the box to select. ''' if self._frozen: raise PrimException(("box has been frozen because PRIM " "has found at least one more recent " "box")) res_dim = sdutil._determine_restricted_dims(self.box_lims[i], self.prim.box_init) indices = sdutil._in_box(self.prim.x.loc[self.prim.yi_remaining, res_dim], self.box_lims[i][res_dim]) self.yi = self.prim.yi_remaining[indices] self._cur_box = i
[docs] def drop_restriction(self, uncertainty='', i=-1): '''Drop the restriction on the specified dimension for box i Parameters ---------- i : int, optional defaults to the currently selected box, which defaults to the latest box on the trajectory uncertainty : str Replace the limits in box i with a new box where for the specified uncertainty the limits of the initial box are being used. The resulting box is added to the peeling trajectory. ''' if i == -1: i = self._cur_box new_box_lim = self.box_lims[i].copy() new_box_lim.loc[:, uncertainty] = self.box_lims[0].loc[:, uncertainty] indices = sdutil._in_box(self.prim.x.loc[self.prim.yi_remaining, :], new_box_lim) indices = self.prim.yi_remaining[indices] self.update(new_box_lim, indices)
[docs] def update(self, box_lims, indices): '''update the box to the provided box limits. Parameters ---------- box_lims: DataFrame the new box_lims indices: ndarray the indices of y that are inside the box ''' self.yi = indices self.box_lims.append(box_lims) # peeling trajectory i = self.peeling_trajectory.shape[0] y = self.prim.y[self.yi] coi = self.prim.determine_coi(self.yi) restricted_dims = sdutil._determine_restricted_dims(self.box_lims[-1], self.prim.box_init) data = {'coverage': coi / self.prim.t_coi, 'density': coi / y.shape[0], 'mean': np.mean(y), 'res_dim': restricted_dims.shape[0], 'mass': y.shape[0] / self.prim.n, 'id': i} new_row = pd.DataFrame([data]) self.peeling_trajectory = self.peeling_trajectory.append( new_row, ignore_index=True, sort=True) # boxlims qp = self._calculate_quasi_p(i, restricted_dims) self.qp.append(qp) self._cur_box = len(self.peeling_trajectory) - 1
[docs] def show_ppt(self): '''show the peeling and pasting trajectory in a figure''' return sdutil.plot_ppt(self.peeling_trajectory)
[docs] def show_tradeoff(self, cmap=mpl.cm.viridis): # @UndefinedVariable '''Visualize the trade off between coverage and density. Color is used to denote the number of restricted dimensions. Parameters ---------- cmap : valid matplotlib colormap Returns ------- a Figure instance ''' return sdutil.plot_tradeoff(self.peeling_trajectory, cmap=cmap)
[docs] def show_pairs_scatter(self, i=None, dims=None, markers=None): ''' Make a pair wise scatter plot of all the restricted dimensions with color denoting whether a given point is of interest or not and the boxlims superimposed on top. Parameters ---------- i : int, optional dims : list of str, optional dimensions to show, defaults to all restricted dimensions Returns ------- seaborn PairGrid ''' if i is None: i = self._cur_box if dims is None: dims = sdutil._determine_restricted_dims(self.box_lims[i], self.prim.box_init) return sdutil.plot_pair_wise_scatter(self.prim.x.iloc[self.yi_initial,:], self.prim.y[self.yi_initial], self.box_lims[i], self.prim.box_init, dims, markers=markers)
[docs] def write_ppt_to_stdout(self): '''write the peeling and pasting trajectory to stdout''' print(self.peeling_trajectory) print("\n")
def _calculate_quasi_p(self, i, restricted_dims): '''helper function for calculating quasi-p values as discussed in Bryant and Lempert (2010). This is a one sided binomial test. Parameters ---------- i : int the specific box in the peeling trajectory for which the quasi-p values are to be calculated. Returns ------- dict ''' box_lim = self.box_lims[i] box_lim = box_lim[restricted_dims] # total nr. of cases in box Tbox = self.peeling_trajectory['mass'][i] * self.prim.n # total nr. of cases of interest in box Hbox = self.peeling_trajectory['coverage'][i] * self.prim.t_coi x = self.prim.x.loc[self.prim.yi_remaining, restricted_dims] y = self.prim.y[self.prim.yi_remaining] # TODO use apply on df? qp_values = box_lim.apply(calculate_qp, axis=0, result_type='expand', args=[x, y, Hbox, Tbox, box_lim, self.box_lims[0]]) qp_values = qp_values.to_dict(orient='list') return qp_values class Prim(sdutil.OutputFormatterMixin): '''Patient rule induction algorithm The implementation of Prim is tailored to interactive use in the context of scenario discovery Parameters ---------- x : DataFrame the independent variables y : 1d ndarray the dependent variable threshold : float the density threshold that a box has to meet obj_function : {LENIENT1, LENIENT2, ORIGINAL} the objective function used by PRIM. Defaults to a lenient objective function based on the gain of mean divided by the loss of mass. peel_alpha : float, optional parameter controlling the peeling stage (default = 0.05). paste_alpha : float, optional parameter controlling the pasting stage (default = 0.05). mass_min : float, optional minimum mass of a box (default = 0.05). threshold_type : {ABOVE, BELOW} whether to look above or below the threshold value mode : {RuleInductionType.BINARY, RuleInductionType.REGRESSION}, optional indicated whether PRIM is used for regression, or for scenario classification in which case y should be a binary vector update_function = {'default', 'guivarch'}, optional controls behavior of PRIM after having found a first box. use either the default behavior were all points are removed, or the procedure suggested by guivarch et al (2016) doi:10.1016/j.envsoft.2016.03.006 to simply set all points to be no longer of interest (only valid in binary mode). See also -------- :mod:`cart` ''' message = "{0} points remaining, containing {1} cases of interest" def __init__(self, x, y, threshold, obj_function=PRIMObjectiveFunctions.LENIENT1, peel_alpha=0.05, paste_alpha=0.05, mass_min=0.05, threshold_type=ABOVE, mode=sdutil.RuleInductionType.BINARY, update_function='default'): x = x.reset_index(drop=True) y = np.asarray(y).ravel() assert mode in {sdutil.RuleInductionType.BINARY, sdutil.RuleInductionType.REGRESSION} assert self._assert_mode(y, mode, update_function) # preprocess x try: x.drop(columns='scenario', inplace=True) except KeyError: pass x = x.reset_index(drop=True) x_float = x.select_dtypes([np.float, np.float32, np.float64, float]) self.x_float = x_float.values self.x_float_colums = x_float.columns.values x_int = x.select_dtypes([np.int, np.int32, np.int64, int]) self.x_int = x_int.values self.x_int_columns = x_int.columns.values self.x_numeric_columns = np.concatenate([self.x_float_colums, self.x_int_columns]) x_nominal = x.select_dtypes(exclude=np.number) # filter out dimensions with only single value for column in x_nominal.columns.values: if np.unique(x[column]).shape == (1,): x = x.drop(column, axis=1) _logger.info(("{} dropped from analysis " "because only a single category").format(column)) x_nominal = x.select_dtypes(exclude=np.number) self.x_nominal = x_nominal.values self.x_nominal_columns = x_nominal.columns.values self.n_cols = x.columns.shape[0] for column in self.x_nominal_columns: x[column] = x[column].astype('category') self.x = x self.y = y self.mode = mode self._update_yi_remaining = self._update_functions[update_function] if len(self.y.shape) > 1: raise PrimException("y is not a 1-d array") if self.y.shape[0] != len(self.x): raise PrimException("len(y) != len(x)") # store the remainder of the parameters self.paste_alpha = paste_alpha self.peel_alpha = peel_alpha self.mass_min = mass_min self.threshold = threshold self.threshold_type = threshold_type self.obj_func = self._obj_functions[obj_function] # set the indices self.yi = x.index.values # how many data points do we have self.n = self.y.shape[0] # how many cases of interest do we have? self.t_coi = self.determine_coi(self.yi) # initial box that contains all data self.box_init = sdutil._make_box(self.x) # make a list in which the identified boxes can be put self._boxes = [] self._update_yi_remaining(self) @property def boxes(self): boxes = [box.box_lim for box in self._boxes] if not boxes: return [self.box_init] return boxes @property def stats(self): stats = [] items = ['coverage', 'density', 'mass', 'res_dim'] for box in self._boxes: stats.append({key: getattr(box, key) for key in items}) return stats def find_box(self): '''Execute one iteration of the PRIM algorithm. That is, find one box, starting from the current state of Prim.''' # set the indices self._update_yi_remaining(self) # make boxes already found immutable for box in self._boxes: box._frozen = True if self.yi_remaining.shape[0] == 0: _logger.info("no data remaining") return # log how much data and how many coi are remaining _logger.info( self.message.format( self.yi_remaining.shape[0], self.determine_coi( self.yi_remaining))) # make a new box that contains all the remaining data points box = PrimBox(self, self.box_init, self.yi_remaining[:]) # perform peeling phase box = self._peel(box) _logger.debug("peeling completed") # perform pasting phase box = self._paste(box) _logger.debug("pasting completed") message = ("mean: {0}, mass: {1}, coverage: {2}, " "density: {3} restricted_dimensions: {4}") message = message.format(box.mean, box.mass, box.coverage, box.density, box.res_dim) if (self.threshold_type == ABOVE) &\ (box.mean >= self.threshold): _logger.info(message) self._boxes.append(box) return box elif (self.threshold_type == BELOW) &\ (box.mean <= self.threshold): _logger.info(message) self._boxes.append(box) return box else: # make a dump box _logger.info(('box does not meet threshold criteria, ' 'value is {}, returning dump box').format( box.mean)) box = PrimBox(self, self.box_init, self.yi_remaining[:]) self._boxes.append(box) return box
[docs] def determine_coi(self, indices): ''' Given a set of indices on y, how many cases of interest are there in this set. Parameters ---------- indices: ndarray a valid index for y Returns ------- int the number of cases of interest. Raises ------ ValueError if threshold_type is not either ABOVE or BELOW ''' y = self.y[indices] if self.threshold_type == ABOVE: coi = y[y >= self.threshold].shape[0] elif self.threshold_type == BELOW: coi = y[y <= self.threshold].shape[0] else: raise ValueError("threshold type is not one of ABOVE or BELOW") return coi
def _update_yi_remaining_default(self): ''' Update yi_remaining in light of the state of the boxes associated with this prim instance. ''' # set the indices logical = np.ones(self.yi.shape[0], dtype=np.bool) for box in self._boxes: logical[box.yi] = False self.yi_remaining = self.yi[logical] def _update_yi_remaining_guivarch(self): ''' Update yi_remaining in light of the state of the boxes associated with this prim instance using the modified version from Guivarch et al (2016) doi:10.1016/j.envsoft.2016.03.006 ''' # set the indices for box in self._boxes: self.y[box.yi] = 0 self.yi_remaining = self.yi def _peel(self, box): ''' Executes the peeling phase of the PRIM algorithm. Delegates peeling to data type specific helper methods. ''' mass_old = box.yi.shape[0] / self.n x_float = self.x_float[box.yi] x_int = self.x_int[box.yi] x_nominal = self.x_nominal[box.yi] # identify all possible peels possible_peels = [] for x, columns, dtype, in [(x_float, self.x_float_colums, 'float'), (x_int, self.x_int_columns, 'int'), (x_nominal, self.x_nominal_columns, 'object')]: for j, u in enumerate(columns): peels = self._peels[dtype](self, box, u, j, x) [possible_peels.append(entry) for entry in peels] if not possible_peels: # there is no peel identified, so return box return box # determine the scores for each peel in order # to identify the next candidate box scores = [] for entry in possible_peels: i, box_lim = entry obj = self.obj_func(self, self.y[box.yi], self.y[i]) non_res_dim = self.n_cols -\ sdutil._determine_nr_restricted_dims(box_lim, self.box_init) score = (obj, non_res_dim, box_lim, i) scores.append(score) scores.sort(key=itemgetter(0, 1), reverse=True) entry = scores[0] obj_score = entry[0] box_new, indices = entry[2:] mass_new = self.y[indices].shape[0] / self.n if (mass_new >= self.mass_min) &\ (mass_new < mass_old) &\ (obj_score > 0): box.update(box_new, indices) return self._peel(box) else: # else return received box return box def _real_peel(self, box, u, j, x): ''' returns two candidate new boxes, peel along upper and lower dimension Parameters ---------- box : a PrimBox instance u : str the uncertainty for which to peel j : int column for which to peel x : ndarray Returns ------- tuple two box lims and the associated indices ''' peels = [] for direction in ['upper', 'lower']: xj = x[:, j] peel_alpha = self.peel_alpha i = 0 if direction == 'upper': peel_alpha = 1 - self.peel_alpha i = 1 box_peel = get_quantile(xj, peel_alpha) if direction == 'lower': logical = xj >= box_peel indices = box.yi[logical] if direction == 'upper': logical = xj <= box_peel indices = box.yi[logical] temp_box = copy.deepcopy(box.box_lims[-1]) temp_box.loc[i, u] = box_peel peels.append((indices, temp_box)) return peels def _discrete_peel(self, box, u, j, x): ''' returns two candidate new boxes, peel along upper and lower dimension Parameters ---------- box : a PrimBox instance u : str the uncertainty for which to peel j : int column for which to peel x : ndarray Returns ------- tuple two box lims and the associated indices ''' peels = [] for direction in ['upper', 'lower']: peel_alpha = self.peel_alpha xj = x[:, j] box_lim = box.box_lims[-1] i = 0 if direction == 'upper': peel_alpha = 1 - self.peel_alpha i = 1 box_peel = get_quantile(xj, peel_alpha) box_peel = int(box_peel) # determine logical associated with peel value if direction == 'lower': if box_peel == box_lim.loc[i, u]: logical = (xj > box_lim.loc[i, u]) &\ (xj <= box_lim.loc[i + 1, u]) else: logical = (xj >= box_peel) &\ (xj <= box_lim.loc[i + 1, u]) if direction == 'upper': if box_peel == box_lim.loc[i, u]: logical = (xj < box_lim.loc[i, u]) &\ (xj >= box_lim.loc[i - 1, u]) else: logical = (xj <= box_peel) &\ (xj >= box_lim.loc[i - 1, u]) # determine value of new limit given logical if xj[logical].shape[0] == 0: if direction == 'upper': new_limit = np.max(xj) else: new_limit = np.min(xj) else: if direction == 'upper': new_limit = np.max(xj[logical]) else: new_limit = np.min(xj[logical]) indices = box.yi[logical] temp_box = copy.deepcopy(box_lim) temp_box.loc[i, u] = new_limit peels.append((indices, temp_box)) return peels def _categorical_peel(self, box, u, j, x): ''' returns candidate new boxes for each possible removal of a single category. So. if the box[u] is a categorical variable with 4 categories, this method will return 4 boxes. Parameters ---------- box : a PrimBox instance u : str the uncertainty for which to peel j : int column for which to peel x : ndarray Returns ------- tuple a list of box lims and the associated indices ''' entries = box.box_lims[-1].loc[0, u] if len(entries) > 1: peels = [] for entry in entries: bools = [] temp_box = box.box_lims[-1].copy() peel = copy.deepcopy(entries) peel.discard(entry) temp_box[u] = [peel, peel] if type(list(entries)[0]) not in (str, float, int, bool): for element in x[:, j]: if element != entry: bools.append(True) else: bools.append(False) logical = np.asarray(bools, dtype=bool) else: logical = x[:, j] != entry indices = box.yi[logical] peels.append((indices, temp_box)) return peels else: # no peels possible, return empty list return [] def _paste(self, box): ''' Executes the pasting phase of the PRIM. Delegates pasting to data type specific helper methods.''' mass_old = box.yi.shape[0] / self.n # need to break this down by dtype restricted_dims = sdutil._determine_restricted_dims(box.box_lims[-1], self.box_init) res_dim = set(restricted_dims) x = self.x.loc[self.yi_remaining, :] # identify all possible pastes possible_pastes = [] for columns, dtype, in [(self.x_float_colums, 'float'), (self.x_int_columns, 'int'), (self.x_nominal_columns, 'object')]: for i, u in enumerate(columns): if u not in res_dim: continue _logger.debug("pasting " + u) pastes = self._pastes[dtype](self, box, u, x, restricted_dims) [possible_pastes.append(entry) for entry in pastes] if not possible_pastes: # there is no peel identified, so return box return box # determine the scores for each peel in order # to identify the next candidate box scores = [] for entry in possible_pastes: i, box_lim = entry obj = self.obj_func(self, self.y[box.yi], self.y[i]) non_res_dim = len(x.columns) -\ sdutil._determine_nr_restricted_dims(box_lim, self.box_init) score = (obj, non_res_dim, box_lim, i) scores.append(score) scores.sort(key=itemgetter(0, 1), reverse=True) entry = scores[0] obj, _, box_new, indices = entry mass_new = self.y[indices].shape[0] / self.n mean_old = np.mean(self.y[box.yi]) mean_new = np.mean(self.y[indices]) if (mass_new >= self.mass_min) &\ (mass_new > mass_old) &\ (obj > 0) &\ (mean_new > mean_old): box.update(box_new, indices) return self._paste(box) else: # else return received box return box def _real_paste(self, box, u, x, resdim): ''' returns two candidate new boxes, pasted along upper and lower dimension Parameters ---------- box : a PrimBox instance u : str the uncertainty for which to peel x : ndarray Returns ------- tuple two box lims and the associated indices ''' pastes = [] boxlim = box.box_lims[-1] for i, direction in enumerate(['lower', 'upper']): box_paste = boxlim.copy() # box containing data candidate for pasting paste_box = boxlim.copy() minimum, maximum = self.box_init[u].values if direction == 'lower': paste_box.loc[:, u] = minimum, box_paste.loc[0, u] indices = sdutil._in_box(x[resdim], paste_box[resdim]) data = x.loc[indices, u] paste_value = minimum if data.size > 0: paste_value = get_quantile(data, 1 - self.paste_alpha) assert paste_value <= boxlim.loc[i, u] else: # direction == 'upper': paste_box.loc[:, u] = paste_box.loc[1, u], maximum indices = sdutil._in_box(x[resdim], paste_box[resdim]) data = x.loc[indices, u] paste_value = maximum if data.size > 0: paste_value = get_quantile(data, self.paste_alpha) assert paste_value >= box.box_lims[-1].loc[i, u] dtype = box_paste[u].dtype if dtype == np.int32: paste_value = np.int(paste_value) box_paste.loc[i, u] = paste_value logical = sdutil._in_box(x[resdim], box_paste[resdim]) indices = self.yi_remaining[logical] pastes.append((indices, box_paste)) return pastes def _categorical_paste(self, box, u, x, resdim): ''' Return a list of pastes, equal to the number of classes currently not on the box lim. Parameters ---------- box : a PrimBox instance u : str the uncertainty for which to peel x : ndarray Returns ------- tuple a list of box lims and the associated indices ''' box_lim = box.box_lims[-1] c_in_b = box_lim.loc[0, u] c_t = self.box_init.loc[0, u] if len(c_in_b) < len(c_t): pastes = [] possible_cs = c_t - c_in_b for entry in possible_cs: paste = copy.deepcopy(c_in_b) paste.add(entry) box_paste = box_lim.copy() box_paste.loc[:, u] = [paste, paste] indices = sdutil._in_box(x[resdim], box_paste[resdim]) indices = self.yi_remaining[indices] pastes.append((indices, box_paste)) return pastes else: # no pastes possible, return empty list return [] def _lenient1_obj_func(self, y_old, y_new): r''' the default objective function used by prim, instead of the original objective function, This function can cope with continuous, integer, and categorical uncertainties. The basic idea is that the gain in mean is divided by the loss in mass. .. math:: obj = \frac {\text{ave} [y_{i}\mid x_{i}\in{B-b}] - \text{ave} [y\mid x\in{B}]} {|n(y_{i})-n(y)|} where :math:`B-b` is the set of candidate new boxes, :math:`B` the old box and :math:`y` are the y values belonging to the old box. :math:`n(y_{i})` and :math:`n(y)` are the cardinality of :math:`y_{i}` and :math:`y` respectively. So, this objective function looks for the difference between the mean of the old box and the new box, divided by the change in the number of data points in the box. This objective function offsets a problem in case of categorical data where the normal objective function often results in boxes mainly based on the categorical data. ''' mean_old = np.mean(y_old) if y_new.shape[0] > 0: mean_new = np.mean(y_new) else: mean_new = 0 obj = 0 if mean_old != mean_new: if y_old.shape[0] > y_new.shape[0]: obj = (mean_new - mean_old) / (y_old.shape[0] - y_new.shape[0]) elif y_old.shape[0] < y_new.shape[0]: obj = (mean_new - mean_old) / (y_new.shape[0] - y_old.shape[0]) else: raise PrimException( '''mean is different {} vs {}, while shape is the same, this cannot be the case'''.format( mean_old, mean_new)) return obj def _lenient2_obj_func(self, y_old, y_new): ''' friedman and fisher 14.6 ''' mean_old = np.mean(y_old) if y_new.shape[0] > 0: mean_new = np.mean(y_new) else: mean_new = 0 obj = 0 if mean_old != mean_new: if y_old.shape == y_new.shape: raise PrimException( '''mean is different {} vs {}, while shape is the same, this cannot be the case'''.format( mean_old, mean_new)) change_mean = mean_new - mean_old change_mass = abs(y_old.shape[0] - y_new.shape[0]) mass_new = y_new.shape[0] obj = mass_new * change_mean / change_mass return obj def _original_obj_func(self, y_old, y_new): ''' The original objective function: the mean of the data inside the box''' if y_new.shape[0] > 0: return np.mean(y_new) else: return -1 def _assert_mode(self, y, mode, update_function): if mode == sdutil.RuleInductionType.BINARY: return set(np.unique(y)) == {0, 1} if update_function == 'guivarch': return False return True def _assert_dtypes(self, keys, dtypes): ''' helper fucntion that checks whether none of the provided keys has a dtype object as value. ''' for key in keys: if dtypes[key][0] == np.dtype(object): raise EMAError( "%s has dtype object and can thus not be rotated" % key) return True _peels = {'object': _categorical_peel, 'int': _discrete_peel, 'float': _real_peel} _pastes = {'object': _categorical_paste, 'int': _real_paste, 'float': _real_paste} # dict with the various objective functions available # todo:: move functions themselves to ENUM? _obj_functions = {PRIMObjectiveFunctions.LENIENT2: _lenient2_obj_func, PRIMObjectiveFunctions.LENIENT1: _lenient1_obj_func, PRIMObjectiveFunctions.ORIGINAL: _original_obj_func} _update_functions = {'default': _update_yi_remaining_default, 'guivarch': _update_yi_remaining_guivarch}