Source code for emat.model.core_files.core_files

# -*- coding: utf-8 -*-

from typing import List, Union, Mapping, Dict, Tuple, Callable
import yaml
import os, sys, time
import shutil
import glob
import numpy as np
import pandas as pd
import subprocess
import warnings
import logging
from pathlib import Path
from ...model.core_model import AbstractCoreModel
from ...scope.scope import Scope
from ...database.database import Database
from ...util.docstrings import copydoc
from ...exceptions import *
from .parsers import *
from ...util.loggers import get_module_logger

_logger = get_module_logger(__name__)



def copy_model_outputs_1(
		local_model,
		remote_repository,
		file
):
	shutil.copyfile(
		os.path.join(local_model, "Outputs", file),
		os.path.join(remote_repository, "Outputs", file)
	)

def copy_model_outputs_ext(
		local_model,
		remote_repository,
		basename,
		ext=('.bin', '.dcb')
):
	for x in ext:
		copy_model_outputs_1(
			local_model,
			remote_repository,
			os.path.splitext(basename)[0] + x
		)

ALL = slice(None)


[docs]class FilesCoreModel(AbstractCoreModel): """ Setup connections and paths to a file reading core model Args: configuration: The configuration for this core model. This can be passed as a dict, or as a str which gives the filename of a YAML file that will be loaded. scope: The exploration scope, as a Scope object or as a str which gives the filename of a YAML file that will be loaded. safe: Load the configuration YAML file in 'safe' mode. This can be disabled if the configuration requires custom Python types or is otherwise not compatible with safe mode. Loading configuration files with safe mode off is not secure and should not be done with files from untrusted sources. db: An optional Database to store experiments and results. name: A name for this model, given as an alphanumeric string. The name is required by ema_workbench operations. If not given, "FilesCoreModel" is used. local_directory: Optionally explicitly give this local_directory to use, overriding any directory set in the config file. If not given either here or in the config file, then Python's cwd is used. """ def __init__(self, configuration: Union[str, Mapping], scope: Union[Scope, str], safe: bool = True, db: Database = None, name: str = 'FilesCoreModel', local_directory: Path = None, ): super().__init__( configuration=configuration, scope=scope, safe=safe, db=db, name=name, metamodel_id=0, ) self._local_directory = local_directory self.model_path = os.path.expanduser(self.config['model_path']) """Path: The directory of the 'live' model instance, relative to the local_directory.""" self.rel_output_path = self.config.get('rel_output_path', 'Outputs') """Path: The path to 'live' model outputs, relative to `model_path`.""" self.archive_path = os.path.expanduser(self.config['model_archive']) """Path: The directory where archived models are stored.""" self._parsers = [] @property def local_directory(self): """Path: The current local working directory for this model.""" return self._local_directory or self.config.get("local_directory", os.getcwd()) @local_directory.setter def local_directory(self, value): self._local_directory = value def __getstate__(self): state = super().__getstate__() # The SQLite Database does not serialize for usage in other # threads or processes, so we will pass through the database path # to open another new connection on the other end, assuming it is # a file object that can be re-opened for other connections. db = getattr(self, 'db', None) from ...database import SQLiteDB if isinstance(db, SQLiteDB): if os.path.exists(db.database_path): state['_sqlitedb_path_'] = db.database_path state['_sqlitedb_readonly_'] = db.readonly return state def __setstate__(self, state): # When we are running on a dask worker, functions # are executed in a different thread from the worker # itself, even if there is only one thread. To prevent # problems with SQLite, we check if this is a worker and # if there is only one thread, in which case we can # safely ignore the fact that the database is accessed # from a different thread than where it is created. from dask.distributed import get_worker try: worker = get_worker() except ValueError: n_threads = -1 else: n_threads = worker.nthreads database_path = state.pop('_sqlitedb_path_', None) database_readonly = state.pop('_sqlitedb_readonly_', False) self.__dict__ = state if database_path and not database_readonly: from ...database import SQLiteDB if os.path.exists(database_path): self.db = SQLiteDB( database_path, initialize='skip', readonly=database_readonly, check_same_thread=(n_threads!=1), ) @property def resolved_model_path(self): """ The model path to use. If `model_path` is set to an absolute path, then that path is returned, otherwise the `model_path` is joined onto the `local_directory`. Returns: str """ if self.model_path is None: raise MissingModelPathError('no model_path set for this core model') if os.path.isabs(self.model_path): return self.model_path else: return os.path.join(self.local_directory, self.model_path) @property def resolved_archive_path(self): """ The archive path to use. If `archive_path` is set to an absolute path, then that path is returned, otherwise the `archive_path` is joined onto the `local_directory`. Returns: str """ if self.archive_path is None: raise MissingArchivePathError('no archive set for this core model') if os.path.isabs(self.archive_path): return self.archive_path else: return os.path.join(self.local_directory, self.archive_path)
[docs] def add_parser(self, parser): """ Add a FileParser to extract performance measures. Args: parser (FileParser): The parser to add. """ if not isinstance(parser, FileParser): raise TypeError("parser must be an instance of FileParser") self._parsers.append(parser)
[docs] def get_parser(self, idx): """ Access a FileParser, used to extract performance measures. Args: idx (int): The position of the parser to get. Returns: FileParser """ return self._parsers[idx]
def model_init(self, policy): super().model_init(policy) def get_experiment_archive_path( self, experiment_id=None, makedirs=False, parameters=None, run_id=None, ): """ Returns a file system location to store model run outputs. For core models with long model run times, it is recommended to store the complete model run results in an archive. This will facilitate adding additional performance measures to the scope at a later time. Both the scope name and experiment id can be used to create the folder path. Args: experiment_id (int): The experiment id, which is also the row id of the experiment in the database. If this is omitted, an experiment id is read or created using the parameters. makedirs (bool, default False): If this archive directory does not yet exist, create it. parameters (dict, optional): The parameters for this experiment, used to create or lookup an experiment id. The parameters are ignored if `experiment_id` is given. run_id (UUID, optional): The run_id of this model run. If not given but a run_id attribute is stored in this FilesCoreModel instance, that value is used. Returns: str: Experiment archive path (no trailing backslashes). """ if experiment_id is None: if parameters is None: raise ValueError("must give `experiment_id` or `parameters`") db = getattr(self, 'db', None) if db is not None: with warnings.catch_warnings(): warnings.simplefilter("ignore", category=MissingIdWarning) experiment_id = db.get_experiment_id(self.scope.name, parameters) if run_id is None: run_id = getattr(self, 'run_id', None) try: exp_dir_name = f"exp_{experiment_id:03d}" except ValueError: exp_dir_name = f"exp_{experiment_id}" if run_id: exp_dir_name += f"_{run_id}" mod_results_path = os.path.join( self.resolved_archive_path, f"scp_{self.scope.name}", exp_dir_name, ) if makedirs: os.makedirs(mod_results_path, exist_ok=True) return mod_results_path def setup(self, params): """ Configure the core model with the experiment variable values. This method is the place where the core model set up takes place, including creating or modifying files as necessary to prepare for a core model run. When running experiments, this method is called once for each core model experiment, where each experiment is defined by a set of particular values for both the exogenous uncertainties and the policy levers. These values are passed to the experiment only here, and not in the `run` method itself. This facilitates debugging, as the `setup` method can potentially be used without the `run` method, allowing the user to manually inspect the prepared files and ensure they are correct before actually running a potentially expensive model. Each input exogenous uncertainty or policy lever can potentially be used to manipulate multiple different aspects of the underlying core model. For example, a policy lever that includes a number of discrete future network "build" options might trigger the replacement of multiple related network definition files. Or, a single uncertainty relating to the cost of fuel might scale both a parameter linked to the modeled per-mile cost of operating an automobile, as well as the modeled total cost of fuel used by transit services. At the end of the `setup` method, a core model experiment should be ready to run using the `run` method. Classes derived from `FilesCoreModel` do not necessarily need to call `super().setup(params)`, but may find it convenient to do so, as this implementation provides some standard functionality, including validation of parameter names, managing existing archive directories, and logging the start time to the archive. Args: params (dict): experiment variables including both exogenous uncertainty and policy levers Raises: KeyError: if a defined experiment variable is not supported by the core model """ experiment_id = params.pop("_experiment_id_", None) # Validate parameter names scope_param_names = set(self.scope.get_parameter_names()) for key in params.keys(): if key not in scope_param_names: self.log( f"SETUP ERROR: '{key}' not found in scope parameters", level=logging.ERROR, ) raise KeyError(f"'{key}' not found in scope parameters") # Get the experiment_id if stored db = getattr(self, 'db', None) if db is not None: run_id, experiment_id = self.db.new_run_id( self.scope.name, params, experiment_id=experiment_id ) else: import uuid if experiment_id is None: experiment_id = getattr(self, 'experiment_id', None) run_id = uuid.uuid4() self.run_id = run_id self.experiment_id = experiment_id # Rename any existing archive directories if experiment_id is not None: orig_archive = self.get_experiment_archive_path(experiment_id) if os.path.exists(orig_archive): n = 1 orig_archive = orig_archive.rstrip(os.path.sep) dirpath, basepath = os.path.split(orig_archive) new_archive = os.path.normpath(os.path.join(dirpath, f"{basepath}_OLD_{n}")) while os.path.exists(new_archive): n += 1 new_archive = os.path.normpath(os.path.join(dirpath, f"{basepath}_OLD_{n}")) shutil.move(orig_archive, new_archive) os.makedirs(orig_archive, exist_ok=True) with open(os.path.join(orig_archive, "_emat_start_.log"), 'at') as f: f.write("Starting model run at {0}\n".format(time.strftime("%Y-%m-%d %H:%M:%S"))) if run_id is not None: f.write(f"run_id = {run_id}\n")
[docs] @copydoc(AbstractCoreModel.load_measures) def load_measures( self, measure_names: List[str]=None, *, rel_output_path=None, abs_output_path=None, ): if rel_output_path is not None and abs_output_path is not None: raise ValueError("cannot give both `rel_output_path` and `abs_output_path`") elif rel_output_path is None and abs_output_path is None: output_path = os.path.join(self.resolved_model_path, self.rel_output_path) elif rel_output_path is not None: output_path = os.path.join(self.resolved_model_path, rel_output_path) else: # abs_output_path is not None output_path = abs_output_path if not os.path.isdir(output_path): raise NotADirectoryError(output_path) if measure_names is None: is_requested = lambda i: True else: requested_measure_names = set(measure_names) is_requested = lambda i: i in requested_measure_names results = {} for parser in self._parsers: if any(is_requested(name) for name in parser.measure_names): try: measures = parser.read(output_path) except FileNotFoundError as err: for name in parser.measure_names: if is_requested(name): warnings.warn(f'{name} unavailable, {err} not found') except Exception as err: for name in parser.measure_names: if is_requested(name): warnings.warn(f'{name} unavailable, {err!r}') else: for k, v in measures.items(): if is_requested(k): results[k] = v # Also assign to outcomes_output instead of returning, for ema_workbench compatibility self.outcomes_output = results return results
[docs] def load_archived_measures(self, experiment_id, measure_names=None): """ Load performance measures from an archived model run. Args: experiment_id (int): The id for the experiment to load. measure_names (Collection, optional): A subset of performance measure names to load. If not provided, all measures will be loaded. """ experiment_archive_path = self.get_experiment_archive_path(experiment_id) experiment_archive_zip = experiment_archive_path.rstrip("/\\")+".zip" if os.path.exists(experiment_archive_zip): _logger.info(f"zipped archive found, loading from {experiment_archive_zip}") import tempfile, zipfile with tempfile.TemporaryDirectory() as tmpdir: zipfile.ZipFile(experiment_archive_zip).extractall(tmpdir) return self.load_measures( measure_names, abs_output_path=os.path.join( tmpdir, self.rel_output_path, ) ) else: _logger.info(f"loading from {experiment_archive_path}") return self.load_measures( measure_names, abs_output_path=os.path.join( experiment_archive_path, self.rel_output_path, ) )
def archive(self, params, model_results_path, experiment_id:int=0): raise NotImplementedError def run(self): raise NotImplementedError def post_process(self, params, measure_names, output_path=None): """ Runs post processors associated with particular performance measures. This method is the place to conduct automatic post-processing of core model run results, in particular any post-processing that is expensive or that will write new output files into the core model's output directory. The core model run should already have been completed using `setup` and `run`. If the relevant performance measures do not require any post-processing to create (i.e. they can all be read directly from output files created during the core model run itself) then this method does not need to be overloaded for a particular core model implementation. The default implementation of this method is a no-op, but it is available to be overloaded for particular implementations. Args: params (dict): Dictionary of experiment variables, with keys as variable names and values as the experiment settings. Most post-processing scripts will not need to know the particular values of the inputs (exogenous uncertainties and policy levers), but this method receives the experiment input parameters as an argument in case one or more of these parameter values needs to be known in order to complete the post-processing. measure_names (List[str]): List of measures to be processed. Normally for the first pass of core model run experiments, post-processing will be completed for all performance measures. However, it is possible to use this argument to give only a subset of performance measures to post-process, which may be desirable if the post-processing of some performance measures is expensive. Additionally, this method may also be called on archived model results, allowing it to run to generate only a subset of (probably new) performance measures based on these archived runs. output_path (str, optional): Path to model outputs. If this is not given (typical for the initial run of core model experiments) then the local/default model directory is used. This argument is provided primarily to facilitate post-processing archived model runs to make new performance measures (i.e. measures that were not in-scope when the core model was actually run). Raises: KeyError: If post process is not available for specified measure """