dse_do_utils package

Submodules

dse_do_utils.cpd25utilities module

dse_do_utils.cpd25utilities.add_file_as_data_asset_cpd25(file_name: str) None[source]

Adds a file located in /project_data/data_asset/ as a Data Asset to the Watson Studio project. So that it appears in the UI and can be exported.

Parameters:

file_name (str) – name of file, including extension

dse_do_utils.cpd25utilities.add_file_path_as_data_asset_cpd25(file_path: str, asset_name: Optional[str] = None) None[source]

Add a data file to the Watson Studio project. Applies to CPDv2.5. Works for any file. Allows the file to be viewed and downloaded from Watson Studio UI. Needs to be called after the file has been saved regularly in the file system. Typically, that would be in /project_data/data_asset/. Ensures the file is visible in the Data Assets of the Watson Studio UI.

Usage:

# Write some file as an example:
file_path = '/project_data/data_asset/myfile.csv'
with open(file_path, 'w+') as f:
     f.write("Hello World")
# Add file as a data asset:
add_file_as_data_asset_cpd25(file_path)
Beware that the same data now exists in 2 different places:
  • In the Cloud Object Storage (COS)

  • As a file in /project_data/data_asset/

Changing any of the 2 independently can cause inconsistencies.

Parameters:
  • file_path (str) – full file path, including the file name and extension

  • asset_name (str) – name of data asset. Default is None. If None, the asset_name will be extracted from the file_path.

dse_do_utils.cpd25utilities.add_file_path_as_data_asset_wsc(file_path: str, asset_name: Optional[str] = None, project=None) None[source]

Add a data file to the Watson Studio project. Applies to WS Cloud and CPDv2.5. Works for any file. Allows the file to be viewed and downloaded from Watson Studio UI. Needs to be called after the file has been saved regularly in the file system. Typically, that would be in:

  • CPDv2.5: /project_data/data_asset/

  • WS Cloud: /home/dsxuser/work/, or os.environ[‘PWD’], or ./, or no path

Ensures the file is visible in the Data Assets of the Watson Studio UI.

Parameters:
  • project (project_lib.Project) – required for WS Cloud

  • file_path (str) – full file path, including the file name and extension

  • asset_name (str) – name of data asset. Default is None. If None, the asset_name will be extracted from the file_path.

Usage:

# Write some file as an example:
file_path = '/project_data/data_asset/myfile.csv'
with open(file_path, 'w+') as f:
     f.write("Hello World")
# Add file as a data asset:
add_file_as_data_asset_cpd25(file_path)
dse_do_utils.cpd25utilities.write_data_asset_as_file_cpd25(asset_name: str, path: str = '/project_data/data_asset/') str[source]

Writes a named data asset to file. Assumes a data asset with asset_name exists. Makes the file accessible for things like:

  • Load from disk

  • Pip install

  • Module import

Parameters:
  • asset_name (str) – name of the asset

  • path (str, Optional) – Default is ‘/project_data/data_asset/’. Use path=’’ for current directory.

dse_do_utils.cpd25utilities.write_data_asset_as_file_wsc(asset_name: str, path: str = '/home/dsxuser/work/', project=None) str[source]

Writes a named data asset to file (for WS Cloud). Assumes a data asset with asset_name exists. Makes the file accessible for things like:

  • Load from disk

  • Pip install

  • Module import

Parameters:
  • asset_name (str) – name of the asset

  • path (str, Optional) – Default (for WS Cloud) is ‘/home/dsxuser/work/’. Use path=’’ for current directory.

  • project (project_lib.Project) – required for WS Cloud. For CPD, leave as None.

dse_do_utils.datamanager module

class dse_do_utils.datamanager.DataManager(inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None)[source]

Bases: object

A DataManager is a container of original scenario and intermediate data.

It typically contains the input and output dictionaries with DataFrames that came from or will be inserted into a DO scenario. In addition it will hold any intermediate data. It holds methods that operate on and convert the data. When used in combination with an optimization engine, it should not contain the docplex code that creates or interacts with the docplex Model. (That is the task of the OptimizationEngine.)

One of the reasons to separate the DataManager from the OptimizationEngine is to re-use the DataManager, e.g. for output visualization notebooks.

A typical DataManager:
  • Prepares the input DataFrames (like selecting and renaming columns and indexing) and assigns them to a direct attribute.

  • Contains a set of methods that create intermediate data (‘pre-processing’). Intermediate data will also be assigned as a direct member property.

static apply_and_concat(dataframe, field, func, column_names)[source]

Adds multiple columns in one lambda apply function call.

Based on https://stackoverflow.com/questions/23690284/pandas-apply-function-that-returns-multiple-values-to-rows-in-pandas-dataframe

Usage:

def my_function(my_input_value):
    my_output_value_1 = 1
    my_output_value_2 = 2
    return (my_output_value1, my_output_value_2)

df = apply_and_concat(df, 'my_input_column_name', my_function, ['my_output_column_name_1','my_output_column_name_2'])

df should have the column ‘my_input_column_name’. Result is that df will have 2 new columns: ‘my_output_column_name_1’ and ‘my_output_column_name_2’

Deprecated since version 0.2.2: Same can be done with plain Pandas.

Alternative in plain Pandas:

df[['my_output_column_name_1','my_output_column_name_2']] = df.apply(lambda row : pd.Series(my_function(row.my_input_column_name)), axis = 1)
Parameters:
  • dataframe (DataFrame) – The DataFrame that the function is applied to

  • field (str) – the name of the input data column of dataframe

  • func – the function that will be applied. Should have one input argument and return a tuple with N elements.

  • column_names (list of str) – The names of the N output columns. Should match the number of values in the function return tuple.

Returns:

modified dataframe with N new columns

static df_crossjoin_ai(df1: DataFrame, df2: DataFrame, **kwargs) DataFrame[source]

Cross-join ‘Any Index’ Make a cross join (cartesian product) between two dataframes by using a constant temporary key. Accepts dataframes that are single or multi-indexed with named and un-named indices.

Parameters:
  • df1 (DataFrame) –

  • df2 (DataFrame) –

  • pd.merge() (kwargs keyword arguments that will be passed to) –

Returns:

(DataFrame) cross join of df1 and df2

static df_crossjoin_mi(df1: DataFrame, df2: DataFrame, **kwargs) DataFrame[source]

Make a cross join (cartesian product) between two dataframes by using a constant temporary key. Assumes both input dataframes have a (single or multi) index. Returns a dataframe with a MultiIndex that is the cartesian product of the indices of the input dataframes. Creates a named MultiIndex if both input dataframes have their indices named. Otherwise will return an unnamed multi-index.

Parameters:
  • df1 (DataFrame) –

  • df2 (DataFrame) –

  • pd.merge() (kwargs keyword arguments that will be passed to) –

Returns:

(DataFrame) cross join of df1 and df2

static df_crossjoin_si(df1: DataFrame, df2: DataFrame, **kwargs) DataFrame[source]

Make a cross join (cartesian product) between two dataframes by using a constant temporary key. Assumes both input dataframes have a single index column. Returns a dataframe with a MultiIndex that is the cartesian product of the indices of the input dataframes. See: https://github.com/pydata/pandas/issues/5401 See https://mkonrad.net/2016/04/16/cross-join–cartesian-product-between-pandas-dataframes.html

Parameters:
  • df1 (DataFrame) – dataframe 1

  • df2 (DataFrame) – dataframe 2

  • pd.merge() (kwargs keyword arguments that will be passed to) –

Returns:

(DataFrame) cross join of df1 and df2

static extract_solution(df, extract_dvar_names: Optional[List[str]] = None, drop_column_names: Optional[List[str]] = None, drop: bool = True)[source]

Generalized routine to extract a solution value. Can remove the dvar column from the df to be able to have a clean df for export into scenario.

static get_parameter_value(params, param_name: str, param_type: Optional[str] = None, default_value=None, value_format: str = '%Y-%m-%d %H:%M:%S')[source]

Get value of parameter from the parameter table (DataFrame). Note that if the input table has a mix of data types in the value column, Pandas can change the data type of a parameter depending on what other values are used in other rows. This requires the explicit conversion to the expected data type.

Parameters:
  • params (indexed DataFrame with parameters) – Index = ‘param’, value in ‘value’ column.

  • param_name (str) – Name of parameter.

  • param_type (str) – Type of parameter. Valid param_type values are int, float, str, bool, datetime.

  • default_value – Value if param_name not in index.

  • value_format (str) – Format for datetime conversion.

Returns:

get_raw_table_by_name(table_name: str) Optional[DataFrame][source]

Get the ‘raw’ (non-indexed) table from inputs or outputs.

prep_parameters() DataFrame[source]

Pre-process the Parameter(s) input table. Assumes the inputs contains a table named Parameter or Parameters with key param and column value. Otherwise, creates a blank DataFrame instance.

prepare_data_frames()[source]
prepare_input_data_frames()[source]

Placeholder to process input data frames, in particular to set the index and to assign dataframes to a direct property of the DataManager. Make sure to test if table-name exists in input dict so we can re-use this class in e.g. DashEnterprise apps where not the whole scenario is loaded.

Example:

if 'MyTable' in self.inputs:
    self.my_table = self.inputs['MyTable'].set_index('Id', verify_integrity=True)
prepare_output_data_frames()[source]

Placeholder to process output data frames. Processes the default ‘kpis’ table.

print_hello()[source]

FOR TESTING: Print some hello string.

Prints some message. To test reloading of the package from a notebook. Usage:

(In notebook cell #1)
from dse_do_utils import DataManager
dm = DataManager()
(In cell #2)
dm.print_hello()

Change the test of the string. Upload the module to WSL. If testing autoreload, rerun the second cell only. Verify it prints the updated string. If testing imp.reload, rerun the notebook from the start.

print_inputs_outputs_summary()[source]

Prints a summary of the input and output data. Prints the names of all input and output tables, along with the column names and the number of rows and columns.

set_parameters()[source]

Set the parameters as properties of the self.param object. This allows for easy access to the parameters, e.g. dm.param.time_limit To be overridden. Make sure to call the super().set_parameters()

Creates the self.param SimpleNamespace to be able to add the individual parameter properties. Creates the self.params pd.DataFrame to be able to easily extract the parameter values.

dse_do_utils.deployeddomodel module

class dse_do_utils.deployeddomodel.DeployedDOModel(wml_credentials, space_name: Optional[str] = None, deployed_model_name: Optional[str] = None, deployment_id: Optional[str] = None, default_max_oaas_time_limit_sec: Optional[int] = None, default_max_run_time_sec: Optional[int] = 600, monitor_loop_delay_sec: int = 5)[source]

Bases: object

New DeployedDOModel for CPD3.5 based on ibm_watson_machine_learning.APIClient

Major steps:

  1. Create an instance of a DeployedDOModel, configuring parameters

Internally, the processes uses the APIClient (former WatsonMachineLearningAPIClient) to communicate with the deployed model:

  1. Start the solve job.

  2. Monitor the running solve job. Runs in a loop, updates the current state.

  3. Once the job completes, get the optimization result.

In the code:

mdl.solve():
    solve_payload = self.get_solve_payload(inputs)  # Convert inputs to payload
    job_details, job_uid = self.execute_model(solve_payload)
    job_details = self.monitor_execution(job_details, job_uid)
    self.extract_solution(job_details)
    return job_details

Usage:

# Simplest, using all default options:
mdl = DeployedDOModel(wml_credentials, space_name, deployed_model_name)
job_details = mdl.solve(inputs)
print("Solve status: {}".format(mdl.solve_status))
print("Objective: {}".format(mdl.objective))
print("Output tables: {}".format(mdl.outputs.keys()))

TODOs:

  1. Debug mode

  2. Get the cplex log file?

  3. Done - Add kill and stop methods

  4. Done - Configure polling interval

execute_model(inputs: Dict[str, DataFrame], max_oaas_time_limit_sec: Optional[int])[source]
extract_solution(job_details)[source]
get_deployment_id(model_name: str)[source]

Find deployment_id from model_name.

static get_job_status(job_details)[source]

Job states can be : queued, running, completed, failed, canceled.

static get_outputs(job_details)[source]
static get_solve_details(job_details)[source]

After job has completed

static get_solve_details_objective(job_details)[source]

After job has completed. Note: not sure where the objective is. Can be PROGRESS_CURRENT_OBJECTIVE or PROGRESS_BEST_OBJECTIVE

get_solve_payload(inputs: Dict[str, DataFrame], max_oaas_time_limit_sec: Optional[int] = None)[source]
static get_solve_status(job_details)[source]

After job has completed

get_space_id(space_name: str)[source]

Find space_id from space_name.

monitor_execution(job_details, max_run_time_sec: Optional[int] = None)[source]

Monitor the model execution by periodically calling the API to get the current execution status. Result stored in self.execution_status_json and self.execution_status.

solve(inputs: Dict[str, DataFrame], max_oaas_time_limit_sec: Optional[int] = None, max_run_time_sec: Optional[int] = None)[source]

Master routine. Initializes the job, starts the execution, monitors the results, post-processes the solution and cleans-up after.

Parameters:
  • inputs (dict of DataFrames) – input tables

  • max_oaas_time_limit_sec (int) – will override the default from the constructor

  • max_run_time_sec (int) – will override the default from the constructor

Calls the following methods (in order):

self.retrieve_solve_configuration()
self.set_output_settings_in_solve_configuration()
self.execute_model()
self.monitor_execution()
self.retrieve_debug_materials()
self.cleanup()

dse_do_utils.domodeldeployer module

class dse_do_utils.domodeldeployer.DOModelDeployer(wml_credentials: Dict, model_name: str, scenario_name: str, space_name: str, package_paths: List[str] = [], file_paths: List[str] = [], deployment_name: str = 'xxx', deployment_description: str = 'xxx', project=None, tmp_dir: Optional[str] = None)[source]

Bases: object

Deploys a DO Model in WML. For use in CPD 4.0. Retrieves the model from the DO Model Builder.

Usage:

md = DOModelDeployer(wml_credentials, model_name, scenario_name, space_name,
                     package_paths, file_paths,
                     deployment_name, deployment_description)
deployment_uid = md.deploy_model()
print(deployment_uid)

How to add Python modules in the root:
- Specify paths to modules (.py files) in `file_paths`. These modules are included in the root of the project
  and can be accessed using `from my_module import MyClass`. This is similar to the additional files in the DO Experiment.
  These files can be located anywhere in JupyterLab.
How to add a Python package:
1. From conda
create_archive(main_file_path: str, path: str)[source]

Create archive. For now assume one folder model with one file main.py

Parameters:
  • main_file_path – file path of main.py file

  • path – folder where archive will be written

create_model_archive(path: str)[source]

Creates a model archive on the path: The archive contains one .py file: the do-model surrounded by boilerplate code to process the inputs and outputs dictionaries. Steps: 1. Write a file path/main.py 2. Creates an archive file in path 3. Adds the main.py 4. Adds packages 5. Adds (module) files

create_model_directory() str[source]

Create a directory ‘model’ in the default path. Will remove/clear first if exists.

Returns:

path

create_package_extension(yaml_file_path: str) str[source]
create_software_specification(pkg_ext_ids: List[str] = []) str[source]

Allow for multiple package_extensions

create_zip_package_extension(package_zip_filepath: str) str[source]

See https://notebooks.githubusercontent.com/view/ipynb?browser=chrome&color_mode=auto&commit=37188b1a8b48be2bef34b35b55f01cba0d29ed19&device=unknown&enc_url=68747470733a2f2f7261772e67697468756275736572636f6e74656e742e636f6d2f49424d2f776174736f6e2d6d616368696e652d6c6561726e696e672d73616d706c65732f333731383862316138623438626532626566333462333562353566303163626130643239656431392f637064342e302f6e6f7465626f6f6b732f707974686f6e5f73646b2f6465706c6f796d656e74732f637573746f6d5f6c6962726172792f5573652532307363696b69742d6c6561726e253230616e64253230637573746f6d2532306c696272617279253230746f2532307072656469637425323074656d70657261747572652e6970796e62&logged_in=false&nwo=IBM%2Fwatson-machine-learning-samples&path=cpd4.0%2Fnotebooks%2Fpython_sdk%2Fdeployments%2Fcustom_library%2FUse+scikit-learn+and+custom+library+to+predict+temperature.ipynb&platform=android&repository_id=277618282&repository_type=Repository&version=98

deploy_archive(model_archive_file_path, yaml_file_path)[source]
deploy_model() str[source]

One call that deploys a model from the Model Builder scenario into WML. Creates a model archive from the extracted model code. Then uploads into WML and creates a deployment.

Returns:

Deployment UID necessary to call the deployment.

Return type:

deployment_uid (str)

get_scenario()[source]
get_wml_create_deployment_meta_props()[source]

Return the meta_props for the creation of the deployment Separate method, so can easily be overridden

get_wml_create_store_model_meta_props(sw_spec_id)[source]

Return the meta_props for the store of the model Separate method, so can easily be overridden

guid_from_space_name(space_name: str) str[source]

Get space_id from deployment space name. TODO: handle exception if space_name not found.

wml_create_deployment(model_uid) str[source]

Create deployment in WML :returns: deployment_uid

wml_store_model(model_archive_file_path, yaml_file_path) str[source]

Stores model in WML :returns: model_uid

write_main_file(file_path: str)[source]

Write the code for the main.py file. Adds the code template header and footer.

write_yaml_file(file_path: str = './main.yml')[source]

Write the code for the main.py file. Adds the code template header and footer.

dse_do_utils.domodelexporter module

class dse_do_utils.domodelexporter.DOModelExporter(do_model_names: List[str], **kwargs)[source]

Bases: object

DEPRECATED. These APis are no longer available from CPDv3.5 Exports a DO model from CPD2.5 using curl/web-requests. By default, the export files are stored as datetime-stamped zip files in the Data Assets of the project.

Can be used in 4 ways:

  1. Typical: only specify a list of the DO Model names to export.

  2. Export from another project in same cluster.

  3. Export from another project in another cluster.

  4. Generates the full curl commands. Then copy and paste them into a terminal that supports curl.

  1. Typical use:

    Initialize the exporter with a list of DO Model names and call the method me.export_do_models(). Must be run in the same project and cluster. The DO Model export files are stored in the Data Assets of this project. Uses naming pattern: {do_model_name}_export_{YYYYMMDD_hhmm}.zip.:

    me = DOModelExporter(do_model_names = ['Model1', 'Model2'])
    me.export_do_models()
    
  2. Export from another project in same cluster:

    Need to specify the project_id. See below for how to get the project_id. Assumes current user is a collaborator on the other project (if not use the next use-case):

    me = DOModelExporter(do_model_names = ['ProductionPlanning'],
             project_id = 'ef365b2c-9f28-447a-a933-5de6560a1cfa')
    me.export_do_models()
    
  3. Export from another project in other cluster:

    Specify access_toke=None, user_name, password and project_id. Will retrieve the accessToken from the user-name and password:

    me = DOModelExporter(cpd_cluster_host = 'https://dse-cp4d25-cluster4.cpolab.ibm.com',
                 access_token = None,
                 user_name = user_name,
                 password = password,
                 do_model_names = ['ProductionPlanning'],
                 project_id = 'b7bf7fd8-aa50-4bd2-8364-02ea6d480895')
    me.export_do_models()
    
  4. Generate curl commands:

    1. Initialize the exporter: me = DOModelExporter(cluster_name, user_name, password, do_model_name, project_id)

    2. Get the access-token curl command: me.get_access_token_curl(). Extract the access_token string.

    3. Get the export-do-model curl command: me.get_do_model_export_curl(do_model_name, access_token).

    Usage:

    me = DOModelExporter(do_model_names=[],
                 user_name = user_name,
                 password = password)
    me.get_access_token_curl()
    access_token = 'xxxxxx'  # Get value from running the above curl command
    me.get_do_model_export_curl('ProductionPlanning', access_token)
    

    Curl commands can be run for instance from the Git Bash terminal that is part of Git for Windows.

  5. How to get the project_id:

    1. If not specifying a project_id in the constructor, it will get it automatically using the environment variable: os.environ[‘PROJECT_ID’].

    2. Run the os.environ[‘PROJECT_ID’] in a notebook in the target project.

    3. Parse the project_id from the Page Source of a web page in the target project.

      1. Manually.

        1. From any web-page of the CPD project, show the Page Source. (In Firefox: menu -> Web Developer -> Page Source)

        2. Do a find-in-page of the name of the project (control-F).

        3. Just before the first instance of the project name, there is a field data_id, e.g.: data-id=”21c8ac71-26c1-49a5-a567-d4c69a0d8158”. Copy the data_id value.

        4. Beware that the Page Source may contain other project-names and projects-IDs, so search on the full project name.

      2. Using the method DOModelExporter.get_project_id(project_name, page_source)

      Usage:

      page_source = 'the page source copied from Page Source'
      project_id = DOModelExporter.get_project_id('Full_Project_Name', page_source)
      
  6. How to get the access_token:

    1. If not provided (i.e. no entry in the constructor arguments), exporter uses the environment variable os.environ[‘USER_ACCESS_TOKEN’].

    2. Run the os.environ[‘USER_ACCESS_TOKEN’] in a notebook in the target project.

    3. Specify access_token=None in the constructor arguments (this is NOT the same as specifying a None value!). And specify a user-name and password. Exporter will retrieve the accessToken by calling a web-API.

  7. How to get the cpd_cluster_host?

    1. If not provided, the exporter will use the environment variable os.environ[‘RUNTIME_ENV_APSX_URL’]

    2. For remote clusters. Beware of the URL of the cluster! DSE clusters may use some alias (e.g. dse-cp4d25-cluster4.datascienceelite.com) that is NOT accessable when running from within the same cluster.<br> When running this from the same cluster, use the ‘original’ cluster name (e.g. dse-cp4d25-cluster4.cpolab.ibm.com).

export_do_models() None[source]

End-to-end run. Gets the access token and then the DO model export.

get_access_token_curl() str[source]

Return the curl command to retreive the accessToken. Based on the cluster_name, user_name and password.

get_access_token_web() Response[source]

Runs web request to get the personal access-token. Based on the cluster_name, user_name and password. Stores it in self.access_token

get_do_model_export_curl(do_model_name: str, access_token: str) str[source]

Return the curl command to retreive the accessToken. Based on the cluster_name, user_name and password.

get_do_model_export_web(do_model_name: str) Response[source]

Runs web-request to get DO model export. Based on the cluster_name, access_token, do_model_name. Stores result as a Data Asset

static get_project_id(project_name: str, page_source: str) str[source]

Extracts the project ID from a page source.

Parameters:
  • project_name (str) – full name of the project

  • page_source (str) – the page source of a page of the project in CPD

Returns:

project_id (str)

write_do_model_to_file(do_model_name: str, response: Response) str[source]

dse_do_utils.mapmanager module

class dse_do_utils.mapmanager.MapManager(data_manager=None, location=None, zoom_start=1, width='100%', height='100%', layer_control_position='topleft')[source]

Bases: object

Base class for building Folium map visualization.

Currently, the functionality is limited, but may be extended in the future.

Work-around for multi-line in popup: https://github.com/python-visualization/folium/issues/469

Popup work-around:

popup = (
    "Time: {time}<br>"
    "Speed: {speed} km/h<br>"
).format(time=row.name.strftime('%H:%M'),
         speed=str(round(row['spd'],2))

Tooltip doesn’t yet work in 0.50.0:<br> https://github.com/python-visualization/folium/issues/785 Update: in 0.6.0 (WSL1.2.3) it does seem to work!

static add_full_screen(m)[source]

Adds a full-screen button in the top-left corner of the map. Unfortunately, the full-screen doesn’t work in a Jupyter cell. Seems to work ok here: http://nbviewer.jupyter.org/github/python-visualization/folium/blob/master/examples/Plugins.ipynb

add_layer_control(m)[source]
create_blank_map()[source]
static get_arrows(locations, color='blue', size=6, n_arrows=3, add_to=None)[source]

Add arrows to a hypothetical line between the first 2 locations in the locations list. Get a list of correctly placed and rotated arrows/markers to be plotted.

Parameters:
  • locations – List of lists of lat lon that represent the start and end of the line. eg [[41.1132, -96.1993],[41.3810, -95.8021]] The locations is a list so that it matches the input for the folium.PolyLine.

  • color – Whatever folium can use. Default is ‘blue’

  • size – Size of arrow. Default is 6

  • n_arrows – Number of arrows to create. Default is 3.

  • add_to – map or FeatureGroup the arrows are added to.

Returns:

list of arrows/markers

Based on: https://medium.com/@bobhaffner/folium-lines-with-arrows-25a0fe88e4e

static get_bearing(p1, p2)[source]

Returns compass bearing from p1 to p2

Parameters p1 : namedtuple with lat lon p2 : namedtuple with lat lon

Return compass bearing of type float

Notes Based on https://gist.github.com/jeromer/2005586

static get_html_table(rows)[source]

Creates 2 column html table for use in popups. :param rows: List of sequences. Each sequence should have 2 string entries, one for each column

Returns:

a HTML formatted table of two columns

Return type:

html

static get_popup_table(rows)[source]

Return a popup table to add as a popup/child to a folium element.

Usage:

popup_table = [
    ('property_1', 'value_1'),
    ('property_2', 'value_2'),
]
popup = MapManager.get_popup_table(popup_table)

Next, the popup object can be used in a popup argument of a Marker:

marker = folium.Marker(coord,
                   popup=popup,
                   icon=icon
                   )

Or added as a child:

county.add_child(popup)
Parameters:

rows – List of sequences. Each sequence should have 2 string entries, one for each column.

Returns:

popup (folium.Popup)

Notes Beware that a quote in the texts causes a problem (no map shows). This can be avoided by replacing the “’” with something else. Unfortunately the option parse_html=True does not prevent the problem. Despite the suggestion in https://nbviewer.jupyter.org/github/python-visualization/folium/blob/master/examples/Popups.ipynb

static get_tooltip_table(rows: List[Tuple[str, str]]) str[source]

Get a tooltip table, based on the same definition as for get_popup_table`. Convenience method. Is the same as MapManager.get_html_table(rows). Usage:

tooltip_table = [
    ('property_1', 'value_1'),
    ('property_2', 'value_2'),
]
tooltip = MapManager.get_tooltip_table(tooltip_table)
Parameters:

rows – list of tuples with name-value pairs

Returns (str):

text for a tooltip in table format

kansas_city_coord = [39.085594, -94.585241]

dse_do_utils.multiscenariomanager module

class dse_do_utils.multiscenariomanager.MultiScenarioManager(model_name: Optional[str] = None, scenario_names: List[str] = [], local_root: Optional[str] = None, project_id: Optional[str] = None, project_access_token: Optional[str] = None, project=None)[source]

Bases: object

Manages multiple scenarios from same DO Model/Experiment. Can export all scenarios in one Excel spreadsheet, where it adds the scenario_name as an additional column. Also adds an additional ‘Scenario’ table. (This looks relevant for usage (filtering) in Cognos.) By default, writes an Excel file in datasets named “model_name + ‘_multi_output’.xlsx”

Usage 1 - All scenarios from Model:

model_name = 'My Model'
msm = MultiScenarioManager(model_name=model_name)
msm.get_multi_scenario_data()
msm.write_data_to_excel()

Usage 2 - Selected scenarios from Model:

model_name = 'My Model'
scenario_names = ['Scenario 1', 'Scenario 2']
msm = MultiScenarioManager(model_name=model_name, scenario_names=scenario_names)
msm.get_multi_scenario_data()
msm.write_data_to_excel()
add_data_file_to_project(file_path: str, file_name: Optional[str] = None) None[source]

Add a data file to the Watson Studio project. Applies to CP4Dv2.5 and WS Cloud Needs to be called after the file has been saved regularly in the file system in /project_data/data_asset/ (for CPD2.5) or /home/dsxuser/work/ in WS Cloud. Ensures the file is visible in the Data Assets of the Watson Studio UI.

Parameters:
  • file_path (str) – full file path, including the file name and extension

  • file_name (str) – name of data asset. Default is None. If None, the file-name will be extracted from the file_path.

env_is_wscloud() bool[source]

Return true if environment is WS Cloud

get_all_scenario_names()[source]

Deprecated. Replaced by get_scenarios_df

get_data_directory() str[source]

Returns the path to the datasets folder.

Returns:

path to the datasets folder

get_dd_client()[source]

Return the Client managing the DO scenario. Returns: new dd_scenario.Client

get_multi_scenario_data(scenario_names: Optional[List[str]] = None)[source]
get_root_directory() str[source]

Return the root directory of the file system. If system is WS, it will return the DSX root, otherwise the directory specified in the local_root. :raises ValueError if root directory doesn’t exist.:

get_scenarios_df(scenario_names: Optional[List[str]] = None) DataFrame[source]

Return scenarios as Dataframe. If scenario_names is None, will get all scenarios in Model. Else, just the ones matching the names. For now, the only column in the df is the scenario_name. More can be added later.

load_data_from_scenario(scenario_name)[source]

TODO: see of by re-using a Client, this can be done faster

static merge_scenario_data(data_by_scenario: Dict[str, Dict[str, DataFrame]]) Dict[str, DataFrame][source]

Add scenario_name as column. Merge tables

write_data_to_excel(excel_file_name: Optional[str] = None) None[source]

Write inputs and/or outputs to an Excel file in datasets. The inputs and outputs as in the attributes self.inputs and self.outputs of the ScenarioManager

If the excel_file_name is None, it will be generated from the model_name and scenario_name: MODEL_NAME + “_multi_output”

Parameters:

excel_file_name (str) – The file name for the Excel file.

dse_do_utils.optimizationengine module

class dse_do_utils.optimizationengine.MyProgressListener(mdl: <module 'docplex.mp.model' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\docplex\\mp\\model.py'>)[source]

Bases: SolutionListener

notify_progress(progress_data)[source]

This method is called from within the solve with a ProgressData instance.

Parameters:

progress_data – an instance of ProgressData containing data about the current point in the search tree.

class dse_do_utils.optimizationengine.OptimizationEngine(data_manager: Optional[DataManager] = None, name: str = 'MyOptimizationEngine', solve_kwargs=None, export_lp: bool = False, export_sav: bool = False, export_lp_path: Optional[str] = None, is_cpo_model: bool = False)[source]

Bases: object

add_mip_progress_kpis(mip_gap_kpi_name='Gap', solve_time_kpi_name='Solve Time', best_bound_kpi_name='Best Bound', solution_count_kpi_name='Solution Count', solve_phase_kpi_name='Solve Phase')[source]

Adds 5 KPIs to the self.mdl: ‘Gap’, ‘Solve Time’, ‘Best Bound’, ‘Solution Count’, ‘Solve Phase’.

Parameters:
  • mip_gap_kpi_name (str) –

  • solve_time_kpi_name (str) –

  • best_bound_kpi_name (str) –

  • solution_count_kpi_name (str) –

  • solve_phase_kpi_name (str) –

Returns:

binary_var_series(df, **kargs) Series[source]

Returns pd.Series[BinaryVarType]

static binary_var_series_s(mdl: <module 'docplex.mp.model' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\docplex\\mp\\model.py'>, df: ~pandas.core.frame.DataFrame, **kargs) Series[source]

Returns pd.Series[BinaryVarType]

continuous_var_series(df, **kargs) Series[source]

Returns pd.Series[ContinuousVarType]

static continuous_var_series_s(mdl: <module 'docplex.mp.model' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\docplex\\mp\\model.py'>, df: ~pandas.core.frame.DataFrame, **kargs) Series[source]

Returns pd.Series[ContinuousVarType].

cp_binary_var_series(df, **kwargs) Series[source]

Returns pd.Series[docplex.cp.expression.CpoIntVar]

static cp_binary_var_series_s(mdl: CpoModel, df: DataFrame, **kwargs) Series[source]

Returns pd.Series[docplex.cp.expression.CpoIntVar]. For **kargs, see docplex.cp.expression.integer_var_list (http://ibmdecisionoptimization.github.io/docplex-doc/cp/docplex.cp.expression.py.html#docplex.cp.expression.binary_var_list)

cp_integer_var_series(df, **kwargs) Series[source]

Returns pd.Series[docplex.cp.expression.CpoIntVar]

static cp_integer_var_series_s(mdl: CpoModel, df: DataFrame, **kwargs) Series[source]

Returns pd.Series[docplex.cp.expression.CpoIntVar]. For **kwargs, see docplex.cp.expression.integer_var_list (http://ibmdecisionoptimization.github.io/docplex-doc/cp/docplex.cp.expression.py.html#docplex.cp.expression.integer_var_list)

cp_interval_var_series(df, **kargs) Series[source]

Returns pd.Series[docplex.cp.expression.CpoIntervalVar]

static cp_interval_var_series_s(mdl: CpoModel, df: DataFrame, **kwargs) Series[source]

Returns pd.Series[cp.CpoIntervalVar]. For **kargs, see docplex.cp.expression.interval_var_list (http://ibmdecisionoptimization.github.io/docplex-doc/cp/docplex.cp.expression.py.html?highlight=interval_var_list#docplex.cp.expression.interval_var_list)

create_do_model(name: str, is_cpo_model: bool = False, **kwargs) Union[Model, CpoModel][source]

Create a model (.mdl). By default a CPLEX model (mp.Model), or a CP Optimizer model (cp.Model) :param name: :param is_cpo_model: Is True, create a cp.Model :param kwargs: additional kwags for mdl initialization :return: mp.Model or cp.CpoModel

export_as_cpo(local_root: Optional[str] = None, copy_to_csv: bool = False)[source]

Export .cpo file of model in the ‘DSX_PROJECT_DIR.datasets’ folder. It can write a copy as a .csv file, so it can be exported to a local machine. If not in DSX, it will write to the local file system in the ‘local_root/datasets’ directory. Convenience method. Cpo-filename is based on the mdl.name.

Parameters:
  • local_root (str) – name of local directory. Will write .lp file here, if not in DSX

  • copy_to_csv (bool) – DEPRECATED. If true, will create a copy of the file with the extension .csv.

Returns:

path (str) path to cpo file

Raises:

ValueError if root directory can't be established.

static export_as_cpo_s(model, model_name: Optional[str] = None, local_root: Optional[str] = None, copy_to_csv: bool = False, **kwargs) str[source]

Export .cpo file of model in the ‘DSX_PROJECT_DIR.datasets’ folder. It can write a copy as a .csv file, so it can be exported to a local machine. If not in DSX, it will write to the local file system in the ‘local_root/datasets’ directory.

Parameters:
  • model (docplex.cp.model) – The CPLEX model to be exported

  • model_name (str) – name of .lp file. If none specified, will use the model.name. Specify if the model.name is not a valid file-name.

  • local_root (str) – name of local directory. Will write .lp file here, if not in DSX

  • copy_to_csv (bool) – If true, will create a copy of the file with the extension .csv.

  • **kwargs – Passed to model.export_model

Returns:

path (str) path to cpo file

Raises:

ValueError if root directory can't be established.

export_as_lp(local_root: Optional[str] = None, copy_to_csv: bool = False) str[source]

Export .lp file of model in the ‘DSX_PROJECT_DIR.datasets’ folder. Convenience method. It can write a copy as a .csv file, so it can be exported to a local machine. If not in DSX, it will write to the local file system in the ‘local_root/datasets’ directory. Lp-filename is based on the mdl.name.

Parameters:
  • local_root (str) – name of local directory. Will write .lp file here, if not in DSX

  • copy_to_csv (bool) – DEPRECATED. If true, will create a copy of the file with the extension .csv.

Returns:

path (str) path to lp file

Raises:

ValueError if root directory can't be established.

export_as_lp_path(lp_file_name: str = 'my_lp_file') str[source]

Saves .lp file in self.export_lp_path Note: Does not conflict with OptimizationEngine.export_as_lp() which has a different signature. :return: file_path

static export_as_lp_s(model, model_name: Optional[str] = None, local_root: Optional[str] = None, copy_to_csv: bool = False) str[source]

Export .lp file of model in the ‘DSX_PROJECT_DIR.datasets’ folder. It can write a copy as a .csv file, so it can be exported to a local machine. If not in WSL, it will write to the local file system in the ‘local_root/datasets’ directory.

Parameters:
  • model (docplex.mp.model) – The CPLEX model to be exported

  • model_name (str) – name of .lp file. If none specified, will use the model.name. Specify if the model.name is not a valid file-name.

  • local_root (str) – name of local directory. Will write .lp file here, if not in WSL

  • copy_to_csv (bool) – DEPRECATED. If true, will create a copy of the file with the extension .csv.

Returns:

path (str) path to lp file

Raises:

ValueError if root directory can't be established.

get_kpi_output_table() DataFrame[source]
integer_var_series(df: DataFrame, **kargs) Series[source]

Create a Series of integer dvar for each row in the DF. Most effective method. Best practice. Result can be assigned to a column of the df. Usage: df[‘xDVar’] = mdl.integer_var_series(df, name = ‘xDVar’)

Parameters:
  • self (docplex.mp.model) – CPLEX Model

  • df (DataFrame) – dataframe

  • **kargs – arguments passed to mdl.integer_var_list method. E.g. ‘name’

Returns:

(pandas.Series) with integer dvars (IntegerVarType), index matches index of df

static integer_var_series_s(mdl: <module 'docplex.mp.model' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\docplex\\mp\\model.py'>, df: ~pandas.core.frame.DataFrame, **kargs) Series[source]

Returns pd.Series[IntegerVarType]

semicontinuous_var_series(df, lb, **kargs) Series[source]

Returns pd.Series[SemiContinuousVarType]

static semicontinuous_var_series_s(mdl: <module 'docplex.mp.model' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\docplex\\mp\\model.py'>, df: ~pandas.core.frame.DataFrame, lb, **kargs) Series[source]

Returns pd.Series[SemiContinuousVarType].

semiinteger_var_series(df, lb, **kargs) Series[source]

Returns pd.Series[SemiIntegerVarType]

static semiinteger_var_series_s(mdl: <module 'docplex.mp.model' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\docplex\\mp\\model.py'>, df: ~pandas.core.frame.DataFrame, lb, **kargs) Series[source]

Returns pd.Series[SemiIntegerVarType].

solve(refine_conflict: bool = False, **kwargs) SolveSolution[source]

dse_do_utils.plotly_cpd_workaround module

dse_do_utils.plotlymanager module

class dse_do_utils.plotlymanager.PlotlyManager(dm: DataManager)[source]

Bases: object

Holds method that create Plotly charts. Pass-in the DM as an input in the constructor.

get_dash_tab_layout_m(page_id)[source]

DEPRECATED. Not used in dse_do_dashboard package. On the instance self, call the method named by get_tab_layout_{page_id}. Used in dse_do_dashboard Plotly-Dash dashboards

get_plotly_fig_m(id)[source]

DEPRECATED. Not used in dse_do_dashboard package. On the instance self, call the method named by id[‘index’] For use with pattern-matching callbacks. Assumes the id[‘index’] is the name of a method of this class and returns a fig. Used in dse_do_dashboard Plotly-Dash dashboards

dse_do_utils.scenariodbmanager module

class dse_do_utils.scenariodbmanager.AutoScenarioDbTable(db_table_name: str)[source]

Bases: ScenarioDbTable

Designed to automatically generate the table definition based on the DataFrame.

Main difference with the ‘regular’ ScenarioDbTable definition:
  • At ‘create_schema`, the table will NOT be created. Instead,

  • At ‘insert_table_in_db_bulk’ SQLAlchemy will automatically create a TABLE based on the DataFrame.

Advantages:
  • No need to define a custom ScenarioDbTable class per table

  • Automatically all columns are inserted

Disadvantages:
  • No primary and foreign key relations. Thus no checks.

  • Missing relationships means Cognos cannot automatically extract a data model

TODO: find out what will happen if the DataFrame structure changes and we’re doing a new insert

create_table_metadata(metadata, engine, schema, multi_scenario: bool = False)[source]

Use the engine to reflect the Table metadata. Called during initialization.

get_sa_table() Optional[Table][source]

Returns the SQLAlchemy Table. Can be None if table is a AutoScenarioDbTable and not defined in Python code. TODO: automatically reflect if None. Is NOT working yet!

insert_table_in_db_bulk(df, mgr, connection=None)[source]
Parameters:
  • df (pd.DataFrame) –

  • mgr (ScenarioDbManager) –

  • connection – if not None, being run within a transaction

class dse_do_utils.scenariodbmanager.BusinessKpiTable(db_table_name: str = 'business_kpi', extended_columns_metadata: List[Column] = [])[source]

Bases: ScenarioDbTable

class dse_do_utils.scenariodbmanager.DatabaseType(value)[source]

Bases: Enum

Used in ScenarioDbManager.__init__ to specify the type of DB it is connecting to.

DB2 = 1
PostgreSQL = 2
SQLite = 0
class dse_do_utils.scenariodbmanager.DbCellUpdate(scenario_name, table_name, row_index, column_name, current_value, previous_value, row_idx)[source]

Bases: NamedTuple

column_name: str

Alias for field number 3

current_value: Any

Alias for field number 4

previous_value: Any

Alias for field number 5

row_idx: int

Alias for field number 6

row_index: List[Dict[str, Any]]

Alias for field number 2

scenario_name: str

Alias for field number 0

table_name: str

Alias for field number 1

class dse_do_utils.scenariodbmanager.KpiTable(db_table_name: str = 'kpis')[source]

Bases: ScenarioDbTable

class dse_do_utils.scenariodbmanager.ParameterTable(db_table_name: str = 'parameters', extended_columns_metadata: List[Column] = [])[source]

Bases: ScenarioDbTable

class dse_do_utils.scenariodbmanager.ScenarioDbManager(input_db_tables: Dict[str, ScenarioDbTable], output_db_tables: Dict[str, ScenarioDbTable], credentials=None, schema: Optional[str] = None, echo: bool = False, multi_scenario: bool = True, enable_transactions: bool = True, enable_sqlite_fk: bool = True, enable_astype: bool = True, enable_debug_print: bool = False, enable_scenario_seq: bool = False, db_type: DatabaseType = DatabaseType.DB2, use_custom_naming_convention: bool = False, future: bool = False)[source]

Bases: object

TODO: documentation!

static add_scenario_name_to_dfs(scenario_name: str, inputs: Dict[str, DataFrame]) Dict[str, DataFrame][source]

Adds a scenario_name column to each df. Or overwrites all values of that column already exists. This avoids to need for the MultiScenarioManager when loading and storing single scenarios.

static add_scenario_seq_to_dfs(scenario_seq: int, inputs: Dict[str, DataFrame]) Dict[str, DataFrame][source]

For ScenarioSeq option Adds a scenario_seq column to each df. Or overwrites all values of that column already exists. This avoids to need for the MultiScenarioManager when loading and storing single scenarios.

create_schema()[source]

Drops all tables and re-creates the schema in the DB.

delete_scenario_from_db(scenario_name: str)[source]

Delete a scenario. Uses a transaction (when enabled).

static delete_scenario_name_column(inputs: Dict[str, DataFrame], outputs: Dict[str, DataFrame])[source]

Drops the column scenario_name from any df in either inputs or outputs. This is used to create a inputs/outputs combination similar to loading a single scenario from the DO Experiment.

static delete_scenario_seq_column(inputs: Dict[str, DataFrame], outputs: Dict[str, DataFrame])[source]

For ScenarioSeq option Drops the column scenario_seq from any df in either inputs or outputs. This is used to create a inputs/outputs combination similar to loading a single scenario from the DO Experiment.

drop_all_tables()[source]

Drops all tables in the current schema.

duplicate_scenario_in_db(source_scenario_name: str, target_scenario_name: str)[source]

Duplicate a scenario. Uses a transaction (when enabled).

get_custom_naming_convention() Dict[source]

Sets a custom naming convention See https://docs.sqlalchemy.org/en/20/core/constraints.html#configuring-constraint-naming-conventions Returns:

get_scenario_db_table() ScenarioDbTable[source]

Scenario table must be the first in self.input_db_tables

get_scenario_sa_table() Table[source]

Returns the SQLAlchemy ‘scenario’ table.

get_scenarios_df() DataFrame[source]

Return all scenarios in df. Result is indexed by scenario_name. Main API to get all scenarios. The API called by a cached procedure in the dse_do_dashboard.DoDashApp.

insert_scenarios_from_zip(filepath: str)[source]

Insert (or replace) a set of scenarios from a .zip file into the DB. Zip is assumed to contain one or more .xlsx files. Others will be skipped. Name of .xlsx file will be used as the scenario name.

Parameters:

filepath – filepath of a zip file

Returns:

insert_scenarios_in_db(inputs={}, outputs={}, bulk: bool = True)[source]

DEPRECATED. If we need it back, requires re-evaluation and bulk support.

insert_tables_in_db(inputs: Dict[str, DataFrame] = {}, outputs: Dict[str, DataFrame] = {}, bulk: bool = True, auto_insert: bool = False, connection=None) int[source]

DEPRECATED. Was attempt to automatically insert a scenario without any schema definition. Currently, one would need to use the AutoScenarioDbTable in the constructor. If you want to automatically create such schema based on the inputs/outputs, then do that in the constructor. Not here. Note: the non-bulk ONLY works if the schema was created! I.e. only when using with self.create_schema.

read_multi_scenario_tables_from_db(scenario_names: List[str], input_table_names: Optional[List[str]] = None, output_table_names: Optional[List[str]] = None)[source]

Read selected set input and output tables from multiple scenarios. If input_table_names/output_table_names contains a ‘*’, then all input/output tables will be read. If empty list or None, then no tables will be read.

read_scenario_from_db(scenario_name: str, multi_threaded: bool = False)[source]

Single scenario load. Main API to read a complete scenario. Reads all tables for a single scenario. Returns all tables in one dict

Note: multi_threaded doesn’t seem to lead to performance improvement. Fixed: omit reading scenario table as an input.

read_scenario_input_tables_from_db(scenario_name: str) Dict[str, DataFrame][source]

Convenience method to load all input tables. Typically used at start if optimization model. :returns The inputs and outputs. (The outputs are always empty.)

read_scenario_table_from_db(scenario_name: str, scenario_table_name: str) DataFrame[source]

Read a single table from the DB. Main API to read a single table. The API called by a cached procedure in the dse_do_dashboard.DoDashApp.

Parameters:
  • scenario_name – Name of scenario

  • scenario_table_name – Name of scenario table (not the DB table name)

Returns:

read_scenario_tables_from_db(scenario_name: str, input_table_names: Optional[List[str]] = None, output_table_names: Optional[List[str]] = None)[source]

Read selected set input and output tables from scenario. If input_table_names/output_table_names contains a ‘*’, then all input/output tables will be read. If empty list or None, then no tables will be read.

rename_scenario_in_db(source_scenario_name: str, target_scenario_name: str)[source]

Rename a scenario. Uses a transaction (when enabled).

replace_scenario_in_db(scenario_name: str, inputs: Dict[str, DataFrame] = {}, outputs: Dict[str, DataFrame] = {}, bulk=True)[source]

Insert or replace a scenario. Main API to insert/update a scenario. If the scenario exists, will delete rows first. Inserts scenario data in all tables. Inserts tables in order specified in OrderedDict. Inputs first, outputs second.

Parameters:
  • scenario_name

  • inputs

  • outputs

  • bulk

Returns:

replace_scenario_tables_in_db(scenario_name, inputs={}, outputs={})[source]

Untested

update_cell_changes_in_db(db_cell_updates: List[DbCellUpdate])[source]

Update a set of cells in the DB.

Parameters:

db_cell_updates

Returns:

update_scenario_output_tables_in_db(scenario_name, outputs: Dict[str, DataFrame])[source]

Main API to update output from a DO solve in the scenario. Deletes ALL output tables. Then inserts the given set of tables. Since this only touches the output tables, more efficient than replacing the whole scenario.

class dse_do_utils.scenariodbmanager.ScenarioDbTable(db_table_name: str, columns_metadata: List[Column] = [], constraints_metadata: List[ForeignKeyConstraint] = [])[source]

Bases: ABC

Abstract class. Subclass to be able to define table schema definition, i.e. column name, data types, primary and foreign keys. Only columns that are specified and included in the DB insert.

static add_scenario_name_to_fk_constraint(fkc: ForeignKeyConstraint)[source]

Creates a new ForeignKeyConstraint by adding the scenario_name.

static add_scenario_seq_to_fk_constraint(fkc: ForeignKeyConstraint)[source]

Creates a new ForeignKeyConstraint by adding the scenario_seq.

static camel_case_to_snake_case(name: str) str[source]
create_table_metadata(metadata, engine, schema, multi_scenario: bool = False) Table[source]

If multi_scenario, then add a primary key ‘scenario_name’.

engine, schema is used only for AutoScenarioDbTable to get the Table (metadata) by reflection

property dbm
static df_column_names_to_snake_case(df: DataFrame) DataFrame[source]

Change all columns names from camelCase to snake_case.

property enable_scenario_seq
static extend_columns_constraints(columns: list[sqlalchemy.sql.schema.Column], constraints: list[sqlalchemy.sql.schema.ForeignKeyConstraint], columns_ext: Optional[list[sqlalchemy.sql.schema.Column]] = None, constraints_ext: Optional[list[sqlalchemy.sql.schema.ForeignKeyConstraint]] = None) tuple[list[sqlalchemy.sql.schema.Column], list[sqlalchemy.sql.schema.ForeignKeyConstraint]][source]

To be used in ScenarioDbTableSubClass.__init__() Helps to avoid mutable default arguments by allowing columns_ext and constraints_ext to be None.

Usage:

class MyTable(ScenarioDbTable):
def __init__(self, db_table_name: str = ‘my_table’,

columns_ext: Optional[list[Column]] = None, constraints_ext: Optional[list[ForeignKeyConstraint]] = None):

columns = [

Column(‘myKey’, Integer(), primary_key=True), Column(‘myValue’, Integer(), primary_key=False),

] constraints = [] columns, constraints = self.extend_columns_constraints(columns, constraints, columns_ext, constraints_ext) super().__init__(db_table_name, columns, constraints)

static fixNanNoneNull(df) DataFrame[source]

Ensure that NaN values are converted to None. Which in turn causes the value to be NULL in the DB. Apply before insert df to DB. TODO VT20230106: what other incarnations of ‘NaN’ do we need to convert? Potentially: [‘N/A’, ‘na’, ‘NaN’, ‘nan’, ‘’, ‘None’]?

get_db_table_name() str[source]
get_df_column_names(df: DataFrame) List[str][source]

Get all column names that are both defined in the DB schema and present in the DataFrame df.

Parameters:

df

Returns:

get_df_column_names_2(df: ~pandas.core.frame.DataFrame) -> (typing.List[str], <class 'pandas.core.frame.DataFrame'>)[source]
Get all column names that are defined in the DB schema.

If not present in the DataFrame df, adds the missing column with all None values.

Note 1 (VT 20220829): Note that the sqlalchemy.insert(db_table.table_metadata).values(row) does NOT properly handle columns that are missing in the row. It seems to simply truncate the columns if the row length is less than the number of columns. It does NOT match the column names! Thus the need to add columns, so we end up with proper None values in the row for the insert, specifying all columns in the table.

Note 2 (VT 20220829): Reducing the list of sqlalchemy.Column does NOT work in sqlalchemy.insert(db_table.table_metadata).values(row) The db_table.table_metadata is an object, not a List[sqlalchemy.Column]

Parameters:

df

Returns:

get_sa_column(db_column_name) Optional[Column][source]

Returns the SQLAlchemy.Column with the specified name. Uses the self.table_metadata (i.e. the sqlalchemy.Table), so works both for pre-defined tables and self-reflected tables like AutoScenarioDbTable

get_sa_table() Optional[Table][source]

Returns the SQLAlchemy Table. Can be None if table is a AutoScenarioDbTable and not defined in Python code.

insert_table_in_db_bulk(df: DataFrame, mgr, connection=None, enable_astype: bool = True)[source]

Insert a DataFrame in the DB using ‘bulk’ insert, i.e. with one SQL insert. (Instead of row-by-row.)

Parameters:
  • df (pd.DataFrame) –

  • mgr (ScenarioDbManager) –

  • connection – if not None, being run within a transaction

  • enable_astype – if True, apply df.column.astype based on datatypes extracted from columns_metadata (i.e. sqlachemy.Column)

resolve_metadata_column_conflicts(columns_metadata: List[Column]) List[Column][source]
static sqlcol(df: DataFrame) Dict[source]
class dse_do_utils.scenariodbmanager.ScenarioSeqTable(db_table_name: str = 'scenario')[source]

Bases: ScenarioDbTable

class dse_do_utils.scenariodbmanager.ScenarioTable(db_table_name: str = 'scenario')[source]

Bases: ScenarioDbTable

dse_do_utils.scenariomanager module

class dse_do_utils.scenariomanager.Platform(value)[source]

Bases: Enum

An enumeration.

CPD25 = 3
CPD40 = 2
CPDaaS = 1
Local = 4
class dse_do_utils.scenariomanager.ScenarioManager(model_name: Optional[str] = None, scenario_name: Optional[str] = None, local_root: Optional[str] = None, project_id: Optional[str] = None, project_access_token: Optional[str] = None, project=None, template_scenario_name: Optional[str] = None, platform: Optional[Platform] = None, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None, local_relative_data_path: str = 'assets/data_asset', data_directory: Optional[str] = None)[source]

Bases: object

A ScenarioManager is responsible for loading and storing the input and output DataFrame dictionaries. The data can be loaded from and stored into:

  • A DO scenario

  • An Excel spreadsheet

  • A set of csv files

Excel. Stores one DataFrame per sheet. Creates a __index__ sheet that keeps track which DataFrame is input or output, and it restores table names that longer than the maximum of 31 in Excel.

Usage 1 - Load data from Excel and store into DO scenario. Assumes DO model MyModel and an Excel file datasets/MyExcelFile.xlsx exists. The scenario will be created if it doesn’t exist or otherwise gets overwritten:

sm = ScenarioManager(model_name='MyModel, scenario_name='Scenario_1)
inputs, outputs = sm.load_data_from_excel('MyExcelFile')
sm.write_data_into_scenario()

Usage 2 - Load data from DO scenario. Assumes DO model MyModel and scenario exists. Typical use in a #dd-ignore cell in a solves notebook:

sm = ScenarioManager(model_name='MyModel, scenario_name='Scenario_1)
inputs, outputs = sm.load_data_from_scenario()

Usage 3 - Load data from all csv files in datasets into Excel.<br> Stores into /datasets/excel_test.xlsx.:

excel_output_file_name = 'excel_test'
csv_directory = os.path.join(os.environ['DSX_PROJECT_DIR'], 'datasets')
sm = ScenarioManager()
inputs, outputs = sm.load_data_from_csv(csv_directory)
sm.write_data_to_excel(excel_output_file_name)

Usage 4 - Load data from Excel and store into Excel. Assumes Excel file datasets/MyExcelFile.xlsx exists. Will create a file datasets/MyExcelFileOutput.xlsx.:

sm = ScenarioManager()
inputs, outputs = sm.load_data_from_excel('MyExcelFile')
# Do something with the inputs or outputs
sm.write_data_to_excel('MyExcelFileOutput')
static add_data_file_to_project_s(file_path: str, file_name: Optional[str] = None) None[source]

DEPRECATED: will never work on CP4DaaS since it requires the project_lib.Project Add a data file to the Watson Studio project. Applies to CP4Dv2.5. Needs to be called after the file has been saved regularly in the file system in /project_data/data_asset/. Ensures the file is visible in the Data Assets of the Watson Studio UI.

Parameters:
  • file_path (str) – full file path, including the file name and extension

  • file_name (str) – name of data asset. Default is None. If None, the file-name will be extracted from the file_path.

add_data_file_using_project_lib(file_path: str, file_name: Optional[str] = None) None[source]

Add a data file to the Watson Studio project. Applies to CP4Dv2.5 and WS Cloud/CP4DaaS Needs to be called after the file has been saved regularly in the file system in /project_data/data_asset/ (for CPD2.5) or /home/wsuser/work/ in CPDaaS. Ensures the file is visible in the Data Assets of the Watson Studio UI.

Parameters:
  • file_path (str) – full file path, including the file name and extension

  • file_name (str) – name of data asset. Default is None. If None, the file-name will be extracted from the file_path.

add_data_file_using_ws_lib(file_path: str, file_name: Optional[str] = None) None[source]

Add a data file to the Watson Studio project using the ibm_watson_studio_lib . Applies to CP4Dv4.0 TODO: where should the file be written? Needs to be called after the file has been saved regularly in the file system in /project_data/data_asset/ (for CPD2.5) or /home/wsuser/work/ in WS Cloud. Ensures the file is visible in the Data Assets of the Watson Studio UI.

Parameters:
  • file_path (str) – full file path, including the file name and extension

  • file_name (str) – name of data asset. Default is None. If None, the file-name will be extracted from the file_path.

static add_data_file_using_ws_lib_s(file_path: str, file_name: Optional[str] = None) None[source]

Add a data file to the Watson Studio project using the ibm_watson_studio_lib . Applies to CP4Dv4.0 TODO: where should the file be written? Needs to be called after the file has been saved regularly in the file system in /project_data/data_asset/ (for CPD2.5) or /home/dsxuser/work/ in WS Cloud. Ensures the file is visible in the Data Assets of the Watson Studio UI.

Parameters:
  • file_path (str) – full file path, including the file name and extension

  • file_name (str) – name of data asset. Default is None. If None, the file-name will be extracted from the file_path.

add_data_into_scenario(inputs=None, outputs=None)[source]

Adds data to a DO scenario. If table exists, does an overwrite/replace.

add_data_into_scenario_s(model_name: str, scenario_name: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None) None[source]

Adds tables in existing scenario.

Replaces table, if table exists. Assumes scenario exists. Does not explicitly clear existing tables. Could be used in post-processing.

add_file_as_data_asset(file_path: str, asset_name: Optional[str] = None)[source]

Register an existing file as a data asset in CPD.

Parameters:
  • file_path – full path of the file

  • asset_name – name of asset. If None, will get the name from the file

Returns:

static add_file_as_data_asset_s(file_path: str, asset_name: Optional[str] = None, platform: Optional[Platform] = None)[source]

Register an existing file as a data asset in CPD. VT 2022-01-21: this method is incorrect for CPDaaS. Should use project_lib.

Parameters:
  • file_path – full path of the file

  • asset_name – name of asset. If None, will get the name from the file

  • platform – CPD40, CPD25, CPSaaS, or Local. If None, will autodetect.

Returns:

static clear_scenario_data(client, scenario, category=None)[source]

Clears all input and output tables from a scenario.

Current API requires the client.

Parameters:
  • client

  • scenario

  • category (string ['input','output']) – If None, clears all tables.

static create_new_scenario(client, model_builder, new_scenario_name: str, template_scenario_name=None)[source]

Creates a new scenario from a template. The template is found either from the template_scenario_name, or if this is None, from the new_scenario_name. If a scenario with the new name already exists, all input and output tables are cleared. Thereby keeping the solve code. Creates a new blank scenario if a scenario with this name doesn’t exist.

Parameters:
  • client (decision_optimization_client.Client) – Client managing the DO model

  • model_builder (decision_optimization_client.Experiment) – The DO model

  • new_scenario_name (str) – Name for the new scenario

  • template_scenario_name (str) – Name of an existing scenario

Returns:

A decision_optimization_client.Container of type scenario

Raises:
  • ValueError – new_scenario_name is None

  • ValueError – new_scenario_name is the same as template_scenario_name

static detect_platform() Platform[source]
static env_is_cpd25() bool[source]

Return true if environment is CPDv2.5

static env_is_cpd40() bool[source]

Return true if environment is CPDv4.0.2 and in particular supports ibm_watson_studio_lib to get access to data assets.

Notes

  • The import from ibm_watson_studio_lib import access_project_or_space does NOT fail in CPDaaS

  • The wslib = access_project_or_space() does fail in CPDaaS, however with an ugly error message

  • Current ugly work-around is to always first test for CPDaaS using the environment variable

  • TODO: prevent error/warning in CPDaaS

static env_is_dsx() bool[source]

Return true if environment is DSX

static env_is_wscloud() bool[source]

Return true if environment is WS Cloud

export_model_as_lp(mdl, model_name: Optional[str] = None) str[source]

Exports the model as an .lp file in the data assets.

Parameters:
  • mdl (docplex.mp.model) – the docplex model

  • model_name (str) – name of model (excluding the .lp). If no model_name, it uses the mdl.name

Returns:

full file path of lp file

Return type:

(str)

Note: now a method of ScenarioManager (instead of OptimizationEngine), so this can be included in a dd-ignore notebook cell. Avoids the dependency on dse-do-utils in the ModelBuilder.

get_data_directory() str[source]

Returns the path to the datasets folder.

Returns:

path to the datasets folder

get_dd_client()[source]

Return the Client managing the DO scenario. Returns: new decision_optimization_client.Client

get_do_scenario(model_name, scenario_name)[source]

Returns a DO scenario.

Parameters:
  • model_name (str) – the name of the DO model

  • scenario_name (str) – the name of the scenario

Returns:

A dd-scenario.Container of type scenario

Raises:

ValueError – When either the model_name or the scenario_name doesn’t match an existing entity.

static get_kpis_table_as_dataframe(mdl) DataFrame[source]

Return a DataFrame with the KPI names and values in the mdl. This table is compatible with the representation in DO4WS and can be updated in the scenario.

Parameters:

mdl (docplex.mp.model.Model) –

Returns:

the KPIs in the mdl

Return type:

pd.DataFrame with columns NAME and VALUE

get_root_directory() str[source]

Return the root directory of the file system. If system is WS, it will return the DSX root, otherwise the directory specified in the local_root. :raises ValueError if root directory doesn’t exist.:

TODO: review the options other than Local

static get_unique_file_name(path)[source]
insert_scenarios_from_zip(filepath: str)[source]

Insert (or replace) a set of scenarios from a .zip file into the DO Experiment. Zip is assumed to contain one or more .xlsx files. Others will be skipped. Name of .xlsx file will be used as the scenario name.

load_data(load_from_excel=False, excel_file_name=None)[source]

Load data from either the DO scenario, or an Excel spreadsheet. The Excel spreadsheet is expected to be in the datasets folder, either in WS or local.

Returns:

the inputs and outputs dictionary of DataFrames

Return type:

inputs, outputs (tuple of dicts)

load_data_from_csv(csv_directory: str, input_csv_name_pattern: str = '*.csv', output_csv_name_pattern: Optional[str] = None, **kwargs) Tuple[Dict[str, DataFrame], Dict[str, DataFrame]][source]

Load data from matching csv files in a directory. Uses glob.glob() to pattern-match files in the csv_directory. If you want to load one file, specify the full name including the .csv extension.

Parameters:
  • csv_directory (str) – Relative directory from the root

  • input_csv_name_pattern (str) – name pattern to find matching csv files for inputs

  • output_csv_name_pattern (str) – name pattern to find matching csv files for outputs

  • **kwargs – Set of optional arguments for the pd.read_csv() function

static load_data_from_csv_s(csv_directory: str, csv_name_pattern: str = '*.csv', **kwargs) Dict[str, DataFrame][source]

Read data from all matching .csv files in a directory.

Parameters:
  • csv_directory (str) – the full path of a directory containing one or more .csv files.

  • csv_name_pattern (str) – name pattern to find matching csv files

  • **kwargs – Set of optional arguments for the pd.read_csv() function

Returns:

dict of DataFrames. Keys are the .csv file names.

Return type:

data

load_data_from_excel(excel_file_name: str) Tuple[Dict[str, DataFrame], Dict[str, DataFrame]][source]

Load data from an Excel file located in the datasets folder of the root directory. Convenience method. If run not on WS, requires the root_dir property passed in the ScenarioManager constructor

static load_data_from_excel_s(xl: ExcelFile, table_index_sheet: str = '_table_index_', input_table_names: Optional[List[str]] = None, output_table_names: Optional[List[str]] = None) Tuple[Dict[str, DataFrame], Dict[str, DataFrame]][source]

Create dataFrames from the sheets of the Excel file. Store in dictionary df_dict with table_name as key. The table_name is either the name of the sheet, or the table_name as defined in the table_index_sheet.

In the default case, when the input_table_names or output_table_names are None, the category of the table (i.e. input or output) is driven off the value in the table_index_sheet. If not listed in table_index_sheet, it is placed in the inputs.

However, to reduce the load time for certain applications, we can restrict the tables it loads by specifying them in the input_table_names or output_table_names. If one of them is not None, it wil only load those tables and categorize them accordingly.

Note that if either input_table_names or output_table_names is used, if applicable, they would refer to the translated tables names by the table_index_sheet. (I.e. not the abbreviated names used in the sheet names.)

Parameters:
  • xl (pandas.ExcelFile) – Excel file

  • table_index_sheet (str) – Name of table index sheet

  • input_table_names (List[str]) – names of input tables to read

  • output_table_names (List[str]) – names of output tables to read

Returns:

A tuple of inputs and outputs dictionaries of DataFrames,

one df per sheet

Return type:

(Dict[str,DataFrame], Dict[str,DataFrame])

load_data_from_parquet(directory: str, input_name_pattern: str = '*.parquet', output_name_pattern: Optional[str] = None, **kwargs) Tuple[Dict[str, DataFrame], Dict[str, DataFrame]][source]

Load data from matching parquet files in a directory. Uses glob.glob() to pattern-match files in the directory. If you want to load one file, specify the full name including the .parquet extension.

Parameters:
  • directory (str) – Relative directory from the root

  • input_name_pattern (str) – name pattern to find matching parquet files for inputs

  • output_name_pattern (str) – name pattern to find matching parquet files for outputs

  • **kwargs – Set of optional arguments for the pd.read_parquet() function

static load_data_from_parquet_s(directory: str, file_name_pattern: str = '*.parquet', **kwargs) Dict[str, DataFrame][source]

Read data from all matching .parquet files in a directory.

Parameters:
  • directory (str) – the full path of a directory containing one or more .parquet files.

  • file_name_pattern (str) – name pattern to find matching parquet files

  • **kwargs – Set of optional arguments for the pd.read_parquet() function

Returns:

dict of DataFrames. Keys are the .parquet file names.

Return type:

data

load_data_from_scenario() Tuple[Dict[str, DataFrame], Dict[str, DataFrame]][source]

Loads the data from a DO scenario

load_data_from_scenario_s(model_name: str, scenario_name: str) Tuple[Dict[str, DataFrame], Dict[str, DataFrame]][source]

Loads the data from a DO scenario. Returns empty dict if no tables.

static load_data_from_zip_csv_s(zip_file_path: str, file_size_limit: Optional[int] = None, **kwargs) Dict[str, DataFrame][source]

Read data from a zip file with .csv files.

Parameters:
  • zip_file_path (str) – the full path of a zip file containing one or more .csv files.

  • file_size_limit (int) – maximum file size in bytes. None implies no limit.

  • **kwargs – Set of optional arguments for the pd.read_csv() function

Returns:

dict of DataFrames. Keys are the .csv file names.

Return type:

data

print_table_names() None[source]

Print the names of the input and output tables. For development and debugging.

replace_data_in_scenario(inputs=None, outputs=None)[source]

Replaces all input, output or both. Note: you will need to specify the inputs or outputs you want to replace explicitly as input arguments. It will NOT get them from self.inputs or self.outputs! In this way, you can control which to update. E.g. after a solve, only update the outputs, not the inputs.

replace_data_into_scenario_s(model_name: str, scenario_name: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None) None[source]

Replaces all input, output or both.

If input/output are not None, clears inputs/outputs first Assumes scenario exists. Does explicitly clear all existing input/output tables.

update_solve_output_into_scenario(mdl, outputs)[source]

Replaces all output and KPIs table in the scenario.

Assumes the scenario exists. Will not change the inputs of the scenario. Generates the KPI table.

Limitations:
  • Does NOT update the objective

  • Does NOT update the log

Parameters:
  • mdl (docplex.mp.model) – the model that has been solved

  • outputs (Dict) – dictionary of DataFrames

write_data_into_scenario()[source]

Writes the data into a DO scenario. Create new scenario and write data.

write_data_into_scenario_s(model_name: str, scenario_name: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None, template_scenario_name: Optional[str] = None) None[source]

Create new scenario and write data.

If scenario exists: clears all existing data. If scenario doesn’t exists: create new one. If template_scenario_name is specified, use that as template. If the existing scenario has a model, it keeps the model. If there is no existing scenario, the user needs to add a model manually in DO. Tested: works reliably.

TODO: one small issue: if scenario exists and has been solved before, it clears all inputs and outputs (including the KPIs), but not the objective value. The DO UI shows as if the model has been solved.

write_data_to_csv() None[source]

Write inputs and/or outputs to .csv files in the root/datasets folder.

Args: None Returns: None

static write_data_to_csv_s(csv_directory: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None) None[source]

Write data to .csv files in a directory. Name as name of DataFrame.

Parameters:
  • csv_directory (str) – the full path of a directory for the .csv files.

  • inputs (Dict of DataFrames) – inputs

  • outputs (Dict of DataFrames) – outputs

Returns: None

write_data_to_excel(excel_file_name: Optional[str] = None, unique_file_name: bool = True, copy_to_csv: bool = False) str[source]

Write inputs and/or outputs to an Excel file in datasets. The inputs and outputs as in the attributes self.inputs and self.outputs of the ScenarioManager

If the excel_file_name is None, it will be generated from the model_name and scenario_name: MODEL_NAME + “_” + SCENARIO_NAME + “_output”

If Excel has a file with the same name opened, it will throw a PermissionError. If so and the flag unique_file_name is set to True, it will save the new file with a unique name. I.e., if the file is not opened by Excel, the file is overwritten.

Parameters:
  • excel_file_name (str) – The file name for the Excel file.

  • unique_file_name (bool) – If True, generates a unique file name in case the existing file is opened(!) by Excel

  • copy_to_csv (bool) – If true, will create a copy of the file with the extension .csv. DEPRECATED, NON-FUNCTIONAL

static write_data_to_excel_s(writer: ExcelWriter, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None, table_index_sheet: str = '_table_index_') None[source]

Writes all dataframes in the inputs and outputs to the Excel writer, with sheet-names based on the keys of the inputs/outputs. Due to the Excel limitation of maximum 31 characters for the sheet-name, tables names longer than the 31 characters will be abbreviated with a unique name. The mapping between the original table-name and abbreviated name is recorded in a separate sheet named by the table_index_sheet.

Parameters:
  • writer (pandas.ExcelWriter) – The Excel writer to write the file

  • inputs (Dict of DataFrames) – inputs

  • outputs (Dict of DataFrames) – outputs

  • table_index_sheet (str) – name for the index sheet

write_data_to_parquet(directory: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None) None[source]

Write inputs and/or outputs to .parquet files in the target folder.

Parameters:

directory (str) – Relative directory from the root

Returns: None

static write_data_to_parquet_s(directory: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None) None[source]

Write data to .parquet files in a directory. Name as name of DataFrame.

Parameters:
  • directory (str) – the full path of a directory for the .parquet files.

  • inputs (Dict of DataFrames) – inputs

  • outputs (Dict of DataFrames) – outputs

Returns: None

static write_data_to_zip_csv_s(zip_file_path: str, inputs: Optional[Dict[str, DataFrame]] = None, outputs: Optional[Dict[str, DataFrame]] = None, **kwargs)[source]

Write data as a zip file with .csv files. inputs and outputs dictionaries are merged and written in same zip.

Parameters:
  • zip_file_path (str) – the full path of a zip file.

  • inputs – dict of input DataFrames

  • outputs – dict of input DataFrames

  • **kwargs – Set of optional arguments for the df.to_csv() function

Returns:

None

dse_do_utils.scenariopicker module

class dse_do_utils.scenariopicker.ScenarioPicker(model_name: Optional[str] = None, scenario_name: Optional[str] = None, project_id=typing.Optional[str], project_access_token=typing.Optional[str], project=None)[source]

Bases: object

Notebook widget to interactively select a scenario from the dd_scenario.Client.

Usage

Cell 1:

sp = ScenarioPicker(model_name = 'My_DO_Model')
sp.get_scenario_picker_ui()

Cell 2:

inputs, outputs = sp.load_selected_scenario_data()

Create a ScenarioPicker and pass the model name. The API get_scenario_picker_ui() returns a widget with a drop-down box with the available scenarios. In addition, there is a Refresh button that will run all cells below this cell. The next cell should reload the scenario data. The API load_selected_scenario_data() is a convenience method that internally uses a ScenarioManager to load the data from the DO scenario.

The selection of the scenario is maintained in the class variable ScenarioPicker.default_scenario. Therefore, a re-run of the cell keeps the last selected value. By adding:

ScenarioPicker.default_scenario = 'my_default_scenario'

before the creation of the scenario picker, one can force the default scenario to an initial value.

class ScenarioRefreshButton(**kwargs: Any)[source]

Bases: Button

A widget Refresh button that will refresh all cells below. Inner class of ScenarioPicker since it is only applicable in the context of the ScenarioPicker.

default_scenario = None
get_dd_client()[source]

Return the Client managing the DO scenario. Returns: new dd_scenario.Client

get_scenario_picker_ui()[source]

Return a combination of both the drop-down and the refresh button.

get_scenario_refresh_button()[source]

Return an instance of the Refresh button.

get_scenario_select_drop_down() Dropdown[source]

Return the drop-down button.

get_selected_scenario()[source]

Return the name of the selected scenario

load_selected_scenario_data()[source]

Convenience method. Creates a ScenarioManager and loads input and output data from the scenario selected by the picker. :returns: A tuple with the (inputs, outputs) data

widgets = <module 'ipywidgets.widgets' from 'C:\\Projects\\DSX-DO-common-utils\\DSE_DO_Utils_PublicGitHub\\dse-decision-optimization-utilities_v3\\venv\\lib\\site-packages\\ipywidgets\\widgets\\__init__.py'>

dse_do_utils.scenariorunner module

class dse_do_utils.scenariorunner.RunConfig(insert_inputs_in_db: bool = False, insert_outputs_in_db: bool = False, new_schema: bool = False, insert_in_do: bool = False, write_output_to_excel: bool = False, enable_data_check: bool = False, enable_data_check_outputs: bool = False, data_check_bulk_insert: bool = False, log_level: str = 'DEBUG', export_lp: bool = False, export_sav: bool = False, export_lp_path: str = '', do_model_name: str = None, template_scenario_name: Optional[str] = None)[source]

Bases: object

data_check_bulk_insert: bool = False
do_model_name: str = None
enable_data_check: bool = False
enable_data_check_outputs: bool = False
export_lp: bool = False
export_lp_path: str = ''
export_sav: bool = False
insert_in_do: bool = False
insert_inputs_in_db: bool = False
insert_outputs_in_db: bool = False
log_level: str = 'DEBUG'
new_schema: bool = False
template_scenario_name: Optional[str] = None
write_output_to_excel: bool = False
class dse_do_utils.scenariorunner.ScenarioConfig(scenario_name: str = 'Scenario_x', parameters: Dict = None)[source]

Bases: object

parameters: Dict = None
scenario_name: str = 'Scenario_x'
class dse_do_utils.scenariorunner.ScenarioGenerator(inputs: Dict[str, DataFrame], scenario_config: ScenarioConfig)[source]

Bases: object

Generates a variation of a scenario, i.e. inputs dataset, driven by a ScenarioConfig. To be subclassed. This base class implements overrides of the Parameter table. The ScenarioGenerator is typically used in the context of a ScenarioRunner.

Usage:

class MyScenarioGenerator(ScenarioGenerator):
    def generate_scenario(self):
        new_inputs = super().generate_scenario()
        new_inputs['MyTable1'] = self.generate_my_table1().reset_index()
        new_inputs['MyTable2'] = self.generate_my_table2().reset_index()
        return new_inputs
generate_scenario()[source]

Generate a variation of the base_inputs. To be overridden. This default implementation changes the Parameter table based on the overrides in the ScenarioConfig.parameters.

Usage:

def generate_scenario(self):
    new_inputs = super().generate_scenario()
    new_inputs['MyTable'] = self.generate_my_table().reset_index()
    return new_inputs
get_parameters() DataFrame[source]

Applies overrides to the Parameter table based on the ScenarioConfig.parameters.

class dse_do_utils.scenariorunner.ScenarioRunner(scenario_db_manager: ScenarioDbManager, optimization_engine_class: Type[OptimizationEngine], data_manager_class: Type[DataManager], scenario_db_manager_class: Type[ScenarioDbManager], scenario_generator_class: Optional[Type[ScenarioGenerator]] = None, do_model_name: str = 'my_model', schema: Optional[str] = None, local_root: Optional[str] = None, local_platform: Optional[int] = None, data_directory: Optional[str] = None)[source]

Bases: object

TODO: remove local_root, local_platform, replace by data_directory? (It seems to be working fine though)

create_new_db_schema()[source]
data_check_inputs(inputs: Dict[str, DataFrame], scenario_name: str = 'data_check', bulk: bool = False) Dict[str, DataFrame][source]

Use SQLite to validate data. Read data back and do a dm.prepare_data_frames. Does a deepcopy of the inputs to ensure the DB operations do not alter the inputs. Bulk can be set to True once the basic data issues have been resolved and performance needs to be improved. Set bulk to False to get more granular DB insert errors, i.e. per record. TODO: add a data_check() on the DataManager for additional checks.

data_check_outputs(inputs: Dict[str, DataFrame], outputs: Dict[str, DataFrame], scenario_name: str = 'data_check', bulk: bool = False) Tuple[Dict[str, DataFrame], Dict[str, DataFrame]][source]

Use SQLite to validate data. Read data back and do a dm.prepare_data_frames. Does a deepcopy of the inputs to ensure the DB operations do not alter the inputs. Bulk can be set to True once the basic data issues have been resolved and performance needs to be improved. Set bulk to False to get more granular DB insert errors, i.e. per record. TODO: add a data_check() on the DataManager for additional checks.

generate_scenario(base_inputs: Dict[str, DataFrame], scenario_config: ScenarioConfig)[source]

Generate a derived scenario from a baseline scenario on the specifications in the scenario_config. :param base_inputs: :param scenario_config: :return:

insert_in_do(inputs, outputs, scenario_config: ScenarioConfig, run_config: RunConfig)[source]
insert_inputs_in_db(inputs: Dict[str, DataFrame], run_config: RunConfig, scenario_name: str) Dict[str, DataFrame][source]
insert_outputs_in_db(inputs: Dict[str, DataFrame], outputs: Dict[str, DataFrame], run_config: RunConfig, scenario_name: str)[source]
load_input_data_from_excel(excel_file_name) Dict[str, DataFrame][source]
run_model(inputs: Dict[str, DataFrame], run_config: RunConfig)[source]

Main method to run the optimization model.

run_multiple(scenario_configs: List[ScenarioConfig], run_config: RunConfig, base_inputs: Optional[Dict[str, DataFrame]] = None, excel_file_name: Optional[str] = None) None[source]

Only once create schema and/or load data from Excel. Then it will run all scenario_configs, each time applying the ScenarioGenerator on the base inputs.

run_once(scenario_config: ScenarioConfig, run_config: RunConfig, base_inputs: Optional[Dict[str, DataFrame]] = None, excel_file_name: Optional[str] = None)[source]
write_output_data_to_excel(inputs: Dict[str, DataFrame], outputs: Dict[str, DataFrame], scenario_name: str)[source]

dse_do_utils.utilities module

dse_do_utils.utilities.add_sys_path(new_path)[source]

Adds a directory to Python’s sys.path

Does not add the directory if it does not exist or if it’s already on sys.path. Returns 1 if OK, -1 if new_path does not exist, 0 if it was already on sys.path. Based on: https://www.oreilly.com/library/view/python-cookbook/0596001673/ch04s23.html

Challenge: in order to use this function, we need to import the dse_do_utils package and thus we need to add it’s location it to sys.path! This will work better once we can do a pip install dse-do_utils.

dse_do_utils.utilities.convert_size(size_bytes: int)[source]

Returns string describing file size.

Parameters:

size_bytes (int) – size if file in bytes

From https://stackoverflow.com/questions/5194057/better-way-to-convert-file-sizes-in-python

dse_do_utils.utilities.list_file_hierarchy(startpath: str) None[source]

Hierarchically print the contents of the folder tree, starting with the startpath.

Usage:

current_dir = os.getcwd()
parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir))
parent_dir_2 = os.path.abspath(os.path.join(parent_dir, os.pardir))
list_file_hierarchy(parent_dir_2) #List tree starting at the grand-parent of the current directory
Parameters:

startpath (str) – Root of the tree

Returns:

None

dse_do_utils.version module

Source of truth for the dse_do_utils version. The versions in setup.py and /docs/source/conf.py are automatically populated from here.

Best practice to keep version here, in a separate file. See https://stackoverflow.com/questions/458550/standard-way-to-embed-version-into-python-package

Module contents

dse_do_utils.module_reload()[source]

DEPRECATED. Requires updates to Python 3.6 Reloads all component modules. Use when you want to force a reload of this module with imp.reload().

This avoids having to code somewhat complex reloading logic in the notebook that is using this module.

Challenge with imp.reload of dse-do_utils. The following is NOT (!) sufficient:

import imp
import dse_do_utils
imp.reload(dse_do_utils)

The package dse_do_utils internally contains a number of sub modules that each contain a part of the code. This keeps development easier and more organized. But to make importing easier, the classes are exposed in the top level init.py, which allows for a simple import statement like from dse_do_utils import ScenarioManager. Unfortunately, reloading the top-level module dse_do_utils doesn’t force a reload of the internal modules.

In case of subclassing, reloading needs to be done in the right order, i.e. first the parent classes.

Usage:

import imp
import dse_do_utils  # You have to do the import, otherwise not possible to do the next 2 steps
dse_do_utils.module_reload()  #This function
imp.reload(dse_do_utils)  # Necessary to ensure all following expressions `from dse_do_utils import class` are using the updated classes
from dse_do_utils import DataManager, OptimizationEngine, ScenarioManager, ScenarioPicker, DeployedDOModel, MapManager  # This needs to be done AFTER the reload to refresh the definitions

Note that this function assumes that the set of classes and component modules is not part of the update. If it is, you may need to add another reload:

import imp
import dse_do_utils  # You have to do the import, otherwise not possible to do the next 2 steps
imp.reload(dse_do_utils)  # To reload this function
dse_do_utils.module_reload()  #This function
imp.reload(dse_do_utils)  # Necessary to ensure all future expressions `from dse_do_utils import class` are using the updated classes
from dse_do_utils import DataManager, OptimizationEngine, ScenarioManager, ScenarioPicker, DeployedDOModel, MapManager  # This needs to be done AFTER the reload to refresh the definitions

If not using this function, in the notebook you would need to do the following (or the relevant parts of it):

import imp
import dse_do_utils
imp.reload(dse_do_utils.datamanager)
imp.reload(dse_do_utils.optimizationengine)
imp.reload(dse_do_utils.scenariomanager)
imp.reload(dse_do_utils.scenariopicker)
imp.reload(dse_do_utils.deployeddomodel)
imp.reload(dse_do_utils.mapmanager)
imp.reload(dse_do_utils)
from dse_do_utils import DataManager, OptimizationEngine, ScenarioManager, ScenarioPicker, DeployedDOModel, MapManager

Returns: