Source code for ibm_watsonx_ai.assets

#  -----------------------------------------------------------------------------------------
#  (C) Copyright IBM Corp. 2023-2024.
#  https://opensource.org/licenses/BSD-3-Clause
#  -----------------------------------------------------------------------------------------
from __future__ import annotations

import os
from typing import Any, TYPE_CHECKING
from warnings import warn

from ibm_watsonx_ai._wrappers import requests
from ibm_watsonx_ai.messages.messages import Messages
from ibm_watsonx_ai.metanames import AssetsMetaNames
from ibm_watsonx_ai.utils import DATA_ASSETS_DETAILS_TYPE
from ibm_watsonx_ai.utils.utils import _get_id_from_deprecated_uid
from ibm_watsonx_ai.wml_client_error import WMLClientError, ApiRequestFailure
from ibm_watsonx_ai.wml_resource import WMLResource

if TYPE_CHECKING:
    from pandas import DataFrame
    from ibm_watsonx_ai import APIClient

_DEFAULT_LIST_LENGTH = 50


[docs] class Assets(WMLResource): """Store and manage data assets.""" ConfigurationMetaNames = AssetsMetaNames() """MetaNames for Data Assets creation.""" def __init__(self, client: APIClient) -> None: WMLResource.__init__(self, __name__, client)
[docs] def get_details( self, asset_id: str | None = None, get_all: bool | None = None, limit: int | None = None, **kwargs: Any, ) -> dict: """Get data asset details. If no asset_id is passed, details for all assets are returned. :param asset_id: unique ID of the asset :type asset_id: str :param limit: limit number of fetched records :type limit: int, optional :param get_all: if True, it will get all entries in 'limited' chunks :type get_all: bool, optional :return: metadata of the stored data asset :rtype: dict **Example:** .. code-block:: python asset_details = client.data_assets.get_details(asset_id) """ asset_id = _get_id_from_deprecated_uid( kwargs, asset_id, "asset", can_be_none=True ) return self._get_asset_based_resource( asset_id, "data_asset", self._get_required_element_from_response, limit=limit, get_all=get_all, )
[docs] def create(self, name: str, file_path: str) -> dict[str, Any]: """Create a data asset and upload content to it. :param name: name to be given to the data asset :type name: str :param file_path: path to the content file to be uploaded :type file_path: str :return: metadata of the stored data asset :rtype: dict **Example:** .. code-block:: python asset_details = client.data_assets.create(name="sample_asset", file_path="/path/to/file") """ # quick support for COS credentials instead of local path # TODO add error handling and cleaning (remove the file) Assets._validate_type(name, "name", str, True) Assets._validate_type(file_path, "file_path", str, True) return self._create_asset(name, file_path)
[docs] def store(self, meta_props: dict) -> dict[str, Any]: """Create a data asset and upload content to it. :param meta_props: metadata of the space configuration. To see available meta names, use: .. code-block:: python client.data_assets.ConfigurationMetaNames.get() :type meta_props: dict **Example:** Example of data asset creation for files: .. code-block:: python metadata = { client.data_assets.ConfigurationMetaNames.NAME: 'my data assets', client.data_assets.ConfigurationMetaNames.DESCRIPTION: 'sample description', client.data_assets.ConfigurationMetaNames.DATA_CONTENT_NAME: 'sample.csv' } asset_details = client.data_assets.store(meta_props=metadata) Example of data asset creation using a connection: .. code-block:: python metadata = { client.data_assets.ConfigurationMetaNames.NAME: 'my data assets', client.data_assets.ConfigurationMetaNames.DESCRIPTION: 'sample description', client.data_assets.ConfigurationMetaNames.CONNECTION_ID: '39eaa1ee-9aa4-4651-b8fe-95d3ddae', client.data_assets.ConfigurationMetaNames.DATA_CONTENT_NAME: 't1/sample.csv' } asset_details = client.data_assets.store(meta_props=metadata) Example of data asset creation with a database sources type connection: .. code-block:: python metadata = { client.data_assets.ConfigurationMetaNames.NAME: 'my data assets', client.data_assets.ConfigurationMetaNames.DESCRIPTION: 'sample description', client.data_assets.ConfigurationMetaNames.CONNECTION_ID: '23eaf1ee-96a4-4651-b8fe-95d3dadfe', client.data_assets.ConfigurationMetaNames.DATA_CONTENT_NAME: 't1' } asset_details = client.data_assets.store(meta_props=metadata) """ ##For CP4D, check if either spce or project ID is set self._client._check_if_either_is_set() # quick support for COS credentials instead of local path # TODO add error handling and cleaning (remove the file) Assets._validate_type(meta_props, "meta_props", dict, True) name = meta_props[self.ConfigurationMetaNames.NAME] file_path = meta_props[self.ConfigurationMetaNames.DATA_CONTENT_NAME] description = "" connection_id = meta_props.get(self.ConfigurationMetaNames.CONNECTION_ID) if not connection_id and not os.path.isfile(file_path): warn(f"No connection_id specified and file: {file_path} does not exist.") if self.ConfigurationMetaNames.DESCRIPTION in meta_props: description = meta_props[self.ConfigurationMetaNames.DESCRIPTION] return self._create_asset( name, file_path, connection_id=connection_id, description=description )
def _create_asset( self, name: str, file_path: str, connection_id: str | None = None, description: str | None = None, ) -> dict: ##Step1: Create a data asset desc = description if desc is None: desc = "" try: import mimetypes except Exception as e: raise WMLClientError( Messages.get_message(message_id="module_mimetypes_not_found"), e ) mime_type = mimetypes.MimeTypes().guess_type(file_path)[0] if mime_type is None: mime_type = "application/octet-stream" asset_meta: dict[str, Any] = { "metadata": { "name": name, "description": desc, "asset_type": "data_asset", "origin_country": "us", "asset_category": "USER", }, "entity": {"data_asset": {"mime_type": mime_type}}, } if connection_id is not None: asset_meta["metadata"].update({"tags": ["connected-data"]}) # Step1 : Create an asset print(Messages.get_message(message_id="creating_data_asset")) creation_response = requests.post( self._client.service_instance._href_definitions.get_data_assets_href(), headers=self._client._get_headers(), params=self._client._params(), json=asset_meta, ) asset_details = self._handle_response( 201, "creating new asset", creation_response ) # Step2: Create attachment if creation_response.status_code == 201: asset_id = asset_details["metadata"]["asset_id"] attachment_name = file_path.split("/")[-1] attachment_meta: dict[str, Any] = { "asset_type": "data_asset", "name": attachment_name, "mime": mime_type, } if connection_id is not None: attachment_meta.update( { "connection_id": connection_id, "connection_path": file_path, "is_remote": True, } ) attachment_response = requests.post( self._client.service_instance._href_definitions.get_attachments_href( asset_id ), headers=self._client._get_headers(), params=self._client._params(), json=attachment_meta, ) attachment_details = self._handle_response( 201, "creating new attachment", attachment_response ) if attachment_response.status_code == 201: if connection_id is None: attachment_id = attachment_details["attachment_id"] attachment_url = attachment_details["url1"] # Step3: Put content to attachment try: with open(file_path, "rb") as _file: if not self._client.ICP_PLATFORM_SPACES: put_response = requests.put(attachment_url, data=_file) else: put_response = requests.put( self._credentials.url + attachment_url, files={"file": (name, _file, "file")}, ) except Exception as e: deletion_response = requests.delete( self._client.service_instance._href_definitions.get_data_asset_href( asset_id ), params=self._client._params(), headers=self._client._get_headers(), ) print(deletion_response.status_code) raise WMLClientError( Messages.get_message( message_id="failed_while_creating_a_data_asset" ), e, ) if ( put_response.status_code == 201 or put_response.status_code == 200 ): # Step4: Complete attachment complete_response = requests.post( self._client.service_instance._href_definitions.get_attachment_complete_href( asset_id, attachment_id ), headers=self._client._get_headers(), params=self._client._params(), ) if complete_response.status_code == 200: print(Messages.get_message(message_id="success")) return self._get_required_element_from_response( asset_details ) else: try: self.delete(asset_id) except: pass raise WMLClientError( Messages.get_message( message_id="failed_while_creating_a_data_asset" ) ) else: try: self.delete(asset_id) except: pass raise WMLClientError( Messages.get_message( message_id="failed_while_creating_a_data_asset" ) ) else: print(Messages.get_message(message_id="success")) return self._get_required_element_from_response(asset_details) else: try: self.delete(asset_id) except: pass raise WMLClientError( Messages.get_message( message_id="failed_while_creating_a_data_asset" ) ) else: raise WMLClientError( Messages.get_message(message_id="failed_while_creating_a_data_asset") )
[docs] def list(self, limit: int | None = None) -> DataFrame: """Lists stored data assets in a table format. If limit is set to none, only the first 50 records are shown. :param limit: limit number for fetched records :type limit: int :rtype: DataFrame :return: listed elements **Example:** .. code-block:: python client.data_assets.list() """ Assets._validate_type(limit, "limit", int, False) href = self._client.service_instance._href_definitions.get_search_asset_href() data: dict[str, Any] = {"query": "*:*"} if limit is not None: data.update({"limit": limit}) response = requests.post( href, params=self._client._params(), headers=self._client._get_headers(), json=data, ) self._handle_response(200, "list assets", response) asset_details = self._handle_response(200, "list assets", response)["results"] space_values = [ ( m["metadata"]["name"], m["metadata"]["asset_type"], m["metadata"]["size"], m["metadata"]["asset_id"], ) for m in asset_details ] table = self._list( space_values, ["NAME", "ASSET_TYPE", "SIZE", "ASSET_ID"], limit, _DEFAULT_LIST_LENGTH, ) return table
[docs] def download( self, asset_id: str | None = None, filename: str = "", **kwargs: Any ) -> str: # asset_id is optional for backward compatibility, # filename should be not optional, however, as asset_id is, filename also must be """Download and store the content of a data asset. :param asset_id: unique ID of the data asset to be downloaded :type asset_id: str :param filename: filename to be used for the downloaded file :type filename: str :return: normalized path to the downloaded asset content :rtype: str **Example:** .. code-block:: python client.data_assets.download(asset_id,"sample_asset.csv") """ asset_id = _get_id_from_deprecated_uid(kwargs, asset_id, "asset") if filename is None: raise TypeError("Missing required positional argument 'filename'") content = self.get_content(asset_id) try: with open(filename, "wb") as f: f.write(content) print( Messages.get_message( filename, message_id="successfully_saved_data_asset_content_to_file" ) ) return os.path.abspath(filename) except IOError as e: raise WMLClientError( Messages.get_message( filename, message_id="saving_data_asset_to_local_file_failed" ), e, )
[docs] def get_content( self, asset_id: str | None = None, **kwargs: Any ) -> bytes: # asset_id is optional for backward compatibility """Download the content of a data asset. :param asset_id: unique ID of the data asset to be downloaded :type asset_id: str :return: the asset content :rtype: bytes **Example:** .. code-block:: python content = client.data_assets.get_content(asset_id).decode('ascii') """ asset_id = _get_id_from_deprecated_uid(kwargs, asset_id, "asset") Assets._validate_type(asset_id, "asset_id", str, True) import urllib asset_response = requests.get( self._client.service_instance._href_definitions.get_data_asset_href( asset_id ), params=self._client._params(), headers=self._client._get_headers(), ) asset_details = self._handle_response(200, "get assets", asset_response) attachment_id = asset_details["attachments"][0]["id"] response = requests.get( self._client.service_instance._href_definitions.get_attachment_href( asset_id, attachment_id ), params=self._client._params(), headers=self._client._get_headers(), ) if response.status_code == 200: if ( "connection_id" in asset_details["attachments"][0] and asset_details["attachments"][0]["connection_id"] is not None ): conn_details = self._client.connections.get_details( asset_details["attachments"][0]["connection_id"] ) attachment_data_source_type = conn_details["entity"].get( "datasource_type" ) cos_conn_data_source_id = ( self._client.connections.get_datasource_type_id_by_name( "cloudobjectstorage" ) ) if attachment_data_source_type == cos_conn_data_source_id: attachment_signed_url = response.json()["url"] att_response = requests.get(attachment_signed_url) else: raise WMLClientError( Messages.get_message( message_id="download_api_not_supported_for_this_connection_type" ) ) else: attachment_signed_url = response.json()["url"] if self._client.CLOUD_PLATFORM_SPACES: att_response = requests.get(attachment_signed_url) else: att_response = requests.get( self._credentials.url + attachment_signed_url ) if att_response.status_code != 200: raise ApiRequestFailure( Messages.get_message( message_id="failure_during_downloading_data_asset" ), att_response, ) return att_response.content else: raise WMLClientError( Messages.get_message(message_id="failure_during_downloading_data_asset") )
[docs] @staticmethod def get_id(asset_details: dict) -> str: """Get the unique ID of a stored data asset. :param asset_details: details of the stored data asset :type asset_details: dict :return: unique ID of the stored data asset :rtype: str **Example:** .. code-block:: python asset_id = client.data_assets.get_id(asset_details) """ Assets._validate_type(asset_details, "asset_details", object, True) Assets._validate_type_of_details(asset_details, DATA_ASSETS_DETAILS_TYPE) return WMLResource._get_required_element_from_dict( asset_details, "data_assets_details", ["metadata", "guid"] )
[docs] @staticmethod def get_href(asset_details: dict) -> str: """Get the URL of a stored data asset. :param asset_details: details of the stored data asset :type asset_details: dict :return: href of the stored data asset :rtype: str **Example:** .. code-block:: python asset_details = client.data_assets.get_details(asset_id) asset_href = client.data_assets.get_href(asset_details) """ Assets._validate_type(asset_details, "asset_details", object, True) Assets._validate_type_of_details(asset_details, DATA_ASSETS_DETAILS_TYPE) return WMLResource._get_required_element_from_dict( asset_details, "asset_details", ["metadata", "href"] )
[docs] def delete( self, asset_id: str | None = None, **kwargs: Any ) -> dict | str: # asset_id is optional for backward compatibility """Delete a stored data asset. :param asset_id: unique ID of the data asset :type asset_id: str :return: status ("SUCCESS" or "FAILED") or dictionary, if deleted asynchronously :rtype: str or dict **Example:** .. code-block:: python client.data_assets.delete(asset_id) """ asset_id = _get_id_from_deprecated_uid(kwargs, asset_id, "asset") Assets._validate_type(asset_id, "asset_id", str, True) response = requests.delete( self._client.service_instance._href_definitions.get_asset_href(asset_id), params=self._client._params(), headers=self._client._get_headers(), ) if response.status_code == 200: return self._get_required_element_from_response(response.json()) else: return self._handle_response(204, "delete assets", response)
def _get_required_element_from_response(self, response_data: dict) -> dict: WMLResource._validate_type(response_data, "data assets response", dict) import copy new_el = {"metadata": copy.copy(response_data["metadata"])} try: new_el["metadata"]["guid"] = response_data["metadata"]["asset_id"] new_el["metadata"]["href"] = response_data["href"] new_el["metadata"]["asset_type"] = response_data["metadata"]["asset_type"] new_el["metadata"]["created_at"] = response_data["metadata"]["created_at"] new_el["metadata"]["last_updated_at"] = response_data["metadata"][ "usage" ].get("last_updated_at") if self._client.default_space_id is not None: new_el["metadata"]["space_id"] = response_data["metadata"]["space_id"] elif self._client.default_project_id is not None: new_el["metadata"]["project_id"] = response_data["metadata"][ "project_id" ] if "entity" in response_data: new_el["entity"] = response_data["entity"] if "attachments" in response_data and response_data["attachments"]: new_el["metadata"].update( {"attachment_id": response_data["attachments"][0]["id"]} ) href_without_host = response_data["href"].split(".com")[-1] new_el["metadata"].update({"href": href_without_host}) return new_el except Exception as e: raise WMLClientError( Messages.get_message( response_data, message_id="failed_to_read_response_from_down_stream_service", ) )