Source code for ibm_watsonx_ai.script

#  -----------------------------------------------------------------------------------------
#  (C) Copyright IBM Corp. 2023-2024.
#  https://opensource.org/licenses/BSD-3-Clause
#  -----------------------------------------------------------------------------------------

from __future__ import annotations
from typing import Any, TYPE_CHECKING, TypeAlias


import ibm_watsonx_ai._wrappers.requests as requests
from ibm_watsonx_ai.utils import (
    DATA_ASSETS_DETAILS_TYPE,
    modify_details_for_script_and_shiny,
)
from ibm_watsonx_ai.lifecycle import SpecStates
from ibm_watsonx_ai.metanames import ScriptMetaNames
from ibm_watsonx_ai.utils.utils import _get_id_from_deprecated_uid
from ibm_watsonx_ai.wml_resource import WMLResource
from ibm_watsonx_ai.wml_client_error import (
    WMLClientError,
    ApiRequestFailure,
    ForbiddenActionForGitBasedProject,
)
import os
import warnings

_DEFAULT_LIST_LENGTH = 50
ListType: TypeAlias = list

if TYPE_CHECKING:
    from ibm_watsonx_ai import APIClient
    from pandas import DataFrame


[docs] class Script(WMLResource): """Store and manage script assets.""" ConfigurationMetaNames = ScriptMetaNames() """MetaNames for script assets creation.""" def __init__(self, client: APIClient) -> None: WMLResource.__init__(self, __name__, client)
[docs] def get_details( self, script_id: str | None = None, limit: int | None = None, get_all: bool | None = None, **kwargs: Any, ) -> dict: """Get script asset details. If no script_id is passed, details for all script assets are returned. :param script_id: unique ID of the script :type script_id: str, optional :param limit: limit number of fetched records :type limit: int, optional :param get_all: if True, it will get all entries in 'limited' chunks :type get_all: bool, optional :return: metadata of the stored script asset :rtype: - **dict** - if script_id is not None - **{"resources": [dict]}** - if script_id is None **Example:** .. code-block:: python script_details = client.script.get_details(script_id) """ script_id = _get_id_from_deprecated_uid( kwargs, script_id, "script", can_be_none=True ) def get_required_elements(response: dict[str, Any]) -> dict[str, Any]: response = modify_details_for_script_and_shiny(response) final_response = { "metadata": response["metadata"], } if "entity" in response: final_response["entity"] = response["entity"] try: del final_response["entity"]["script"]["ml_version"] except KeyError: pass return final_response return self._get_asset_based_resource( script_id, "script", get_required_elements, limit=limit, get_all=get_all )
[docs] def store(self, meta_props: dict, file_path: str) -> dict: """Create a script asset and upload content to it. :param meta_props: name to be given to the script asset :type meta_props: dict :param file_path: path to the content file to be uploaded :type file_path: str :return: metadata of the stored script asset :rtype: dict **Example:** .. code-block:: python metadata = { client.script.ConfigurationMetaNames.NAME: 'my first script', client.script.ConfigurationMetaNames.DESCRIPTION: 'description of the script', client.script.ConfigurationMetaNames.SOFTWARE_SPEC_ID: '0cdb0f1e-5376-4f4d-92dd-da3b69aa9bda' } asset_details = client.script.store(meta_props=metadata, file_path="/path/to/file") """ if self._client.project_type == "local_git_storage": raise ForbiddenActionForGitBasedProject( reason="Storing Scripts is not supported for git based project." ) Script._validate_type(meta_props, "meta_props", dict, True) Script._validate_type(file_path, "file_path", str, True) script_meta = self.ConfigurationMetaNames._generate_resource_metadata( meta_props, with_validation=True, client=self._client ) name, extension = os.path.splitext(file_path) response = self._create_asset(script_meta, file_path, extension) entity = response["entity"] try: del entity["script"]["ml_version"] except KeyError: pass final_response = {"metadata": response["metadata"], "entity": entity} return final_response
def _create_asset( self, script_meta: dict[str, Any], file_path: str, extension: str = ".py" ) -> dict[str, Any]: # Step1: Create a data asset name = script_meta["metadata"]["name"] if script_meta["metadata"].get("description") is not None: desc = script_meta["metadata"]["description"] else: desc = "" if script_meta.get("software_spec_uid") is not None: if script_meta["software_spec_uid"] == "": raise WMLClientError( "Failed while creating a script asset, SOFTWARE_SPEC_ID cannot be empty" ) if extension == ".py": lang = "python3" elif extension == ".R": lang = "R" else: raise WMLClientError( "This file type is not supported. It has to be either a python script(.py file ) or a " "R script" ) # check if the software spec specified is base or derived and accordingly update the entity for asset creation # everything inside "if" should work. sw_spec_details = self._client.software_specifications.get_details( script_meta["software_spec_uid"] ) if lang == "R": rscript_sw_specs = [] # names of supported r script software specifications retired_rscript_sw_specs = ( [] ) # names of retired r script software specifications constricted_rscript_sw_specs = ( [] ) # names of constricted r script software specifications deprecated_rscript_sw_specs = ( [] ) # names of deprecated r script software specifications if self._client.CPD_version >= 4.7: for sw_spec in self._client.software_specifications.get_details()[ "resources" ]: if "life_cycle" in sw_spec["metadata"]: sw_configuration = ( sw_spec["entity"] .get("software_specification", {}) .get("software_configuration", {}) ) if ( sw_configuration.get("platform", {}).get("name") == "r" and len(sw_configuration.get("included_packages", [])) > 1 ): if ( SpecStates.RETIRED.value in sw_spec["metadata"]["life_cycle"] ): retired_rscript_sw_specs.append( sw_spec["metadata"]["name"] ) elif ( SpecStates.CONSTRICTED.value in sw_spec["metadata"]["life_cycle"] ): constricted_rscript_sw_specs.append( sw_spec["metadata"]["name"] ) elif ( SpecStates.DEPRECATED.value in sw_spec["metadata"]["life_cycle"] ): deprecated_rscript_sw_specs.append( sw_spec["metadata"]["name"] ) elif "rstudio" not in sw_spec["metadata"]["name"].lower(): rscript_sw_specs.append(sw_spec["metadata"]["name"]) elif self._client.CPD_version == 4.6: rscript_sw_specs = ["runtime-22.2-r4.2"] deprecated_rscript_sw_specs = ["default_r3.6", "runtime-22.1-r3.6"] else: rscript_sw_specs = ["default_r3.6", "runtime-22.1-r3.6"] deprecated_rscript_sw_specs = [] rscript_sw_spec_ids = [ self._client.software_specifications.get_id_by_name(sw_name) for sw_name in rscript_sw_specs ] retired_sw_spec_ids = [ self._client.software_specifications.get_id_by_name(sw_name) for sw_name in retired_rscript_sw_specs ] constricted_sw_spec_ids = [ self._client.software_specifications.get_id_by_name(sw_name) for sw_name in constricted_rscript_sw_specs ] deprecated_sw_spec_ids = [ self._client.software_specifications.get_id_by_name(sw_name) for sw_name in deprecated_rscript_sw_specs ] unsupported_dict = { SpecStates.RETIRED.value: retired_sw_spec_ids, SpecStates.CONSTRICTED.value: constricted_sw_spec_ids, SpecStates.DEPRECATED.value: deprecated_sw_spec_ids, } if ( script_meta["software_spec_uid"] not in rscript_sw_spec_ids + deprecated_sw_spec_ids + constricted_sw_spec_ids + retired_sw_spec_ids ): raise WMLClientError( f"For R scripts, only base software specs {', '.join(rscript_sw_specs)} " "are supported. Specify " "the id you get via " "client.software_specifications.get_id_by_name(sw_name)" ) warning_error_msg = ( "Provided software spec is {key} for R scripts. " + f"Only base software specs {', '.join(rscript_sw_specs)} are supported." f" Specify the id you get via " f"client.software_specifications.get_id_by_name(sw_name)" ) for key in unsupported_dict: if script_meta["software_spec_uid"] in unsupported_dict[key]: match key: case SpecStates.RETIRED.value: raise WMLClientError(warning_error_msg.format(key=key)) case _: warnings.warn( warning_error_msg.format(key=key), category=RuntimeWarning, ) if sw_spec_details["entity"]["software_specification"]["type"] == "base": asset_meta = { "metadata": { "name": name, "description": desc, "asset_type": "script", "origin_country": "us", "asset_category": "USER", }, "entity": { "script": { "ml_version": "4.0.0", "language": {"name": lang}, "software_spec": {"base_id": script_meta["software_spec_uid"]}, } }, } elif sw_spec_details["entity"]["software_specification"]["type"] == "derived": asset_meta = { "metadata": { "name": name, "description": desc, "asset_type": "script", "origin_country": "us", "asset_category": "USER", }, "entity": { "script": { "ml_version": "4.0.0", "language": {"name": lang}, "software_spec": {"id": script_meta["software_spec_uid"]}, } }, } else: asset_meta = { "metadata": { "name": name, "description": desc, "asset_type": "script", "origin_country": "us", "asset_category": "USER", }, "entity": { "script": { "ml_version": "4.0.0", "language": {"name": lang}, "software_spec": {"id": script_meta["software_spec_uid"]}, } }, } # Step1: Create an asset print("Creating Script asset...") if self._client.CLOUD_PLATFORM_SPACES: creation_response = requests.post( self._client.service_instance._href_definitions.get_assets_href(), headers=self._client._get_headers(), params=self._client._params(), json=asset_meta, ) else: # if self._client.ICP_PLATFORM_SPACES creation_response = requests.post( self._client.service_instance._href_definitions.get_data_assets_href(), headers=self._client._get_headers(), params=self._client._params(), json=asset_meta, ) asset_details = self._handle_response( 201, "creating new asset", creation_response ) # Step2: Create attachment if creation_response.status_code == 201: asset_id = asset_details["metadata"]["asset_id"] attachment_meta = {"asset_type": "script", "name": "attachment_" + asset_id} attachment_response = requests.post( self._client.service_instance._href_definitions.get_attachments_href( asset_id ), headers=self._client._get_headers(), params=self._client._params(), json=attachment_meta, ) attachment_details = self._handle_response( 201, "creating new attachment", attachment_response ) if attachment_response.status_code == 201: attachment_id = attachment_details["attachment_id"] attachment_url = attachment_details["url1"] # Step3: Put content to attachment try: with open(file_path, "rb") as f: if not self._client.ICP_PLATFORM_SPACES: put_response = requests.put(attachment_url, data=f.read()) else: put_response = requests.put( self._credentials.url + attachment_url, files={"file": (name, f, "application/octet-stream")}, ) except Exception as e: deletion_response = requests.delete( self._client.service_instance._href_definitions.get_data_asset_href( asset_id ), params=self._client._params(), headers=self._client._get_headers(), ) print(deletion_response.status_code) raise WMLClientError("Failed while reading a file.", e) if put_response.status_code == 201 or put_response.status_code == 200: # Step4: Complete attachment complete_response = requests.post( self._client.service_instance._href_definitions.get_attachment_complete_href( asset_id, attachment_id ), headers=self._client._get_headers(), params=self._client._params(), ) if complete_response.status_code == 200: print("SUCCESS") return self._get_required_element_from_response(asset_details) else: try: self.delete(asset_id) except: pass raise WMLClientError( "Failed while creating a script asset. Try again." ) else: try: self.delete(asset_id) except: pass raise WMLClientError( "Failed while creating a script asset. Try again." ) else: print("SUCCESS") return self._get_required_element_from_response(asset_details) else: raise WMLClientError("Failed while creating a script asset. Try again.")
[docs] def list(self, limit: int | None = None) -> DataFrame: """List stored scripts in a table format. If limit is set to None, only the first 50 records are shown. :param limit: limit number of fetched records :type limit: int, optional :return: pandas.DataFrame with listed scripts :rtype: pandas.DataFrame **Example:** .. code-block:: python client.script.list() """ Script._validate_type(limit, "limit", int, False) href = self._client.service_instance._href_definitions.get_search_script_href() data: dict[str, Any] = {"query": "*:*"} if limit is not None: data.update({"limit": limit}) response = requests.post( href, params=self._client._params(), headers=self._client._get_headers(), json=data, ) self._handle_response(200, "list assets", response) asset_details = self._handle_response(200, "list assets", response)["results"] space_values = [ ( m["metadata"]["name"], m["metadata"]["asset_type"], m["metadata"]["asset_id"], ) for m in asset_details ] table = self._list( space_values, ["NAME", "ASSET_TYPE", "ASSET_ID"], None, _DEFAULT_LIST_LENGTH ) return table
[docs] def download( self, asset_id: str | None = None, filename: str | None = None, rev_id: str | None = None, **kwargs: Any, ) -> str: """Download the content of a script asset. :param asset_id: unique ID of the script asset to be downloaded :type asset_id: str :param filename: filename to be used for the downloaded file :type filename: str :param rev_id: revision ID :type rev_id: str, optional :return: path to the downloaded asset content :rtype: str **Example:** .. code-block:: python client.script.download(asset_id, "script_file") """ if filename is None: raise TypeError( "download() missing 1 required positional argument: 'filename'" ) asset_id = _get_id_from_deprecated_uid( kwargs, asset_id, "asset", can_be_none=False ) rev_id = _get_id_from_deprecated_uid(kwargs, rev_id, "rev", can_be_none=True) # Backward compatibility in past `rev_id` was an int. if isinstance(rev_id, int): warnings.warn( f"`rev_id` parameter type as int is deprecated, please convert to str instead", category=DeprecationWarning, ) rev_id = str(rev_id) Script._validate_type(asset_id, "asset_id", str, True) Script._validate_type(rev_id, "rev_id", str, False) params = self._client._params() if rev_id is not None: params.update({"revision_id": rev_id}) import urllib if not self._client.ICP_PLATFORM_SPACES: asset_response = requests.get( self._client.service_instance._href_definitions.get_asset_href( asset_id ), params=params, headers=self._client._get_headers(), ) else: asset_response = requests.get( self._client.service_instance._href_definitions.get_data_asset_href( asset_id ), params=params, headers=self._client._get_headers(), ) asset_details = self._handle_response(200, "get assets", asset_response) attachment_id = asset_details["attachments"][0]["id"] response = requests.get( self._client.service_instance._href_definitions.get_attachment_href( asset_id, attachment_id ), params=params, headers=self._client._get_headers(), ) if response.status_code == 200: attachment_signed_url = response.json()["url"] if "connection_id" in asset_details["attachments"][0]: att_response = requests.get(attachment_signed_url) else: if not self._client.ICP_PLATFORM_SPACES: att_response = requests.get(attachment_signed_url) else: att_response = requests.get( self._credentials.url + attachment_signed_url ) if att_response.status_code != 200: raise ApiRequestFailure( "Failure during {}.".format("downloading asset"), att_response ) downloaded_asset = att_response.content try: with open(filename, "wb") as f: f.write(downloaded_asset) print( "Successfully saved data asset content to file: '{}'".format( filename ) ) return os.path.abspath(filename) except IOError as e: raise WMLClientError( "Saving asset with artifact_url to local file: '{}' failed.".format( filename ), e, ) else: raise WMLClientError("Failed while downloading the asset " + asset_id) # type: ignore
[docs] @staticmethod def get_id(asset_details: dict) -> str: """Get the unique ID of a stored script asset. :param asset_details: metadata of the stored script asset :type asset_details: dict :return: unique ID of the stored script asset :rtype: str **Example:** .. code-block:: python asset_id = client.script.get_id(asset_details) """ Script._validate_type(asset_details, "asset_details", object, True) Script._validate_type_of_details(asset_details, DATA_ASSETS_DETAILS_TYPE) return WMLResource._get_required_element_from_dict( asset_details, "data_assets_details", ["metadata", "guid"] )
[docs] @staticmethod def get_href(asset_details: dict) -> str: """Get the URL of a stored script asset. :param asset_details: details of the stored script asset :type asset_details: dict :return: href of the stored script asset :rtype: str **Example:** .. code-block:: python asset_details = client.script.get_details(asset_id) asset_href = client.script.get_href(asset_details) """ Script._validate_type(asset_details, "asset_details", object, True) Script._validate_type_of_details(asset_details, DATA_ASSETS_DETAILS_TYPE) return WMLResource._get_required_element_from_dict( asset_details, "asset_details", ["metadata", "href"] )
[docs] def update( self, script_id: str | None = None, meta_props: dict | None = None, file_path: str | None = None, **kwargs: Any, ) -> dict: """Update a script with metadata, attachment, or both. :param script_id: ID of the script :type script_id: str :param meta_props: changes for the script matadata :type meta_props: dict, optional :param file_path: file path to the new attachment :type file_path: str, optional :return: updated metadata of the script :rtype: dict **Example:** .. code-block:: python script_details = client.script.update(script_id, meta, content_path) """ script_id = _get_id_from_deprecated_uid( kwargs, script_id, "script", can_be_none=False ) Script._validate_type(script_id, "script_id", str, True) if meta_props is None and file_path is None: raise WMLClientError( "At least either meta_props or file_path has to be provided" ) updated_details = None url = self._client.service_instance._href_definitions.get_asset_href(script_id) # STEPS # STEP 1. Get existing metadata # STEP 2. If meta_props provided, we need to patch meta # a. Construct meta patch string and call /v2/assets/<asset_id> to patch meta # b. Construct entity patch if required and call /v2/assets/<asset_id>/attributes/script to patch entity # STEP 3. If file_path provided, we need to patch the attachment # a. If attachment already exists for the script, delete it # b. POST call to get signed url for upload # c. Upload to the signed url # d. Mark upload complete # STEP 4. Get the updated script record and return # STEP 1 response = requests.get( url, params=self._client._params(), headers=self._client._get_headers() ) if response.status_code != 200: if response.status_code == 404: raise WMLClientError( "Invalid input. Unable to get the details of script_id provided." ) else: raise ApiRequestFailure( "Failure during {}.".format("getting script to update"), response ) details = self._handle_response(200, "Get script details", response) attachments_response = None # STEP 2a. # Patch meta if provided if meta_props is not None: self._validate_type(meta_props, "meta_props", dict, True) meta_patch_payload = [] entity_patch_payload = [] # Since we are dealing with direct asset apis, there can be metadata or entity patch or both if "name" in meta_props or "description" in meta_props: props_for_asset_meta_patch = {} for key in meta_props: if key == "name" or key == "description": props_for_asset_meta_patch.update({key: meta_props[key]}) meta_patch_payload = ( self.ConfigurationMetaNames._generate_patch_payload( details, props_for_asset_meta_patch, with_validation=True ) ) # STEP 2b. if "software_spec_uid" in meta_props: if details["entity"]["script"]["software_spec"]: entity_patch_payload = [ { "op": "replace", "path": "/software_spec/base_id", "value": meta_props["software_spec_uid"], } ] else: entity_patch_payload = [ { "op": "add", "path": "/software_spec", "value": "{base_id:" + meta_props["software_spec_uid"] + "}", } ] if meta_patch_payload: meta_patch_url = ( self._client.service_instance._href_definitions.get_asset_href( script_id ) ) response_patch = requests.patch( meta_patch_url, json=meta_patch_payload, params=self._client._params(), headers=self._client._get_headers(), ) updated_details = self._handle_response( 200, "script patch", response_patch ) if entity_patch_payload: entity_patch_url = ( self._client.service_instance._href_definitions.get_asset_href( script_id ) + "/attributes/script" ) response_patch = requests.patch( entity_patch_url, json=entity_patch_payload, params=self._client._params(), headers=self._client._get_headers(), ) updated_details = self._handle_response( 200, "script patch", response_patch ) if file_path is not None: if "attachments" in details and details["attachments"]: current_attachment_id = details["attachments"][0]["id"] else: current_attachment_id = None # STEP 3 attachments_response = self._update_attachment_for_assets( "script", script_id, file_path, current_attachment_id ) if attachments_response is not None and "success" not in attachments_response: self._update_msg(updated_details) # Have to fetch again to reflect updated asset and attachment ids url = self._client.service_instance._href_definitions.get_asset_href(script_id) response = requests.get( url, params=self._client._params(), headers=self._client._get_headers() ) if response.status_code != 200: if response.status_code == 404: raise WMLClientError( "Invalid input. Unable to get the details of script_id provided." ) else: raise ApiRequestFailure( "Failure during {}.".format("getting script to update"), response ) response = self._get_required_element_from_response( self._handle_response(200, "Get script details", response) ) entity = response["entity"] try: del entity["script"]["ml_version"] except KeyError: pass final_response = {"metadata": response["metadata"], "entity": entity} return final_response
def _update_msg(self, updated_details: Any) -> None: if updated_details is not None: print( "Could not update the attachment because of server error." " However metadata is updated. Try updating attachment again later" ) else: raise WMLClientError( "Unable to update attachment because of server error. Try again later" )
[docs] def delete(self, asset_id: str | None = None, **kwargs: Any) -> dict | str: """Delete a stored script asset. :param asset_id: ID of the script asset :type asset_id: str :return: status ("SUCCESS" or "FAILED") if deleted synchronously or dictionary with response :rtype: str | dict **Example:** .. code-block:: python client.script.delete(asset_id) """ asset_id = _get_id_from_deprecated_uid( kwargs, asset_id, "asset", can_be_none=False ) Script._validate_type(asset_id, "asset_id", str, True) if self._if_deployment_exist_for_asset(asset_id): raise WMLClientError( "Cannot delete script that has existing deployments. Please delete all associated deployments and try again" ) response = requests.delete( self._client.service_instance._href_definitions.get_asset_href(asset_id), params=self._client._params(), headers=self._client._get_headers(), ) if response.status_code == 200: return self._get_required_element_from_response(response.json()) else: return self._handle_response(204, "delete assets", response)
[docs] def create_revision(self, script_id: str | None = None, **kwargs: Any) -> dict: """Create a revision for the given script. Revisions are immutable once created. The metadata and attachment at `script_id` is taken and a revision is created out of it. :param script_id: ID of the script :type script_id: str :return: revised metadata of the stored script :rtype: dict **Example:** .. code-block:: python script_revision = client.script.create_revision(script_id) """ script_id = _get_id_from_deprecated_uid( kwargs, script_id, "script", can_be_none=False ) # For CP4D, check if either space or project ID is set self._client._check_if_either_is_set() Script._validate_type(script_id, "script_id", str, True) print("Creating script revision...") response = self._get_required_element_from_response( self._create_revision_artifact_for_assets(script_id, "Script") ) entity = response["entity"] try: del entity["script"]["ml_version"] except KeyError: pass final_response = {"metadata": response["metadata"], "entity": entity} return final_response
[docs] def list_revisions( self, script_id: str | None = None, limit: int | None = None, **kwargs: Any ) -> DataFrame: """Print all revisions for the given script ID in a table format. :param script_id: ID of the stored script :type script_id: str :param limit: limit number of fetched records :type limit: int, optional :return: pandas.DataFrame with listed revisions :rtype: pandas.DataFrame **Example:** .. code-block:: python client.script.list_revisions(script_id) """ script_id = _get_id_from_deprecated_uid( kwargs, script_id, "script", can_be_none=False ) # For CP4D, check if either space or project ID is set self._client._check_if_either_is_set() Script._validate_type(script_id, "script_id", str, True) url = ( self._client.service_instance._href_definitions.get_asset_href(script_id) + "/revisions" ) # /v2/assets/{asset_id}/revisions returns 'results' object script_resources = self._get_with_or_without_limit( url, limit, "List Script revisions", summary=None, pre_defined=None )["resources"] script_values = [ ( m["metadata"]["asset_id"], m["metadata"]["revision_id"], m["metadata"]["name"], m["metadata"]["commit_info"]["committed_at"], ) for m in script_resources ] table = self._list( script_values, ["ID", "REVISION_ID", "NAME", "REVISION_COMMIT"], limit, _DEFAULT_LIST_LENGTH, ) return table
[docs] def get_revision_details( self, script_id: str | None = None, rev_id: str | None = None, **kwargs: Any ) -> ListType: """Get metadata of the script revision. :param script_id: ID of the script :type script_id: str :param rev_id: ID of the revision. If this parameter is not provided, it returns the latest revision. If there is no latest revision, it returns an error. :type rev_id: str, optional :return: metadata of the stored script(s) :rtype: list **Example:** .. code-block:: python script_details = client.script.get_revision_details(script_id, rev_id) """ script_id = _get_id_from_deprecated_uid( kwargs, script_id, "script", can_be_none=False ) rev_id = _get_id_from_deprecated_uid(kwargs, rev_id, "rev", can_be_none=True) # Backward compatibility in past `rev_id` was an int. if isinstance(rev_id, int): warnings.warn( f"`rev_id` parameter type as int is deprecated, please convert to str instead", category=DeprecationWarning, ) rev_id = str(rev_id) # For CP4D, check if either space or project ID is set self._client._check_if_either_is_set() Script._validate_type(script_id, "script_id", str, True) Script._validate_type(rev_id, "rev_id", str, False) if rev_id is None: rev_id = "latest" url = self._client.service_instance._href_definitions.get_asset_href(script_id) resources = self._get_with_or_without_limit( url, limit=None, op_name="asset_revision", summary=None, pre_defined=None, revision=rev_id, )["resources"] responses = [ self._get_required_element_from_response(resource) for resource in resources ] final_responses = [] for response in responses: entity = response["entity"] try: del entity["script"]["ml_version"] except KeyError: pass final_responses.append({"metadata": response["metadata"], "entity": entity}) return final_responses
def _get_required_element_from_response(self, response_data: dict) -> dict: WMLResource._validate_type(response_data, "scripts", dict) revision_id = None metadata = { "name": response_data["metadata"]["name"], "guid": response_data["metadata"]["asset_id"], "href": response_data["href"], "asset_type": response_data["metadata"]["asset_type"], "created_at": response_data["metadata"]["created_at"], "last_updated_at": response_data["metadata"]["usage"]["last_updated_at"], } try: if self._client.default_space_id is not None: metadata["space_id"] = response_data["metadata"]["space_id"] elif self._client.default_project_id is not None: metadata["project_id"] = response_data["metadata"]["project_id"] if "description" in response_data["metadata"]: metadata.update( {"description": response_data["metadata"]["description"]} ) if "revision_id" in response_data["metadata"]: revision_id = response_data["metadata"]["revision_id"] metadata.update( {"revision_id": response_data["metadata"]["revision_id"]} ) if "attachments" in response_data and response_data["attachments"]: metadata.update( {"attachment_id": response_data["attachments"][0]["id"]} ) if "commit_info" in response_data["metadata"] and revision_id is not None: metadata.update( { "revision_commit_date": response_data["metadata"][ "commit_info" ]["committed_at"] } ) new_el = {"metadata": metadata, "entity": response_data["entity"]} if self._client.default_project_id is not None: href_without_host = response_data["href"].split(".com")[-1] new_el["metadata"].update({"href": href_without_host}) return new_el except Exception: raise WMLClientError( f"Failed to read Response from down-stream service: {response_data}" )