Source code for dse_do_utils.scenariomanager

# Copyright IBM All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

# -----------------------------------------------------------------------------------
# -----------------------------------------------------------------------------------
# ScenarioManager - for CPD4.0 using decision_optimization_client instead of dd_scenario
# -----------------------------------------------------------------------------------
# -----------------------------------------------------------------------------------
import os
import glob
import pathlib
import zipfile
import tempfile

import docplex
import pandas as pd
from typing import Sequence, List, Dict, Tuple, Optional, Union

#  Typing aliases
from dse_do_utils.utilities import convert_size
from pathlib import Path

Inputs = Dict[str, pd.DataFrame]
Outputs = Dict[str, pd.DataFrame]
InputsOutputs = Tuple[Inputs, Outputs]

# Platform
# Different platform require different approaches for writing data assets
# ScenarioManager will try to detect platform automatically. However, these tests are sensitive and un-supported.
# Therefore, to allow better control and avoid dependency on underlying conditions, user can explicitly set the platform
import enum
[docs]class Platform(enum.Enum): CPDaaS = 1 # As of Nov 2021, CPDaaS uses project_lib CPD40 = 2 # CPD 4.0 uses ibm_watson_studio_lib CPD25 = 3 # CPD 2.5-3.5 uses project_lib (but differently from CPDaaS) Local = 4
[docs]class ScenarioManager(object): """ A ScenarioManager is responsible for loading and storing the input and output DataFrame dictionaries. The data can be loaded from and stored into: * A DO scenario * An Excel spreadsheet * A set of csv files Excel. Stores one DataFrame per sheet. Creates a `__index__` sheet that keeps track which DataFrame is input or output, and it restores table names that longer than the maximum of 31 in Excel. Usage 1 - Load data from Excel and store into DO scenario. Assumes DO model `MyModel` and an Excel file `datasets/MyExcelFile.xlsx` exists. The scenario will be created if it doesn't exist or otherwise gets overwritten:: sm = ScenarioManager(model_name='MyModel, scenario_name='Scenario_1) inputs, outputs = sm.load_data_from_excel('MyExcelFile') sm.write_data_into_scenario() Usage 2 - Load data from DO scenario. Assumes DO model `MyModel` and scenario exists. Typical use in a `#dd-ignore` cell in a solves notebook:: sm = ScenarioManager(model_name='MyModel, scenario_name='Scenario_1) inputs, outputs = sm.load_data_from_scenario() Usage 3 - Load data from all csv files in datasets into Excel.<br> Stores into `/datasets/excel_test.xlsx`.:: excel_output_file_name = 'excel_test' csv_directory = os.path.join(os.environ['DSX_PROJECT_DIR'], 'datasets') sm = ScenarioManager() inputs, outputs = sm.load_data_from_csv(csv_directory) sm.write_data_to_excel(excel_output_file_name) Usage 4 - Load data from Excel and store into Excel. Assumes Excel file `datasets/MyExcelFile.xlsx` exists. Will create a file `datasets/MyExcelFileOutput.xlsx`.:: sm = ScenarioManager() inputs, outputs = sm.load_data_from_excel('MyExcelFile') # Do something with the inputs or outputs sm.write_data_to_excel('MyExcelFileOutput') """ def __init__(self, model_name: Optional[str] = None, scenario_name: Optional[str] = None, local_root: Optional[Union[str, Path]] = None, project_id: Optional[str] = None, project_access_token: Optional[str] = None, project=None, template_scenario_name: Optional[str] = None, platform: Optional[Platform] = None, inputs: Inputs = None, outputs: Outputs = None, local_relative_data_path: str = 'assets/data_asset', data_directory: str = None): """Create a ScenarioManager. Template_scenario_name: name of a scenario with an (empty but) valid model that has been successfully run at least once. When creating a new scenario, will copy the template scenario. This ensures the new scenario can be updated with output generated from running the Jupyter notebook. This is a work-around for the problem that the DO Experiment will not show outputs updated/generated from a notebook unless the scenario has at least been solved successfully once. The way files are added as data assets in CPD/CPDaaS is different for different platforms and has been changing frequently with newer versions. The ScenarioManager will try and detect the platform to choose the appropriate method. However, these checks are sensitive and not supported by the platform. Therefore, the ScenarioManager allows explicit control via the argument `platform`. Valid choices are: `CPDaaS`, `CPD40`, `CPD25`, and `Local` Args: model_name (str): scenario_name (str): local_root (str): Path of root when running on a local computer project_id (str): Project-id, when running in WS Cloud, also requires a project_access_token project_access_token (str): When running in WS Cloud, also requires a project_id project (project_lib.Project): alternative for project_id and project_access_token for WS Cloud template_scenario_name (str): If scenario doesn't exist: create new one. If template_scenario_name is specified, use that as template. platform (Platform): Optionally control the platform (`CPDaaS`, `CPD40`, `CPD25`, and `Local`). If None, will try to detect automatically. local_relative_data_path (str): relative directory from the local_root. Used as default data_directory data_directory (str): Full path of data directory. Will override the platform dependent process. """ self.model_name = model_name self.scenario_name = scenario_name self.local_root = local_root self.project_id = project_id self.project_access_token = project_access_token self.project = project self.inputs = inputs self.outputs = outputs self.template_scenario_name = template_scenario_name self.local_relative_data_path = local_relative_data_path self.data_directory = data_directory if platform is None: platform = ScenarioManager.detect_platform() self.platform = platform # def __init__(self, model_name: Optional[str] = None, scenario_name: Optional[str] = None, local_root: Optional[str] = None): # self.model_name = model_name # self.scenario_name = scenario_name # self.local_root = local_root # self.inputs = None # self.outputs = None
[docs] def load_data(self, load_from_excel=False, excel_file_name=None): """Load data from either the DO scenario, or an Excel spreadsheet. The Excel spreadsheet is expected to be in the `datasets` folder, either in WS or local. Returns: inputs, outputs (tuple of dicts): the inputs and outputs dictionary of DataFrames """ if load_from_excel: self.inputs, self.outputs = self.load_data_from_excel(excel_file_name) else: scenario = ScenarioManager.get_do_scenario(self.model_name, self.scenario_name) self.inputs, self.outputs = self.load_data_from_scenario_s(scenario) return self.inputs, self.outputs
[docs] def get_data_directory(self) -> str: """Returns the path to the datasets folder. :return: path to the datasets folder """ # Added in v0.5.4.3 to force the data_directory on any platform, e.g. when using StorageVolumes if self.data_directory is not None: return self.data_directory if self.platform == Platform.CPDaaS: data_dir = os.environ['PWD'] # '/home/wsuser/work' or use os.environ['PWD'] elif self.platform == Platform.CPD40: from ibm_watson_studio_lib import access_project_or_space wslib = access_project_or_space() data_dir = wslib.mount.get_base_dir() elif self.platform == Platform.CPD25: # Note that the data dir in CPD25 is not an actual real directory and is NOT in the hierarchy of the JupyterLab folder data_dir = '/project_data/data_asset' # Do NOT use the os.path.join! elif self.platform == Platform.Local: if self.local_root is None: raise ValueError('The local_root should be specified if loading from a file from outside of Watson Studio') data_dir = os.path.join(self.local_root, self.local_relative_data_path) else: # TODO: get_root_directory requires updates data_dir = os.path.join(self.get_root_directory(), self.local_relative_data_path) return data_dir
[docs] def get_root_directory(self) -> str: """Return the root directory of the file system. If system is WS, it will return the DSX root, otherwise the directory specified in the local_root. Raises: ValueError if root directory doesn't exist. TODO: review the options other than Local """ if self.platform == Platform.CPDaaS: root_dir = '.' # '/home/wsuser/work' or use os.environ['PWD'] elif self.platform == Platform.CPD40: root_dir = "/userfs" elif self.platform == Platform.CPD25: root_dir = '.' # Do NOT use the os.path.join! elif self.platform == Platform.Local: if self.local_root is None: raise ValueError('The local_root should be specified if loading from a file from outside of Watson Studio') root_dir = self.local_root else: root_dir = '.' # if ScenarioManager.env_is_cpd25(): # root_dir = '.' # elif ScenarioManager.env_is_dsx(): # Note that this is False in DO! So don't run in DO # root_dir = os.environ['DSX_PROJECT_DIR'] # else: # if self.local_root is None: # raise ValueError('The local_root should be specified if loading from a file from outside of WS') # root_dir = self.local_root # Assert that root_dir actually exists if not os.path.isdir(root_dir): raise ValueError("Root directory `{}` does not exist.".format(root_dir)) return root_dir
[docs] def add_data_file_using_project_lib(self, file_path: str, file_name: Optional[str] = None) -> None: """Add a data file to the Watson Studio project. Applies to CP4Dv2.5 and WS Cloud/CP4DaaS Needs to be called after the file has been saved regularly in the file system in `/project_data/data_asset/` (for CPD2.5) or `/home/wsuser/work/` in CPDaaS. Ensures the file is visible in the Data Assets of the Watson Studio UI. Args: file_path (str): full file path, including the file name and extension file_name (str): name of data asset. Default is None. If None, the file-name will be extracted from the file_path. """ # Add to Project if self.project is None: from project_lib import Project self.project = Project.access() if file_name is None: file_name = os.path.basename(file_path) with open(file_path, 'rb') as f: self.project.save_data(file_name=file_name, data=f, overwrite=True)
[docs] def add_data_file_using_ws_lib(self, file_path: str, file_name: Optional[str] = None) -> None: """Add a data file to the Watson Studio project using the ibm_watson_studio_lib . Applies to CP4Dv4.0 TODO: where should the file be written? Needs to be called after the file has been saved regularly in the file system in `/project_data/data_asset/` (for CPD2.5) or `/home/wsuser/work/` in WS Cloud. Ensures the file is visible in the Data Assets of the Watson Studio UI. Args: file_path (str): full file path, including the file name and extension file_name (str): name of data asset. Default is None. If None, the file-name will be extracted from the file_path. """ # Add to Project if file_name is None: file_name = os.path.basename(file_path) with open(file_path, 'rb') as f: from ibm_watson_studio_lib import access_project_or_space wslib = access_project_or_space() wslib.save_data(asset_name_or_item=file_name, data=f.read(), overwrite=True)
# Notes: # * wslib.upload_file(file_path=file_path, file_name=file_name, overwrite=True) CANNOT(!) overwrite an existing asset # * Unlike with project_lib, we need to do a f.read() # * ibm_watson_studio_lib is not (yet?) available in CPDaaS, but if so similar to project_lib it may need a handle to the self.project. Thus this non-static method.
[docs] @staticmethod def add_data_file_using_ws_lib_s(file_path: str, file_name: Optional[str] = None) -> None: """Add a data file to the Watson Studio project using the ibm_watson_studio_lib . Applies to CP4Dv4.0 TODO: where should the file be written? Needs to be called after the file has been saved regularly in the file system in `/project_data/data_asset/` (for CPD2.5) or `/home/dsxuser/work/` in WS Cloud. Ensures the file is visible in the Data Assets of the Watson Studio UI. Args: file_path (str): full file path, including the file name and extension file_name (str): name of data asset. Default is None. If None, the file-name will be extracted from the file_path. """ # Add to Project if file_name is None: file_name = os.path.basename(file_path) with open(file_path, 'rb') as f: from ibm_watson_studio_lib import access_project_or_space wslib = access_project_or_space() wslib.save_data(asset_name_or_item=file_name, data=f.read(), overwrite=True)
# Note that wslib.upload_file(file_path=file_path, file_name=file_name, overwrite=True) CANNOT(!) overwrite an existing asset # Unlike with project_lib, we need to do a f.read()
[docs] @staticmethod def add_data_file_to_project_s(file_path: str, file_name: Optional[str] = None) -> None: """DEPRECATED: will never work on CP4DaaS since it requires the project_lib.Project Add a data file to the Watson Studio project. Applies to CP4Dv2.5. Needs to be called after the file has been saved regularly in the file system in `/project_data/data_asset/`. Ensures the file is visible in the Data Assets of the Watson Studio UI. Args: file_path (str): full file path, including the file name and extension file_name (str): name of data asset. Default is None. If None, the file-name will be extracted from the file_path. """ # Add to Project if file_name is None: file_name = os.path.basename(file_path) with open(file_path, 'rb') as f: from project_lib import Project project = Project.access() project.save_data(file_name=file_name, data=f, overwrite=True)
# ----------------------------------------------------------------- # Read and write from/to DO scenario - value-added # -----------------------------------------------------------------
[docs] def load_data_from_scenario(self) -> InputsOutputs: """Loads the data from a DO scenario""" self.inputs, self.outputs = self.load_data_from_scenario_s(self.model_name, self.scenario_name) return self.inputs, self.outputs
[docs] def write_data_into_scenario(self): """Writes the data into a DO scenario. Create new scenario and write data.""" return self.write_data_into_scenario_s(self.model_name, self.scenario_name, self.inputs, self.outputs, self.template_scenario_name)
# def write_data_into_scenario(self): # """Writes the data into a DO scenario. Create new scenario and write data.""" # return self.write_data_into_scenario_s(self.model_name, self.scenario_name, self.inputs, self.outputs)
[docs] def add_data_into_scenario(self, inputs=None, outputs=None): """Adds data to a DO scenario. If table exists, does an overwrite/replace.""" return ScenarioManager.add_data_into_scenario_s(self.model_name, self.scenario_name, inputs, outputs)
[docs] def replace_data_in_scenario(self, inputs=None, outputs=None): """Replaces all input, output or both. Note: you will need to specify the inputs or outputs you want to replace explicitly as input arguments. It will NOT get them from self.inputs or self.outputs! In this way, you can control which to update. E.g. after a solve, only update the outputs, not the inputs. """ return self.replace_data_into_scenario_s(self.model_name, self.scenario_name, inputs, outputs)
[docs] def update_solve_output_into_scenario(self, mdl, outputs): """Replaces all output and KPIs table in the scenario. Assumes the scenario exists. Will not change the inputs of the scenario. Generates the KPI table. Limitations: * Does NOT update the objective * Does NOT update the log Args: mdl (docplex.mp.model): the model that has been solved outputs (Dict): dictionary of DataFrames """ outputs['kpis'] = ScenarioManager.get_kpis_table_as_dataframe(mdl) self.outputs = outputs # Not necessary for the update! self.replace_data_in_scenario(inputs=None, outputs=outputs)
# ----------------------------------------------------------------- # Read and write from/to DO scenario - base functions # -----------------------------------------------------------------
[docs] def get_do_scenario(self, model_name, scenario_name): """Returns a DO scenario. Args: model_name (str): the name of the DO model scenario_name (str): the name of the scenario Returns: A dd-scenario.Container of type `scenario` Raises: ValueError: When either the model_name or the scenario_name doesn't match an existing entity. """ client = ScenarioManager.get_dd_client(self) dd_model_builder = client.get_experiment(name=model_name) if dd_model_builder is None: raise ValueError('No DO model with name `{}` exists'.format(model_name)) scenario = dd_model_builder.get_scenario(name=scenario_name) if scenario is None: raise ValueError('No DO scenario with name `{}` exists in model `{}`'.format(scenario_name, model_name)) return scenario
# @staticmethod
[docs] def load_data_from_scenario_s(self, model_name: str, scenario_name: str) -> InputsOutputs: """Loads the data from a DO scenario. Returns empty dict if no tables.""" # scenario = ScenarioManager.get_do_scenario(model_name, scenario_name) scenario = self.get_do_scenario(model_name, scenario_name) # Load all input data as a map { data_name: data_frame } inputs = scenario.get_tables_data(category='input') outputs = scenario.get_tables_data(category='output') return (inputs, outputs)
# @staticmethod
[docs] def write_data_into_scenario_s(self, model_name: str, scenario_name: str, inputs: Optional[Inputs] = None, outputs: Optional[Outputs] = None, template_scenario_name: Optional[str] = None) -> None: """Create new scenario and write data. If scenario exists: clears all existing data. If scenario doesn't exists: create new one. If template_scenario_name is specified, use that as template. If the existing scenario has a model, it keeps the model. If there is no existing scenario, the user needs to add a model manually in DO. Tested: works reliably. TODO: one small issue: if scenario exists and has been solved before, it clears all inputs and outputs (including the KPIs), but not the objective value. The DO UI shows as if the model has been solved. """ # Create scenario client = ScenarioManager.get_dd_client(self) dd_model_builder = client.get_experiment(name=model_name) if dd_model_builder is None: raise ValueError('No DO model with name `{}` exists'.format(model_name)) scenario = ScenarioManager.create_new_scenario(client, dd_model_builder, new_scenario_name=scenario_name, template_scenario_name=template_scenario_name) if inputs is not None: for table in inputs: scenario.add_table_data(table, inputs[table], category='input') if outputs is not None: for table in outputs: scenario.add_table_data(table, outputs[table], category='output')
# @staticmethod
[docs] def add_data_into_scenario_s(self, model_name: str, scenario_name: str, inputs: Optional[Inputs] = None, outputs: Optional[Outputs] = None) -> None: """Adds tables in existing scenario. Replaces table, if table exists. Assumes scenario exists. Does not explicitly clear existing tables. Could be used in post-processing. """ scenario = self.get_do_scenario(model_name, scenario_name) if inputs is not None: for table in inputs: scenario.add_table_data(table, inputs[table], category='input') if outputs is not None: for table in outputs: scenario.add_table_data(table, outputs[table], category='output')
# @staticmethod
[docs] def replace_data_into_scenario_s(self, model_name: str, scenario_name: str, inputs: Optional[Inputs] = None, outputs: Optional[Outputs] = None) -> None: """Replaces all input, output or both. If input/output are not None, clears inputs/outputs first Assumes scenario exists. Does explicitly clear all existing input/output tables. """ client = self.get_dd_client() scenario = self.get_do_scenario(model_name, scenario_name) if inputs is not None: ScenarioManager.clear_scenario_data(client, scenario, category='input') for table in inputs: scenario.add_table_data(table, inputs[table], category='input') if outputs is not None: ScenarioManager.clear_scenario_data(client, scenario, category='output') for table in outputs: scenario.add_table_data(table, outputs[table], category='output')
# ----------------------------------------------------------------- # Scenario operations # -----------------------------------------------------------------
[docs] @staticmethod def clear_scenario_data(client, scenario, category=None): """Clears all input and output tables from a scenario. Current API requires the client. Args: client scenario category (string ['input','output']): If None, clears all tables. """ for table in scenario.get_tables(category): client.delete_table(scenario, table) # API on Client-only for now
# TODO: test
[docs] @staticmethod def create_new_scenario(client, model_builder, new_scenario_name: str, template_scenario_name=None): """ Creates a new scenario from a template. The template is found either from the template_scenario_name, or if this is None, from the new_scenario_name. If a scenario with the new name already exists, all input and output tables are cleared. Thereby keeping the solve code. Creates a new blank scenario if a scenario with this name doesn't exist. Args: client (decision_optimization_client.Client): Client managing the DO model model_builder (decision_optimization_client.Experiment): The DO model new_scenario_name (str): Name for the new scenario template_scenario_name (str): Name of an existing scenario Returns: A decision_optimization_client.Container of type scenario Raises: ValueError: new_scenario_name is None ValueError: new_scenario_name is the same as template_scenario_name """ if new_scenario_name is None: raise ValueError('The new_scenario_name cannot be None') if template_scenario_name is not None: if (new_scenario_name == template_scenario_name): raise ValueError( 'The new_scenario_name `{}` must be different from the template_scenario_name `{}`'.format( new_scenario_name, template_scenario_name)) # Copy and clear data from template scenario template_scenario = model_builder.get_scenario(name=template_scenario_name) if template_scenario is not None: # Delete existing target scenario if it exists scenario = model_builder.get_scenario(name=new_scenario_name) if scenario is not None: model_builder.delete_container(scenario) # Copy scenario from template scenario = template_scenario.copy(new_scenario_name) # Clear the data in the scenario ScenarioManager.clear_scenario_data(client, scenario) else: raise ValueError( "No scenario with template_scenario_name `{}` exists in model".format(template_scenario_name)) else: # If scenario does not already exists: create a new blank scenario. Else: clear the data scenario = model_builder.get_scenario(name=new_scenario_name) if (scenario == None): # Create a new scenario (does not have solve code) scenario = model_builder.create_scenario(name=new_scenario_name) else: # Existing scenario probably already has solver code, so maintain that. ScenarioManager.clear_scenario_data(client, scenario) return scenario
[docs] @staticmethod def get_kpis_table_as_dataframe(mdl) -> pd.DataFrame: """Return a DataFrame with the KPI names and values in the mdl. This table is compatible with the representation in DO4WS and can be updated in the scenario. Args: mdl (docplex.mp.model.Model) Returns: pd.DataFrame with columns NAME and VALUE: the KPIs in the mdl """ if mdl.solution is not None: all_kpis = [(kp.name, kp.compute()) for kp in mdl.iter_kpis()] else: all_kpis = [] df_kpis = pd.DataFrame(all_kpis, columns=['NAME', 'VALUE']) return df_kpis
# def get_input_output_data(self): # """Returns the loaded input and output data as dictionaries of DataFrames. # Can also be accessed as `inputs` and `outputs` properties of the ScenarioManager. """ # return self.inputs, self.outputs # ----------------------------------------------------------------- # Read and write from/to Excel - value added-functions # -----------------------------------------------------------------
[docs] def load_data_from_excel(self, excel_file_name: str) -> InputsOutputs: """Load data from an Excel file located in the `datasets` folder of the root directory. Convenience method. If run not on WS, requires the `root_dir` property passed in the ScenarioManager constructor """ if pathlib.Path(excel_file_name).suffix == '.xlsx': file_name = excel_file_name else: file_name = excel_file_name + '.xlsx' if self.platform == Platform.CPDaaS: # For CPDaaS only: file doesn't exist in /home/wsuser/work/. We have to get it. file = self.project.get_file(file_name) file.seek(0) xl = pd.ExcelFile(file) else: datasets_dir = self.get_data_directory() excel_file_path = os.path.join(datasets_dir, file_name) xl = pd.ExcelFile(excel_file_path) # Read data from Excel self.inputs, self.outputs = ScenarioManager.load_data_from_excel_s(xl) return self.inputs, self.outputs
# def load_data_from_excel(self, excel_file_name: str) -> InputsOutputs: # """Load data from an Excel file located in the `datasets` folder of the root directory. # Convenience method. # If run not on WS, requires the `root_dir` property passed in the ScenarioManager constructor # """ # # root_dir = self.get_root_directory() # datasets_dir = self.get_data_directory() # excel_file_path = os.path.join(datasets_dir, excel_file_name + '.xlsx') # xl = pd.ExcelFile(excel_file_path) # # Read data from Excel # self.inputs, self.outputs = ScenarioManager.load_data_from_excel_s(xl) # return self.inputs, self.outputs
[docs] def write_data_to_excel(self, excel_file_name: str = None, unique_file_name: bool = True, copy_to_csv: bool = False) -> str: """Write inputs and/or outputs to an Excel file in datasets. The inputs and outputs as in the attributes `self.inputs` and `self.outputs` of the ScenarioManager If the excel_file_name is None, it will be generated from the model_name and scenario_name: MODEL_NAME + "_" + SCENARIO_NAME + "_output" If Excel has a file with the same name opened, it will throw a PermissionError. If so and the flag `unique_file_name` is set to True, it will save the new file with a unique name. I.e., if the file is not opened by Excel, the file is overwritten. Args: excel_file_name (str): The file name for the Excel file. unique_file_name (bool): If True, generates a unique file name in case the existing file is opened(!) by Excel copy_to_csv (bool): If true, will create a copy of the file with the extension `.csv`. DEPRECATED, NON-FUNCTIONAL """ if excel_file_name is None: if self.model_name is not None and self.scenario_name is not None: excel_file_name = "{}_{}_output".format(self.model_name, self.scenario_name) else: raise ValueError("The argument excel_file_name can only be 'None' if both the model_name '{}' and the scenario_name '{}' have been specified.".format(self.model_name, self.scenario_name)) # root_dir = self.get_root_directory() # Save the Excel file: if pathlib.Path(excel_file_name).suffix != '.xlsx': excel_file_name = excel_file_name + '.xlsx' data_dir = self.get_data_directory() excel_file_path_1 = os.path.join(data_dir, excel_file_name) if unique_file_name: try: writer_1 = pd.ExcelWriter(excel_file_path_1, engine='xlsxwriter') except PermissionError: excel_file_path_1 = self.get_unique_file_name(excel_file_path_1) writer_1 = pd.ExcelWriter(excel_file_path_1, engine='xlsxwriter') else: writer_1 = pd.ExcelWriter(excel_file_path_1, engine='xlsxwriter') ScenarioManager.write_data_to_excel_s(writer_1, inputs=self.inputs, outputs=self.outputs) writer_1.close() # .save() self.add_file_as_data_asset(excel_file_path_1, excel_file_name) # if self.platform == Platform.CPDaaS: # self.add_data_file_using_project_lib(excel_file_path_1, excel_file_name + '.xlsx') # elif self.platform == Platform.CPD40: # self.add_data_file_using_ws_lib(excel_file_path_1, excel_file_name + '.xlsx') # elif self.platform == Platform.CPD25: # self.add_data_file_using_project_lib(excel_file_path_1, excel_file_name + '.xlsx') # # Save the csv copy (no longer supported in CPD25 because not necessary) # elif copy_to_csv: # excel_file_path_2 = os.path.join(data_dir, excel_file_name + 'to_csv.xlsx') # csv_excel_file_path_2 = os.path.join(data_dir, excel_file_name + '_xlsx.csv') # writer_2 = pd.ExcelWriter(excel_file_path_2, engine='xlsxwriter') # ScenarioManager.write_data_to_excel_s(writer_2, inputs=self.inputs, outputs=self.outputs) # writer_2.save() # os.rename(excel_file_path_2, csv_excel_file_path_2) return excel_file_path_1
[docs] @staticmethod def get_unique_file_name(path): filename, extension = os.path.splitext(path) counter = 1 while os.path.exists(path): path = filename + "(" + str(counter) + ")" + extension counter += 1 return path
[docs] def add_file_as_data_asset(self, file_path: str, asset_name: str = None): """Register an existing file as a data asset in CPD. :param file_path: full path of the file :param asset_name: name of asset. If None, will get the name from the file :return: """ if asset_name is None: asset_name = os.path.basename(file_path) if self.platform in [Platform.CPD40]: self.add_data_file_using_ws_lib_s(file_path, asset_name) elif self.platform in [Platform.CPD25, Platform.CPDaaS]: self.add_data_file_using_project_lib(file_path, asset_name) else: # i.e Local: do not register as data asset pass
# def add_file_as_data_asset(self, file_path: str, asset_name:str = None): # """Register an existing file as a data asset in CPD. # # :param file_path: full path of the file # :param asset_name: name of asset. If None, will get the name from the file # :return: # """ # ScenarioManager.add_file_as_data_asset_s(file_path, asset_name, self.platform)
[docs] @staticmethod def add_file_as_data_asset_s(file_path: str, asset_name: str = None, platform: Platform = None): """Register an existing file as a data asset in CPD. VT 2022-01-21: this method is incorrect for CPDaaS. Should use project_lib. :param file_path: full path of the file :param asset_name: name of asset. If None, will get the name from the file :param platform: CPD40, CPD25, CPSaaS, or Local. If None, will autodetect. :return: """ if asset_name is None: asset_name = os.path.basename(file_path) if platform is None: platform = ScenarioManager.detect_platform() if platform in [Platform.CPD40, Platform.CPDaaS]: ScenarioManager.add_data_file_using_ws_lib_s(file_path) elif platform == Platform.CPD25: ScenarioManager.add_data_file_to_project_s(file_path, asset_name) else: # i.e Local: do not register as data asset pass
# ----------------------------------------------------------------- # Read and write from/to Excel - base functions # -----------------------------------------------------------------
[docs] @staticmethod def load_data_from_excel_s(xl: pd.ExcelFile, table_index_sheet: str = '_table_index_', input_table_names:List[str]=None, output_table_names:List[str]=None) -> InputsOutputs: """ Create dataFrames from the sheets of the Excel file. Store in dictionary df_dict with table_name as key. The table_name is either the name of the sheet, or the table_name as defined in the table_index_sheet. In the default case, when the input_table_names or output_table_names are None, the category of the table (i.e. input or output) is driven off the value in the table_index_sheet. If not listed in table_index_sheet, it is placed in the inputs. However, to reduce the load time for certain applications, we can restrict the tables it loads by specifying them in the input_table_names or output_table_names. If one of them is not None, it wil only load those tables and categorize them accordingly. Note that if either input_table_names or output_table_names is used, if applicable, they would refer to the *translated* tables names by the table_index_sheet. (I.e. not the abbreviated names used in the sheet names.) Args: xl (pandas.ExcelFile): Excel file table_index_sheet (str): Name of table index sheet input_table_names (List[str]): names of input tables to read output_table_names (List[str]): names of output tables to read Returns: (Dict[str,DataFrame], Dict[str,DataFrame]): A tuple of inputs and outputs dictionaries of DataFrames, one df per sheet """ # Check for table_index sheet: table_index_df = None if (table_index_sheet is not None) and (table_index_sheet in xl.sheet_names): table_index_df = xl.parse(table_index_sheet) table_index_df.set_index('sheet_name', inplace=True) # Load all sheets: inputs = {} outputs = {} for sheet in xl.sheet_names: if sheet != table_index_sheet: # Do not load the table_index as a df_dict DataFrame table_name = sheet # default if no abbreviation category = 'input' # Translate table_name if possible: if (table_index_df is not None): if (sheet in table_index_df.index.values): table_name = table_index_df.loc[sheet].table_name if ('category' in table_index_df.columns.values): category = table_index_df.loc[sheet].category if input_table_names is None and output_table_names is None: # Categorize according to the table_index_sheet if category == 'output': outputs[table_name] = xl.parse(sheet) else: inputs[table_name] = xl.parse(sheet) else: # Categorize according to the input/output_table_names if input_table_names is not None and table_name in input_table_names: inputs[table_name] = xl.parse(sheet) elif output_table_names is not None and table_name in output_table_names: outputs[table_name] = xl.parse(sheet) # Original code (before adding input/output_table_names) # if category == 'output': # outputs[table_name] = xl.parse(sheet) # else: # inputs[table_name] = xl.parse(sheet) return inputs, outputs
# def load_data_from_excel_s(xl: pd.ExcelFile, table_index_sheet: str = '_table_index_') -> InputsOutputs: # """ # Create dataFrames from the sheets of the Excel file. # Store in dictionary df_dict with table_name as key. # The table_name is either the name of the sheet, or the table_name as defined in the table_index_sheet. # TODO: Test allow for distinction between input and output via the index-sheet # # Args: # xl (pandas.ExcelFile): Excel file # table_index_sheet (str): Name of table index sheet # # Returns: # (Dict[str,DataFrame], Dict[str,DataFrame]): A tuple of inputs and outputs dictionaries of DataFrames, # one df per sheet # """ # # Check for table_index sheet: # table_index_df = None # if (table_index_sheet is not None) and (table_index_sheet in xl.sheet_names): # table_index_df = xl.parse(table_index_sheet) # table_index_df.set_index('sheet_name', inplace=True) # # # Load all sheets: # inputs = {} # outputs = {} # for sheet in xl.sheet_names: # if sheet != table_index_sheet: # Do not load the table_index as a df_dict DataFrame # table_name = sheet # default if no abbreviation # category = 'input' # # Translate table_name if possible: # if (table_index_df is not None): # if (sheet in table_index_df.index.values): # table_name = table_index_df.loc[sheet].table_name # if ('category' in table_index_df.columns.values): # TODO: test! # category = table_index_df.loc[sheet].category # # if category == 'output': # outputs[table_name] = xl.parse(sheet) # else: # inputs[table_name] = xl.parse(sheet) # return inputs, outputs @staticmethod def _create_truncted_post_fixed_name(long_name: str, max_length: int, index: int) -> str: """Create a trunced name post-fixed with '_<index>' where the total length of the string <= max_length""" post_fix = '_' + str(index) return long_name[:max_length - len(post_fix)] + post_fix @staticmethod def _create_unique_abbreviated_name(long_name: str, max_length: int, existing_names: Sequence[str]) -> str: """Create a unique, abbreviated name such that it is not a member of the existing_names set. Name is made unique by post-fixing '_<index>' where index is an increasing integer, starting at 0 """ name = long_name if len(name) > max_length: name = ScenarioManager._create_truncted_post_fixed_name(long_name, max_length, 0) for index in range(1, 9999): if name in existing_names: name = ScenarioManager._create_truncted_post_fixed_name(long_name, max_length, index) else: break return name
[docs] @staticmethod def write_data_to_excel_s(writer: pd.ExcelWriter, inputs: Optional[Inputs] = None, outputs: Optional[Outputs] = None, table_index_sheet: str = '_table_index_') -> None: """Writes all dataframes in the inputs and outputs to the Excel writer, with sheet-names based on the keys of the inputs/outputs. Due to the Excel limitation of maximum 31 characters for the sheet-name, tables names longer than the 31 characters will be abbreviated with a unique name. The mapping between the original table-name and abbreviated name is recorded in a separate sheet named by the table_index_sheet. Args: writer (pandas.ExcelWriter): The Excel writer to write the file inputs (Dict of DataFrames): inputs outputs (Dict of DataFrames): outputs table_index_sheet (str): name for the index sheet """ table_index = [] # to hold dicts with keys 'table_name', 'sheet_name', 'category' (`input` or `output`) sheet_names = set() if (inputs is not None) and (type(inputs) is dict): for table_name, df in inputs.items(): # truncate table name to 31 characters due to sheet name limit in Excel: sheet_name = ScenarioManager._create_unique_abbreviated_name(table_name, 31, sheet_names) sheet_names.add(sheet_name) df.to_excel(writer, sheet_name, index=False) # Store row in table_index table_index.append({'table_name': table_name, 'sheet_name': sheet_name, 'category': 'input'}) if (outputs is not None) and (type(outputs) is dict): for table_name, df in outputs.items(): # truncate table name to 31 characters due to sheet name limit in Excel: sheet_name = ScenarioManager._create_unique_abbreviated_name(table_name, 31, sheet_names) sheet_names.add(sheet_name) df.to_excel(writer, sheet_name, index=False) # Store row in table_index table_index.append({'table_name': table_name, 'sheet_name': sheet_name, 'category': 'output'}) # Add table_index sheet if applicable: if (len(table_index) > 0) & (table_index_sheet is not None): index_df = pd.DataFrame(table_index) index_df.to_excel(writer, table_index_sheet, index=False)
# ----------------------------------------------------------------- # Load data from csv # -----------------------------------------------------------------
[docs] def load_data_from_csv(self, csv_directory: str, input_csv_name_pattern: str = "*.csv", output_csv_name_pattern: Optional[str] = None, **kwargs) -> InputsOutputs: """Load data from matching csv files in a directory. Uses glob.glob() to pattern-match files in the csv_directory. If you want to load one file, specify the full name including the `.csv` extension. Args: csv_directory (str): Relative directory from the root input_csv_name_pattern (str): name pattern to find matching csv files for inputs output_csv_name_pattern (str): name pattern to find matching csv files for outputs **kwargs: Set of optional arguments for the pd.read_csv() function """ root_dir = self.get_root_directory() csv_full_directory = os.path.join(root_dir, csv_directory) # Read data self.inputs = {} self.outputs = {} if input_csv_name_pattern is not None: self.inputs = ScenarioManager.load_data_from_csv_s(csv_full_directory, input_csv_name_pattern, **kwargs) if output_csv_name_pattern is not None: self.outputs = ScenarioManager.load_data_from_csv_s(csv_full_directory, output_csv_name_pattern, **kwargs) return self.inputs, self.outputs
[docs] @staticmethod def load_data_from_csv_s(csv_directory: str, csv_name_pattern: str = "*.csv", **kwargs) -> Dict[str, pd.DataFrame]: """Read data from all matching .csv files in a directory. Args: csv_directory (str): the full path of a directory containing one or more .csv files. csv_name_pattern (str): name pattern to find matching csv files **kwargs: Set of optional arguments for the pd.read_csv() function Returns: data: dict of DataFrames. Keys are the .csv file names. """ inputs = {} # outputs = {} for file_path in glob.glob( os.path.join(csv_directory, csv_name_pattern)): # os.path.join is safe for both Unix and Win # Read csv df = pd.read_csv(file_path, **kwargs) table_name = pathlib.Path(file_path).stem # head, tail = os.path.split(file_path) # table_name = tail[:-4] # remove the '.csv' inputs[table_name] = df return inputs # , outputs
[docs] def write_data_to_csv(self) -> None: """Write inputs and/or outputs to .csv files in the root/datasets folder. Args: None Returns: None """ # root_dir = self.get_root_directory() # csv_directory = os.path.join(root_dir, 'datasets') csv_directory = self.get_data_directory() ScenarioManager.write_data_to_csv_s(csv_directory, inputs=self.inputs, outputs=self.outputs)
[docs] @staticmethod def write_data_to_csv_s(csv_directory: str, inputs: Optional[Inputs] = None, outputs: Optional[Outputs] = None) -> None: """Write data to .csv files in a directory. Name as name of DataFrame. Args: csv_directory (str): the full path of a directory for the .csv files. inputs (Dict of DataFrames): inputs outputs (Dict of DataFrames): outputs Returns: None """ platform = ScenarioManager.detect_platform() if inputs is not None: for table_name, df in inputs.items(): file_path = os.path.join(csv_directory, table_name + ".csv") print("Writing {}".format(file_path)) df.to_csv(file_path, index=False) # ScenarioManager.add_file_as_data_asset_s(file_path, table_name + ".csv", platform=platform) if outputs is not None: for table_name, df in outputs.items(): file_path = os.path.join(csv_directory, table_name + ".csv") print("Writing {}".format(file_path)) df.to_csv(file_path, index=False)
# ScenarioManager.add_file_as_data_asset_s(file_path, table_name + ".csv", platform=platform) # ----------------------------------------------------------------- # Load data from parquet # -----------------------------------------------------------------
[docs] def load_data_from_parquet(self, directory: str, input_name_pattern: str = "*.parquet", output_name_pattern: Optional[str] = None, **kwargs) -> InputsOutputs: """Load data from matching parquet files in a directory. Uses glob.glob() to pattern-match files in the directory. If you want to load one file, specify the full name including the `.parquet` extension. Args: directory (str): Relative directory from the root input_name_pattern (str): name pattern to find matching parquet files for inputs output_name_pattern (str): name pattern to find matching parquet files for outputs **kwargs: Set of optional arguments for the pd.read_parquet() function """ root_dir = self.get_root_directory() full_directory = os.path.join(root_dir, directory) # Read data from parquet if input_name_pattern is not None: self.inputs = ScenarioManager.load_data_from_parquet_s(full_directory, input_name_pattern, **kwargs) if output_name_pattern is not None: self.outputs = ScenarioManager.load_data_from_parquet_s(full_directory, output_name_pattern, **kwargs) return self.inputs, self.outputs
[docs] @staticmethod def load_data_from_parquet_s(directory: str, file_name_pattern: str = "*.parquet", **kwargs) -> Dict[str, pd.DataFrame]: """Read data from all matching .parquet files in a directory. Args: directory (str): the full path of a directory containing one or more .parquet files. file_name_pattern (str): name pattern to find matching parquet files **kwargs: Set of optional arguments for the pd.read_parquet() function Returns: data: dict of DataFrames. Keys are the .parquet file names. """ inputs = {} for file_path in glob.glob( os.path.join(directory, file_name_pattern)): # os.path.join is safe for both Unix and Win # Read parquet df = pd.read_parquet(file_path, **kwargs) table_name = pathlib.Path(file_path).stem inputs[table_name] = df return inputs
[docs] def write_data_to_parquet(self, directory: str, inputs: Optional[Inputs] = None, outputs: Optional[Outputs] = None) -> None: """Write inputs and/or outputs to .parquet files in the target folder. Args: directory (str): Relative directory from the root Returns: None """ root_dir = self.get_root_directory() directory_path = os.path.join(root_dir, directory) ScenarioManager.write_data_to_parquet_s(directory_path, inputs=inputs, outputs=outputs)
[docs] @staticmethod def write_data_to_parquet_s(directory: str, inputs: Optional[Inputs] = None, outputs: Optional[Outputs] = None) -> None: """Write data to .parquet files in a directory. Name as name of DataFrame. Args: directory (str): the full path of a directory for the .parquet files. inputs (Dict of DataFrames): inputs outputs (Dict of DataFrames): outputs Returns: None """ if inputs is not None: for table_name, df in inputs.items(): file_path = os.path.join(directory, table_name + ".parquet") print("Writing input {}".format(file_path)) df.to_parquet(file_path, index=False) if outputs is not None: for table_name, df in outputs.items(): file_path = os.path.join(directory, table_name + ".parquet") print("Writing output {}".format(file_path)) df.to_parquet(file_path, index=False)
# ----------------------------------------------------------------- # Read from / write to zipped set of csv files # -----------------------------------------------------------------
[docs] @staticmethod def load_data_from_zip_csv_s(zip_file_path: str, file_size_limit: int = None, **kwargs) -> Dict[str, pd.DataFrame]: """Read data from a zip file with .csv files. Args: zip_file_path (str): the full path of a zip file containing one or more .csv files. file_size_limit (int): maximum file size in bytes. None implies no limit. **kwargs: Set of optional arguments for the pd.read_csv() function Returns: data: dict of DataFrames. Keys are the .csv file names. """ inputs = {} with zipfile.ZipFile(zip_file_path, "r") as f: for csv_file in f.infolist(): if pathlib.Path(csv_file.filename).suffix.lower() == '.csv': table_name = pathlib.Path(csv_file.filename).stem # print(f"Reading table = {table_name}. File-size = {convert_size(csv_file.file_size)}") if file_size_limit is None or csv_file.file_size <= file_size_limit: df = pd.read_csv(f.open(csv_file.filename), **kwargs) inputs[table_name] = df #print(f"Read {table_name}: {df.shape[0]} rows and {df.shape[1]} columns") else: pass #print(f"Read {table_name}: skipped") return inputs
[docs] @staticmethod def write_data_to_zip_csv_s(zip_file_path: str, inputs: Inputs = None, outputs: Outputs = None, **kwargs): """Write data as a zip file with .csv files. inputs and outputs dictionaries are merged and written in same zip. Args: zip_file_path (str): the full path of a zip file. inputs: dict of input DataFrames outputs: dict of input DataFrames **kwargs: Set of optional arguments for the df.to_csv() function Returns: None """ dfs = {} if inputs is not None: dfs = {**dfs, **inputs} if outputs is not None: dfs = {**dfs, **outputs} with zipfile.ZipFile(zip_file_path, 'w') as zipMe: with tempfile.TemporaryDirectory() as tmpdir: for table_name, df in dfs.items(): filename = table_name + ".csv" file_path = os.path.join(tmpdir, filename) # print(f"Write table {table_name}, rows = {df.shape[0]} as {file_path}") df.to_csv(file_path, index=False, **kwargs) zipMe.write(file_path, arcname=filename, compress_type=zipfile.ZIP_DEFLATED)
# ----------------------------------------------------------------- # Utils # -----------------------------------------------------------------
[docs] @staticmethod def detect_platform() -> Platform: if ScenarioManager.env_is_wscloud(): platform = Platform.CPDaaS elif ScenarioManager.env_is_cpd40(): platform = Platform.CPD40 elif ScenarioManager.env_is_cpd25(): platform = Platform.CPD25 else: platform = Platform.Local return platform
[docs] @staticmethod def env_is_cpd40() -> bool: """Return true if environment is CPDv4.0.2 and in particular supports ibm_watson_studio_lib to get access to data assets. Notes: - The `import from ibm_watson_studio_lib import access_project_or_space` does NOT fail in CPDaaS - The `wslib = access_project_or_space()` does fail in CPDaaS, however with an ugly error message - Current ugly work-around is to always first test for CPDaaS using the environment variable - TODO: prevent error/warning in CPDaaS """ try: from ibm_watson_studio_lib import access_project_or_space wslib = access_project_or_space() wslib.mount.get_base_dir() is_cpd40 = True except: is_cpd40 = False return is_cpd40
[docs] @staticmethod def env_is_dsx() -> bool: """Return true if environment is DSX""" return 'DSX_PROJECT_DIR' in os.environ
[docs] @staticmethod def env_is_cpd25() -> bool: """Return true if environment is CPDv2.5""" return 'PWD' in os.environ
[docs] @staticmethod def env_is_wscloud() -> bool: """Return true if environment is WS Cloud""" # return 'PWD' in os.environ and os.environ['PWD'] == '/home/wsuser/work' return 'RUNTIME_ENV_APSX_URL' in os.environ and os.environ['RUNTIME_ENV_APSX_URL'] == 'https://api.dataplatform.cloud.ibm.com'
[docs] def get_dd_client(self): """Return the Client managing the DO scenario. Returns: new decision_optimization_client.Client """ from decision_optimization_client import Client if self.project is not None: pc = self.project.project_context return Client(pc=pc) elif (self.project_id is not None) and (self.project_access_token is not None): # When in WS Cloud: from project_lib import Project # The do_optimization project token is an authorization token used to access project resources like data sources, connections, and used by platform APIs. project = Project(project_id=self.project_id, project_access_token=self.project_access_token) pc = project.project_context return Client(pc=pc) else: # In WSL/CPD: return Client()
[docs] def print_table_names(self) -> None: """Print the names of the input and output tables. For development and debugging.""" print("Input tables: {}".format(", ".join(self.inputs.keys()))) print("Output tables: {}".format(", ".join(self.outputs.keys())))
[docs] def export_model_as_lp(self, mdl, model_name: Optional[str] = None) -> str: """Exports the model as an .lp file in the data assets. Args: mdl (docplex.mp.model): the docplex model model_name (str): name of model (excluding the `.lp`). If no model_name, it uses the `mdl.name` Returns: (str): full file path of lp file Note: now a method of ScenarioManager (instead of OptimizationEngine), so this can be included in a dd-ignore notebook cell. Avoids the dependency on dse-do-utils in the ModelBuilder. """ # Get model name: if model_name is None: model_name = mdl.name datasets_dir = self.get_data_directory() lp_file_name = model_name + '.lp' lp_file_path = os.path.join(datasets_dir, lp_file_name) mdl.export_as_lp(lp_file_path) # Writes the .lp file self.add_file_as_data_asset(lp_file_path, lp_file_name) # if self.platform == Platform.CPDaaS: # self.add_data_file_using_project_lib(lp_file_path, lp_file_name) # elif self.platform == Platform.CPD40: # self.add_data_file_using_ws_lib(lp_file_path) # elif self.platform == Platform.CPD25: # self.add_data_file_using_project_lib(lp_file_path, lp_file_name) return lp_file_path
[docs] def insert_scenarios_from_zip(self, filepath: str): """Insert (or replace) a set of scenarios from a .zip file into the DO Experiment. Zip is assumed to contain one or more .xlsx files. Others will be skipped. Name of .xlsx file will be used as the scenario name.""" with zipfile.ZipFile(filepath, 'r') as zip_file: for info in zip_file.infolist(): scenario_name = pathlib.Path(info.filename).stem file_extension = pathlib.Path(info.filename).suffix if file_extension == '.xlsx': # print(f"file in zip : {info.filename}") xl = pd.ExcelFile(zip_file.read(info)) self.inputs, self.outputs = ScenarioManager.load_data_from_excel_s(xl) self.print_table_names() self.write_data_into_scenario_s(self.model_name, scenario_name, self.inputs, self.outputs, self.template_scenario_name) print(f"Uploaded scenario: '{scenario_name}' from '{info.filename}'") else: print(f"File '{info.filename}' in zip is not a .xlsx. Skipped.")