dse_do_utils package¶
Submodules¶
dse_do_utils.cpd25utilities module¶
- dse_do_utils.cpd25utilities.add_file_as_data_asset_cpd25(file_name: str) None [source]¶
Adds a file located in /project_data/data_asset/ as a Data Asset to the Watson Studio project. So that it appears in the UI and can be exported.
- Parameters:
file_name (str) – name of file, including extension
- dse_do_utils.cpd25utilities.add_file_path_as_data_asset_cpd25(file_path: str, asset_name: Optional[str] = None) None [source]¶
Add a data file to the Watson Studio project. Applies to CPDv2.5. Works for any file. Allows the file to be viewed and downloaded from Watson Studio UI. Needs to be called after the file has been saved regularly in the file system. Typically, that would be in /project_data/data_asset/. Ensures the file is visible in the Data Assets of the Watson Studio UI.
Usage:
# Write some file as an example: file_path = '/project_data/data_asset/myfile.csv' with open(file_path, 'w+') as f: f.write("Hello World") # Add file as a data asset: add_file_as_data_asset_cpd25(file_path)
- Beware that the same data now exists in 2 different places:
In the Cloud Object Storage (COS)
As a file in /project_data/data_asset/
Changing any of the 2 independently can cause inconsistencies.
- Parameters:
file_path (str) – full file path, including the file name and extension
asset_name (str) – name of data asset. Default is None. If None, the asset_name will be extracted from the file_path.
- dse_do_utils.cpd25utilities.add_file_path_as_data_asset_wsc(file_path: str, asset_name: Optional[str] = None, project=None) None [source]¶
Add a data file to the Watson Studio project. Applies to WS Cloud and CPDv2.5. Works for any file. Allows the file to be viewed and downloaded from Watson Studio UI. Needs to be called after the file has been saved regularly in the file system. Typically, that would be in:
CPDv2.5: /project_data/data_asset/
WS Cloud: /home/dsxuser/work/, or os.environ[‘PWD’], or ./, or no path
Ensures the file is visible in the Data Assets of the Watson Studio UI.
- Parameters:
project (project_lib.Project) – required for WS Cloud
file_path (str) – full file path, including the file name and extension
asset_name (str) – name of data asset. Default is None. If None, the asset_name will be extracted from the file_path.
Usage:
# Write some file as an example: file_path = '/project_data/data_asset/myfile.csv' with open(file_path, 'w+') as f: f.write("Hello World") # Add file as a data asset: add_file_as_data_asset_cpd25(file_path)
- dse_do_utils.cpd25utilities.write_data_asset_as_file_cpd25(asset_name: str, path: str = '/project_data/data_asset/') str [source]¶
Writes a named data asset to file. Assumes a data asset with asset_name exists. Makes the file accessible for things like:
Load from disk
Pip install
Module import
- Parameters:
asset_name (str) – name of the asset
path (str, Optional) – Default is ‘/project_data/data_asset/’. Use path=’’ for current directory.
- dse_do_utils.cpd25utilities.write_data_asset_as_file_wsc(asset_name: str, path: str = '/home/dsxuser/work/', project=None) str [source]¶
Writes a named data asset to file (for WS Cloud). Assumes a data asset with asset_name exists. Makes the file accessible for things like:
Load from disk
Pip install
Module import
- Parameters:
asset_name (str) – name of the asset
path (str, Optional) – Default (for WS Cloud) is ‘/home/dsxuser/work/’. Use path=’’ for current directory.
project (project_lib.Project) – required for WS Cloud. For CPD, leave as None.
dse_do_utils.datamanager module¶
- class dse_do_utils.datamanager.DataManager(inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None)[source]¶
Bases:
object
A DataManager is a container of original scenario and intermediate data.
It typically contains the input and output dictionaries with DataFrames that came from or will be inserted into a DO scenario. In addition, it will hold any intermediate data. It holds methods that operate on and convert the data. When used in combination with an optimization engine, it should not contain the docplex code that creates or interacts with the docplex Model. (That is the task of the OptimizationEngine.)
One of the reasons to separate the DataManager from the OptimizationEngine is to re-use the DataManager, e.g. for output visualization notebooks.
- A typical DataManager:
Prepares the input DataFrames (like selecting and renaming columns and indexing) and assigns them to a direct attribute.
Contains a set of methods that create intermediate data (‘pre-processing’). Intermediate data will also be assigned as a direct member property.
- static apply_and_concat(dataframe, field, func, column_names)[source]¶
Adds multiple columns in one lambda apply function call.
Usage:
def my_function(my_input_value): my_output_value_1 = 1 my_output_value_2 = 2 return (my_output_value1, my_output_value_2) df = apply_and_concat(df, 'my_input_column_name', my_function, ['my_output_column_name_1','my_output_column_name_2'])
df should have the column ‘my_input_column_name’. Result is that df will have 2 new columns: ‘my_output_column_name_1’ and ‘my_output_column_name_2’
Deprecated since version 0.2.2: Same can be done with plain Pandas.
Alternative in plain Pandas:
df[['my_output_column_name_1','my_output_column_name_2']] = df.apply(lambda row : pd.Series(my_function(row.my_input_column_name)), axis = 1)
- Parameters:
dataframe (DataFrame) – The DataFrame that the function is applied to
field (str) – the name of the input data column of dataframe
func – the function that will be applied. Should have one input argument and return a tuple with N elements.
column_names (list of str) – The names of the N output columns. Should match the number of values in the function return tuple.
- Returns:
modified dataframe with N new columns
- static df_crossjoin_ai(df1: DataFrame, df2: DataFrame, **kwargs) DataFrame [source]¶
Cross-join ‘Any Index’ Make a cross join (cartesian product) between two dataframes by using a constant temporary key. Accepts dataframes that are single or multi-indexed with named and un-named indices.
- Parameters:
df1 (DataFrame) –
df2 (DataFrame) –
pd.merge() (kwargs keyword arguments that will be passed to) –
- Returns:
(DataFrame) cross join of df1 and df2
- static df_crossjoin_mi(df1: DataFrame, df2: DataFrame, **kwargs) DataFrame [source]¶
Make a cross join (cartesian product) between two dataframes by using a constant temporary key. Assumes both input dataframes have a (single or multi) index. Returns a dataframe with a MultiIndex that is the cartesian product of the indices of the input dataframes. Creates a named MultiIndex if both input dataframes have their indices named. Otherwise will return an unnamed multi-index.
- Parameters:
df1 (DataFrame) –
df2 (DataFrame) –
pd.merge() (kwargs keyword arguments that will be passed to) –
- Returns:
(DataFrame) cross join of df1 and df2
- static df_crossjoin_si(df1: DataFrame, df2: DataFrame, **kwargs) DataFrame [source]¶
Make a cross join (cartesian product) between two dataframes by using a constant temporary key. Assumes both input dataframes have a single index column. Returns a dataframe with a MultiIndex that is the cartesian product of the indices of the input dataframes. See: https://github.com/pydata/pandas/issues/5401 See https://mkonrad.net/2016/04/16/cross-join–cartesian-product-between-pandas-dataframes.html
- Parameters:
df1 (DataFrame) – dataframe 1
df2 (DataFrame) – dataframe 2
pd.merge() (kwargs keyword arguments that will be passed to) –
- Returns:
(DataFrame) cross join of df1 and df2
- static extract_solution(df: pd.DataFrame, extract_dvar_names: Optional[List[str] | Dict[str, str]] = None, drop_column_names: List[str] = None, drop: bool = True, epsilon: float = None, round_decimals: int = None, solution_column_name_post_fix: str = 'Sol', allow_mixed_type_columns: bool = False) pd.DataFrame [source]¶
Generalized routine to extract a solution value. Can remove the dvar column from the df to be able to have a clean df for export into scenario.
In some cases, CPLEX extracted values for continuous dvars can have very small values instead of zeros. If epsilon has a value, this method will drop these small values to zero. And it will assume the values need to be positive, so it clips negative values at zero.
In some case, CPLEX extracted values for integer dvars can have very small values from the rounded integer value. If round_decimals is set to 0, the solution values will be rounded to the nearest integer. Use values larger than zero to round continuous dvars to their required precision.
- Parameters:
df – DataFrame
extract_dvar_names – list of column names with CPLEX dvars, or a Dict[str, str] where the keys are the dvar column names and the values the name of the solution column
drop_column_names – columns to be dropped (can be different and in addition to drop)
drop – if True drops all columns in extract_dvar_names
epsilon (float) – if not None, drop values below threshold to zero and clip negative values at zero
round_decimals (int) – round the solution value by number of decimals. If None, no rounding.
0 (If) –
value. (rounding to integer) –
solution_column_name_post_fix (str) – Postfix for the name of the solution column. Default = ‘Sol’
allow_mixed_type_columns (bool) – If True, will allow the column not to have the solution_value attribute, i.e. be a plain Python value, not a CPLEX dvar or expression
- static get_parameter_value(params, param_name: str, param_type: Optional[str] = None, default_value=None, value_format: str = '%Y-%m-%d %H:%M:%S')[source]¶
Get value of parameter from the parameter table (DataFrame). Note that if the input table has a mix of data types in the value column, Pandas can change the data type of a parameter depending on what other values are used in other rows. This requires the explicit conversion to the expected data type.
- Parameters:
params (indexed DataFrame with parameters) – Index = ‘param’, value in ‘value’ column.
param_name (str) – Name of parameter.
param_type (str) – Type of parameter. Valid param_type values are int, float, str, bool, datetime.
default_value – Value if param_name not in index.
value_format (str) – Format for datetime conversion.
Returns:
- get_raw_table_by_name(table_name: str) Optional[DataFrame] [source]¶
Get the ‘raw’ (non-indexed) table from inputs or outputs.
- prep_parameters() DataFrame [source]¶
Pre-process the Parameter(s) input table. Assumes the inputs contains a table named Parameter or Parameters with key param and column value. Otherwise, creates a blank DataFrame instance.
- prepare_input_data_frames()[source]¶
Placeholder to process input data frames, in particular to set the index and to assign dataframes to a direct property of the DataManager. Make sure to test if table-name exists in input dict so we can re-use this class in e.g. DashEnterprise apps where not the whole scenario is loaded.
Example:
if 'MyTable' in self.inputs: self.my_table = self.inputs['MyTable'].set_index('Id', verify_integrity=True)
- prepare_output_data_frames()[source]¶
Placeholder to process output data frames. Processes the default ‘kpis’ table.
- print_hello()[source]¶
FOR TESTING: Print some hello string.
Prints some message. To test reloading of the package from a notebook. Usage:
(In notebook cell #1) from dse_do_utils import DataManager dm = DataManager() (In cell #2) dm.print_hello()
Change the test of the string. Upload the module to WSL. If testing autoreload, rerun the second cell only. Verify it prints the updated string. If testing imp.reload, rerun the notebook from the start.
- print_inputs_outputs_summary()[source]¶
Prints a summary of the input and output data. Prints the names of all input and output tables, along with the column names and the number of rows and columns.
- set_parameters()[source]¶
Set the parameters as properties of the self.param object. This allows for easy access to the parameters, e.g. dm.param.time_limit To be overridden. Make sure to call the super().set_parameters()
Creates the self.param SimpleNamespace to be able to add the individual parameter properties. Creates the self.params pd.DataFrame to be able to easily extract the parameter values.
dse_do_utils.deployeddomodel module¶
- class dse_do_utils.deployeddomodel.DeployedDOModel(wml_credentials, space_name: Optional[str] = None, deployed_model_name: Optional[str] = None, deployment_id: Optional[str] = None, default_max_oaas_time_limit_sec: Optional[int] = None, default_max_run_time_sec: Optional[int] = 600, monitor_loop_delay_sec: int = 5)[source]¶
Bases:
object
New DeployedDOModel for CPD3.5 based on ibm_watson_machine_learning.APIClient
Major steps:
Create an instance of a DeployedDOModel, configuring parameters
Internally, the processes uses the APIClient (former WatsonMachineLearningAPIClient) to communicate with the deployed model:
Start the solve job.
Monitor the running solve job. Runs in a loop, updates the current state.
Once the job completes, get the optimization result.
In the code:
mdl.solve(): solve_payload = self.get_solve_payload(inputs) # Convert inputs to payload job_details, job_uid = self.execute_model_v1(solve_payload) job_details = self.monitor_execution_v1(job_details, job_uid) self.extract_solution_v1(job_details) return job_details
Usage:
# Simplest, using all default options: mdl = DeployedDOModel(wml_credentials, space_name, deployed_model_name) job_details = mdl.solve(inputs) print("Solve status: {}".format(mdl.solve_status)) print("Objective: {}".format(mdl.objective)) print("Output tables: {}".format(mdl.outputs.keys()))
TODOs:
Debug mode
Get the cplex log file?
Done - Add kill and stop methods
Done - Configure polling interval
- execute_model_v2(inputs: Dict[str, DataFrame], max_oaas_time_limit_sec: Optional[int]) str [source]¶
- Parameters:
inputs – inputs dict
max_oaas_time_limit_sec – int - number of seconds for the WML job time limit.
- Returns:
str
- Return type:
job_uid
- static get_job_status(job_details: dict) str [source]¶
Extract job_status from the job_details :returns: queued, running, completed, failed, canceled. :rtype: Job state can be
- get_job_status_v2(job_uid: str) str [source]¶
Retrieves the job_status from a job.
- Parameters:
job_uid –
- Returns:
job_status: str - The job_status. Either: queued, running, completed, failed, canceled
- static get_log(job_details: dict) List[str] [source]¶
Extracts the log.txt from the job_details, if it exists.
- Returns:
List[str] - the lines of the log.txt
- Return type:
log_lines
- static get_solve_details_objective(job_details: dict)[source]¶
After job has completed. Note: not sure where the objective is. Can be PROGRESS_CURRENT_OBJECTIVE or PROGRESS_BEST_OBJECTIVE
- get_solve_payload(inputs: Dict[str, DataFrame], max_oaas_time_limit_sec: Optional[int] = None)[source]¶
- static get_solve_status(job_details: dict) str [source]¶
Retreives the solve_status from job_details. After job has completed
- monitor_execution_v2(job_uid: str, max_run_time_sec: Optional[int] = None) str [source]¶
Monitor the model execution by periodically calling the API to get the current execution status. Result stored in self.execution_status_json and self.execution_status. Time-out after max_run_time_sec. Job will be deleted if total monitor time exceeds this limit.
- Parameters:
job_uid – str
max_run_time_sec – int - Number of seconds maximum processing time (queued + run time) before the job must complete
- Returns:
str
- Return type:
job_status
- solve(inputs: Dict[str, DataFrame], max_oaas_time_limit_sec: Optional[int] = None, max_run_time_sec: Optional[int] = None) dict [source]¶
Master routine. Initializes the job, starts the execution, monitors the results, post-processes the solution and cleans-up after.
- Parameters:
inputs (dict of DataFrames) – input tables
max_oaas_time_limit_sec (int) – will override the default from the constructor
max_run_time_sec (int) – will override the default from the constructor
Calls the following methods (in order):
self.retrieve_solve_configuration() self.set_output_settings_in_solve_configuration() self.execute_model_v1() self.monitor_execution_v1() self.retrieve_debug_materials() self.cleanup()
- solve_v2(inputs: Dict[str, DataFrame], max_oaas_time_limit_sec: Optional[int] = None, max_run_time_sec: Optional[int] = None) dict [source]¶
Master routine. Initializes the job, starts the execution, monitors the results, post-processes the solution and cleans-up after.
- Parameters:
inputs (dict of DataFrames) – input tables
max_oaas_time_limit_sec (int) – will override the default from the constructor
max_run_time_sec (int) – will override the default from the constructor
Calls the following methods (in order):
self.retrieve_solve_configuration() self.set_output_settings_in_solve_configuration() self.execute_model_v1() self.monitor_execution_v1() self.retrieve_debug_materials() self.cleanup()
dse_do_utils.domodeldeployer module¶
- class dse_do_utils.domodeldeployer.DOModelDeployer(wml_credentials: Dict, model_name: str, scenario_name: str, space_name: str, package_paths: List[str] = [], file_paths: List[str] = [], deployment_name: str = 'xxx', deployment_description: str = 'xxx', project=None, tmp_dir: Optional[str] = None)[source]¶
Bases:
object
Deploys a DO Model in WML. For use in CPD 4.0. Retrieves the model from the DO Model Builder.
Usage:
md = DOModelDeployer(wml_credentials, model_name, scenario_name, space_name, package_paths, file_paths, deployment_name, deployment_description) deployment_uid = md.deploy_model() print(deployment_uid) How to add Python modules in the root: - Specify paths to modules (.py files) in `file_paths`. These modules are included in the root of the project and can be accessed using `from my_module import MyClass`. This is similar to the additional files in the DO Experiment. These files can be located anywhere in JupyterLab. How to add a Python package: 1. From conda
- create_archive(main_file_path: str, path: str)[source]¶
Create archive. For now assume one folder model with one file main.py
- Parameters:
main_file_path – file path of main.py file
path – folder where archive will be written
- create_model_archive(path: str)[source]¶
Creates a model archive on the path: The archive contains one .py file: the do-model surrounded by boilerplate code to process the inputs and outputs dictionaries. Steps: 1. Write a file path/main.py 2. Creates an archive file in path 3. Adds the main.py 4. Adds packages 5. Adds (module) files
- create_model_directory() str [source]¶
Create a directory ‘model’ in the default path. Will remove/clear first if exists.
- Returns:
path
- create_software_specification(pkg_ext_ids: List[str] = []) str [source]¶
Allow for multiple package_extensions
- deploy_model() str [source]¶
One call that deploys a model from the Model Builder scenario into WML. Creates a model archive from the extracted model code. Then uploads into WML and creates a deployment.
- Returns:
Deployment UID necessary to call the deployment.
- Return type:
deployment_uid (str)
- get_wml_create_deployment_meta_props()[source]¶
Return the meta_props for the creation of the deployment Separate method, so can easily be overridden
- get_wml_create_store_model_meta_props(sw_spec_id)[source]¶
Return the meta_props for the store of the model Separate method, so can easily be overridden
- guid_from_space_name(space_name: str) str [source]¶
Get space_id from deployment space name. TODO: handle exception if space_name not found.
- wml_store_model(model_archive_file_path, yaml_file_path) str [source]¶
Stores model in WML :returns: model_uid
- class dse_do_utils.domodeldeployer.DOModelDeployerLocal(wml_credentials: Dict, space_name: str, package_paths: Optional[List[str]] = None, file_paths: Optional[List[str]] = None, deployment_name: Optional[str] = 'xxx', deployment_description: Optional[str] = 'xxx', tmp_dir: Optional[str] = None)[source]¶
Bases:
object
EXPERIMENTAL Please note this code is experimental. There are a number of aspects HARD-CODED. Please review the source code and override where necessary.
Deploys a DO Model in WML. For use in CPD 4.0 and SaaS. Retrieves the model from the DO Model Builder.
Usage:
md = DOModelDeployer(wml_credentials, space_name, package_paths, file_paths, deployment_name, deployment_description) deployment_uid = md.deploy_model() print(deployment_uid)
- create_archive(main_file_path: str, path: str)[source]¶
Create archive. For now assume one folder model with one file main.py
- Parameters:
main_file_path – file path of main.py file
path – folder where archive will be written
- create_model_archive(path: str)[source]¶
Creates a model archive on the path: The archive contains one .py file: the do-model surrounded by boilerplate code to process the inputs and outputs dictionaries. Steps: 1. Write a file path/main.py 2. Creates an archive file in path 3. Adds the main.py 4. Adds packages 5. Adds (module) files
- create_model_directory() str [source]¶
Create a directory ‘model’ in the default path. Will remove/clear first if exists.
- Returns:
path
- create_software_specification(pkg_ext_ids: Optional[List[str]] = None) str [source]¶
Allow for multiple package_extensions
- create_zip_package_extension(package_zip_filepath: str) str [source]¶
See https://notebooks.githubusercontent.com/view/ipynb?browser=chrome&color_mode=auto&commit=37188b1a8b48be2bef34b35b55f01cba0d29ed19&device=unknown&enc_url=68747470733a2f2f7261772e67697468756275736572636f6e74656e742e636f6d2f49424d2f776174736f6e2d6d616368696e652d6c6561726e696e672d73616d706c65732f333731383862316138623438626532626566333462333562353566303163626130643239656431392f637064342e302f6e6f7465626f6f6b732f707974686f6e5f73646b2f6465706c6f796d656e74732f637573746f6d5f6c6962726172792f5573652532307363696b69742d6c6561726e253230616e64253230637573746f6d2532306c696272617279253230746f2532307072656469637425323074656d70657261747572652e6970796e62&logged_in=false&nwo=IBM%2Fwatson-machine-learning-samples&path=cpd4.0%2Fnotebooks%2Fpython_sdk%2Fdeployments%2Fcustom_library%2FUse+scikit-learn+and+custom+library+to+predict+temperature.ipynb&platform=android&repository_id=277618282&repository_type=Repository&version=98 TYPE can only be ‘pip_zip’ and package format must be a .zip (See https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/ml-create-custom-software-spec.html?context=cpdaas)
- deploy_model() str [source]¶
One call that deploys a model from the Model Builder scenario into WML. Creates a model archive from the extracted model code. Then uploads into WML and creates a deployment.
- Returns:
Deployment UID necessary to call the deployment.
- Return type:
deployment_uid (str)
- get_model_py() str [source]¶
Return the optimization model. Assume the usual inputs and outputs dicts.
- get_wml_create_deployment_meta_props()[source]¶
Return the meta_props for the creation of the deployment Separate method, so can easily be overridden
Note that 1 node is too slow: somehow causes extended running overhead, where the CPLEX model itself can solve quickly Also, 2 nodes still only gives CPLEX one thread.
- get_wml_create_store_model_meta_props(sw_spec_id)[source]¶
Return the meta_props for the store of the model Separate method, so can easily be overridden
- guid_from_space_name(space_name: str) str [source]¶
Get space_id from deployment space name. TODO: handle exception if space_name not found.
- wml_store_model(model_archive_file_path, yaml_file_path) str [source]¶
Stores model in WML :returns: model_uid
dse_do_utils.domodelexporter module¶
- class dse_do_utils.domodelexporter.DOModelExporter(do_model_names: List[str], **kwargs)[source]¶
Bases:
object
DEPRECATED. These APis are no longer available from CPDv3.5 Exports a DO model from CPD2.5 using curl/web-requests. By default, the export files are stored as datetime-stamped zip files in the Data Assets of the project.
Can be used in 4 ways:
Typical: only specify a list of the DO Model names to export.
Export from another project in same cluster.
Export from another project in another cluster.
Generates the full curl commands. Then copy and paste them into a terminal that supports curl.
- Typical use:
Initialize the exporter with a list of DO Model names and call the method me.export_do_models(). Must be run in the same project and cluster. The DO Model export files are stored in the Data Assets of this project. Uses naming pattern: {do_model_name}_export_{YYYYMMDD_hhmm}.zip.:
me = DOModelExporter(do_model_names = ['Model1', 'Model2']) me.export_do_models()
- Export from another project in same cluster:
Need to specify the project_id. See below for how to get the project_id. Assumes current user is a collaborator on the other project (if not use the next use-case):
me = DOModelExporter(do_model_names = ['ProductionPlanning'], project_id = 'ef365b2c-9f28-447a-a933-5de6560a1cfa') me.export_do_models()
- Export from another project in other cluster:
Specify access_toke=None, user_name, password and project_id. Will retrieve the accessToken from the user-name and password:
me = DOModelExporter(cpd_cluster_host = 'https://dse-cp4d25-cluster4.cpolab.ibm.com', access_token = None, user_name = user_name, password = password, do_model_names = ['ProductionPlanning'], project_id = 'b7bf7fd8-aa50-4bd2-8364-02ea6d480895') me.export_do_models()
Generate curl commands:
Initialize the exporter: me = DOModelExporter(cluster_name, user_name, password, do_model_name, project_id)
Get the access-token curl command: me.get_access_token_curl(). Extract the access_token string.
Get the export-do-model curl command: me.get_do_model_export_curl(do_model_name, access_token).
Usage:
me = DOModelExporter(do_model_names=[], user_name = user_name, password = password) me.get_access_token_curl() access_token = 'xxxxxx' # Get value from running the above curl command me.get_do_model_export_curl('ProductionPlanning', access_token)
Curl commands can be run for instance from the Git Bash terminal that is part of Git for Windows.
How to get the project_id:
If not specifying a project_id in the constructor, it will get it automatically using the environment variable: os.environ[‘PROJECT_ID’].
Run the os.environ[‘PROJECT_ID’] in a notebook in the target project.
Parse the project_id from the Page Source of a web page in the target project.
Manually.
From any web-page of the CPD project, show the Page Source. (In Firefox: menu -> Web Developer -> Page Source)
Do a find-in-page of the name of the project (control-F).
Just before the first instance of the project name, there is a field data_id, e.g.: data-id=”21c8ac71-26c1-49a5-a567-d4c69a0d8158”. Copy the data_id value.
Beware that the Page Source may contain other project-names and projects-IDs, so search on the full project name.
Using the method DOModelExporter.get_project_id(project_name, page_source)
Usage:
page_source = 'the page source copied from Page Source' project_id = DOModelExporter.get_project_id('Full_Project_Name', page_source)
How to get the access_token:
If not provided (i.e. no entry in the constructor arguments), exporter uses the environment variable os.environ[‘USER_ACCESS_TOKEN’].
Run the os.environ[‘USER_ACCESS_TOKEN’] in a notebook in the target project.
Specify access_token=None in the constructor arguments (this is NOT the same as specifying a None value!). And specify a user-name and password. Exporter will retrieve the accessToken by calling a web-API.
How to get the cpd_cluster_host?
If not provided, the exporter will use the environment variable os.environ[‘RUNTIME_ENV_APSX_URL’]
For remote clusters. Beware of the URL of the cluster! DSE clusters may use some alias (e.g. dse-cp4d25-cluster4.datascienceelite.com) that is NOT accessable when running from within the same cluster.<br> When running this from the same cluster, use the ‘original’ cluster name (e.g. dse-cp4d25-cluster4.cpolab.ibm.com).
- export_do_models() None [source]¶
End-to-end run. Gets the access token and then the DO model export.
- get_access_token_curl() str [source]¶
Return the curl command to retreive the accessToken. Based on the cluster_name, user_name and password.
- get_access_token_web() Response [source]¶
Runs web request to get the personal access-token. Based on the cluster_name, user_name and password. Stores it in self.access_token
- get_do_model_export_curl(do_model_name: str, access_token: str) str [source]¶
Return the curl command to retreive the accessToken. Based on the cluster_name, user_name and password.
- get_do_model_export_web(do_model_name: str) Response [source]¶
Runs web-request to get DO model export. Based on the cluster_name, access_token, do_model_name. Stores result as a Data Asset
dse_do_utils.mapmanager module¶
- class dse_do_utils.mapmanager.MapManager(data_manager=None, location=None, zoom_start=1, width='100%', height='100%', layer_control_position='topleft')[source]¶
Bases:
object
Base class for building Folium map visualization.
Currently, the functionality is limited, but may be extended in the future.
Work-around for multi-line in popup: https://github.com/python-visualization/folium/issues/469
Popup work-around:
popup = ( "Time: {time}<br>" "Speed: {speed} km/h<br>" ).format(time=row.name.strftime('%H:%M'), speed=str(round(row['spd'],2))
Tooltip doesn’t yet work in 0.50.0:<br> https://github.com/python-visualization/folium/issues/785 Update: in 0.6.0 (WSL1.2.3) it does seem to work!
- add_bar_chart_in_map(m, coord, quantities=None, tooltips=None, bar_width=20, bar_height_per_unit=1, border_colors: Optional[List] = None, background_colors: Optional[List] = None)[source]¶
Draws a bar chart at the coord. Anchor on the botton-left. Bars expand to the right. Will add as many bars as there are quantities, cycling through pre-defined colors.
See also: https://stackoverflow.com/questions/60131314/folium-draw-star-marker https://python-visualization.github.io/folium/plugins.html#folium.plugins.BeautifyIcon
- static add_full_screen(m)[source]¶
Adds a full-screen button in the top-left corner of the map. Unfortunately, the full-screen doesn’t work in a Jupyter cell. Seems to work ok here: http://nbviewer.jupyter.org/github/python-visualization/folium/blob/master/examples/Plugins.ipynb
- static get_arrows(locations, color='blue', size=6, n_arrows=3, add_to=None)[source]¶
Add arrows to a hypothetical line between the first 2 locations in the locations list. Get a list of correctly placed and rotated arrows/markers to be plotted.
- Parameters:
locations – List of lists of lat lon that represent the start and end of the line. eg [[41.1132, -96.1993],[41.3810, -95.8021]] The locations is a list so that it matches the input for the folium.PolyLine.
color – Whatever folium can use. Default is ‘blue’
size – Size of arrow. Default is 6
n_arrows – Number of arrows to create. Default is 3.
add_to – map or FeatureGroup the arrows are added to.
- Returns:
list of arrows/markers
Based on: https://medium.com/@bobhaffner/folium-lines-with-arrows-25a0fe88e4e
- static get_bearing(p1, p2)[source]¶
Returns compass bearing from p1 to p2
Parameters p1 : namedtuple with lat lon p2 : namedtuple with lat lon
Return compass bearing of type float
Notes Based on https://gist.github.com/jeromer/2005586
- static get_html_table(rows)[source]¶
Creates 2 column html table for use in popups. :param rows: List of sequences. Each sequence should have 2 string entries, one for each column
- Returns:
a HTML formatted table of two columns
- Return type:
html
- static get_popup_table(rows)[source]¶
Return a popup table to add as a popup/child to a folium element.
Usage:
popup_table = [ ('property_1', 'value_1'), ('property_2', 'value_2'), ] popup = MapManager.get_popup_table(popup_table)
Next, the popup object can be used in a popup argument of a Marker:
marker = folium.Marker(coord, popup=popup, icon=icon )
Or added as a child:
county.add_child(popup)
- Parameters:
rows – List of sequences. Each sequence should have 2 string entries, one for each column.
- Returns:
popup (folium.Popup)
Notes Beware that a quote in the texts causes a problem (no map shows). This can be avoided by replacing the “’” with something else. Unfortunately the option parse_html=True does not prevent the problem. Despite the suggestion in https://nbviewer.jupyter.org/github/python-visualization/folium/blob/master/examples/Popups.ipynb
- static get_tooltip_table(rows: List[Tuple[str, str]]) str [source]¶
Get a tooltip table, based on the same definition as for get_popup_table`. Convenience method. Is the same as MapManager.get_html_table(rows). Usage:
tooltip_table = [ ('property_1', 'value_1'), ('property_2', 'value_2'), ] tooltip = MapManager.get_tooltip_table(tooltip_table)
- Parameters:
rows – list of tuples with name-value pairs
- Returns (str):
text for a tooltip in table format
- kansas_city_coord = [39.085594, -94.585241]¶
dse_do_utils.multiscenariomanager module¶
- class dse_do_utils.multiscenariomanager.MultiScenarioManager(model_name: Optional[str] = None, scenario_names: List[str] = [], local_root: Optional[str] = None, project_id: Optional[str] = None, project_access_token: Optional[str] = None, project=None)[source]¶
Bases:
object
Manages multiple scenarios from same DO Model/Experiment. Can export all scenarios in one Excel spreadsheet, where it adds the scenario_name as an additional column. Also adds an additional ‘Scenario’ table. (This looks relevant for usage (filtering) in Cognos.) By default, writes an Excel file in datasets named “model_name + ‘_multi_output’.xlsx”
Usage 1 - All scenarios from Model:
model_name = 'My Model' msm = MultiScenarioManager(model_name=model_name) msm.get_multi_scenario_data() msm.write_data_to_excel()
Usage 2 - Selected scenarios from Model:
model_name = 'My Model' scenario_names = ['Scenario 1', 'Scenario 2'] msm = MultiScenarioManager(model_name=model_name, scenario_names=scenario_names) msm.get_multi_scenario_data() msm.write_data_to_excel()
- add_data_file_to_project(file_path: str, file_name: Optional[str] = None) None [source]¶
Add a data file to the Watson Studio project. Applies to CP4Dv2.5 and WS Cloud Needs to be called after the file has been saved regularly in the file system in /project_data/data_asset/ (for CPD2.5) or /home/dsxuser/work/ in WS Cloud. Ensures the file is visible in the Data Assets of the Watson Studio UI.
- Parameters:
file_path (str) – full file path, including the file name and extension
file_name (str) – name of data asset. Default is None. If None, the file-name will be extracted from the file_path.
- get_data_directory() str [source]¶
Returns the path to the datasets folder.
- Returns:
path to the datasets folder
- get_dd_client()[source]¶
Return the Client managing the DO scenario. Returns: new dd_scenario.Client
- get_root_directory() str [source]¶
Return the root directory of the file system. If system is WS, it will return the DSX root, otherwise the directory specified in the local_root. :raises ValueError if root directory doesn’t exist.:
- get_scenarios_df(scenario_names: Optional[List[str]] = None) DataFrame [source]¶
Return scenarios as Dataframe. If scenario_names is None, will get all scenarios in Model. Else, just the ones matching the names. For now, the only column in the df is the scenario_name. More can be added later.
- load_data_from_scenario(scenario_name)[source]¶
TODO: see of by re-using a Client, this can be done faster
- static merge_scenario_data(data_by_scenario: Dict[str, Dict[str, DataFrame]]) Dict[str, DataFrame] [source]¶
Add scenario_name as column. Merge tables
- write_data_to_excel(excel_file_name: Optional[str] = None) None [source]¶
Write inputs and/or outputs to an Excel file in datasets. The inputs and outputs as in the attributes self.inputs and self.outputs of the ScenarioManager
If the excel_file_name is None, it will be generated from the model_name and scenario_name: MODEL_NAME + “_multi_output”
- Parameters:
excel_file_name (str) – The file name for the Excel file.
dse_do_utils.optimizationengine module¶
- class dse_do_utils.optimizationengine.MyProgressListener(mdl: <module 'docplex.mp.model' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\docplex\\mp\\model.py'>)[source]¶
Bases:
SolutionListener
- class dse_do_utils.optimizationengine.OptimizationEngine(data_manager: Optional[DM] = None, name: str = 'MyOptimizationEngine', solve_kwargs=None, export_lp: bool = False, export_sav: bool = False, export_lp_path: Optional[str] = None, is_cpo_model: bool = False)[source]¶
Bases:
Generic
[DM
]- add_mip_progress_kpis(mip_gap_kpi_name='Gap', solve_time_kpi_name='Solve Time', best_bound_kpi_name='Best Bound', solution_count_kpi_name='Solution Count', solve_phase_kpi_name='Solve Phase')[source]¶
Adds 5 KPIs to the self.mdl: ‘Gap’, ‘Solve Time’, ‘Best Bound’, ‘Solution Count’, ‘Solve Phase’.
- Parameters:
mip_gap_kpi_name (str) –
solve_time_kpi_name (str) –
best_bound_kpi_name (str) –
solution_count_kpi_name (str) –
solve_phase_kpi_name (str) –
Returns:
- static binary_var_series_s(mdl: <module 'docplex.mp.model' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\docplex\\mp\\model.py'>, df: ~pandas.core.frame.DataFrame, **kargs) Series [source]¶
Returns pd.Series[BinaryVarType]
- static continuous_var_series_s(mdl: <module 'docplex.mp.model' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\docplex\\mp\\model.py'>, df: ~pandas.core.frame.DataFrame, **kargs) Series [source]¶
Returns pd.Series[ContinuousVarType].
- cp_binary_var_series(df, **kwargs) Series [source]¶
Returns pd.Series[docplex.cp.expression.CpoIntVar]
- static cp_binary_var_series_s(mdl: CpoModel, df: DataFrame, **kwargs) Series [source]¶
Returns pd.Series[docplex.cp.expression.CpoIntVar]. For **kargs, see docplex.cp.expression.integer_var_list (http://ibmdecisionoptimization.github.io/docplex-doc/cp/docplex.cp.expression.py.html#docplex.cp.expression.binary_var_list)
- cp_integer_var_series(df, **kwargs) Series [source]¶
Returns pd.Series[docplex.cp.expression.CpoIntVar]
- static cp_integer_var_series_s(mdl: CpoModel, df: DataFrame, **kwargs) Series [source]¶
Returns pd.Series[docplex.cp.expression.CpoIntVar]. For **kwargs, see docplex.cp.expression.integer_var_list (http://ibmdecisionoptimization.github.io/docplex-doc/cp/docplex.cp.expression.py.html#docplex.cp.expression.integer_var_list)
- static cp_integer_var_series_s_v2(mdl: CpoModel, df: DataFrame, min=None, max=None, name=None, domain=None) Series [source]¶
Returns pd.Series[docplex.cp.expression.CpoIntVar]. If name is not None, will generate unique names based on pattern: ‘{name}_{index of df}’ If multi-index df, keys are separated by ‘_’, e.g. ‘xDvar_1_2_3’
- cp_interval_var_series(df, **kargs) Series [source]¶
Returns pd.Series[docplex.cp.expression.CpoIntervalVar]
- static cp_interval_var_series_s(mdl: CpoModel, df: DataFrame, **kwargs) Series [source]¶
Returns pd.Series[cp.CpoIntervalVar]. For **kargs, see docplex.cp.expression.interval_var_list (http://ibmdecisionoptimization.github.io/docplex-doc/cp/docplex.cp.expression.py.html?highlight=interval_var_list#docplex.cp.expression.interval_var_list)
- create_do_model(name: str, is_cpo_model: bool = False, **kwargs) Union[Model, CpoModel] [source]¶
Create a model (.mdl). By default a CPLEX model (mp.Model), or a CP Optimizer model (cp.Model) :param name: :param is_cpo_model: Is True, create a cp.Model :param kwargs: additional kwags for mdl initialization :return: mp.Model or cp.CpoModel
- export_as_cpo(local_root: Optional[str] = None, copy_to_csv: bool = False)[source]¶
Export .cpo file of model in the ‘DSX_PROJECT_DIR.datasets’ folder. It can write a copy as a .csv file, so it can be exported to a local machine. If not in DSX, it will write to the local file system in the ‘local_root/datasets’ directory. Convenience method. Cpo-filename is based on the mdl.name.
- Parameters:
local_root (str) – name of local directory. Will write .lp file here, if not in DSX
copy_to_csv (bool) – DEPRECATED. If true, will create a copy of the file with the extension .csv.
- Returns:
path (str) path to cpo file
- Raises:
ValueError if root directory can't be established. –
- static export_as_cpo_s(model, model_name: Optional[str] = None, local_root: Optional[str] = None, copy_to_csv: bool = False, **kwargs) str [source]¶
Export .cpo file of model in the ‘DSX_PROJECT_DIR.datasets’ folder. It can write a copy as a .csv file, so it can be exported to a local machine. If not in DSX, it will write to the local file system in the ‘local_root/datasets’ directory.
- Parameters:
model (docplex.cp.model) – The CPLEX model to be exported
model_name (str) – name of .lp file. If none specified, will use the model.name. Specify if the model.name is not a valid file-name.
local_root (str) – name of local directory. Will write .lp file here, if not in DSX
copy_to_csv (bool) – If true, will create a copy of the file with the extension .csv.
**kwargs – Passed to model.export_model
- Returns:
path (str) path to cpo file
- Raises:
ValueError if root directory can't be established. –
- export_as_lp(local_root: Optional[str] = None, copy_to_csv: bool = False) str [source]¶
Export .lp file of model in the ‘DSX_PROJECT_DIR.datasets’ folder. Convenience method. It can write a copy as a .csv file, so it can be exported to a local machine. If not in DSX, it will write to the local file system in the ‘local_root/datasets’ directory. Lp-filename is based on the mdl.name.
- Parameters:
local_root (str) – name of local directory. Will write .lp file here, if not in DSX
copy_to_csv (bool) – DEPRECATED. If true, will create a copy of the file with the extension .csv.
- Returns:
path (str) path to lp file
- Raises:
ValueError if root directory can't be established. –
- export_as_lp_path(lp_file_name: str = 'my_lp_file') str [source]¶
Saves .lp file in self.export_lp_path Note: Does not conflict with OptimizationEngine.export_as_lp() which has a different signature. :return: file_path
- static export_as_lp_s(model, model_name: Optional[str] = None, local_root: Optional[str] = None, copy_to_csv: bool = False) str [source]¶
Export .lp file of model in the ‘DSX_PROJECT_DIR.datasets’ folder. It can write a copy as a .csv file, so it can be exported to a local machine. If not in WSL, it will write to the local file system in the ‘local_root/datasets’ directory.
- Parameters:
model (docplex.mp.model) – The CPLEX model to be exported
model_name (str) – name of .lp file. If none specified, will use the model.name. Specify if the model.name is not a valid file-name.
local_root (str) – name of local directory. Will write .lp file here, if not in WSL
copy_to_csv (bool) – DEPRECATED. If true, will create a copy of the file with the extension .csv.
- Returns:
path (str) path to lp file
- Raises:
ValueError if root directory can't be established. –
- integer_var_series(df: DataFrame, **kargs) Series [source]¶
Create a Series of integer dvar for each row in the DF. Most effective method. Best practice. Result can be assigned to a column of the df. Usage: df[‘xDVar’] = mdl.integer_var_series(df, name = ‘xDVar’)
- Parameters:
self (docplex.mp.model) – CPLEX Model
df (DataFrame) – dataframe
**kargs – arguments passed to mdl.integer_var_list method. E.g. ‘name’
- Returns:
(pandas.Series) with integer dvars (IntegerVarType), index matches index of df
- static integer_var_series_s(mdl: <module 'docplex.mp.model' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\docplex\\mp\\model.py'>, df: ~pandas.core.frame.DataFrame, **kargs) Series [source]¶
Returns pd.Series[IntegerVarType]
- semicontinuous_var_series(df, lb, **kargs) Series [source]¶
Returns pd.Series[SemiContinuousVarType]
- static semicontinuous_var_series_s(mdl: <module 'docplex.mp.model' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\docplex\\mp\\model.py'>, df: ~pandas.core.frame.DataFrame, lb, **kargs) Series [source]¶
Returns pd.Series[SemiContinuousVarType].
- static semiinteger_var_series_s(mdl: <module 'docplex.mp.model' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\docplex\\mp\\model.py'>, df: ~pandas.core.frame.DataFrame, lb, **kargs) Series [source]¶
Returns pd.Series[SemiIntegerVarType].
dse_do_utils.plotly_cpd_workaround module¶
dse_do_utils.plotlymanager module¶
- class dse_do_utils.plotlymanager.PlotlyManager(dm: DM)[source]¶
Bases:
Generic
[DM
]Holds method that create Plotly charts. Pass-in the DM as an input in the constructor.
- get_dash_tab_layout_m(page_id)[source]¶
DEPRECATED. Not used in dse_do_dashboard package. On the instance self, call the method named by get_tab_layout_{page_id}. Used in dse_do_dashboard Plotly-Dash dashboards
- get_multi_scenario_compare_selected() bool [source]¶
Returns True if the user has selected multi-scenario compare.
- get_multi_scenario_table(table_name: str) Optional[DataFrame] [source]¶
Gets the df from the table named table_name in either inputs or outputs. If necessary (i.e. when using scenario_seq), merges the Scenario table, so it has the scenario_name as column. DataFrame is NOT indexed!
- get_plotly_fig_m(id)[source]¶
DEPRECATED. Not used in dse_do_dashboard package. On the instance self, call the method named by id[‘index’] For use with pattern-matching callbacks. Assumes the id[‘index’] is the name of a method of this class and returns a fig. Used in dse_do_dashboard Plotly-Dash dashboards
- get_reference_scenario_compare_selected() bool [source]¶
Returns True if the user has selected (single) reference-scenario compare
- plotly_kpi_compare_bar_charts(figs_per_row: int = 3, orientation: str = 'v') [[<class 'plotly.graph_objs._figure.Figure'>]] [source]¶
Generalized compare of KPIs between scenarios. Creates a list-of-list of go.Figure, i.e. rows of figures, for the PlotlyRowsVisualizationPage. Each KPI gets its own bar-chart, comparing the scenarios.
- Supports 3 cases:
Multi-scenario compare based on the Reference Scenarios multi-checkbox select on the Home page.
Compare the current select scenario with the Reference Scenario selected on the Home page.
Single scenario view based on the currently selected scenario
- Parameters:
figs_per_row – int - Maximum number of figures per row
orientation – str - h’ (horizontal) or `v (vertical)
- Returns:
figures in rows ([[go.Figure]]) - bar-charts in rows
dse_do_utils.scenariodbmanager module¶
- class dse_do_utils.scenariodbmanager.AutoScenarioDbTable(db_table_name: str)[source]¶
Bases:
ScenarioDbTable
Designed to automatically generate the table definition based on the DataFrame.
- Main difference with the ‘regular’ ScenarioDbTable definition:
At ‘create_schema`, the table will NOT be created. Instead,
At ‘insert_table_in_db_bulk’ SQLAlchemy will automatically create a TABLE based on the DataFrame.
- Advantages:
No need to define a custom ScenarioDbTable class per table
Automatically all columns are inserted
- Disadvantages:
No primary and foreign key relations. Thus no checks.
Missing relationships means Cognos cannot automatically extract a data model
TODO: find out what will happen if the DataFrame structure changes and we’re doing a new insert
- create_table_metadata(metadata, engine, schema, multi_scenario: bool = False)[source]¶
Use the engine to reflect the Table metadata. Called during initialization.
- get_sa_table() Optional[Table] [source]¶
Returns the SQLAlchemy Table. Can be None if table is a AutoScenarioDbTable and not defined in Python code. TODO: automatically reflect if None. Is NOT working yet!
- insert_table_in_db_bulk(df, mgr, connection=None)[source]¶
- Parameters:
df (pd.DataFrame) –
mgr (ScenarioDbManager) –
connection – if not None, being run within a transaction
- class dse_do_utils.scenariodbmanager.BusinessKpiTable(db_table_name: str = 'business_kpi', extended_columns_metadata: List[Column] = [])[source]¶
Bases:
ScenarioDbTable
- class dse_do_utils.scenariodbmanager.DatabaseType(value)[source]¶
Bases:
Enum
Used in ScenarioDbManager.__init__ to specify the type of DB it is connecting to.
- DB2 = 1¶
- PostgreSQL = 2¶
- SQLite = 0¶
- class dse_do_utils.scenariodbmanager.DbCellUpdate(scenario_name, table_name, row_index, column_name, current_value, previous_value, row_idx)[source]¶
Bases:
NamedTuple
- column_name: str¶
Alias for field number 3
- current_value: Any¶
Alias for field number 4
- previous_value: Any¶
Alias for field number 5
- row_idx: int¶
Alias for field number 6
- row_index: List[Dict[str, Any]]¶
Alias for field number 2
- scenario_name: str¶
Alias for field number 0
- table_name: str¶
Alias for field number 1
- class dse_do_utils.scenariodbmanager.KpiTable(db_table_name: str = 'kpis')[source]¶
Bases:
ScenarioDbTable
- class dse_do_utils.scenariodbmanager.ParameterTable(db_table_name: str = 'parameters', extended_columns_metadata: List[Column] = [])[source]¶
Bases:
ScenarioDbTable
- class dse_do_utils.scenariodbmanager.ScenarioDbManager(input_db_tables: Dict[str, ScenarioDbTable], output_db_tables: Dict[str, ScenarioDbTable], credentials=None, schema: Optional[str] = None, echo: bool = False, multi_scenario: bool = True, enable_transactions: bool = True, enable_sqlite_fk: bool = True, enable_astype: bool = True, enable_debug_print: bool = False, enable_scenario_seq: bool = True, db_type: DatabaseType = DatabaseType.SQLite, use_custom_naming_convention: bool = False, future: bool = True)[source]¶
Bases:
object
TODO: documentation!
- static add_scenario_name_to_dfs(scenario_name: str, inputs: Dict[str, DataFrame]) Dict[str, DataFrame] [source]¶
Adds a scenario_name column to each df. Or overwrites all values of that column already exists. This avoids to need for the MultiScenarioManager when loading and storing single scenarios.
- static add_scenario_seq_to_dfs(scenario_seq: int, inputs: Dict[str, DataFrame]) Dict[str, DataFrame] [source]¶
For ScenarioSeq option Adds a scenario_seq column to each df. Or overwrites all values of that column already exists. This avoids to need for the MultiScenarioManager when loading and storing single scenarios.
- delete_scenario_from_db(scenario_name: str)[source]¶
Delete a scenario. Uses a transaction (when enabled).
- static delete_scenario_name_column(inputs: Dict[str, DataFrame], outputs: Dict[str, DataFrame])[source]¶
Drops the column scenario_name from any df in either inputs or outputs. This is used to create a inputs/outputs combination similar to loading a single scenario from the DO Experiment.
- static delete_scenario_seq_column(inputs: Dict[str, DataFrame], outputs: Dict[str, DataFrame])[source]¶
For ScenarioSeq option Drops the column scenario_seq from any df in either inputs or outputs. This is used to create a inputs/outputs combination similar to loading a single scenario from the DO Experiment.
- duplicate_scenario_in_db(source_scenario_name: str, target_scenario_name: Optional[str] = None) str [source]¶
Duplicate a scenario. Uses a transaction (when enabled).
- get_custom_naming_convention() Dict [source]¶
Sets a custom naming convention See https://docs.sqlalchemy.org/en/20/core/constraints.html#configuring-constraint-naming-conventions Returns:
- get_scenario_db_table() ScenarioDbTable [source]¶
Scenario table must be the first in self.input_db_tables
- get_scenarios_df() DataFrame [source]¶
Return all scenarios in df. Result is indexed by scenario_name. Main API to get all scenarios. The API called by a cached procedure in the dse_do_dashboard.DoDashApp.
- insert_scenarios_from_zip(filepath: str)[source]¶
Insert (or replace) a set of scenarios from a .zip file into the DB. Zip is assumed to contain one or more .xlsx files. Others will be skipped. Name of .xlsx file will be used as the scenario name.
- Parameters:
filepath – filepath of a zip file
- Returns:
- insert_scenarios_in_db(inputs={}, outputs={}, bulk: bool = True)[source]¶
DEPRECATED. If we need it back, requires re-evaluation and bulk support.
- insert_tables_in_db(inputs: Dict[str, DataFrame] = {}, outputs: Dict[str, DataFrame] = {}, bulk: bool = True, auto_insert: bool = False, connection=None) int [source]¶
DEPRECATED. Was attempt to automatically insert a scenario without any schema definition. Currently, one would need to use the AutoScenarioDbTable in the constructor. If you want to automatically create such schema based on the inputs/outputs, then do that in the constructor. Not here. Note: the non-bulk ONLY works if the schema was created! I.e. only when using with self.create_schema.
- read_multi_scenario_tables_from_db(scenario_names: List[str], input_table_names: Optional[List[str]] = None, output_table_names: Optional[List[str]] = None)[source]¶
Read selected set input and output tables from multiple scenarios. If input_table_names/output_table_names contains a ‘*’, then all input/output tables will be read. If empty list or None, then no tables will be read.
- read_scenario_from_db(scenario_name: str, multi_threaded: bool = False)[source]¶
Single scenario load. Main API to read a complete scenario. Reads all tables for a single scenario. Returns all tables in one dict
Note: multi_threaded doesn’t seem to lead to performance improvement. Fixed: omit reading scenario table as an input.
- read_scenario_input_tables_from_db(scenario_name: str) Dict[str, DataFrame] [source]¶
Convenience method to load all input tables. Typically used at start if optimization model. :returns The inputs and outputs. (The outputs are always empty.)
- read_scenario_table_from_db(scenario_name: str, scenario_table_name: str) DataFrame [source]¶
Read a single table from the DB. Main API to read a single table. The API called by a cached procedure in the dse_do_dashboard.DoDashApp.
- Parameters:
scenario_name – Name of scenario
scenario_table_name – Name of scenario table (not the DB table name)
- Returns:
- read_scenario_tables_from_db(scenario_name: str, input_table_names: Optional[List[str]] = None, output_table_names: Optional[List[str]] = None)[source]¶
Read selected set input and output tables from scenario. If input_table_names/output_table_names contains a ‘*’, then all input/output tables will be read. If empty list or None, then no tables will be read.
- rename_scenario_in_db(source_scenario_name: str, target_scenario_name: str)[source]¶
Rename a scenario. Uses a transaction (when enabled).
- replace_scenario_in_db(scenario_name: str, inputs: Dict[str, DataFrame] = {}, outputs: Dict[str, DataFrame] = {}, bulk=True)[source]¶
Insert or replace a scenario. Main API to insert/update a scenario. If the scenario exists, will delete rows first. Inserts scenario data in all tables. Inserts tables in order specified in OrderedDict. Inputs first, outputs second.
- Parameters:
scenario_name –
inputs –
outputs –
bulk –
- Returns:
- update_cell_changes_in_db(db_cell_updates: List[DbCellUpdate])[source]¶
Update a set of cells in the DB.
- Parameters:
db_cell_updates –
- Returns:
- update_scenario_output_tables_in_db(scenario_name, outputs: Dict[str, DataFrame])[source]¶
Main API to update output from a DO solve in the scenario. Deletes ALL output tables. Then inserts the given set of tables. Since this only touches the output tables, more efficient than replacing the whole scenario.
- class dse_do_utils.scenariodbmanager.ScenarioDbTable(db_table_name: str, columns_metadata=None, constraints_metadata=None)[source]¶
Bases:
ABC
Abstract class. Subclass to be able to define table schema definition, i.e. column name, data types, primary and foreign keys. Only columns that are specified and included in the DB insert.
- static add_scenario_name_to_fk_constraint(fkc: ForeignKeyConstraint)[source]¶
Creates a new ForeignKeyConstraint by adding the scenario_name.
- static add_scenario_seq_to_fk_constraint(fkc: ForeignKeyConstraint)[source]¶
Creates a new ForeignKeyConstraint by adding the scenario_seq.
- create_table_metadata(metadata, engine, schema, multi_scenario: bool = False) Table [source]¶
If multi_scenario, then add a primary key ‘scenario_name’.
engine, schema is used only for AutoScenarioDbTable to get the Table (metadata) by reflection
- property dbm¶
- static df_column_names_to_snake_case(df: DataFrame) DataFrame [source]¶
Change all columns names from camelCase to snake_case.
- property enable_scenario_seq¶
- static extend_columns_constraints(columns: list[sqlalchemy.sql.schema.Column], constraints: list[sqlalchemy.sql.schema.ForeignKeyConstraint], columns_ext: Optional[list[sqlalchemy.sql.schema.Column]] = None, constraints_ext: Optional[list[sqlalchemy.sql.schema.ForeignKeyConstraint]] = None) tuple[list[sqlalchemy.sql.schema.Column], list[sqlalchemy.sql.schema.ForeignKeyConstraint]] [source]¶
To be used in ScenarioDbTableSubClass.__init__() Helps to avoid mutable default arguments by allowing columns_ext and constraints_ext to be None.
Usage:
- class MyTable(ScenarioDbTable):
- def __init__(self, db_table_name: str = ‘my_table’,
columns_ext: Optional[list[Column]] = None, constraints_ext: Optional[list[ForeignKeyConstraint]] = None):
- columns = [
Column(‘myKey’, Integer(), primary_key=True), Column(‘myValue’, Integer(), primary_key=False),
] constraints = [] columns, constraints = self.extend_columns_constraints(columns, constraints, columns_ext, constraints_ext) super().__init__(db_table_name, columns, constraints)
- static fixNanNoneNull(df) DataFrame [source]¶
Ensure that NaN values are converted to None. Which in turn causes the value to be NULL in the DB. Apply before insert df to DB. TODO VT20230106: what other incarnations of ‘NaN’ do we need to convert? Potentially: [‘N/A’, ‘na’, ‘NaN’, ‘nan’, ‘’, ‘None’]?
- get_df_column_names(df: DataFrame) List[str] [source]¶
Get all column names that are both defined in the DB schema and present in the DataFrame df.
- Parameters:
df –
- Returns:
- get_df_column_names_2(df: ~pandas.core.frame.DataFrame) -> (typing.List[str], <class 'pandas.core.frame.DataFrame'>)[source]¶
- Get all column names that are defined in the DB schema.
If not present in the DataFrame df, adds the missing column with all None values.
Note 1 (VT 20220829): Note that the sqlalchemy.insert(db_table.table_metadata).values(row) does NOT properly handle columns that are missing in the row. It seems to simply truncate the columns if the row length is less than the number of columns. It does NOT match the column names! Thus the need to add columns, so we end up with proper None values in the row for the insert, specifying all columns in the table.
Note 2 (VT 20220829): Reducing the list of sqlalchemy.Column does NOT work in sqlalchemy.insert(db_table.table_metadata).values(row) The db_table.table_metadata is an object, not a List[sqlalchemy.Column]
- Parameters:
df –
- Returns:
- get_sa_column(db_column_name) Optional[Column] [source]¶
Returns the SQLAlchemy.Column with the specified name. Uses the self.table_metadata (i.e. the sqlalchemy.Table), so works both for pre-defined tables and self-reflected tables like AutoScenarioDbTable
- get_sa_table() Optional[Table] [source]¶
Returns the SQLAlchemy Table. Can be None if table is a AutoScenarioDbTable and not defined in Python code.
- insert_table_in_db_bulk(df: DataFrame, mgr, connection=None, enable_astype: bool = True)[source]¶
Insert a DataFrame in the DB using ‘bulk’ insert, i.e. with one SQL insert. (Instead of row-by-row.)
- Parameters:
df (pd.DataFrame) –
mgr (ScenarioDbManager) –
connection – if not None, being run within a transaction
enable_astype – if True, apply df.column.astype based on datatypes extracted from columns_metadata (i.e. sqlachemy.Column)
- class dse_do_utils.scenariodbmanager.ScenarioSeqTable(db_table_name: str = 'scenario')[source]¶
Bases:
ScenarioDbTable
- class dse_do_utils.scenariodbmanager.ScenarioTable(db_table_name: str = 'scenario')[source]¶
Bases:
ScenarioDbTable
dse_do_utils.scenariomanager module¶
- class dse_do_utils.scenariomanager.Platform(value)[source]¶
Bases:
Enum
An enumeration.
- CPD25 = 3¶
- CPD40 = 2¶
- CPDaaS = 1¶
- Local = 4¶
- class dse_do_utils.scenariomanager.ScenarioManager(model_name: Optional[str] = None, scenario_name: Optional[str] = None, local_root: Optional[Union[str, Path]] = None, project_id: Optional[str] = None, project_access_token: Optional[str] = None, project=None, template_scenario_name: Optional[str] = None, platform: Optional[Platform] = None, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None, local_relative_data_path: str = 'assets/data_asset', data_directory: Optional[str] = None)[source]¶
Bases:
object
A ScenarioManager is responsible for loading and storing the input and output DataFrame dictionaries. The data can be loaded from and stored into:
A DO scenario
An Excel spreadsheet
A set of csv files
Excel. Stores one DataFrame per sheet. Creates a __index__ sheet that keeps track which DataFrame is input or output, and it restores table names that longer than the maximum of 31 in Excel.
Usage 1 - Load data from Excel and store into DO scenario. Assumes DO model MyModel and an Excel file datasets/MyExcelFile.xlsx exists. The scenario will be created if it doesn’t exist or otherwise gets overwritten:
sm = ScenarioManager(model_name='MyModel, scenario_name='Scenario_1) inputs, outputs = sm.load_data_from_excel('MyExcelFile') sm.write_data_into_scenario()
Usage 2 - Load data from DO scenario. Assumes DO model MyModel and scenario exists. Typical use in a #dd-ignore cell in a solves notebook:
sm = ScenarioManager(model_name='MyModel, scenario_name='Scenario_1) inputs, outputs = sm.load_data_from_scenario()
Usage 3 - Load data from all csv files in datasets into Excel.<br> Stores into /datasets/excel_test.xlsx.:
excel_output_file_name = 'excel_test' csv_directory = os.path.join(os.environ['DSX_PROJECT_DIR'], 'datasets') sm = ScenarioManager() inputs, outputs = sm.load_data_from_csv(csv_directory) sm.write_data_to_excel(excel_output_file_name)
Usage 4 - Load data from Excel and store into Excel. Assumes Excel file datasets/MyExcelFile.xlsx exists. Will create a file datasets/MyExcelFileOutput.xlsx.:
sm = ScenarioManager() inputs, outputs = sm.load_data_from_excel('MyExcelFile') # Do something with the inputs or outputs sm.write_data_to_excel('MyExcelFileOutput')
- static add_data_file_to_project_s(file_path: str, file_name: Optional[str] = None) None [source]¶
DEPRECATED: will never work on CP4DaaS since it requires the project_lib.Project Add a data file to the Watson Studio project. Applies to CP4Dv2.5. Needs to be called after the file has been saved regularly in the file system in /project_data/data_asset/. Ensures the file is visible in the Data Assets of the Watson Studio UI.
- Parameters:
file_path (str) – full file path, including the file name and extension
file_name (str) – name of data asset. Default is None. If None, the file-name will be extracted from the file_path.
- add_data_file_using_project_lib(file_path: str, file_name: Optional[str] = None) None [source]¶
Add a data file to the Watson Studio project. Applies to CP4Dv2.5 and WS Cloud/CP4DaaS Needs to be called after the file has been saved regularly in the file system in /project_data/data_asset/ (for CPD2.5) or /home/wsuser/work/ in CPDaaS. Ensures the file is visible in the Data Assets of the Watson Studio UI.
- Parameters:
file_path (str) – full file path, including the file name and extension
file_name (str) – name of data asset. Default is None. If None, the file-name will be extracted from the file_path.
- add_data_file_using_ws_lib(file_path: str, file_name: Optional[str] = None) None [source]¶
Add a data file to the Watson Studio project using the ibm_watson_studio_lib . Applies to CP4Dv4.0 TODO: where should the file be written? Needs to be called after the file has been saved regularly in the file system in /project_data/data_asset/ (for CPD2.5) or /home/wsuser/work/ in WS Cloud. Ensures the file is visible in the Data Assets of the Watson Studio UI.
- Parameters:
file_path (str) – full file path, including the file name and extension
file_name (str) – name of data asset. Default is None. If None, the file-name will be extracted from the file_path.
- static add_data_file_using_ws_lib_s(file_path: str, file_name: Optional[str] = None) None [source]¶
Add a data file to the Watson Studio project using the ibm_watson_studio_lib . Applies to CP4Dv4.0 TODO: where should the file be written? Needs to be called after the file has been saved regularly in the file system in /project_data/data_asset/ (for CPD2.5) or /home/dsxuser/work/ in WS Cloud. Ensures the file is visible in the Data Assets of the Watson Studio UI.
- Parameters:
file_path (str) – full file path, including the file name and extension
file_name (str) – name of data asset. Default is None. If None, the file-name will be extracted from the file_path.
- add_data_into_scenario(inputs=None, outputs=None)[source]¶
Adds data to a DO scenario. If table exists, does an overwrite/replace.
- add_data_into_scenario_s(model_name: str, scenario_name: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None) None [source]¶
Adds tables in existing scenario.
Replaces table, if table exists. Assumes scenario exists. Does not explicitly clear existing tables. Could be used in post-processing.
- add_file_as_data_asset(file_path: str, asset_name: Optional[str] = None)[source]¶
Register an existing file as a data asset in CPD.
- Parameters:
file_path – full path of the file
asset_name – name of asset. If None, will get the name from the file
- Returns:
- static add_file_as_data_asset_s(file_path: str, asset_name: Optional[str] = None, platform: Optional[Platform] = None)[source]¶
Register an existing file as a data asset in CPD. VT 2022-01-21: this method is incorrect for CPDaaS. Should use project_lib.
- Parameters:
file_path – full path of the file
asset_name – name of asset. If None, will get the name from the file
platform – CPD40, CPD25, CPSaaS, or Local. If None, will autodetect.
- Returns:
- static clear_scenario_data(client, scenario, category=None)[source]¶
Clears all input and output tables from a scenario.
Current API requires the client.
- Parameters:
client –
scenario –
category (string ['input','output']) – If None, clears all tables.
- static create_new_scenario(client, model_builder, new_scenario_name: str, template_scenario_name=None)[source]¶
Creates a new scenario from a template. The template is found either from the template_scenario_name, or if this is None, from the new_scenario_name. If a scenario with the new name already exists, all input and output tables are cleared. Thereby keeping the solve code. Creates a new blank scenario if a scenario with this name doesn’t exist.
- Parameters:
client (decision_optimization_client.Client) – Client managing the DO model
model_builder (decision_optimization_client.Experiment) – The DO model
new_scenario_name (str) – Name for the new scenario
template_scenario_name (str) – Name of an existing scenario
- Returns:
A decision_optimization_client.Container of type scenario
- Raises:
ValueError – new_scenario_name is None
ValueError – new_scenario_name is the same as template_scenario_name
- static env_is_cpd40() bool [source]¶
Return true if environment is CPDv4.0.2 and in particular supports ibm_watson_studio_lib to get access to data assets.
Notes
The import from ibm_watson_studio_lib import access_project_or_space does NOT fail in CPDaaS
The wslib = access_project_or_space() does fail in CPDaaS, however with an ugly error message
Current ugly work-around is to always first test for CPDaaS using the environment variable
TODO: prevent error/warning in CPDaaS
- export_model_as_lp(mdl, model_name: Optional[str] = None) str [source]¶
Exports the model as an .lp file in the data assets.
- Parameters:
mdl (docplex.mp.model) – the docplex model
model_name (str) – name of model (excluding the .lp). If no model_name, it uses the mdl.name
- Returns:
full file path of lp file
- Return type:
(str)
Note: now a method of ScenarioManager (instead of OptimizationEngine), so this can be included in a dd-ignore notebook cell. Avoids the dependency on dse-do-utils in the ModelBuilder.
- get_data_directory() str [source]¶
Returns the path to the datasets folder.
- Returns:
path to the datasets folder
- get_dd_client()[source]¶
Return the Client managing the DO scenario. Returns: new decision_optimization_client.Client
- get_do_scenario(model_name, scenario_name)[source]¶
Returns a DO scenario.
- Parameters:
model_name (str) – the name of the DO model
scenario_name (str) – the name of the scenario
- Returns:
A dd-scenario.Container of type scenario
- Raises:
ValueError – When either the model_name or the scenario_name doesn’t match an existing entity.
- static get_kpis_table_as_dataframe(mdl) DataFrame [source]¶
Return a DataFrame with the KPI names and values in the mdl. This table is compatible with the representation in DO4WS and can be updated in the scenario.
- Parameters:
mdl (docplex.mp.model.Model) –
- Returns:
the KPIs in the mdl
- Return type:
pd.DataFrame with columns NAME and VALUE
- get_root_directory() str [source]¶
Return the root directory of the file system. If system is WS, it will return the DSX root, otherwise the directory specified in the local_root. :raises ValueError if root directory doesn’t exist.:
TODO: review the options other than Local
- insert_scenarios_from_zip(filepath: str)[source]¶
Insert (or replace) a set of scenarios from a .zip file into the DO Experiment. Zip is assumed to contain one or more .xlsx files. Others will be skipped. Name of .xlsx file will be used as the scenario name.
- load_data(load_from_excel=False, excel_file_name=None)[source]¶
Load data from either the DO scenario, or an Excel spreadsheet. The Excel spreadsheet is expected to be in the datasets folder, either in WS or local.
- Returns:
the inputs and outputs dictionary of DataFrames
- Return type:
inputs, outputs (tuple of dicts)
- load_data_from_csv(csv_directory: str, input_csv_name_pattern: str = '*.csv', output_csv_name_pattern: Optional[str] = None, **kwargs) Tuple[Dict[str, DataFrame], Dict[str, DataFrame]] [source]¶
Load data from matching csv files in a directory. Uses glob.glob() to pattern-match files in the csv_directory. If you want to load one file, specify the full name including the .csv extension.
- Parameters:
csv_directory (str) – Relative directory from the root
input_csv_name_pattern (str) – name pattern to find matching csv files for inputs
output_csv_name_pattern (str) – name pattern to find matching csv files for outputs
**kwargs – Set of optional arguments for the pd.read_csv() function
- static load_data_from_csv_s(csv_directory: str, csv_name_pattern: str = '*.csv', **kwargs) Dict[str, DataFrame] [source]¶
Read data from all matching .csv files in a directory.
- Parameters:
csv_directory (str) – the full path of a directory containing one or more .csv files.
csv_name_pattern (str) – name pattern to find matching csv files
**kwargs – Set of optional arguments for the pd.read_csv() function
- Returns:
dict of DataFrames. Keys are the .csv file names.
- Return type:
data
- load_data_from_excel(excel_file_name: str) Tuple[Dict[str, DataFrame], Dict[str, DataFrame]] [source]¶
Load data from an Excel file located in the datasets folder of the root directory. Convenience method. If run not on WS, requires the root_dir property passed in the ScenarioManager constructor
- static load_data_from_excel_s(xl: ExcelFile, table_index_sheet: str = '_table_index_', input_table_names: Optional[List[str]] = None, output_table_names: Optional[List[str]] = None) Tuple[Dict[str, DataFrame], Dict[str, DataFrame]] [source]¶
Create dataFrames from the sheets of the Excel file. Store in dictionary df_dict with table_name as key. The table_name is either the name of the sheet, or the table_name as defined in the table_index_sheet.
In the default case, when the input_table_names or output_table_names are None, the category of the table (i.e. input or output) is driven off the value in the table_index_sheet. If not listed in table_index_sheet, it is placed in the inputs.
However, to reduce the load time for certain applications, we can restrict the tables it loads by specifying them in the input_table_names or output_table_names. If one of them is not None, it wil only load those tables and categorize them accordingly.
Note that if either input_table_names or output_table_names is used, if applicable, they would refer to the translated tables names by the table_index_sheet. (I.e. not the abbreviated names used in the sheet names.)
- Parameters:
xl (pandas.ExcelFile) – Excel file
table_index_sheet (str) – Name of table index sheet
input_table_names (List[str]) – names of input tables to read
output_table_names (List[str]) – names of output tables to read
- Returns:
- A tuple of inputs and outputs dictionaries of DataFrames,
one df per sheet
- Return type:
(Dict[str,DataFrame], Dict[str,DataFrame])
- load_data_from_parquet(directory: str, input_name_pattern: str = '*.parquet', output_name_pattern: Optional[str] = None, **kwargs) Tuple[Dict[str, DataFrame], Dict[str, DataFrame]] [source]¶
Load data from matching parquet files in a directory. Uses glob.glob() to pattern-match files in the directory. If you want to load one file, specify the full name including the .parquet extension.
- Parameters:
directory (str) – Relative directory from the root
input_name_pattern (str) – name pattern to find matching parquet files for inputs
output_name_pattern (str) – name pattern to find matching parquet files for outputs
**kwargs – Set of optional arguments for the pd.read_parquet() function
- static load_data_from_parquet_s(directory: str, file_name_pattern: str = '*.parquet', **kwargs) Dict[str, DataFrame] [source]¶
Read data from all matching .parquet files in a directory.
- Parameters:
directory (str) – the full path of a directory containing one or more .parquet files.
file_name_pattern (str) – name pattern to find matching parquet files
**kwargs – Set of optional arguments for the pd.read_parquet() function
- Returns:
dict of DataFrames. Keys are the .parquet file names.
- Return type:
data
- load_data_from_scenario() Tuple[Dict[str, DataFrame], Dict[str, DataFrame]] [source]¶
Loads the data from a DO scenario
- load_data_from_scenario_s(model_name: str, scenario_name: str) Tuple[Dict[str, DataFrame], Dict[str, DataFrame]] [source]¶
Loads the data from a DO scenario. Returns empty dict if no tables.
- static load_data_from_zip_csv_s(zip_file_path: str, file_size_limit: Optional[int] = None, **kwargs) Dict[str, DataFrame] [source]¶
Read data from a zip file with .csv files.
- Parameters:
zip_file_path (str) – the full path of a zip file containing one or more .csv files.
file_size_limit (int) – maximum file size in bytes. None implies no limit.
**kwargs – Set of optional arguments for the pd.read_csv() function
- Returns:
dict of DataFrames. Keys are the .csv file names.
- Return type:
data
- print_table_names() None [source]¶
Print the names of the input and output tables. For development and debugging.
- replace_data_in_scenario(inputs=None, outputs=None)[source]¶
Replaces all input, output or both. Note: you will need to specify the inputs or outputs you want to replace explicitly as input arguments. It will NOT get them from self.inputs or self.outputs! In this way, you can control which to update. E.g. after a solve, only update the outputs, not the inputs.
- replace_data_into_scenario_s(model_name: str, scenario_name: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None) None [source]¶
Replaces all input, output or both.
If input/output are not None, clears inputs/outputs first Assumes scenario exists. Does explicitly clear all existing input/output tables.
- update_solve_output_into_scenario(mdl, outputs)[source]¶
Replaces all output and KPIs table in the scenario.
Assumes the scenario exists. Will not change the inputs of the scenario. Generates the KPI table.
- Limitations:
Does NOT update the objective
Does NOT update the log
- Parameters:
mdl (docplex.mp.model) – the model that has been solved
outputs (Dict) – dictionary of DataFrames
- write_data_into_scenario()[source]¶
Writes the data into a DO scenario. Create new scenario and write data.
- write_data_into_scenario_s(model_name: str, scenario_name: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None, template_scenario_name: Optional[str] = None) None [source]¶
Create new scenario and write data.
If scenario exists: clears all existing data. If scenario doesn’t exists: create new one. If template_scenario_name is specified, use that as template. If the existing scenario has a model, it keeps the model. If there is no existing scenario, the user needs to add a model manually in DO. Tested: works reliably.
TODO: one small issue: if scenario exists and has been solved before, it clears all inputs and outputs (including the KPIs), but not the objective value. The DO UI shows as if the model has been solved.
- write_data_to_csv() None [source]¶
Write inputs and/or outputs to .csv files in the root/datasets folder.
Args: None Returns: None
- static write_data_to_csv_s(csv_directory: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None) None [source]¶
Write data to .csv files in a directory. Name as name of DataFrame.
- Parameters:
csv_directory (str) – the full path of a directory for the .csv files.
inputs (Dict of DataFrames) – inputs
outputs (Dict of DataFrames) – outputs
Returns: None
- write_data_to_excel(excel_file_name: Optional[str] = None, unique_file_name: bool = True, copy_to_csv: bool = False) str [source]¶
Write inputs and/or outputs to an Excel file in datasets. The inputs and outputs as in the attributes self.inputs and self.outputs of the ScenarioManager
If the excel_file_name is None, it will be generated from the model_name and scenario_name: MODEL_NAME + “_” + SCENARIO_NAME + “_output”
If Excel has a file with the same name opened, it will throw a PermissionError. If so and the flag unique_file_name is set to True, it will save the new file with a unique name. I.e., if the file is not opened by Excel, the file is overwritten.
- Parameters:
excel_file_name (str) – The file name for the Excel file.
unique_file_name (bool) – If True, generates a unique file name in case the existing file is opened(!) by Excel
copy_to_csv (bool) – If true, will create a copy of the file with the extension .csv. DEPRECATED, NON-FUNCTIONAL
- static write_data_to_excel_s(writer: ExcelWriter, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None, table_index_sheet: str = '_table_index_') None [source]¶
Writes all dataframes in the inputs and outputs to the Excel writer, with sheet-names based on the keys of the inputs/outputs. Due to the Excel limitation of maximum 31 characters for the sheet-name, tables names longer than the 31 characters will be abbreviated with a unique name. The mapping between the original table-name and abbreviated name is recorded in a separate sheet named by the table_index_sheet.
- Parameters:
writer (pandas.ExcelWriter) – The Excel writer to write the file
inputs (Dict of DataFrames) – inputs
outputs (Dict of DataFrames) – outputs
table_index_sheet (str) – name for the index sheet
- write_data_to_parquet(directory: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None) None [source]¶
Write inputs and/or outputs to .parquet files in the target folder.
- Parameters:
directory (str) – Relative directory from the root
Returns: None
- static write_data_to_parquet_s(directory: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None) None [source]¶
Write data to .parquet files in a directory. Name as name of DataFrame.
- Parameters:
directory (str) – the full path of a directory for the .parquet files.
inputs (Dict of DataFrames) – inputs
outputs (Dict of DataFrames) – outputs
Returns: None
- static write_data_to_zip_csv_s(zip_file_path: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None, **kwargs)[source]¶
Write data as a zip file with .csv files. inputs and outputs dictionaries are merged and written in same zip.
- Parameters:
zip_file_path (str) – the full path of a zip file.
inputs – dict of input DataFrames
outputs – dict of input DataFrames
**kwargs – Set of optional arguments for the df.to_csv() function
- Returns:
None
dse_do_utils.scenariopicker module¶
- class dse_do_utils.scenariopicker.ScenarioPicker(model_name: Optional[str] = None, scenario_name: Optional[str] = None, project_id=typing.Optional[str], project_access_token=typing.Optional[str], project=None)[source]¶
Bases:
object
Notebook widget to interactively select a scenario from the dd_scenario.Client.
Usage
Cell 1:
sp = ScenarioPicker(model_name = 'My_DO_Model') sp.get_scenario_picker_ui()
Cell 2:
inputs, outputs = sp.load_selected_scenario_data()
Create a ScenarioPicker and pass the model name. The API get_scenario_picker_ui() returns a widget with a drop-down box with the available scenarios. In addition, there is a Refresh button that will run all cells below this cell. The next cell should reload the scenario data. The API load_selected_scenario_data() is a convenience method that internally uses a ScenarioManager to load the data from the DO scenario.
The selection of the scenario is maintained in the class variable ScenarioPicker.default_scenario. Therefore, a re-run of the cell keeps the last selected value. By adding:
ScenarioPicker.default_scenario = 'my_default_scenario'
before the creation of the scenario picker, one can force the default scenario to an initial value.
- class ScenarioRefreshButton(**kwargs: Any)[source]¶
Bases:
Button
A widget Refresh button that will refresh all cells below. Inner class of ScenarioPicker since it is only applicable in the context of the ScenarioPicker.
- default_scenario = None¶
- get_dd_client()[source]¶
Return the Client managing the DO scenario. Returns: new dd_scenario.Client
- get_scenario_picker_ui()[source]¶
Return a combination of both the drop-down and the refresh button.
- load_selected_scenario_data()[source]¶
Convenience method. Creates a ScenarioManager and loads input and output data from the scenario selected by the picker. :returns: A tuple with the (inputs, outputs) data
- widgets = <module 'ipywidgets.widgets' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\ipywidgets\\widgets\\__init__.py'>¶
dse_do_utils.scenariorunner module¶
- class dse_do_utils.scenariorunner.RunConfig(insert_inputs_in_db: bool = False, insert_outputs_in_db: bool = False, new_schema: bool = False, insert_in_do: bool = False, write_output_to_excel: bool = False, enable_data_check: bool = False, enable_data_check_outputs: bool = False, data_check_bulk_insert: bool = False, log_level: str = 'DEBUG', export_lp: bool = False, export_sav: bool = False, enable_refine_conflict: bool = False, export_lp_path: str = '', do_model_name: str = None, template_scenario_name: Optional[str] = None)[source]¶
Bases:
object
- data_check_bulk_insert: bool = False¶
- do_model_name: str = None¶
- enable_data_check: bool = False¶
- enable_data_check_outputs: bool = False¶
- enable_refine_conflict: bool = False¶
- export_lp: bool = False¶
- export_lp_path: str = ''¶
- export_sav: bool = False¶
- insert_in_do: bool = False¶
- insert_inputs_in_db: bool = False¶
- insert_outputs_in_db: bool = False¶
- log_level: str = 'DEBUG'¶
- new_schema: bool = False¶
- template_scenario_name: Optional[str] = None¶
- write_output_to_excel: bool = False¶
- class dse_do_utils.scenariorunner.ScenarioConfig(scenario_name: str = 'Scenario_x', parameters: Dict = None)[source]¶
Bases:
object
- parameters: Dict = None¶
- scenario_name: str = 'Scenario_x'¶
- class dse_do_utils.scenariorunner.ScenarioGenerator(inputs: Dict[str, DataFrame], scenario_config: SC)[source]¶
Bases:
Generic
[SC
]Generates a variation of a scenario, i.e. inputs dataset, driven by a ScenarioConfig. To be subclassed. This base class implements overrides of the Parameter table. The ScenarioGenerator is typically used in the context of a ScenarioRunner.
Usage:
class MyScenarioGenerator(ScenarioGenerator): def generate_scenario(self): new_inputs = super().generate_scenario() new_inputs['MyTable1'] = self.generate_my_table1().reset_index() new_inputs['MyTable2'] = self.generate_my_table2().reset_index() return new_inputs
- generate_scenario()[source]¶
Generate a variation of the base_inputs. To be overridden. This default implementation changes the Parameter table based on the overrides in the ScenarioConfig.parameters.
Usage:
def generate_scenario(self): new_inputs = super().generate_scenario() new_inputs['MyTable'] = self.generate_my_table().reset_index() return new_inputs
- class dse_do_utils.scenariorunner.ScenarioRunner(scenario_db_manager: ScenarioDbManager, optimization_engine_class: Type[Core01OptimizationEngine], data_manager_class: Type[Core01DataManager], scenario_db_manager_class: Type[ScenarioDbManager], scenario_generator_class: Optional[Type[ScenarioGenerator]] = None, do_model_name: str = 'my_model', schema: Optional[str] = None, local_root: Optional[str] = None, local_platform: Optional[Union[int, Platform]] = None, data_directory: Optional[str] = None)[source]¶
Bases:
object
TODO: remove local_root, local_platform, replace by data_directory? (It seems to be working fine though)
- data_check_inputs(inputs: Dict[str, DataFrame], scenario_name: str = 'data_check', bulk: bool = False) Dict[str, DataFrame] [source]¶
Use SQLite to validate data. Read data back and do a dm.prepare_data_frames. Does a deepcopy of the inputs to ensure the DB operations do not alter the inputs. Bulk can be set to True once the basic data issues have been resolved and performance needs to be improved. Set bulk to False to get more granular DB insert errors, i.e. per record. TODO: add a data_check() on the DataManager for additional checks.
- data_check_outputs(inputs: Dict[str, DataFrame], outputs: Dict[str, DataFrame], scenario_name: str = 'data_check', bulk: bool = False) Tuple[Dict[str, DataFrame], Dict[str, DataFrame]] [source]¶
Use SQLite to validate data. Read data back and do a dm.prepare_data_frames. Does a deepcopy of the inputs to ensure the DB operations do not alter the inputs. Bulk can be set to True once the basic data issues have been resolved and performance needs to be improved. Set bulk to False to get more granular DB insert errors, i.e. per record. TODO: add a data_check() on the DataManager for additional checks.
- generate_scenario(base_inputs: Dict[str, DataFrame], scenario_config: ScenarioConfig)[source]¶
Generate a derived scenario from a baseline scenario on the specifications in the scenario_config. :param base_inputs: :param scenario_config: :return:
- insert_in_do(inputs, outputs, scenario_config: ScenarioConfig, run_config: RunConfig)[source]¶
- insert_inputs_in_db(inputs: Dict[str, DataFrame], run_config: RunConfig, scenario_name: str) Dict[str, DataFrame] [source]¶
- insert_outputs_in_db(inputs: Dict[str, DataFrame], outputs: Dict[str, DataFrame], run_config: RunConfig, scenario_name: str)[source]¶
- run_model(inputs: Dict[str, DataFrame], run_config: RunConfig)[source]¶
Main method to run the optimization model.
- run_multiple(scenario_configs: List[ScenarioConfig], run_config: RunConfig, base_inputs: Optional[Dict[str, DataFrame]] = None, excel_file_name: Optional[str] = None) None [source]¶
Only once create schema and/or load data from Excel. Then it will run all scenario_configs, each time applying the ScenarioGenerator on the base inputs.
- run_once(scenario_config: ScenarioConfig, run_config: RunConfig, base_inputs: Optional[Dict[str, DataFrame]] = None, excel_file_name: Optional[str] = None)[source]¶
dse_do_utils.utilities module¶
- dse_do_utils.utilities.add_sys_path(new_path)[source]¶
Adds a directory to Python’s sys.path
Does not add the directory if it does not exist or if it’s already on sys.path. Returns 1 if OK, -1 if new_path does not exist, 0 if it was already on sys.path. Based on: https://www.oreilly.com/library/view/python-cookbook/0596001673/ch04s23.html
Challenge: in order to use this function, we need to import the dse_do_utils package and thus we need to add it’s location it to sys.path! This will work better once we can do a pip install dse-do_utils.
- dse_do_utils.utilities.convert_size(size_bytes: int)[source]¶
Returns string describing file size.
- Parameters:
size_bytes (int) – size if file in bytes
From https://stackoverflow.com/questions/5194057/better-way-to-convert-file-sizes-in-python
- dse_do_utils.utilities.df_itertuples_with_index_names(df: DataFrame)[source]¶
Alternative for df.itertuples() where we add the index as named attributes to the tuple. This allows access to the index column in the same way as a regular column. This will make it much easier to access the values of the named index.
Normally with df.itertuples() one must access the values of the Index by position, e.g.:
for row in df.itertuples(): (index_a, index_b) = row.Index print(index_a)
One would have to ensure to extract all index columns and know the order in the Index. However, with this function we can do:
for row in df_itertuples_with_index_names(df): print(row.index_a)
Test:
# Create a sample df index = pd.MultiIndex.from_product([range(2), range(3)], names=['index_a', 'index_b']) df = pd.DataFrame({'my_column': range(len(index))}, index=index) # Loop over itertuples alternative: for row in df_itertuples_with_index_names(df): print(row.index_a)
Index columns are added at the tail of the tuple, so to be compatible with code that uses the position of the fields in the tuple. Inspired by https://stackoverflow.com/questions/46151666/iterate-over-pandas-dataframe-with-multiindex-by-index-names.
Notes
Does NOT work when df.Index has no names
TODO: does not work if only Index and no columns TODO: test the combinations where row or Index are not tuples. Is row always a tuple?
- dse_do_utils.utilities.list_file_hierarchy(startpath: str) None [source]¶
Hierarchically print the contents of the folder tree, starting with the startpath.
Usage:
current_dir = os.getcwd() parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir)) parent_dir_2 = os.path.abspath(os.path.join(parent_dir, os.pardir)) list_file_hierarchy(parent_dir_2) #List tree starting at the grand-parent of the current directory
- Parameters:
startpath (str) – Root of the tree
- Returns:
None
dse_do_utils.version module¶
Source of truth for the dse_do_utils version. The versions in setup.py and /docs/source/conf.py are automatically populated from here.
Best practice to keep version here, in a separate file. See https://stackoverflow.com/questions/458550/standard-way-to-embed-version-into-python-package