Source code for ibm_watson_machine_learning.assets

#  -----------------------------------------------------------------------------------------
#  (C) Copyright IBM Corp. 2019-2024.
#  https://opensource.org/licenses/BSD-3-Clause
#  -----------------------------------------------------------------------------------------

from __future__ import print_function
import ibm_watson_machine_learning._wrappers.requests as requests

from ibm_watson_machine_learning.messages.messages import Messages
from ibm_watson_machine_learning.utils import DATA_ASSETS_DETAILS_TYPE
from ibm_watson_machine_learning.metanames import AssetsMetaNames
from ibm_watson_machine_learning.wml_resource import WMLResource
from ibm_watson_machine_learning.wml_client_error import WMLClientError, ApiRequestFailure
from warnings import warn
import os

_DEFAULT_LIST_LENGTH = 50


[docs] class Assets(WMLResource): """Store and manage data assets.""" ConfigurationMetaNames = AssetsMetaNames() """MetaNames for Data Assets creation.""" def __init__(self, client): WMLResource.__init__(self, __name__, client) self._ICP = client.ICP
[docs] def get_details(self, asset_uid=None): """Get data asset details. If no asset_uid is passed, details for all assets will be returned. :param asset_uid: Unique id of asset :type asset_uid: str :return: metadata of the stored data asset :rtype: dict **Example** .. code-block:: python asset_details = client.data_assets.get_details(asset_uid) """ return self._get_asset_based_resource(asset_uid, 'data_asset', self._get_required_element_from_response)
[docs] def create(self, name, file_path): """Create a data asset and upload content to it. :param name: name to be given to the data asset :type name: str :param file_path: path to the content file to be uploaded :type file_path: str :return: metadata of the stored data asset :rtype: dict **Example** .. code-block:: python asset_details = client.data_assets.create(name="sample_asset", file_path="/path/to/file") """ # quick support for COS credentials instead of local path # TODO add error handling and cleaning (remove the file) WMLResource._chk_and_block_create_update_for_python36(self) Assets._validate_type(name, u'name', str, True) Assets._validate_type(file_path, u'file_path', str, True) return self._create_asset(name, file_path)
[docs] def store(self, meta_props): """Create a data asset and upload content to it. :param meta_props: meta data of the space configuration. To see available meta names use: .. code-block:: python client.data_assets.ConfigurationMetaNames.get() :type meta_props: dict **Example** Example for data asset creation for files : .. code-block:: python metadata = { client.data_assets.ConfigurationMetaNames.NAME: 'my data assets', client.data_assets.ConfigurationMetaNames.DESCRIPTION: 'sample description', client.data_assets.ConfigurationMetaNames.DATA_CONTENT_NAME: 'sample.csv' } asset_details = client.data_assets.store(meta_props=metadata) Example of data asset creation using connection: .. code-block:: python metadata = { client.data_assets.ConfigurationMetaNames.NAME: 'my data assets', client.data_assets.ConfigurationMetaNames.DESCRIPTION: 'sample description', client.data_assets.ConfigurationMetaNames.CONNECTION_ID: '39eaa1ee-9aa4-4651-b8fe-95d3ddae', client.data_assets.ConfigurationMetaNames.DATA_CONTENT_NAME: 't1/sample.csv' } asset_details = client.data_assets.store(meta_props=metadata) Example for data asset creation with database sources type connection: .. code-block:: python metadata = { client.data_assets.ConfigurationMetaNames.NAME: 'my data assets', client.data_assets.ConfigurationMetaNames.DESCRIPTION: 'sample description', client.data_assets.ConfigurationMetaNames.CONNECTION_ID: '23eaf1ee-96a4-4651-b8fe-95d3dadfe', client.data_assets.ConfigurationMetaNames.DATA_CONTENT_NAME: 't1' } asset_details = client.data_assets.store(meta_props=metadata) """ ##For CP4D, check if either spce or project ID is set WMLResource._chk_and_block_create_update_for_python36(self) self._client._check_if_either_is_set() # quick support for COS credentials instead of local path # TODO add error handling and cleaning (remove the file) Assets._validate_type(meta_props, u'meta_props', dict, True) name = meta_props[self.ConfigurationMetaNames.NAME] file_path = meta_props[self.ConfigurationMetaNames.DATA_CONTENT_NAME] description = "" connection_id = meta_props.get(self.ConfigurationMetaNames.CONNECTION_ID) if not connection_id and not os.path.isfile(file_path): warn(f"No connection_id specified and file: {file_path} does not exist.") if self.ConfigurationMetaNames.DESCRIPTION in meta_props: description = meta_props[self.ConfigurationMetaNames.DESCRIPTION] return self._create_asset(name, file_path, connection_id=connection_id, description=description)
def _create_asset(self, name, file_path, connection_id=None, description=None): ##Step1: Create a data asset desc = description if desc is None: desc = "" try: import mimetypes except Exception as e: raise WMLClientError(Messages.get_message(message_id="module_mimetypes_not_found"), e) mime_type = mimetypes.MimeTypes().guess_type(file_path)[0] if mime_type is None: mime_type = "application/octet-stream" asset_meta = { "metadata": { "name": name, "description": desc, "asset_type": "data_asset", "origin_country": "us", "asset_category": "USER" }, "entity": { "data_asset": { "mime_type": mime_type } } } if connection_id is not None: asset_meta["metadata"].update({"tags": ["connected-data"]}) # Step1 : Create an asset print(Messages.get_message(message_id="creating_data_asset")) if self._client.WSD: # For WSD the asset creation is done within _wsd_create_asset function using polyglot # Thus using the same for data_assets type input_payload = { "name": name, "mime_type": mime_type } meta_props = { "name": name } details = Assets._wsd_create_asset(self, "data_asset", input_payload, meta_props, file_path) return self._get_required_element_from_response(details) else: creation_response = requests.post( self._client.service_instance._href_definitions.get_data_assets_href(), headers=self._client._get_headers(), params=self._client._params(), json=asset_meta ) asset_details = self._handle_response(201, u'creating new asset', creation_response) # Step2: Create attachment if creation_response.status_code == 201: asset_id = asset_details["metadata"]["asset_id"] attachment_name = file_path.split("/")[-1] attachment_meta = { "asset_type": "data_asset", "name": attachment_name, "mime": mime_type } if connection_id is not None: attachment_meta.update({"connection_id": connection_id, "connection_path": file_path, "is_remote": True}) attachment_response = requests.post( self._client.service_instance._href_definitions.get_attachments_href(asset_id), headers=self._client._get_headers(), params=self._client._params(), json=attachment_meta ) attachment_details = self._handle_response(201, u'creating new attachment', attachment_response) if attachment_response.status_code == 201: if connection_id is None: attachment_id = attachment_details["attachment_id"] attachment_url = attachment_details["url1"] # Step3: Put content to attachment try: with open(file_path, 'rb') as _file: if not self._ICP: put_response = requests.put( attachment_url, data=_file ) else: put_response = requests.put( self._wml_credentials['url'] + attachment_url, files={'file': (name, _file, 'file')} ) except Exception as e: deletion_response = requests.delete( self._client.service_instance._href_definitions.get_data_asset_href(asset_id), params=self._client._params(), headers=self._client._get_headers() ) print(deletion_response.status_code) raise WMLClientError(Messages.get_message(message_id="failed_while_creating_a_data_asset"), e) if put_response.status_code == 201 or put_response.status_code == 200: # Step4: Complete attachment complete_response = requests.post( self._client.service_instance._href_definitions.get_attachment_complete_href(asset_id, attachment_id), headers=self._client._get_headers(), params=self._client._params() ) if complete_response.status_code == 200: print(Messages.get_message(message_id="success")) return self._get_required_element_from_response(asset_details) else: self._delete(asset_id) raise WMLClientError( Messages.get_message(message_id="failed_while_creating_a_data_asset")) else: self._delete(asset_id) raise WMLClientError(Messages.get_message(message_id="failed_while_creating_a_data_asset")) else: print(Messages.get_message(message_id="success")) return self._get_required_element_from_response(asset_details) else: self._delete(asset_id) raise WMLClientError(Messages.get_message(message_id="failed_while_creating_a_data_asset")) else: raise WMLClientError(Messages.get_message(message_id="failed_while_creating_a_data_asset"))
[docs] def list(self, limit=None, return_as_df=True): """Print stored data assets in a table format. If limit is set to None there will be only first 50 records shown. :param limit: limit number of fetched records :type limit: int :param return_as_df: determine if table should be returned as pandas.DataFrame object, default: True :type return_as_df: bool, optional **Example** .. code-block:: python client.data_assets.list() """ Assets._validate_type(limit, u'limit', int, False) href = self._client.service_instance._href_definitions.get_search_asset_href() data = { "query": "*:*" } if limit is not None: data.update({"limit": limit}) response = requests.post(href, params=self._client._params(), headers=self._client._get_headers(), json=data) self._handle_response(200, u'list assets', response) asset_details = self._handle_response(200, u'list assets', response)["results"] space_values = [ (m[u'metadata'][u'name'], m[u'metadata'][u'asset_type'], m[u'metadata'][u'size'], m["metadata"]["asset_id"]) for m in asset_details] table = self._list(space_values, [u'NAME', u'ASSET_TYPE', u'SIZE', u'ASSET_ID'], limit, _DEFAULT_LIST_LENGTH) if return_as_df: return table
[docs] def download(self, asset_uid, filename): """Download and store the content of a data asset. :param asset_uid: the Unique Id of the data asset to be downloaded :type asset_uid: str :param filename: filename to be used for the downloaded file :type filename: str :return: normalized path to the downloaded asset content :rtype: str **Example** .. code-block:: python client.data_assets.download(asset_uid,"sample_asset.csv") """ content = self.get_content(asset_uid) try: with open(filename, 'wb') as f: f.write(content) print(Messages.get_message(filename, message_id="successfully_saved_data_asset_content_to_file")) return os.path.abspath(filename) except IOError as e: raise WMLClientError( Messages.get_message(filename, message_id="saving_data_asset_to_local_file_failed"), e)
[docs] def get_content(self, asset_uid): """Download the content of a data asset. :param asset_uid: the Unique Id of the data asset to be downloaded :type asset_uid: str :return: the asset content :rtype: binary **Example** .. code-block:: python content = client.data_assets.get_content(asset_uid).decode('ascii') """ Assets._validate_type(asset_uid, u'asset_uid', str, True) import urllib asset_response = requests.get(self._client.service_instance._href_definitions.get_data_asset_href(asset_uid), params=self._client._params(), headers=self._client._get_headers()) asset_details = self._handle_response(200, u'get assets', asset_response) if self._WSD: attachment_url = asset_details['attachments'][0]['object_key'] artifact_content_url = self._client.service_instance._href_definitions.get_wsd_model_attachment_href() + \ urllib.parse.quote('data_asset/' + attachment_url, safe='') r = requests.get(artifact_content_url, params=self._client._params(), headers=self._client._get_headers(), stream=True) if r.status_code != 200: raise ApiRequestFailure(Messages.get_message(message_id="failure_during_downloading_data_asset"), r) return r.content else: attachment_id = asset_details["attachments"][0]["id"] response = requests.get( self._client.service_instance._href_definitions.get_attachment_href(asset_uid, attachment_id), params=self._client._params(), headers=self._client._get_headers()) if response.status_code == 200: if 'connection_id' in asset_details["attachments"][0] and \ asset_details["attachments"][0]['connection_id'] is not None: conn_details = self._client.connections.get_details( asset_details["attachments"][0]['connection_id']) attachment_data_source_type = conn_details['entity'].get('datasource_type') cos_conn_data_source_id = self._client.connections.get_datasource_type_uid_by_name( 'cloudobjectstorage') if attachment_data_source_type == cos_conn_data_source_id: attachment_signed_url = response.json()["url"] att_response = requests.get(attachment_signed_url) else: raise WMLClientError( Messages.get_message(message_id="download_api_not_supported_for_this_connection_type")) else: attachment_signed_url = response.json()["url"] if not self._ICP and not self._client.WSD: if self._client.CLOUD_PLATFORM_SPACES: att_response = requests.get(attachment_signed_url) else: att_response = requests.get(self._wml_credentials["url"] + attachment_signed_url) else: att_response = requests.get(self._wml_credentials["url"] + attachment_signed_url) if att_response.status_code != 200: raise ApiRequestFailure(Messages.get_message(message_id="failure_during_downloading_data_asset"), att_response) return att_response.content else: raise WMLClientError(Messages.get_message(message_id="failure_during_downloading_data_asset"))
[docs] @staticmethod def get_uid(asset_details): """Get Unique Id of stored data asset. *Deprecated:* Use ``get_id(details)`` instead. :param asset_details: metadata of the stored data asset :type asset_details: dict :return: Unique Id of stored asset :rtype**: str **Example** .. code-block:: python asset_uid = client.data_assets.get_uid(asset_details) """ Assets._validate_type(asset_details, u'asset_details', object, True) Assets._validate_type_of_details(asset_details, DATA_ASSETS_DETAILS_TYPE) return WMLResource._get_required_element_from_dict(asset_details, u'data_assets_details', [u'metadata', u'guid'])
[docs] @staticmethod def get_id(asset_details): """Get Unique Id of stored data asset. :param asset_details: details of the stored data asset :type asset_details: dict :return: Unique Id of stored data asset :rtype: str **Example** .. code-block:: python asset_id = client.data_assets.get_id(asset_details) """ return Assets.get_uid(asset_details)
[docs] @staticmethod def get_href(asset_details): """Get url of stored data asset. :param asset_details: stored data asset details :type asset_details: dict :return: href of stored data asset :rtype: str **Example** .. code-block:: python asset_details = client.data_assets.get_details(asset_uid) asset_href = client.data_assets.get_href(asset_details) """ Assets._validate_type(asset_details, u'asset_details', object, True) Assets._validate_type_of_details(asset_details, DATA_ASSETS_DETAILS_TYPE) return WMLResource._get_required_element_from_dict(asset_details, u'asset_details', [u'metadata', u'href'])
[docs] def delete(self, asset_uid): """Delete a stored data asset. :param asset_uid: Unique Id of data asset :type asset_uid: str :return: status ("SUCCESS" or "FAILED") :rtype: str **Example** .. code-block:: python client.data_assets.delete(asset_uid) """ Assets._validate_type(asset_uid, u'asset_uid', str, True) response = requests.delete(self._client.service_instance._href_definitions.get_asset_href(asset_uid), params=self._client._params(), headers=self._client._get_headers()) if response.status_code == 200: return self._get_required_element_from_response(response.json()) else: return self._handle_response(204, u'delete assets', response)
def _delete(self, asset_uid): Assets._validate_type(asset_uid, u'asset_uid', str, True) response = requests.delete(self._client.service_instance._href_definitions.get_asset_href(asset_uid), params=self._client._params(), headers=self._client._get_headers()) # if response.status_code == 200: # return self._get_required_element_from_response(response.json()) # else: # return self._handle_response(204, u'delete assets', response) # def get_href(self, asset_uid): # """ # Get metadata of stored space(s). If space UID is not specified, it returns all the spaces metadata. # # **Parameters** # # .. important:: # #. **space_uid**: Space UID (optional)\n # **type**: str\n # #. **limit**: limit number of fetched records (optional)\n # **type**: int\n # # **Output** # # .. important:: # **returns**: metadata of stored space(s)\n # **return type**: dict # dict (if UID is not None) or {"resources": [dict]} (if UID is None)\n # # .. note:: # If UID is not specified, all spaces metadata is fetched\n # # **Example** # # >>> space_details = client.spaces.get_details(space_uid) # >>> space_details = client.spaces.get_details() # """ # # # Assets._validate_type(asset_uid, u'asset_uid', str, True) # # response = requests.get(self._client.service_instance._href_definitions.get_data_asset_href(asset_uid), params=self._client._params(), # headers=self._client._get_headers()) # # if response.status_code == 200: # return response.json()["href"] # else: # return self._handle_response(200, u'spaces assets', response) def _get_required_element_from_response(self, response_data): WMLResource._validate_type(response_data, u'data assets response', dict) import copy new_el = {'metadata': copy.copy(response_data['metadata'])} try: new_el['metadata']['guid'] = response_data['metadata']['asset_id'] new_el['metadata']['href'] = response_data['href'] if not ( self._client.WSD and self._client.default_project_id) else self._client.service_instance._href_definitions.get_base_asset_href( response_data['metadata']['asset_id']) + "?" + "project_id=" + response_data['metadata']['project_id'] new_el['metadata']['asset_type'] = response_data['metadata']['asset_type'] new_el['metadata']['created_at'] = response_data['metadata']['created_at'] new_el['metadata']['last_updated_at'] = response_data['metadata']['usage'].get('last_updated_at') if self._client.default_space_id is not None: new_el['metadata']['space_id'] = response_data['metadata']['space_id'] elif self._client.default_project_id is not None: new_el['metadata']['project_id'] = response_data['metadata']['project_id'] if 'entity' in response_data: new_el['entity'] = response_data['entity'] if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: if "attachments" in response_data and response_data[u'attachments']: new_el[u'metadata'].update({'attachment_id': response_data[u'attachments'][0][u'id']}) if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: href_without_host = response_data['href'].split('.com')[-1] new_el[u'metadata'].update({'href': href_without_host}) return new_el except Exception as e: raise WMLClientError( Messages.get_message(response_data, message_id="failed_to_read_response_from_down_stream_service"))