"""sqlite_db:
Methods for creating and deleting a sqlite3 database for emat.
A Sqlite3 database is a single file.
The class knows the set of sql files needed to create the necessary tables
"""
import os
from typing import List
import sqlite3
import atexit
import pandas as pd
import warnings
from typing import AbstractSet
import numpy as np
import uuid
import re
from . import sql_queries as sq
from ..database import Database
from ...util.deduplicate import reindex_duplicates
from ...exceptions import DatabaseVersionWarning, DatabaseVersionError, DatabaseError, ReadOnlyDatabaseError
from ...util.loggers import get_module_logger
_logger = get_module_logger(__name__)
import logging
from ...util.docstrings import copydoc
sqlite3.register_adapter(np.int64, int)
def _to_uuid(b):
"""Convert a value to a UUID"""
if isinstance(b, uuid.UUID):
return b
if isinstance(b, bytes):
if len(b)==16:
return uuid.UUID(bytes=b)
else:
try:
return uuid.UUID(b.decode('utf8'))
except:
return uuid.UUID(bytes=b'\xDE\xAD\xBE\xEF' * 4)
if pd.isna(b):
return uuid.UUID(bytes=b'\x00' * 16)
try:
return uuid.UUID(b)
except:
return uuid.UUID(bytes=b'\xDE\xAD\xBE\xEF' * 4)
[docs]class SQLiteDB(Database):
"""
SQLite implementation of the :class:`Database` abstract base class.
Args:
database_path (str, optional): file path and name of database file
If not given, a database is initialized in-memory.
initialize (bool or 'skip', default False):
Whether to initialize emat database file. The value of this argument
is ignored if `database_path` is not given (as in-memory databases
must always be initialized). If given as 'skip' then no setup
scripts are run, and it is assumed that all relevant tables already
exist in the database.
readonly (bool, default False):
Whether to open the database connection in readonly mode.
check_same_thread (bool, default True):
By default, check_same_thread is True and only the creating thread
may use the connection. If set False, the returned connection may be
shared across multiple threads. The dask distributed evaluator has
workers that run code in a separate thread from the model class object,
so setting this to False is necessary to enable SQLite connections on
the workers.
update (bool, default True):
Whether to attempt a database schema update if the open file appears
to be an out-of-date database format.
"""
def __init__(
self,
database_path=":memory:",
initialize=False,
readonly=False,
check_same_thread=True,
update=True,
):
super().__init__(readonly=readonly)
if database_path[-3:] == '.gz':
import tempfile, os, shutil, gzip
if not os.path.isfile(database_path):
raise FileNotFoundError(database_path)
self._tempdir = tempfile.TemporaryDirectory()
tempfilename = os.path.join(self._tempdir.name, os.path.basename(database_path[:-3]))
with open(tempfilename, "wb") as tmp:
shutil.copyfileobj(gzip.open(database_path), tmp)
database_path = tempfilename
elif database_path == 'tempfile':
import tempfile, os
self._tempdir = tempfile.TemporaryDirectory()
database_path = os.path.join(self._tempdir.name, "emat-temp.db")
self.database_path = database_path
if self.database_path == ":memory:":
initialize = True
# in order:
self.modules = {}
if initialize == 'skip':
self.conn = self.__create(
[],
wipe=False,
check_same_thread=check_same_thread,
)
elif initialize:
self.conn = self.__create(
["emat_db_init.sql", "meta_model.sql"],
wipe=True,
check_same_thread=check_same_thread,
)
elif readonly:
self.conn = sqlite3.connect(
f'file:{database_path}?mode=ro',
uri=True,
check_same_thread=check_same_thread,
)
else:
self.conn = self.__create(
["emat_db_init.sql", "meta_model.sql"],
wipe=False,
check_same_thread=check_same_thread,
)
if not readonly:
self.conn.execute("PRAGMA foreign_keys = ON")
with self.conn:
self.conn.cursor().execute(sq.SET_VERSION_DATABASE)
self.conn.cursor().execute(sq.SET_MINIMUM_VERSION_DATABASE)
# Warn if opening a database that requires a more recent version of tmip-emat.
try:
_min_ver = list(self.conn.cursor().execute(sq.GET_MINIMUM_VERSION_DATABASE))
if len(_min_ver):
_min_ver_number = _min_ver[0][0]
from ... import __version__
ver = (np.asarray([int(i.replace('a','')) for i in __version__.split(".")])
@ np.asarray([1000000,1000,1]))
if _min_ver_number > ver:
warnings.warn(
f"Database requires emat version {_min_ver_number}",
category=DatabaseVersionWarning,
)
except:
pass
if update and not self.readonly:
# update old databases
if 'design' in self._raw_query(table='ema_experiment')['name'].to_numpy():
self.update_database(sq.UPDATE_DATABASE_ema_design_experiment)
if 'run_source' not in self._raw_query(table='ema_experiment_run')['name'].to_numpy():
self.update_database(sq.UPDATE_DATABASE_ema_experiment_run_ADD_run_source, on_error='raise')
if (
'scope_id' not in self._raw_query(table='ema_scope')['name'].to_numpy()
or 'measure_source' in self._raw_query(table='ema_experiment_measure')['name'].to_numpy()
or "UNIQUE" not in self._raw_query("SELECT sql FROM sqlite_master WHERE name='ema_scope_measure'").loc[0,'sql']
):
if 'measure_run' not in self._raw_query(table='ema_experiment_measure')['name'].to_numpy():
self.update_database(sq.UPDATE_DATABASE_ema_experiment_measure_ADD_measure_run)
self.__apply_sql_script(self.conn, 'emat_db_rebuild.sql')
try:
self.update_database_for_run_ids()
except:
_logger.exception("UPDATE FAIL")
try:
if not self.readonly:
self.__apply_sql_script(self.conn, 'emat_db_init_views.sql')
except:
_logger.exception("VIEWS FAIL")
atexit.register(self.conn.close)
def __create(self, filenames, wipe=False, check_same_thread=None):
"""
Call sql files to create sqlite database file
"""
# close connection and delete file if exists
if self.database_path != ":memory:" and wipe:
self.__delete_database()
try:
conn = sqlite3.connect(self.database_path, check_same_thread=check_same_thread)
except sqlite3.OperationalError as err:
raise DatabaseError(f'error on connecting to {self.database_path}') from err
for filename in filenames:
self.__apply_sql_script(conn, filename)
return conn
def __apply_sql_script(self, connection, filename):
with connection:
cur = connection.cursor()
_logger.info("running script " + filename)
contents = (
self.__read_sql_file(
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
filename
)
)
)
for q in contents.split(';'):
z = cur.execute(q).fetchall()
if z:
_logger.error(f"Unexpected output in database script {filename}:\n{q}\n{z}")
[docs] def vacuum(self):
"""
Vacuum the SQLite database.
SQLite files grow over time, and may have inefficient allocation of data
as more rows are added. This command may make the database file run faster
or be smaller on disk.
"""
self.conn.cursor().execute('VACUUM')
[docs] def update_database_for_run_ids(self):
"""
Add run_id to runs that do not have one.
This command is for updating older files, and should not be needed
on recently created database files.
"""
# add run_id to runs that do not have one
if 'measure_source' in self._raw_query(table='ema_experiment_measure')['name'].to_numpy():
# measure source and experiment id
experiments_with_missing_run_ids = self._raw_query("""
SELECT DISTINCT experiment_id, measure_source FROM ema_experiment_measure WHERE measure_run IS NULL
""")
experiments_with_missing_run_ids['run_id'] = experiments_with_missing_run_ids['experiment_id'].apply(uuid.uuid1)
experiments_with_missing_run_ids['run_rowid'] = 0
else:
# experiment id only
experiments_with_missing_run_ids = self._raw_query("""
SELECT DISTINCT experiment_id FROM ema_experiment_measure WHERE measure_run IS NULL
""")
experiments_with_missing_run_ids['run_id'] = experiments_with_missing_run_ids.applymap(uuid.uuid1)
experiments_with_missing_run_ids['run_rowid'] = 0
experiments_with_missing_run_ids['measure_source'] = 0
if len(experiments_with_missing_run_ids) == 0:
self.log("found no experiments with missing run_id's")
return
self.log(f"found {len(experiments_with_missing_run_ids)} experiments with missing run_id, updating database")
experiments_with_missing_run_ids['run_id'] = experiments_with_missing_run_ids['run_id'].apply(lambda x: x.bytes)
with self.conn:
cur = self.conn.cursor()
# remove otherwise duplicate rows from ema_experiment_measure
cur.execute("""
DELETE FROM
ema_experiment_measure
WHERE
rowid NOT IN (
SELECT min(rowid)
FROM ema_experiment_measure
GROUP BY
experiment_id,
measure_id,
measure_run
)
""")
# write new run_ids to run table
qry = """
INSERT INTO ema_experiment_run (run_id, experiment_id, run_status, run_valid, run_location, run_source)
VALUES (@run_id, @experiment_id, 'existing', 1, NULL, @measure_source)
"""
for j in experiments_with_missing_run_ids.index:
cur.execute(qry, dict(experiments_with_missing_run_ids.loc[j]))
experiments_with_missing_run_ids.loc[j, 'run_rowid'] = int(cur.lastrowid)
#then update measures table with run ids.
if 'measure_source' in self._raw_query(table='ema_experiment_measure')['name'].to_numpy():
qry = """
UPDATE
ema_experiment_measure
SET
measure_run = @run_rowid
WHERE
measure_run IS NULL
AND experiment_id = @experiment_id
AND measure_source = @measure_source
"""
else:
qry = """
UPDATE
ema_experiment_measure
SET
measure_run = @run_rowid
WHERE
measure_run IS NULL
AND experiment_id = @experiment_id
"""
for j in experiments_with_missing_run_ids.index:
bindings = dict(experiments_with_missing_run_ids.loc[j])
cur.execute(qry, bindings)
[docs] def update_database(self, queries, on_error='ignore'):
"""
Update database for compatability with tmip-emat 0.4 and later
"""
if self.readonly:
raise DatabaseVersionError("cannot open or update an old database in readonly")
else:
warnings.warn(
f"updating database file",
category=DatabaseVersionWarning,
)
with self.conn:
cur = self.conn.cursor()
for u in queries:
try:
cur.execute(u)
except:
if on_error in ('log','raise'):
_logger.error(f"SQL Query:\n{u}\n")
if on_error == 'raise':
raise
def __repr__(self):
scopes = self.read_scope_names()
if len(scopes) == 1:
return f'<emat.SQLiteDB with scope "{scopes[0]}">'
elif len(scopes) == 0:
return f'<emat.SQLiteDB with no scopes>'
elif len(scopes) <= 4:
return f'<emat.SQLiteDB with {len(scopes)} scopes: "{", ".join(scopes[0])}">'
else:
return f'<emat.SQLiteDB with {len(scopes)} scopes>'
def __read_sql_file(self, filename):
"""
helper function to load sql files to create database
"""
sql_file_path = filename
_logger.debug(sql_file_path)
with open(sql_file_path, 'r') as fil:
all_lines = fil.read()
return all_lines
def __delete_database(self):
"""
Delete the sqlite database file
"""
if os.path.exists(self.database_path):
os.remove(self.database_path)
def _raw_query(self, qry=None, table=None, bindings=None):
if qry is None and table is not None:
qry = f"PRAGMA table_info({table});"
with self.conn:
cur = self.conn.cursor()
if isinstance(bindings, pd.DataFrame):
cur.executemany(qry, (dict(row) for _,row in bindings.iterrows()))
elif bindings is None:
cur.execute(qry)
else:
cur.execute(qry, bindings)
try:
cols = ([i[0] for i in cur.description])
except:
df = None
else:
df = pd.DataFrame(cur.fetchall(), columns=cols)
return df
[docs] def get_db_info(self):
"""
Get a short string describing this Database
Returns:
str
"""
return f"SQLite @ {self.database_path}"
[docs] def init_xlm(self, parameter_list, measure_list):
"""
Initialize or extend set of experiment variables and measures
Initialize database with universe of risk variables,
policy variables, and performance measures. All variables and measures
defined in scopes must be defined in this set.
This method only needs to be run
once after creating a new database.
Args:
parameter_list (List[tuple]):
Experiment variable tuples (variable name, type)
where variable name is a string and
type is 'uncertainty', 'lever', or 'constant'
measure_list (List[tuple]):
Performance measure tuples (name, transform), where
name is a string and transform is a defined transformation
used in metamodeling, currently supported include {'log', None}.
"""
with self.conn:
cur = self.conn.cursor()
# experiment variables - description and type (risk or strategy)
for xl in parameter_list:
cur.execute(sq.CONDITIONAL_INSERT_XL, xl)
# performance measures - description
for m in measure_list:
cur.execute(sq.CONDITIONAL_INSERT_M, m)
def _write_scope(self, scope_name, sheet, scp_xl, scp_m, content=None):
"""
Save the emat scope information to the database.
Generally users should not call this function directly,
use `store_scope` instead.
Args:
scope_name (str):
The scope name, used to identify experiments,
performance measures, and results associated with this model.
Multiple scopes can be stored in the same database.
sheet (str):
Yaml file name with scope definition.
scp_xl (List[str]):
Scope parameter names - both uncertainties and policy levers
scp_m (List[str]):
Scope performance measure names
content (Scope, optional):
Scope object to be pickled and stored in the database.
Raises:
KeyError:
If scope name already exists, the scp_vars are not
available, or the performance measures are not initialized
in the database.
"""
if self.readonly:
raise ReadOnlyDatabaseError
with self.conn:
cur = self.conn.cursor()
if content is not None:
import gzip, cloudpickle
blob = gzip.compress(cloudpickle.dumps(content))
else:
blob = None
try:
cur.execute(sq.INSERT_SCOPE, [scope_name, sheet, blob])
except sqlite3.IntegrityError:
raise KeyError(f'scope named "{scope_name}" already exists')
for xl in scp_xl:
cur.execute(sq.INSERT_SCOPE_XL, [scope_name, xl])
if cur.rowcount < 1:
raise KeyError('Experiment Variable {0} not present in database'
.format(xl))
for m in scp_m:
cur.execute(sq.INSERT_SCOPE_M, [scope_name, m])
if cur.rowcount < 1:
raise KeyError('Performance measure {0} not present in database'
.format(m))
[docs] def update_scope(self, scope):
"""
Update the emat scope information in the database.
Args:
scope (Scope): scope to update
"""
if self.readonly:
raise ReadOnlyDatabaseError
from ...scope.scope import Scope
assert isinstance(scope, Scope)
scope_name = scope.name
with self.conn:
cur = self.conn.cursor()
if scope is not None:
import gzip, cloudpickle
blob = gzip.compress(cloudpickle.dumps(scope))
else:
raise ValueError("unpickleable scope")
cur.execute(
sq.UPDATE_SCOPE_CONTENT,
{'scope_name':scope_name, 'scope_pickle':blob},
)
for xl in scope.get_uncertainties()+scope.get_levers():
cur.execute(
sq.CONDITIONAL_INSERT_XL,
(xl.name, xl.ptype),
)
cur.execute(
sq.INSERT_SCOPE_XL.replace("INSERT", "INSERT OR IGNORE"),
[scope_name, xl.name],
)
for m in scope.get_measures():
cur.execute(
sq.CONDITIONAL_INSERT_M,
(m.name, m.transform),
)
cur.execute(
sq.INSERT_SCOPE_M.replace("INSERT", "INSERT OR IGNORE"),
[scope_name, m.name],
)
[docs] @copydoc(Database.store_scope)
def store_scope(self, scope):
"""
Save an emat.Scope directly to the database.
Args:
scope (Scope): The scope object to store.
Raises:
KeyError: If scope name already exists.
"""
if self.readonly:
raise ReadOnlyDatabaseError
return scope.store_scope(self)
[docs] def read_scope(self, scope_name=None):
"""
Load the pickled scope from the database.
Args:
scope_name (str, optional):
The name of the scope to load. If not
given and there is only one scope stored
in the database, that scope is loaded. If not
given and there are multiple scopes stored in
the database, a KeyError is raised.
Returns:
Scope
Raises:
KeyError: If a name is given but is not found in
the database, or if no name is given but there
is more than one scope stored.
"""
cur = self.conn.cursor()
if scope_name is None:
scope_names = self.read_scope_names()
if len(scope_names) > 1:
raise ValueError("multiple scopes stored in database, you must identify one:\n -"+"\n -".join(scope_names))
elif len(scope_names) == 0:
raise ValueError("no scopes stored in database")
else:
scope_name = scope_names[0]
try:
blob = cur.execute(sq.GET_SCOPE, [scope_name]).fetchall()[0][0]
except IndexError:
raise KeyError(f"scope '{scope_name}' not found")
if blob is None:
return blob
import gzip, cloudpickle
try:
return cloudpickle.loads(gzip.decompress(blob))
except ModuleNotFoundError as err:
if "ema_workbench" in err.msg:
import sys
from ... import workbench
sys.modules['ema_workbench'] = workbench
return cloudpickle.loads(gzip.decompress(blob))
else:
raise
[docs] @copydoc(Database.add_scope_meas)
def add_scope_meas(self, scope_name, scp_m):
raise NotImplementedError("deprecated: use update_scope instead")
[docs] @copydoc(Database.delete_scope)
def delete_scope(self, scope_name):
if self.readonly:
raise ReadOnlyDatabaseError
with self.conn:
cur = self.conn.cursor()
cur.execute(sq.DELETE_SCOPE, [scope_name])
[docs] def write_experiment_parameters(
self,
scope_name,
design_name,
xl_df,
force_ids=False,
):
"""
Write experiment definitions the the database.
This method records values for each experiment parameter,
for each experiment in a design of one or more experiments.
Args:
scope_name (str):
A scope name, used to identify experiments,
performance measures, and results associated with this
exploratory analysis. The scope with this name should
already have been stored in this database.
design_name (str):
An experiment design name. This name should be unique
within the named scope, and typically will include a
reference to the design sampler, for example:
'uni' - generated by univariate sensitivity test design
'lhs' - generated by latin hypercube sample design
The design_name is used primarily to load groups of
related experiments together.
xl_df (pandas.DataFrame):
The columns of this DataFrame are the experiment
parameters (i.e. policy levers, uncertainties, and
constants), and each row is an experiment.
force_ids (bool, default False):
For the experiment id's saved into the database to match
the id's in the index of `xl_df`, or raise an error if
this cannot be completed, either because that id is in
use for a different experiment, or because this experiment
is already saved with a different id.
Returns:
list: the experiment id's of the newly recorded experiments
Raises:
UserWarning: If scope name does not exist
TypeError: If not all scope variables are defined in the
exp_def
ValueError: If `force_ids` is True but the same experiment
already has a different id.
sqlite3.IntegrityError: If `force_ids` is True but a different
experiment is already using the given id.
"""
if self.readonly:
raise ReadOnlyDatabaseError
if design_name is None:
design_name = 'ad hoc'
if isinstance(design_name, dict):
design_name_map = {}
for k,v in design_name.items():
if isinstance(v, str):
from ...util.seq_grouping import seq_int_group_expander
design_name_map[k] = seq_int_group_expander(v)
else:
design_name_map[k] = v
else:
design_name_map = {design_name: xl_df.index}
with self.conn:
scope_name = self._validate_scope(scope_name, 'design_name')
# local cursor because we'll depend on lastrowid
fcur = self.conn.cursor()
# get list of experiment variables - except "one"
scp_xl = fcur.execute(sq.GET_SCOPE_XL, [scope_name]).fetchall()
if len(scp_xl) == 0:
raise UserWarning('named scope {0} not found - experiments will \
not be recorded'.format(scope_name))
for k in design_name_map:
try:
fcur.execute(sq.INSERT_DESIGN, [scope_name, k])
except sqlite3.OperationalError as err:
raise DatabaseError from err
### split experiments into novel and duplicate ###
# first join to existing experiments
existing_experiments = self.read_experiment_parameters(scope_name, None)
combined_experiments = pd.concat([
existing_experiments,
xl_df.set_index(np.full(len(xl_df), -1, dtype=int)),
])
combined_experiments_reindexed = reindex_duplicates(combined_experiments)
xl_df_ = combined_experiments_reindexed.iloc[-len(xl_df):]
novel_flag = xl_df_.index.isin([-1])
novel_experiments = xl_df.loc[novel_flag]
duplicate_experiments = xl_df_.loc[~novel_flag]
ex_ids = []
for ex_id_as_input, row in novel_experiments.iterrows():
if force_ids:
# create new experiment and set id
fcur.execute(sq.INSERT_EXPERIMENT_WITH_ID, [scope_name, ex_id_as_input])
ex_id = ex_id_as_input
else:
# create new experiment and get id
fcur.execute(sq.INSERT_EXPERIMENT, [scope_name])
ex_id = fcur.lastrowid
# set each from experiment defitinion
for xl in scp_xl:
try:
value = row[xl[0]]
fcur.execute(sq.INSERT_EX_XL, [ex_id, value, xl[0]])
except TypeError:
_logger.error(f'Experiment definition missing {xl[0]} variable')
raise
# Add this experiment id to this design
ex_ids.append(ex_id)
for design_name_, design_experiment_ids in design_name_map.items():
if ex_id_as_input in design_experiment_ids:
try:
fcur.execute(sq.INSERT_DESIGN_EXPERIMENT, [scope_name, design_name_, ex_id])
except Exception as err:
_logger.error(str(err))
_logger.error(f"scope_name, design_name, ex_id= {scope_name, design_name_, ex_id}")
raise
for ex_id, ex_id_as_input in zip(duplicate_experiments.index, xl_df.loc[~novel_flag].index):
if force_ids and ex_id != ex_id_as_input:
raise ValueError(f"cannot change experiment id {ex_id_as_input} to {ex_id}")
# Add this experiment id to this design
ex_ids.append(ex_id)
for design_name_, design_experiment_ids in design_name_map.items():
if ex_id_as_input in design_experiment_ids:
try:
fcur.execute(sq.INSERT_DESIGN_EXPERIMENT, [scope_name, design_name_, ex_id])
except Exception as err:
_logger.error(str(err))
_logger.error(f"scope_name, design_name, ex_id= {scope_name, design_name_, ex_id}")
raise
return ex_ids
[docs] def read_experiment_id(self, scope_name, *args, **kwargs):
"""
Read the experiment id previously defined in the database
Args:
scope_name (str): scope name, used to identify experiments,
performance measures, and results associated with this run
parameters (dict): keys are experiment parameters, values are the
experimental values to look up. Subsequent positional or keyword
arguments are used to update parameters.
Returns:
int: the experiment id of the identified experiment
Raises:
ValueError: If scope name does not exist
ValueError: If multiple experiments match an experiment definition.
This can happen, for example, if the definition is incomplete.
"""
scope_name = self._validate_scope(scope_name, 'design_name')
parameters = {}
for a in args:
if a is not None:
parameters.update(a)
parameters.update(kwargs)
xl_df = pd.DataFrame(parameters, index=[0])
result = self.read_experiment_ids(scope_name, xl_df)
return result[0]
[docs] def get_experiment_id(self, scope_name=None, *args, **kwargs):
"""
Read or create an experiment id in the database.
Args:
scope_name (str): scope name, used to identify experiments,
performance measures, and results associated with this run
parameters (dict): keys are experiment parameters, values are the
experimental values to look up. Subsequent positional or keyword
arguments are used to update parameters.
Returns:
int: the experiment id of the identified experiment
Raises:
ValueError: If scope name does not exist
ValueError: If multiple experiments match an experiment definition.
This can happen, for example, if the definition is incomplete.
"""
scope_name = self._validate_scope(scope_name, 'design_name')
from ...exceptions import MissingIdWarning
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=MissingIdWarning)
ex_id = self.read_experiment_id(scope_name, *args, **kwargs)
if ex_id is None:
parameters = self.read_scope(scope_name).get_parameter_defaults()
for a in args:
if a is not None:
parameters.update(a)
parameters.update(kwargs)
df = pd.DataFrame(parameters, index=[0])
ex_id = self.write_experiment_parameters(scope_name, None, df)[0]
return ex_id
[docs] def set_experiment_id(self, scope_name=None, experiment_id=None, *args, **kwargs):
"""
Set an experiment id in the database.
Args:
scope_name (str): scope name, used to identify experiments,
performance measures, and results associated with this run
parameters (dict): keys are experiment parameters, values are the
experimental values to look up. Subsequent positional or keyword
arguments are used to update parameters.
Returns:
int: the experiment id of the identified experiment
Raises:
ValueError:
If scope name does not exist, or another experiments
already has this id, or this experiment already has a
different id.
"""
scope_name = self._validate_scope(scope_name, 'design_name')
from ...exceptions import MissingIdWarning
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=MissingIdWarning)
ex_id = self.read_experiment_id(scope_name, *args, **kwargs)
if ex_id is None:
# check the id is not yet used
if experiment_id in self.read_all_experiment_ids(scope_name):
raise ValueError(f"experiment_id {experiment_id} already exists")
parameters = self.read_scope(scope_name).get_parameter_defaults()
for a in args:
if a is not None:
parameters.update(a)
parameters.update(kwargs)
df = pd.DataFrame(parameters, index=[0])
ex_id = self.write_experiment_parameters(scope_name, None, df)[0]
else:
if ex_id != experiment_id:
raise ValueError(f"these parameters already have experiment_id "
f"{ex_id}, cannot change to {experiment_id}")
return ex_id
[docs] def new_run_id(
self,
scope_name=None,
parameters=None,
location=None,
experiment_id=None,
source=0,
):
"""
Create a new run_id in the database.
Args:
scope_name (str): scope name, used to identify experiments,
performance measures, and results associated with this run
parameters (dict): keys are experiment parameters, values are the
experimental values to look up. Subsequent positional or keyword
arguments are used to update parameters.
location (str or True, optional): An identifier for this location
(i.e. this computer). If set to True, the name of this node
is found using the `platform` module.
experiment_id (int, optional): The experiment id associated
with this run. If given, the parameters are ignored.
source (int, default 0): The metamodel_id of the source for this
run, or 0 for a core model run.
Returns:
Tuple[Int,Int]:
The run_id and experiment_id of the identified experiment
Raises:
ValueError: If scope name does not exist
ValueError: If multiple experiments match an experiment definition.
This can happen, for example, if the definition is incomplete.
"""
if self.readonly:
raise ReadOnlyDatabaseError
scope_name = self._validate_scope(scope_name, 'design_name')
if experiment_id is None:
if parameters is None:
raise ValueError('must give experiment_id or parameters')
experiment_id = self.get_experiment_id(scope_name, parameters)
run_id = uuid.uuid1()
if location is True:
import platform
location = platform.node()
with self.conn:
cursor = self.conn.cursor()
cursor.execute(
sq.NEW_EXPERIMENT_RUN,
dict(
run_id=run_id.bytes,
experiment_id=experiment_id,
run_location=location,
run_source=source,
)
)
return run_id, experiment_id
[docs] def existing_run_id(
self,
run_id,
scope_name=None,
parameters=None,
location=None,
experiment_id=None,
source=0,
):
"""
Store an existing run_id in the database.
Args:
run_id (bytes or UUID): run id to be stored
scope_name (str): scope name, used to identify experiments,
performance measures, and results associated with this run
parameters (dict): keys are experiment parameters, values are the
experimental values to look up. Subsequent positional or keyword
arguments are used to update parameters.
location (str or True, optional): An identifier for this location
(i.e. this computer). If set to True, the name of this node
is found using the `platform` module.
experiment_id (int, optional): The experiment id associated
with this run. If given, the parameters are ignored.
source (int, default 0): The metamodel_id of the source for this
run, or 0 for a core model run.
Returns:
Tuple[Int,Int]:
The run_id and experiment_id of the identified experiment
Raises:
ValueError: If scope name does not exist
ValueError: If multiple experiments match an experiment definition.
This can happen, for example, if the definition is incomplete.
"""
if self.readonly:
raise ReadOnlyDatabaseError
scope_name = self._validate_scope(scope_name, 'design_name')
if experiment_id is None:
if parameters is None:
raise ValueError('must give experiment_id or parameters')
experiment_id = self.get_experiment_id(scope_name, parameters)
run_id = _to_uuid(run_id).bytes
if location is True:
import platform
location = platform.node()
with self.conn:
cursor = self.conn.cursor()
cursor.execute(
sq.NEW_EXPERIMENT_RUN.replace("INSERT", "INSERT OR IGNORE"),
dict(
run_id=run_id,
experiment_id=experiment_id,
run_location=location,
run_source=source,
)
)
return run_id, experiment_id
[docs] def read_experiment_ids(
self,
scope_name,
xl_df,
):
"""
Read the experiment ids previously defined in the database.
This method is used to recover the experiment id, if the
set of parameter values is known but the id of the experiment
is not known.
Args:
scope_name (str): scope name, used to identify experiments,
performance measures, and results associated with this run
xl_df (pandas.DataFrame):
The columns of this DataFrame are experiment parameters
to lookup, and each row is a full experiment. It is
possible to include only a subset of the columns, and
matches will be returned if that subset matches only one
experiment.
Returns:
list: the experiment id's of the identified experiments
Raises:
ValueError: If scope name does not exist
ValueError: If multiple experiments match an experiment definition.
This can happen, for example, if the definition is incomplete.
"""
# local cursor
fcur = self.conn.cursor()
ex_ids = []
missing_ids = 0
try:
# get list of experiment variables - except "one"
scp_xl = fcur.execute(sq.GET_SCOPE_XL, [scope_name]).fetchall()
if len(scp_xl) == 0:
raise ValueError('named scope {0} not found - experiment ids \
not available'.format(scope_name))
scp_xl = [i[0] for i in scp_xl]
use_cols = set(c for c in xl_df.columns if c in scp_xl)
for row in xl_df.itertuples(index=False, name=None):
candidate_ids = None
# get all ids by value
for par_name, par_value in zip(xl_df.columns, row):
if par_name not in use_cols:
continue
possible_ids = set([i[0] for i in fcur.execute(
sq.GET_EXPERIMENT_IDS_BY_VALUE,
[scope_name, par_name, par_value],
).fetchall()])
if candidate_ids is None:
candidate_ids = possible_ids
else:
candidate_ids &= possible_ids
if len(candidate_ids) > 1:
raise ValueError('multiple matching experiment ids found')
elif len(candidate_ids) == 1:
ex_ids.append(candidate_ids.pop())
else:
missing_ids +=1
ex_ids.append(None)
finally:
fcur.close()
if missing_ids:
from ...exceptions import MissingIdWarning
warnings.warn(f'missing {missing_ids} ids', category=MissingIdWarning)
return ex_ids
[docs] def read_all_experiment_ids(
self,
scope_name,
design_name=None,
grouped=False,
):
"""
Read the experiment ids previously defined in the database
Args:
scope_name (str): scope name, used to identify experiments,
performance measures, and results associated with this run
design_name (str or None):
Name of experiment design. Set to None to find experiments
across all designs in aggregate. Set to '*' to get a
dictionary giving the experiment ids in each design
individually.
grouped (bool, default False):
The default return value is a list of all experiment id's,
but by setting this to True, this method instead returns
a human-readable string giving contiguous ranges of
experiment id's.
Returns:
list or dict or str: the experiment id's of the identified experiments
Raises:
ValueError: If scope name does not exist
"""
if design_name == "*":
result = {}
for design_name_ in self.read_design_names(scope_name):
result[design_name_] = self.read_all_experiment_ids(
scope_name=scope_name,
design_name=design_name_,
grouped=grouped,
)
return result
cur = self.conn.cursor()
scope_name = self._validate_scope(scope_name, 'design_name')
if design_name is None:
experiment_ids = [i[0] for i in cur.execute(sq.GET_EXPERIMENT_IDS_ALL,
[scope_name] ).fetchall()]
else:
experiment_ids = [i[0] for i in cur.execute(sq.GET_EXPERIMENT_IDS_IN_DESIGN,
[scope_name, design_name] ).fetchall()]
if grouped:
from ...util.seq_grouping import seq_int_grouper
return seq_int_grouper(experiment_ids)
return experiment_ids
def _validate_scope(self, scope_name, design_parameter_name="design_name"):
"""Validate the scope argument to a function."""
known_scopes = self.read_scope_names()
if len(known_scopes) == 0:
raise ValueError(f'there are no stored scopes')
# None, but there is only one scope in the DB, so use it.
if scope_name is None or scope_name is 0:
if len(known_scopes) == 1:
return known_scopes[0]
raise ValueError(f'there are {len(known_scopes)} scopes, must identify scope explicitly')
if scope_name not in known_scopes:
oops = self.read_scope_names(design_name=scope_name)
if oops:
if design_parameter_name:
des = f", {design_parameter_name}='{oops[0]}'"
else:
des = ''
raise ValueError(f'''no scope named "{scope_name}", did you mean '''
f'''"...(scope='{scope_name}'{des},...)"?''')
raise ValueError(f'no scope named "{scope_name}" is stored in the database, only {known_scopes}')
return scope_name
[docs] def read_experiment_parameters(
self,
scope_name,
design_name=None,
only_pending=False,
design=None,
*,
experiment_ids=None,
ensure_dtypes=True,
):
"""
Read experiment definitions from the database.
Read the values for each experiment parameter per experiment.
Args:
scope_name (str):
A scope name, used to identify experiments,
performance measures, and results associated with this
exploratory analysis.
design_name (str, optional): If given, only experiments
associated with both the scope and the named design
are returned, otherwise all experiments associated
with the scope are returned.
only_pending (bool, default False): If True, only pending
experiments (which have no performance measure results
stored in the database) are returned.
design (str, optional): Deprecated. Use design_name.
experiment_ids (Collection, optional):
A collection of experiment id's to load. If given,
both `design_name` and `only_pending` are ignored.
ensure_dtypes (bool, default True): If True, the scope
associated with these experiments is also read out
of the database, and that scope file is used to
format experimental data consistently (i.e., as
float, integer, bool, or categorical).
Returns:
emat.ExperimentalDesign:
The experiment parameters are returned in a subclass
of a normal pandas.DataFrame, which allows attaching
the `design_name` as meta-data to the DataFrame.
Raises:
ValueError: if `scope_name` is not stored in this database
"""
if design is not None:
if design_name is None:
design_name = design
warnings.warn("the `design` argument is deprecated for "
"read_experiment_parameters, use `design_name`", DeprecationWarning)
elif design != design_name:
raise ValueError("cannot give both `design_name` and `design`")
scope_name = self._validate_scope(scope_name, 'design_name')
cur = self.conn.cursor()
if experiment_ids is not None:
query = sq.GET_EX_XL_IDS_IN
if isinstance(experiment_ids, int):
experiment_ids = [experiment_ids]
subquery = ",".join(f"?{n+2}" for n in range(len(experiment_ids)))
query = query.replace("???", subquery)
bindings = [scope_name, *experiment_ids]
elif design_name is None:
query = sq.GET_EX_XL_ALL
bindings = dict(scope_name=scope_name)
else:
query = sq.GET_EXPERIMENT_PARAMETERS
bindings = dict(scope_name=scope_name, design_name=design_name)
xl_df = pd.DataFrame(cur.execute(
query,
bindings,
).fetchall())
if xl_df.empty is False:
xl_df = xl_df.pivot(index=0, columns=1, values=2)
xl_df.index.name = 'experiment'
xl_df.columns.name = None
column_order = (
self.read_constants(scope_name)
+ self.read_uncertainties(scope_name)
+ self.read_levers(scope_name)
)
from ...experiment.experimental_design import ExperimentalDesign
result = ExperimentalDesign(xl_df[[i for i in column_order if i in xl_df.columns]])
result.design_name = design_name
if ensure_dtypes:
scope = self.read_scope(scope_name)
if scope is not None:
result = scope.ensure_dtypes(result)
if only_pending:
valid_results = self.read_experiment_measures(scope_name, design_name, source=0)
valid_ex_ids = valid_results.index.get_level_values(0)
result = result.loc[~result.index.isin(valid_ex_ids)]
return result
[docs] def write_experiment_measures(
self,
scope_name,
source,
m_df,
run_ids=None,
experiment_id=None,
):
"""
Write experiment results to the database.
Write the performance measure results for each experiment
in the scope - if the scope does not exist, nothing is recorded.
Note that the design_name is not required to write experiment
measures, as the individual experiments from any design are
uniquely identified by the experiment id's.
Args:
scope_name (str):
A scope name, used to identify experiments,
performance measures, and results associated with this
exploratory analysis. The scope with this name should
already have been stored in this database.
source (int):
An indicator of performance measure source. This should
be 0 for a bona-fide run of the associated core models,
or some non-zero metamodel_id number.
m_df (pandas.DataFrame or dict):
The columns of this DataFrame are the performance
measure names, and row indexes are the experiment id's.
If given as a `dict` instead of a DataFrame, the keys are
treated as columns and an `experiment_id` must be provided.
run_ids (pandas.Index, optional):
Provide an optional index of universally unique run ids
(UUIDs) for these results. The UUIDs can be used to help
identify problems and organize model runs.
experiment_id (int, optional):
Provide an experiment_id. This is only used if the
`m_df` is provided as a dict instead of a DataFrame
Raises:
UserWarning: If scope name does not exist
"""
if self.readonly:
raise ReadOnlyDatabaseError
if experiment_id is not None:
if not isinstance(m_df, dict):
raise ValueError("only give an experiment_id with a dict as `m_df`")
m_df = pd.DataFrame(m_df, index=[experiment_id])
# split run_ids from multiindex
if m_df.index.nlevels == 2:
if run_ids is not None:
raise ValueError('run_ids cannot be given when they are embedded in m_df.index')
run_ids = m_df.index.get_level_values(1)
m_df.index = m_df.index.get_level_values(0)
with self.conn:
scope = None # don't load unless needed
cur = self.conn.cursor()
scope_name = self._validate_scope(scope_name, None)
scp_m = cur.execute(sq.GET_SCOPE_M, [scope_name]).fetchall()
if len(scp_m) == 0:
raise UserWarning('named scope {0} not found - experiments will \
not be recorded'.format(scope_name))
if run_ids is None:
# generate new run_ids if none is given
run_ids = []
for experiment_id in m_df.index:
run_id, experiment_id = self.new_run_id(
scope_name,
experiment_id=experiment_id,
source=source,
)
run_ids.append(run_id)
else:
for (run_id, experiment_id) in zip(run_ids, m_df.index):
self.existing_run_id(
run_id,
scope_name,
experiment_id=experiment_id,
source=source,
)
for m in scp_m:
dataseries = None
measure_name = m[0]
if measure_name not in m_df.columns:
if scope is None:
scope = self.read_scope(scope_name)
formula = getattr(scope[measure_name], 'formula', None)
if formula:
dataseries = m_df.eval(formula).rename(m.name)
if measure_name in m_df.columns:
dataseries = m_df[measure_name]
if dataseries is not None:
for (ex_id, value), uid in zip(dataseries.iteritems(),run_ids):
_logger.debug(f"write_experiment_measures: writing {measure_name} = {value} @ {ex_id}/{uid}")
uid = _to_uuid(uid)
# index is experiment id
bindings = dict(
experiment_id=ex_id,
measure_value=value,
measure_source=source,
measure_name=measure_name,
measure_run=uid.bytes,
)
try:
if not pd.isna(m[0]):
cur.execute(sq.INSERT_EX_M, bindings)
except:
_logger.error(f"Error saving {value} to m {m[0]} for ex {ex_id}")
for binding, bval in bindings.items():
_logger.error(f"bindings[{binding}]={bval} ({type(bval)})")
_logger.error(str(cur.execute(sq._DEBUG_INSERT_EX_M, bindings).fetchall()))
raise
else:
_logger.debug(f"write_experiment_measures: no dataseries for {measure_name}")
[docs] def write_ex_m_1(
self,
scope_name,
source,
ex_id,
m_name,
m_value,
run_id=None,
):
"""Write a single performance measure result for an experiment
Write the performance measure result for an experiment
in the scope - if the scope does not exist, nothing is recorded
Args:
scope_name (str): scope name, used to identify experiments,
performance measures, and results associated with this run
source (int): indicator of performance measure source
(0 = core model or non-zero = meta-model id)
ex_id (int): experiment id
m_name (str): performance measure name
m_value (numeric): performance measure value
run_ids (uuid, optional):
Provide an optional universally unique run id
(UUID) for these results. The UUID can be used to help
identify problems and organize model runs.
Raises:
UserWarning: If scope name does not exist
"""
if self.readonly:
raise ReadOnlyDatabaseError
with self.conn:
scope_name = self._validate_scope(scope_name, None)
cur = self.conn.cursor()
scp_m = cur.execute(sq.GET_SCOPE_M, [scope_name]).fetchall()
if len(scp_m) == 0:
raise UserWarning('named scope {0} not found - experiments will \
not be recorded'.format(scope_name))
for m in scp_m:
if m[0] == m_name:
try:
if not pd.isna(m[0]):
cur.execute(
sq.INSERT_EX_M,
dict(
experiment_id=ex_id,
measure_value=m_value,
measure_source=source,
measure_name=m[0],
measure_run=run_id,
)
)
except:
_logger.error(f"Error saving {m_value} to m {m[0]} for ex {ex_id}")
raise
[docs] def read_experiment_all(
self,
scope_name,
design_name=None,
source=None,
*,
only_pending=False,
only_incomplete=False,
only_complete=False,
only_with_measures=False,
ensure_dtypes=True,
with_run_ids=False,
runs=None,
formulas=True,
):
"""
Read experiment definitions and results
Read the values from each experiment variable and the
results for each performance measure per experiment.
Args:
scope_name (str or Scope):
A scope name, used to identify experiments,
performance measures, and results associated with this
exploratory analysis.
design_name (str or Collection[str], optional):
The experimental design name (a single `str`) or
a collection of design names to read.
source (int, optional): The source identifier of the
experimental outcomes to load. If not given, but
there are only results from a single source in the
database, those results are returned. If there are
results from multiple sources, an error is raised.
only_pending (bool, default False): If True, only pending
experiments (which have no performance measure results
stored in the database) are returned. Experiments that
have any results, even if only partial results, are
excluded.
only_incomplete (bool, default False): If True, only incomplete
experiments (which have at least one missing performance
measure result that is not stored in the database) are
returned. Only complete experiments (that have every
performance measure populated) are excluded.
only_complete (bool, default False): If True, only complete
experiments (which have no missing performance measure
results stored in the database) are returned.
only_with_measures (bool, default False): If True, only
experiments with at least one stored performance measure
are returned.
ensure_dtypes (bool, default True): If True, the scope
associated with these experiments is also read out
of the database, and that scope file is used to
format experimental data consistently (i.e., as
float, integer, bool, or categorical).
with_run_ids (bool, default False): Whether to use a
two-level pd.MultiIndex that includes both the
experiment_id (which always appears in the index)
as well as the run_id (which only appears in the
index if this argument is set to True).
runs ({None, 'all', 'valid', 'invalid'}, default None):
By default, this method returns the one and only
valid model run matching the given `design_name`
and `source` (if any) for any experiment, and fails
if there is more than one such valid run. Set this to
'valid' or 'invalid' to get all valid or invalid model
runs (instead of raising an exception). Set to 'all' to get
everything, including both valid and invalidated results.
formulas (bool, default True): If the scope includes
formulaic measures (computed directly from other
measures) then compute these values and include them in
the results.
Returns:
emat.ExperimentalDesign:
The experiment parameters are returned in a subclass
of a normal pandas.DataFrame, which allows attaching
the `design_name` as meta-data to the DataFrame.
Raises:
ValueError
When no source is given but the database contains
results from multiple sources.
"""
from ...scope.scope import Scope
if isinstance(scope_name, Scope):
scope = scope_name
scope_name = scope.name
else:
scope = None
df_p = self.read_experiment_parameters(
scope_name=scope_name,
design_name=design_name,
ensure_dtypes=ensure_dtypes,
)
df_m = self.read_experiment_measures(
scope_name=scope or scope_name,
design_name=design_name,
runs=runs,
formulas=formulas,
)
ex_xlm = pd.merge(
df_p,
df_m.reset_index().set_index('experiment'),
how='outer',
on='experiment',
).reset_index()
if runs == 'valid_mean':
ex_xlm = ex_xlm.set_index(['experiment'])
else:
ex_xlm = ex_xlm.set_index(['experiment', 'run'])
if not with_run_ids:
ex_xlm.index = ex_xlm.index.droplevel(1)
if only_incomplete:
retain = np.zeros(len(ex_xlm), dtype=bool)
for meas_name in self.read_measures(scope_name):
if meas_name not in ex_xlm.columns:
retain[:] = True
break
else:
retain[:] |= pd.isna(ex_xlm[meas_name])
ex_xlm = ex_xlm.loc[retain, :]
if scope is None:
column_order = (
self.read_constants(scope_name)
+ self.read_uncertainties(scope_name)
+ self.read_levers(scope_name)
+ self.read_measures(scope_name)
)
else:
column_order = (
scope.get_constant_names()
+ scope.get_uncertainty_names()
+ scope.get_lever_names()
+ scope.get_measure_names()
)
result = ex_xlm[[i for i in column_order if i in ex_xlm.columns]]
if only_complete:
result = result[~result.isna().any(axis=1)]
elif only_incomplete:
result = result[result.isna().any(axis=1)]
if only_with_measures:
result_measures = result[[i for i in self.read_measures(scope_name) if i in result.columns]]
result = result[~result_measures.isna().all(axis=1)]
elif only_pending:
result_measures = result[[i for i in self.read_measures(scope_name) if i in result.columns]]
result = result[result_measures.isna().all(axis=1)]
if ensure_dtypes:
if scope is None:
scope = self.read_scope(scope_name)
if scope is not None:
result = scope.ensure_dtypes(result)
from ...experiment.experimental_design import ExperimentalDesign
result = ExperimentalDesign(result)
result.design_name = design_name
return result
[docs] def read_experiment_measures(
self,
scope_name,
design_name=None,
experiment_id=None,
source=None,
design=None,
runs=None,
formulas=True,
with_validity=False,
):
"""
Read experiment results from the database.
Args:
scope_name (str or Scope):
A scope or just its name, used to identify experiments,
performance measures, and results associated with this
exploratory analysis.
design_name (str, optional): If given, only experiments
associated with both the scope and the named design
are returned, otherwise all experiments associated
with the scope are returned.
experiment_id (int, optional): The id of the experiment to
retrieve. If omitted, get all experiments matching the
named scope and design.
source (int, optional): The source identifier of the
experimental outcomes to load. If not given, but
there are only results from a single source in the
database, those results are returned. If there are
results from multiple sources, an error is raised.
design (str): Deprecated, use `design_name`.
runs ({None, 'all', 'valid', 'invalid', 'valid_mean'}, default None):
By default, this method returns the most recent timestamped
valid model run matching the given `design_name`
and `source` (if any) for any experiment. Set this to
'valid' or 'invalid' to get all valid or invalid model
runs (instead of raising an exception). Set to 'all' to get
everything, including both valid and invalidated results.
Set to 'valid_mean' to get the average of all valid runs
instead of the single most recent one.
formulas (bool, default True): If the scope includes
formulaic measures (computed directly from other
measures) then compute these values and include them in
the results.
Returns:
results (pandas.DataFrame): performance measures
Raises:
ValueError
When the database contains multiple sets of results
matching the given `design_name` and/or `source`
(if any) for any experiment.
"""
assert runs in (None, 'all', 'valid', 'invalid', 'valid_mean')
if design is not None:
if design_name is None:
design_name = design
warnings.warn("the `design` argument is deprecated for "
"read_experiment_parameters, use `design_name`", DeprecationWarning)
elif design != design_name:
raise ValueError("cannot give both `design_name` and `design`")
from ...scope.scope import Scope
if isinstance(scope_name, Scope):
scope = scope_name
scope_name = scope.name
else:
scope = None
scope_name = self._validate_scope(scope_name, 'design_name')
sql = sq.GET_EXPERIMENT_MEASURES_MASTER
if design_name is None:
sql = sql.replace("AND ed.design = @design_name", "")
if source is None:
sql = sql.replace("AND run_source = @measure_source", "")
if experiment_id is None:
sql = sql.replace("AND eem.experiment_id = @experiment_id", "")
if runs in ('all', ):
sql = sql.replace("AND run_valid = 1", "")
elif runs == 'invalid':
sql = sql.replace("AND run_valid = 1",
"AND run_valid = 0")
if runs in ('all', 'valid', 'invalid', 'valid_mean'):
sql = re.sub(
r"/\* most recent .* end most recent \*/",
"ema_experiment_run",
sql,
flags=re.DOTALL
)
arg = dict(
scope_name=scope_name,
design_name=design_name,
experiment_id=experiment_id,
measure_source=source,
)
cur = self.conn.cursor()
try:
ex_m = pd.DataFrame(cur.execute(sql, arg).fetchall())
except:
_logger.error(f"ERROR ON READ MEASURES query=\n{sql}")
_logger.error(f"ERROR ON READ MEASURES arg=\n{arg}")
raise
if ex_m.empty is False:
if runs is None:
# by default, raise a value error if there are runs from multiple sources
try:
ex_m.pivot(index=0, columns=2, values=3)
except ValueError:
raise ValueError("duplicate experiment ids suggest results "
"from more than one model source are stored\n"
"set `runs='valid'` to return results from all "
"sources or set the `source` argument.")
ex_m = ex_m.pivot(index=(0,1), columns=2, values=3).astype(np.float64)
if isinstance(ex_m.index, pd.MultiIndex):
fix_levels = [_to_uuid(i) for i in ex_m.index.levels[1]]
zero_uuid = uuid.UUID(bytes=b'\x00' * 16)
try:
zero_index = fix_levels.index(zero_uuid)
except ValueError:
zero_index = len(fix_levels)
fix_levels.append(zero_uuid) # add a zero level
if len(fix_levels):
ex_m.index = ex_m.index.set_levels(fix_levels, 1)
fix_codes = np.array(ex_m.index.codes[1], copy=True)
fix_codes[fix_codes == -1] = zero_index
ex_m.index = ex_m.index.set_codes(fix_codes, level=1)
ex_m.index.names = ['experiment', 'run']
elif len(ex_m.index) == 0:
ex_m.index = pd.MultiIndex(
levels=[[],[]],
codes=[[],[]],
names=['experiment', 'run'],
)
ex_m.columns.name = None
if formulas:
if scope is None:
scope = self.read_scope(scope_name)
for measure in scope.get_measures():
if getattr(measure, 'formula', None):
ex_m[measure.name] = ex_m.eval(measure.formula)
if scope is None:
column_order = (
self.read_measures(scope_name)
)
else:
column_order = scope.get_measure_names()
result = ex_m[[i for i in column_order if i in ex_m.columns]]
if runs == 'valid_mean':
return result.groupby('experiment').mean()
return result
[docs] def read_experiment_measure_sources(
self,
scope_name,
design_name=None,
experiment_id=None,
design=None,
):
"""
Read all source ids from the results stored in the database.
Args:
scope_name (str):
A scope name, used to identify experiments,
performance measures, and results associated with this
exploratory analysis.
design_name (str, optional): If given, only experiments
associated with both the scope and the named design
are returned, otherwise all experiments associated
with the scope are returned.
experiment_id (int, optional): The id of the experiment to
retrieve. If omitted, get all experiments matching the
named scope and design.
design (str): Deprecated, use `design_name`.
Returns:
List[Int]: performance measure source ids
"""
if design is not None:
if design_name is None:
design_name = design
warnings.warn("the `design` argument is deprecated for "
"read_experiment_parameters, use `design_name`", DeprecationWarning)
elif design != design_name:
raise ValueError("cannot give both `design_name` and `design`")
scope_name = self._validate_scope(scope_name, 'design_name')
if design_name is None:
sql = sq.GET_EXPERIMENT_MEASURE_SOURCES
arg = {'scope_name':scope_name}
if experiment_id is not None:
sql += " AND eem.experiment_id = @experiment_id"
arg['experiment_id'] = experiment_id
else:
sql = sq.GET_EXPERIMENT_MEASURE_SOURCES_BY_DESIGN
arg = {'scope_name': scope_name, 'design_name':design_name, }
if experiment_id is not None:
sql += " AND eem.experiment_id = @experiment_id"
arg['experiment_id'] = experiment_id
cur = self.conn.cursor()
return [i[0] for i in cur.execute(sql, arg).fetchall()]
[docs] def delete_experiments(
self,
scope_name,
design_name=None,
design=None,
):
"""
Delete experiment definitions and results.
The method removes the linkage between experiments and the
identified experimental design. Experiment parameters and results
are only removed if they are also not linked to any other experimental
design stored in the database.
Args:
scope_name (str): scope name, used to identify experiments,
performance measures, and results associated with this run
design_name (str): Only experiments
associated with both the scope and the named design
are deleted.
design (str): Deprecated, use `design_name`.
"""
if self.readonly:
raise ReadOnlyDatabaseError
with self.conn:
if design is not None:
if design_name is None:
design_name = design
warnings.warn("the `design` argument is deprecated for "
"read_experiment_parameters, use `design_name`", DeprecationWarning)
elif design != design_name:
raise ValueError("cannot give both `design_name` and `design`")
scope_name = self._validate_scope(scope_name, 'design_name')
cur = self.conn.cursor()
cur.execute(sq.DELETE_DESIGN_EXPERIMENTS, [scope_name, design_name])
cur.execute(sq.DELETE_LOOSE_EXPERIMENTS, [scope_name,])
[docs] def delete_experiment_measures(
self,
run_ids=None,
):
"""
Delete experiment performance measure results.
The method removes only the performance measures, not the
parameters. This can be useful if a set of corrupted model
results was stored in the database.
Args:
run_ids (Collection, optional):
A collection of run_id's for which measures shall
be deleted. Note that no scope or design are given here,
experiments must be individually identified.
"""
if self.readonly:
raise ReadOnlyDatabaseError
if isinstance(run_ids, uuid.UUID):
run_ids = [run_ids]
elif isinstance(run_ids, (bytes, str)):
run_ids = [_to_uuid(run_ids)]
with self.conn:
cur = self.conn.cursor()
for run_id in run_ids:
try:
b = _to_uuid(run_id).bytes
except:
raise ValueError(f'cannot interpret run_id "{run_id}"')
cur.execute(
sq.DELETE_RUN_ID,
dict(run_id=b),
)
[docs] def invalidate_experiment_runs(
self,
run_ids=None,
queries=None,
**kwargs,
):
"""
Invalidate experiment performance measure results.
The method marks the performance measures as invalid, not the
parameters. It does not actually remove any data from the model.
This can be useful if a set of corrupted model results was stored
in the database, to be able to ignore them in analysis but also
keep tabs on the results and know if they are attempted to be
stored again.
Args:
run_ids (Collection, optional):
A collection of run_id's for which measures shall
be deleted. Note that no scope or design are given here,
experiments must be individually identified.
run_ids (Collection[str], optional):
A collection of query commands that will select invalid
experiment runs. The queries are run against a DataFrame
of parameters and measures.
Returns:
n_runs_invalidated (int or list):
The number of runs that were actually invalidated in the
database (total, if run_ids is given, otherwise per query)
"""
if queries is None and run_ids is None:
raise ValueError("must give run_ids or queries")
if queries is not None and run_ids is not None:
raise ValueError("must give run_ids or queries, not both")
if queries is not None:
n_runs_invalidated = list()
scope_name = kwargs.pop('scope_name', None)
kwargs['with_run_ids'] = True
kwargs['runs'] = 'valid'
if isinstance(queries, str):
queries = [queries]
for q in queries:
bad_runs = self.read_experiment_all(
scope_name,
**kwargs,
).query(q)
if not bad_runs.empty:
n_runs_invalidated.append(
self.invalidate_experiment_runs(bad_runs)
)
else:
n_runs_invalidated.append(0)
return n_runs_invalidated
if self.readonly:
raise ReadOnlyDatabaseError
if isinstance(run_ids, (uuid.UUID, bytes, str)):
run_ids = [_to_uuid(run_ids)]
elif isinstance(run_ids, pd.MultiIndex) and run_ids.nlevels==2:
run_ids = run_ids.get_level_values(1)
elif isinstance(run_ids, pd.DataFrame) and run_ids.index.nlevels==2:
run_ids = run_ids.index.get_level_values(1)
elif isinstance(run_ids, pd.Series) and run_ids.index.nlevels==2 and run_ids.dtype==bool:
run_ids = run_ids[run_ids].index.get_level_values(1)
n_runs_invalidated = 0
with self.conn:
cur = self.conn.cursor()
for run_id in run_ids:
if isinstance(run_id, (uuid.UUID, str, bytes)):
b = _to_uuid(run_id).bytes
else:
raise TypeError(f"Error run_id type {type(run_id)} = {run_id}")
cur.execute(
sq.INVALIDATE_RUN_ID,
dict(run_id=b),
)
n_runs_invalidated += cur.rowcount
return n_runs_invalidated
[docs] def write_experiment_all(
self,
scope_name,
design_name,
source,
xlm_df,
run_ids=None,
):
"""
Write experiment definitions and results
Writes the values from each experiment variable and the
results for each performance measure per experiment
Args:
scope_name (str):
A scope name, used to identify experiments,
performance measures, and results associated with this
exploratory analysis. The scope with this name should
already have been stored in this database.
design_name (str or dict):
An experiment design name. This name should be unique
within the named scope, and typically will include a
reference to the design sampler, for example:
'uni' - generated by univariate sensitivity test design
'lhs' - generated by latin hypercube sample design
The design_name is used primarily to load groups of
related experiments together.
TODO: document dict
source (int):
An indicator of performance measure source. This should
be 0 for a bona fide run of the associated core models,
or some non-zero metamodel_id number.
xlm_df (pandas.DataFrame):
The columns of this DataFrame are the experiment
parameters (i.e. policy levers, uncertainties, and
constants) and performance measures, and each row
is an experiment.
run_ids (pandas.Index, optional):
Provide an optional index of universally unique run ids
(UUIDs) for these results. The UUIDs can be used to help
identify problems and organize model runs.
Raises:
DesignExistsError: If scope and design already exist
TypeError: If not all scope variables are defined in the
experiment
"""
if self.readonly:
raise ReadOnlyDatabaseError
with self.conn:
scope_name = self._validate_scope(scope_name, 'design_name')
fcur = self.conn.cursor()
if isinstance(design_name, str):
design_name_map = {design_name: xlm_df.index}
else:
design_name_map = design_name
for k in design_name_map:
exist = pd.DataFrame(fcur.execute(
sq.GET_EXPERIMENT_PARAMETERS_AND_MEASURES,
[scope_name, k],
).fetchall())
if exist.empty is False:
from ...exceptions import DesignExistsError
raise DesignExistsError(
'scope {0} with design {1} found, '
'must be deleted before recording'
.format(scope_name, k)
)
# get list of experiment variables
scp_xl = fcur.execute(sq.GET_SCOPE_XL, [scope_name]).fetchall()
scp_m = fcur.execute(sq.GET_SCOPE_M, [scope_name]).fetchall()
experiment_ids = self.write_experiment_parameters(
scope_name,
design_name_map,
xlm_df[[z[0] for z in scp_xl]],
)
xlm_df.index = experiment_ids
self.write_experiment_measures(
scope_name,
source,
xlm_df[[z[0] for z in scp_m if z[0] in xlm_df.columns]],
run_ids=run_ids,
)
[docs] @copydoc(Database.read_scope_names)
def read_scope_names(self, design_name=None) -> list:
cur = self.conn.cursor()
if design_name is None:
scopes = [i[0] for i in cur.execute(sq.GET_SCOPE_NAMES ).fetchall()]
else:
scopes = [i[0] for i in cur.execute(sq.GET_SCOPES_CONTAINING_DESIGN_NAME,
[design_name] ).fetchall()]
return scopes
[docs] @copydoc(Database.read_design_names)
def read_design_names(self, scope_name:str) -> list:
cur = self.conn.cursor()
scope_name = self._validate_scope(scope_name, None)
designs = [i[0] for i in cur.execute(sq.GET_DESIGN_NAMES, [scope_name] ).fetchall()]
return designs
[docs] @copydoc(Database.read_uncertainties)
def read_uncertainties(self, scope_name:str) -> list:
cur = self.conn.cursor()
scope_name = self._validate_scope(scope_name, None)
scp_x = cur.execute(sq.GET_SCOPE_X, [scope_name]).fetchall()
return [i[0] for i in scp_x]
[docs] @copydoc(Database.read_levers)
def read_levers(self, scope_name:str) -> list:
cur = self.conn.cursor()
scope_name = self._validate_scope(scope_name, None)
scp_l = cur.execute(sq.GET_SCOPE_L, [scope_name]).fetchall()
return [i[0] for i in scp_l]
[docs] @copydoc(Database.read_constants)
def read_constants(self, scope_name:str) -> list:
cur = self.conn.cursor()
scope_name = self._validate_scope(scope_name, None)
scp_c = cur.execute(sq.GET_SCOPE_C, [scope_name]).fetchall()
return [i[0] for i in scp_c]
[docs] @copydoc(Database.read_measures)
def read_measures(self, scope_name: str) -> list:
cur = self.conn.cursor()
scope_name = self._validate_scope(scope_name, None)
scp_m = cur.execute(sq.GET_SCOPE_M, [scope_name]).fetchall()
return [i[0] for i in scp_m]
[docs] @copydoc(Database.read_box)
def read_box(self, scope_name: str, box_name: str, scope=None):
cur = self.conn.cursor()
scope_name = self._validate_scope(scope_name, None)
from ...scope.box import Box
parent = self.read_box_parent_name(scope_name, box_name)
box = Box(name=box_name, scope=scope,
parent=parent)
for row in cur.execute(sq.GET_BOX_THRESHOLDS, [scope_name, box_name]):
par_name, t_value, t_type = row
if t_type == -2:
box.set_lower_bound(par_name, t_value)
elif t_type == -1:
box.set_upper_bound(par_name, t_value)
elif t_type == 0:
box.relevant_features.add(par_name)
elif t_type >= 1:
box.add_to_allowed_set(par_name, t_value)
return box
[docs] @copydoc(Database.read_box_names)
def read_box_names(self, scope_name: str):
cur = self.conn.cursor()
scope_name = self._validate_scope(scope_name, None)
names = cur.execute(sq.GET_BOX_NAMES, [scope_name]).fetchall()
return [i[0] for i in names]
[docs] @copydoc(Database.read_box_parent_name)
def read_box_parent_name(self, scope_name: str, box_name:str):
cur = self.conn.cursor()
scope_name = self._validate_scope(scope_name, None)
try:
return cur.execute(sq.GET_BOX_PARENT_NAME, [scope_name, box_name]).fetchall()[0][0]
except IndexError:
return None
[docs] @copydoc(Database.read_box_parent_names)
def read_box_parent_names(self, scope_name: str):
cur = self.conn.cursor()
scope_name = self._validate_scope(scope_name, None)
names = cur.execute(sq.GET_BOX_PARENT_NAMES, [scope_name]).fetchall()
return {i[0]:i[1] for i in names}
[docs] @copydoc(Database.read_boxes)
def read_boxes(self, scope_name: str=None, scope=None):
from ...scope.box import Boxes
if scope is not None:
scope_name = scope.name
scope_name = self._validate_scope(scope_name, None)
names = self.read_box_names(scope_name)
u = Boxes(scope=scope)
for name in names:
box = self.read_box(scope_name, name, scope=scope)
u.add(box)
return u
[docs] @copydoc(Database.write_box)
def write_box(self, box, scope_name=None):
with self.conn:
from ...scope.box import Box, Bounds
assert isinstance(box, Box)
try:
scope_name_ = box.scope.name
except AttributeError:
scope_name_ = scope_name
if scope_name is not None and scope_name != scope_name_:
raise ValueError("scope_name mismatch")
scope_name = scope_name_
scope_name = self._validate_scope(scope_name, None)
cur = self.conn.cursor()
p_ = set(self.read_uncertainties(scope_name) + self.read_levers(scope_name))
m_ = set(self.read_measures(scope_name))
if box.parent_box_name is None:
cur.execute(sq.INSERT_BOX, [scope_name, box.name])
else:
cur.execute(sq.INSERT_SUBBOX, [scope_name, box.name,
box.parent_box_name])
for t_name, t_vals in box._thresholds.items():
if t_name in p_:
sql_cl = sq.CLEAR_BOX_THRESHOLD_P
sql_in = sq.SET_BOX_THRESHOLD_P
elif t_name in m_:
sql_cl = sq.CLEAR_BOX_THRESHOLD_M
sql_in = sq.SET_BOX_THRESHOLD_M
else:
warnings.warn(f"{t_name} not identifiable as parameter or measure")
continue
cur.execute(sql_cl, [scope_name, box.name, t_name])
if isinstance(t_vals, Bounds):
if t_vals.lowerbound is not None:
cur.execute(sql_in, [scope_name, box.name, t_name, t_vals.lowerbound, -2])
if t_vals.upperbound is not None:
cur.execute(sql_in, [scope_name, box.name, t_name, t_vals.upperbound, -1])
elif isinstance(t_vals, AbstractSet):
for n, t_val in enumerate(t_vals, start=1):
cur.execute(sql_in, [scope_name, box.name, t_name, t_val, n])
else:
raise NotImplementedError(str(type(t_vals)))
for t_name in box.relevant_features:
if t_name in p_:
sql_cl = sq.CLEAR_BOX_THRESHOLD_P
sql_in = sq.SET_BOX_THRESHOLD_P
elif t_name in m_:
sql_cl = sq.CLEAR_BOX_THRESHOLD_M
sql_in = sq.SET_BOX_THRESHOLD_M
else:
warnings.warn(f"{t_name} not identifiable as parameter or measure")
continue
cur.execute(sql_cl, [scope_name, box.name, t_name])
cur.execute(sql_in, [scope_name, box.name, t_name, None, 0])
[docs] @copydoc(Database.write_boxes)
def write_boxes(self, boxes, scope_name=None):
with self.conn:
if boxes.scope is not None:
if scope_name is not None and scope_name != boxes.scope.name:
raise ValueError('scope name mismatch')
scope_name = boxes.scope.name
for box in boxes.values():
self.write_box(box, scope_name)
[docs] def log(self, message, level=logging.INFO):
"""
Log a message into the SQLite database
Args:
message (str): A message to log, will be stored verbatim.
level (int): A logging level, can be used to filter messages.
"""
message = str(message)
if not self.readonly:
with self.conn:
cur = self.conn.cursor()
cur.execute("INSERT INTO ema_log(level, content) VALUES (?,?)", [level, str(message)])
_logger.log(level, message)
[docs] def merge_log(self, other):
"""
Merge the log from another SQLiteDB into this database log.
Args:
other (emat.SQLiteDB): Source of log to merge.
"""
if self.readonly:
raise ReadOnlyDatabaseError
if not hasattr(other, 'conn'):
return
with self.conn:
with other.conn:
selfc = self.conn.cursor()
otherc = other.conn.cursor()
other_q = f"""
SELECT
timestamp, level, content
FROM
ema_log
ORDER BY
rowid
"""
for timestamp, level, content in otherc.execute(other_q):
selfc.execute(
"""
INSERT INTO ema_log(timestamp, level, content)
SELECT ?1, ?2, ?3
WHERE NOT EXISTS(
SELECT 1 FROM ema_log
WHERE timestamp=?1 AND level=?2 AND content=?3
)
""",
[timestamp, level, content],
)
# if not selfc.execute(
# "SELECT count(*) FROM ema_log WHERE timestamp=? AND level=? AND content=?",
# [timestamp, level, content],
# ).fetchall()[0][0]:
# selfc.execute(
# "INSERT INTO ema_log(timestamp, level, content) VALUES (?,?,?)",
# [timestamp, level, content],
# )
[docs] def print_log(self, file=None, limit=20, order="DESC", level=logging.INFO, like=None):
"""
Print logged messages from the SQLite database
Args:
file (file-like object):
Where to print the output, defaults to the
current sys.stdout.
limit (int or None, default 20):
Maximum number of messages to print
order ({'DESC','ASC'}):
Print log messages in descending or ascending
chronological order.
level (int):
A logging level, only messages logged at this
level or higher are printed.
"""
if order.upper() not in ("ASC", "DESC"):
raise ValueError("order must be ASC or DESC")
if not isinstance(level, int):
raise ValueError("level must be integer")
if like:
qry = f"""
SELECT
datetime(timestamp, 'localtime'), content
FROM
ema_log
WHERE
level >= {level}
AND content LIKE '{like}'
ORDER BY
timestamp {order}, rowid {order}
"""
else:
qry = f"""
SELECT
datetime(timestamp, 'localtime'), content
FROM
ema_log
WHERE
level >= {level}
ORDER BY
timestamp {order}, rowid {order}
"""
if limit is not None:
qry += f" LIMIT {limit}"
with self.conn:
cur = self.conn.cursor()
for row in cur.execute(qry):
print(" - ".join(row), file=file)
[docs] def mark_run_invalid(
self,
run_id
):
"""
Mark a particular run_id as invalid.
Args:
run_id (str): The run to mark as invalid.
"""
if self.readonly:
raise ReadOnlyDatabaseError
with self.conn:
cursor = self.conn.cursor()
cursor.execute('''
UPDATE
ema_experiment_run
SET
run_valid = 0
WHERE
run_id = @run_id
''', dict(run_id=run_id))
if cursor.rowcount:
self.log(f"MARKED AS INVALID RUN_ID {run_id}")
else:
_logger.warn(f"FAILED TO MARK AS INVALID RUN_ID {run_id}")
def get_run_list(self):
runs = self._raw_query(
"""
SELECT
run_id,
experiment_id,
run_source,
run_status,
run_valid,
run_timestamp,
run_location
FROM
ema_experiment_run
"""
)
runs['run_id'] = runs['run_id'].apply(lambda x: uuid.UUID(bytes=x))
return runs.set_index('run_id')
[docs] def merge_database(
self,
other,
force=False,
dryrun=False,
on_conflict='ignore',
max_diffs_in_log=50,
):
"""
Merge results from another database.
Only results from a matching identical scope are
merged. Metamodels are copied without checking
if they are duplicate.
Args:
other (emat.Database):
The other database from which to draw data.
force (bool, default False):
By default, database results are merged only
when the scopes match exactly between the current
and `other` database files. Setting `force` to
`True` will merge results from the `other`
database file as long as the scope names match,
the names of the the scoped parameters match,
and there is some overlap in the names of the
scoped performance measures.
dryrun (bool, default False):
Allows a dry run of the merge, to check how many
experiments would be merged, without actually
importing any data into the current database.
on_conflict ({'ignore','replace'}, default 'ignore'):
When corresponding performance measures for the
same experiment exist in both the current and
`other` databases, the merge will either ignore
the updated value or replace it with the value
from the other database. This only applies to
conflicts in performance measures; conflicts in
parameters always result in distinct experiments.
max_diffs_in_log (int, default 50):
When there are fewer than this many changes in
a set of experiments, the individual experiment_ids
that have been changed are reported.
"""
if self.readonly:
raise ReadOnlyDatabaseError
assert isinstance(other, Database)
from ...util.deduplicate import count_diff_rows, report_diff_rows
other_db_path = getattr(other, 'database_path', None)
if other_db_path:
self.log(f"merging from database at {other_db_path}")
for scope_name in other.read_scope_names():
scope_other = other.read_scope(scope_name)
if scope_name not in self.read_scope_names():
self.store_scope(scope_other)
scope_self = self.read_scope(scope_name)
if force:
# Force merge if parameter names match, and there are some measures in common
if set(scope_self.get_parameter_names()) != set(scope_other.get_parameter_names()):
self.log(f"not merging scope name {scope_name}, parameter names do not match current database")
continue
common_measure_names = list(set(scope_self.get_measure_names()) & set(scope_other.get_measure_names()))
if len(common_measure_names) == 0:
self.log(f"not merging scope name {scope_name}, measure names do not overlap current database")
continue
else:
# Unforced merge only when scopes are identical
if scope_self != scope_other:
self.log(f"not merging scope name {scope_name}, content does not match current database")
continue
common_measure_names = scope_self.get_measure_names()
self.log(f"merging scope name {scope_name}")
# transfer metamodels
source_mapping = {}
other_metamodel_ids = other.read_metamodel_ids(scope_name)
for other_metamodel_id in other_metamodel_ids:
other_metamodel = other.read_metamodel(scope_name, other_metamodel_id)
new_id = self.get_new_metamodel_id(scope_name)
other_metamodel.metamodel_id = new_id
if not dryrun:
self.write_metamodel(
scope_name,
metamodel=other_metamodel,
metamodel_id=new_id,
)
source_mapping[other_metamodel_id] = new_id
# transfer runs
n_runs_added = 0
try:
other_runs = other.get_run_list()
except AttributeError:
other_runs = pd.DataFrame(
columns=[
'run_id',
'experiment_id',
'run_source',
'run_status',
'run_valid',
'run_timestamp',
'run_location',
]
)
# transfer experiments
design_names = other.read_design_names(scope_name)
for design_name in design_names:
xl_df = other.read_experiment_parameters(scope_name, design_name)
proposed_design_name = design_name
design_match = False
experiment_id_map = None
if proposed_design_name in self.read_design_names(scope_name):
xl_df_self = self.read_experiment_parameters(scope_name, design_name)
if not xl_df.reset_index(drop=True).equals(
xl_df_self.reset_index(drop=True)
):
n = 2
while proposed_design_name in self.read_design_names(scope_name):
proposed_design_name = f"{design_name}_{n}"
n += 1
else:
design_match = True
experiment_id_map = pd.Series(data=xl_df_self.index, index=xl_df.index)
if not design_match:
# transfer parameters
if proposed_design_name == design_name:
self.log(f"experiment parameters updates for {scope_name}/{design_name}")
else:
self.log(f"experiment parameters updates for {scope_name}/{design_name} -> {proposed_design_name}")
if not dryrun:
self.write_experiment_parameters(scope_name, proposed_design_name, xl_df)
sources = other.read_experiment_measure_sources(scope_name, design_name)
# transfer relevant runs
if experiment_id_map is not None:
experiment_id_map_ = experiment_id_map
else:
experiment_id_map_ = pd.Series(data=xl_df.index, index=xl_df.index)
relevant_runs = other_runs['experiment_id'].map(experiment_id_map_)
copy_runs = other_runs.copy()
copy_runs['experiment_id'] = relevant_runs
copy_runs = copy_runs[~relevant_runs.isna()]
with self.conn:
n_runs_added = 0
cursor = self.conn.cursor()
for row in copy_runs.itertuples():
binding = dict(
run_id=row[0].bytes,
experiment_id=row[1],
run_source=row[2],
run_status=row[3],
run_valid=row[4],
run_timestamp=row[5],
run_location=row[6]
)
cursor.execute(
"""
INSERT OR IGNORE INTO ema_experiment_run (
run_id ,
experiment_id ,
run_source ,
run_status ,
run_valid ,
run_timestamp ,
run_location
) VALUES (
@run_id ,
@experiment_id ,
@run_source ,
@run_status ,
@run_valid ,
@run_timestamp ,
@run_location
)
""",
binding
)
if cursor.rowcount:
n_runs_added += 1
if n_runs_added:
self.log(f"added {n_runs_added} run_ids to database")
# transfer measures
for source in sources:
if source == 0:
# source is core model, make updates but do not overwrite non-null values
m_df0 = self.read_experiment_measures(scope_name, design_name, source=source, runs='all')
m_df1 = other.read_experiment_measures(scope_name, design_name, source=source, runs='all')[common_measure_names]
if experiment_id_map is not None:
reindexed = m_df1.index.levels[0].map(experiment_id_map)
m_df1.index.set_levels(
reindexed,
level=0,
inplace=True,
)
df0copy = m_df0.copy()
try:
if on_conflict == 'replace':
m_df = m_df1.combine_first(m_df0)
else:
m_df = m_df0.combine_first(m_df1)
except:
print("m_df0\n",m_df0)
print("m_df1\n",m_df1)
raise
n_diffs = count_diff_rows(m_df, df0copy)
self.log(f"experiment measures {n_diffs} updates "
f"for {scope_name}/{design_name}")
if n_diffs < max_diffs_in_log:
list_diffs, list_adds, list_drops = report_diff_rows(m_df, df0copy)
if list_diffs:
self.log(f" {scope_name}/{design_name} experiment measures updated: "
+", ".join(str(_) for _ in list_diffs))
if list_adds:
self.log(f" {scope_name}/{design_name} experiment measures added: "
+", ".join(str(_) for _ in list_adds))
if list_drops: # should be none
self.log(f" {scope_name}/{design_name} experiment measures dropped: "
+", ".join(str(_) for _ in list_drops))
if not dryrun:
self.write_experiment_measures(scope_name, 0, m_df)
else: # source is not core model, just copy
m_df = other.read_experiment_measures(scope_name, design_name, source=source)[common_measure_names]
if not dryrun:
self.write_experiment_measures(scope_name, source_mapping[source], m_df)
# merge logs
if isinstance(other, SQLiteDB):
self.merge_log(other)