import seaborn as sns
import pandas
import numpy
import warnings
from ..workbench.analysis import feature_scoring
from ..viz import heatmap_table
from ..scope.box import Box
from ..util.arg_processing import design_check
from ..exceptions import MissingMeasuresWarning, MissingMeasuresError
[docs]def feature_scores(
scope,
design,
return_type='styled',
db=None,
random_state=None,
cmap='viridis',
measures=None,
shortnames=None,
):
"""
Calculate feature scores based on a design of experiments.
Args:
scope (emat.Scope): The scope that defines this analysis.
design (str or pandas.DataFrame): The name of the design
of experiments to use for feature scoring, or a single
pandas.DataFrame containing the experimental design and
results.
return_type ({'styled', 'figure', 'dataframe'}):
The format to return, either a heatmap figure as an SVG
render in and xmle.Elem, or a plain pandas.DataFrame,
or a styled dataframe.
db (emat.Database): If `design` is given as a string,
extract the experiments from this database.
random_state (int or numpy.RandomState, optional):
Random state to use.
cmap (string or colormap, default 'viridis'): matplotlib
colormap to use for rendering.
measures (Collection, optional): The performance measures
on which feature scores are to be generated. By default,
all measures are included.
shortnames (Scope or callable):
If given, use this function to convert the measure
names into more readable `shortname` values from the
scope, or by using a function that maps measures
names to something else.
Returns:
xmle.Elem or pandas.DataFrame:
Returns a rendered SVG as xml, or a DataFrame,
depending on the `return_type` argument.
This function internally uses feature_scoring from the EMA Workbench,
which in turn scores features using the "extra trees" regression
approach.
"""
design = design_check(design, scope, db)
# Split design into inputs and outcomes
inputs = design[[c for c in design.columns if c in scope.get_parameter_names()]]
outcomes = design[[c for c in design.columns if c in scope.get_measure_names()]]
# Remove experiments that have no outcomes in any dimension
missing_outcomes_experiments = outcomes.isna().all(axis=1)
if missing_outcomes_experiments.any():
n_missing = missing_outcomes_experiments.sum()
if n_missing == len(outcomes):
raise MissingMeasuresError(
f"All {n_missing} experiments have no outcomes",
)
inputs = inputs[~missing_outcomes_experiments]
outcomes = outcomes[~missing_outcomes_experiments]
warnings.warn(
f"There are {n_missing} experiments "
f"with no outcomes, ignoring them",
stacklevel=2,
category=MissingMeasuresWarning,
)
# prepare to remove input columns with NaN's
drop_inputs = list(inputs.columns[pandas.isna(inputs).sum()>0])
# prepare to remove constant inputs
for c in scope.get_constant_names():
if c in inputs.columns and c not in drop_inputs:
drop_inputs.append(c)
# prepare to remove outcome columns with NaN's,
drop_outcomes = list(outcomes.columns[pandas.isna(outcomes).sum()>0])
# prepare to remove outcomes that have been removed from the scope
scope_measures = set(scope.get_measure_names())
for c in outcomes.columns:
if c not in scope_measures and c not in drop_outcomes:
drop_outcomes.append(c)
if measures is not None and c not in measures and c not in drop_outcomes:
drop_outcomes.append(c)
# execute removals
outcomes_ = outcomes.drop(columns=drop_outcomes)
inputs_ = inputs.drop(columns=drop_inputs)
# use workbench to compute feature scores
fs = feature_scoring.get_feature_scores_all(inputs_, outcomes_, random_state=random_state)
# restore original row/col ordering
orig_col_order = [c for c in outcomes.columns if c in scope_measures]
fs = fs.reindex(index=inputs.columns, columns=orig_col_order)
# remove columns and rows that are entirely NA
drop_c = list(fs.columns[(~pandas.isna(fs)).sum() == 0])
drop_r = list(fs.index[(~pandas.isna(fs)).sum(axis=1) == 0])
fs = fs.drop(index=drop_r, columns=drop_c)
if shortnames is not None:
if shortnames is True:
shortnames = scope
from ..scope.scope import Scope
if isinstance(shortnames, Scope):
fs.columns = fs.columns.map(shortnames.shortname)
else:
fs.columns = fs.columns.map(shortnames)
# convert colormap to a light color palette for rendered outputs
if 'figure' in return_type.lower() or 'styled' in return_type.lower():
try:
cmap = sns.light_palette(cmap, as_cmap=True)
except ValueError:
pass
# create output based on `return_type`
if 'figure' in return_type.lower():
return heatmap_table(
fs.T,
xlabel='Model Parameters', ylabel='Performance Measures',
title='Feature Scoring' + (f' [{design.design_name_}]' if design.design_name_ else ''),
cmap=cmap,
)
elif return_type.lower() == 'styled':
from ..util.styling import feature_score_styling
return feature_score_styling(fs.T, cmap=cmap)
else:
return fs.T
def box_feature_scores(
scope,
box,
design,
return_type='styled',
db=None,
random_state=None,
cmap='viridis',
exclude_measures=True,
):
"""
Calculate feature scores for a box, based on a design of experiments.
Args:
scope (emat.Scope): The scope that defines this analysis.
box (emat.Box): The box the defines the target feature.
design (str or pandas.DataFrame): The name of the design of experiments
to use for feature scoring, or a single pandas.DataFrame containing the
experimental design and results.
return_type ({'styled', 'figure', 'dataframe'}):
The format to return, either a heatmap figure as an SVG render in and
xmle.Elem, or a plain pandas.DataFrame, or a styled dataframe.
db (emat.Database): If `design` is given as a string, extract the experiments
from this database.
random_state (int or numpy.RandomState, optional):
Random state to use.
cmap (string or colormap, default 'viridis'): matplotlib colormap
to use for rendering.
exclude_measures (bool, default True): Exclude measures from feature scoring.
Returns:
xmle.Elem or pandas.DataFrame:
Returns a rendered SVG as xml, or a DataFrame,
depending on the `return_type` argument.
This function internally uses feature_scoring from the EMA Workbench, which in turn
scores features using the "extra trees" classification approach.
"""
design = design_check(design, scope, db)
if exclude_measures:
if not set(box.thresholds.keys()).intersection(scope.get_measure_names()):
raise ValueError('no measures in box thresholds')
target = box.inside(design)
return target_feature_scores(
scope,
target,
design,
return_type=return_type,
db=db,
random_state=random_state,
cmap=cmap,
exclude_measures=exclude_measures,
exclude_parameters=box.thresholds.keys(),
)
def target_feature_scores(
scope,
target,
design,
return_type='styled',
db=None,
random_state=None,
cmap='viridis',
exclude_measures=True,
exclude_parameters=None,
):
"""
Calculate feature scores for a target selection, based on a design of experiments.
Args:
scope (emat.Scope): The scope that defines this analysis.
target (pandas.Series): The target feature, whose dtype should be bool.
design (str or pandas.DataFrame): The name of the design of experiments
to use for feature scoring, or a single pandas.DataFrame containing the
experimental design and results.
return_type ({'styled', 'figure', 'dataframe'}):
The format to return, either a heatmap figure as an SVG render in and
xmle.Elem, or a plain pandas.DataFrame, or a styled dataframe.
db (emat.Database): If `design` is given as a string, extract the experiments
from this database.
random_state (int or numpy.RandomState, optional):
Random state to use.
cmap (string or colormap, default 'viridis'): matplotlib colormap
to use for rendering.
exclude_measures (bool, default True): Exclude measures from feature scoring.
Returns:
xmle.Elem or pandas.DataFrame:
Returns a rendered SVG as xml, or a DataFrame,
depending on the `return_type` argument.
This function internally uses feature_scoring from the EMA Workbench, which in turn
scores features using the "extra trees" classification approach.
"""
import pandas, numpy
if isinstance(design, str):
if db is None:
raise ValueError('must give db to use design name')
design_name = design
design = db.read_experiment_all(scope.name, design)
elif isinstance(design, pandas.DataFrame):
design_name = None
else:
raise TypeError('must name design or give DataFrame')
# remove design columns with NaN's
drop_cols = set(design.columns[pandas.isna(design).sum()>0])
# remove design columns not in the scope
all_names = set(scope.get_all_names())
for c in design.columns:
if c not in all_names:
drop_cols.add(c)
# remove constants
for c in scope.get_constant_names():
if c in design.columns:
drop_cols.add(c)
# remove outcome columns if exclude_measures
if exclude_measures:
for meas in scope.get_measure_names():
if meas in design.columns:
drop_cols.add(meas)
if exclude_parameters is not None:
for meas in design.columns:
if meas in exclude_parameters:
drop_cols.add(meas)
design_ = design.drop(columns=list(drop_cols))
from ..workbench.analysis.scenario_discovery_util import RuleInductionType
target_name = getattr(target, 'name', None)
if not isinstance(target_name, str):
target_name = 'target'
fs = feature_scoring.get_feature_scores_all(
design_,
{target_name:target},
random_state=random_state,
mode=RuleInductionType.CLASSIFICATION,
)
# restore original row/col ordering
# orig_col_order = [c for c in outcomes.columns if c in scope_measures]
# fs = fs.reindex(
# index=design.columns,
# # columns=orig_col_order,
# )
if return_type.lower() in ('figure','styled'):
try:
cmap = sns.light_palette(cmap, as_cmap=True)
except ValueError:
pass
if return_type.lower() == 'figure':
return heatmap_table(
fs.T,
xlabel='Model Parameters', ylabel='Target',
title='Feature Scoring' + (f' [{design_name}]' if design_name else ''),
cmap=cmap,
)
elif return_type.lower() == 'styled':
return fs.T.style.background_gradient(cmap=cmap, axis=1, text_color_threshold=0.5)
else:
return fs.T
def _col_breakpoints(
data_col,
min_tail=5,
max_breaks=20,
break_spacing='linear',
):
arr = numpy.asarray(data_col).flatten()
if arr.size < min_tail*2:
raise ValueError("array too short for `min_tail`")
arr_s = numpy.partition(arr, [min_tail,-min_tail])
lo_end, hi_end = arr_s[[min_tail,-min_tail]]
inside_size = arr.size - (min_tail*2)
if arr.size == min_tail*2:
inside_breaks = 1
else:
inside_breaks = max(min(int(numpy.ceil(inside_size/min_tail)), max_breaks),2)
if break_spacing == 'linear':
return numpy.linspace(lo_end, hi_end, inside_breaks)
elif break_spacing == 'percentile':
qtiles = numpy.linspace(min_tail/arr.size, (arr.size-min_tail)/arr.size, inside_breaks)
return numpy.quantile(arr, qtiles)
raise ValueError(f'unknown `break_spacing` value {break_spacing}')
[docs]def threshold_feature_scores(
scope,
measure_name,
design,
return_type='styled',
*,
db=None,
random_state=None,
cmap='viridis',
z_min=0,
z_max=1,
min_tail=5,
max_breaks=20,
break_spacing='linear',
):
"""
Compute and display thresold feature scores for a performance measure.
This function is useful to detect and understand non-linear relationships
between performance measures and various input parameters.
Args:
scope (emat.Scope): The scope that defines this analysis.
measure_name (str): The name of an individual performance
measure to analyze.
design (str or pandas.DataFrame): The name of the design
of experiments to use for feature scoring, or a single
pandas.DataFrame containing the experimental design and
results.
return_type (str):
The format to return:
- 'dataframe' gives a plain pandas.DataFrame,
- 'styled' gives a colorized pandas.DataFrame,
- 'figure' gives a plotly violin plot,
- 'ridge figure' gives a plotly ridgeline figure.
Either plotly result can optionally have ".svg"
or ".png" added to render a static image in those
formats.
db (emat.Database): If `design` is given as a string,
extract the experiments from this database.
random_state (int or numpy.RandomState, optional):
Random state to use.
cmap (string or colormap, default 'viridis'): matplotlib
colormap to use for rendering. Ignored if `return_type`
is 'dataframe'.
z_min, z_max (float, optional): Trim the bottom and top of
the colormap range, respectively. Defaults to (0,1) which
will make the most relevant overall feature colored at the
top of the colorscale and the least relevant feature at
the bottom.
min_tail (int, default 5): The minimum number of observations
on each side of any threshold point. If this value is too
small, the endpoint feature scoring results are highly
unstable, but if it is too large then important nonlinearities
near the extreme points may not be detected. This is also
used as the minimum average number of observations between
threshold points.
max_breaks (int, default 20): The maximum number of distinct
threshold points to use. Setting this value higher improves
resolution but also requires more computational time.
break_spacing ({'linear', 'percentile'}): How to distribute
threshold breakpoints to test within the min-max range.
Returns:
plotly.graph_objs.Figure or DataFrame or styled DataFrame
"""
design = design_check(design, scope, db)
tracking = {}
breakpoints = _col_breakpoints(
design[measure_name],
min_tail=min_tail,
max_breaks=max_breaks,
break_spacing=break_spacing,
)
for j in breakpoints:
tracking[j] = dict(box_feature_scores(
scope,
Box(name="", lower_bounds={measure_name: j}),
design,
return_type='dataframe',
db=db,
random_state=random_state,
exclude_measures=True,
).iloc[0])
result = pandas.DataFrame(tracking)
name_order = []
for name in scope.get_parameter_names():
if name in result.index:
name_order.append(name)
for name in result.index:
if name not in name_order:
name_order.append(name)
result = result.reindex(index=name_order)
if return_type.lower() == 'styled':
return result.style.background_gradient(cmap=cmap, axis=0, text_color_threshold=0.5)
if 'figure' in return_type.lower():
import plotly.graph_objects as go
from matplotlib import cm
base_score = feature_scores(
scope=scope,
design=design,
return_type='dataframe',
db=None,
random_state=random_state,
measures=[measure_name],
)
traces = []
max_base_score = base_score.max().max()
tick_values = []
tick_labels = []
colormap = getattr(cm, cmap, cm.viridis)
if 'ridge' in return_type.lower():
ridge = True
gap = numpy.percentile(result.values.flatten(), 95)
linewidth = 3
area_alpha = 1.0
else:
ridge = False
gap = numpy.percentile(result.values.flatten(), 95) * 2
linewidth = 2
area_alpha = 1.0
for n_reversed in range(len(result)):
n = len(result) - n_reversed - 1
bs = base_score.loc[measure_name, result.index[n]] / max_base_score
if numpy.isnan(bs):
bs = 0
bs = bs * (z_max-z_min) + z_min
color = colormap(bs, bytes=True)
dark_color, light_color = _darker_and_lighter_color(numpy.asarray(color)/255)
linecolor_ = ", ".join(str(i) for i in dark_color[:3])
areacolor_ = ", ".join(str(i) for i in light_color[:3])
traces.append(
go.Scatter(
y=(numpy.zeros(len(result.columns)) if ridge else -result.iloc[n]) + n * gap,
x=result.columns,
fillcolor='rgba(0,0,0,0)',
visible=True,
showlegend=False,
line=dict(color=f'rgba({linecolor_}, 1.0)', width=0 if ridge else linewidth),
name=result.index[n],
hovertemplate='%{meta}<br>Rel Import: %{customdata:.3f}<extra>'+measure_name+': %{x:.3s}</extra>',
meta=[result.index[n]],
customdata=result.iloc[n],
)
)
traces.append(
go.Scatter(
y=result.iloc[n] + n * gap,
x=result.columns,
fill='tonexty',
name=result.index[n],
fillcolor=f'rgba({areacolor_}, {area_alpha})',
line=dict(color=f'rgba({linecolor_}, 1.0)', width=linewidth),
hovertemplate='%{meta}<br>Rel Import: %{customdata:.3f}<extra>'+measure_name+': %{x:.3s}</extra>',
meta=[result.index[n]],
customdata=result.iloc[n],
)
)
tick_values.append(n * gap + (gap/3 if ridge else 0))
tick_labels.append(result.index[n])
fig = go.Figure()
fig.add_traces(traces)
fig.update_layout(
xaxis_title_text=scope.shortname(measure_name),
yaxis_showgrid=False,
yaxis_zeroline=False,
yaxis_tickvals=tick_values,
yaxis_ticktext=tick_labels,
yaxis_tickmode='array',
showlegend=False,
margin=dict(t=0,b=0,l=0,r=0),
)
from ..util.rendering import render_plotly
return render_plotly(fig, return_type)
return result
def _max_luminosity_color(color, max_lum=0.333, bytes=False):
import matplotlib.colors as mc
import colorsys
try:
c = mc.cnames[color]
except:
c = color
c = colorsys.rgb_to_hls(*mc.to_rgb(c))
new_c = colorsys.hls_to_rgb(c[0], min(c[1], max_lum), c[2])
if bytes:
lev = lambda x: max(0,min(255,int(numpy.round(x*255))))
return tuple(lev(i) for i in new_c)
else:
return new_c
def _darker_and_lighter_color(color, lum_diff=0.3, bytes=False):
import matplotlib.colors as mc
import colorsys
try:
c = mc.cnames[color]
except:
c = color
hls = colorsys.rgb_to_hls(*mc.to_rgb(c))
dark = hls[1] * (1-lum_diff)
light = dark + lum_diff
new_dark_c = colorsys.hls_to_rgb(hls[0], dark, hls[2])
new_light_c = colorsys.hls_to_rgb(hls[0], light, hls[2])
if bytes:
lev = lambda x: max(0,min(255,int(numpy.round(x*255))))
return tuple(lev(i) for i in new_dark_c), tuple(lev(i) for i in new_light_c)
else:
return new_dark_c, new_light_c