# -----------------------------------------------------------------------------------------
# (C) Copyright IBM Corp. 2023-2024.
# https://opensource.org/licenses/BSD-3-Clause
# -----------------------------------------------------------------------------------------
from __future__ import annotations
import os
from typing import Any, TYPE_CHECKING
from warnings import warn
from ibm_watsonx_ai._wrappers import requests
from ibm_watsonx_ai.messages.messages import Messages
from ibm_watsonx_ai.metanames import AssetsMetaNames
from ibm_watsonx_ai.utils import DATA_ASSETS_DETAILS_TYPE
from ibm_watsonx_ai.utils.utils import _get_id_from_deprecated_uid
from ibm_watsonx_ai.wml_client_error import WMLClientError, ApiRequestFailure
from ibm_watsonx_ai.wml_resource import WMLResource
if TYPE_CHECKING:
from pandas import DataFrame
from ibm_watsonx_ai import APIClient
_DEFAULT_LIST_LENGTH = 50
[docs]
class Assets(WMLResource):
"""Store and manage data assets."""
ConfigurationMetaNames = AssetsMetaNames()
"""MetaNames for Data Assets creation."""
def __init__(self, client: APIClient) -> None:
WMLResource.__init__(self, __name__, client)
[docs]
def get_details(
self,
asset_id: str | None = None,
get_all: bool | None = None,
limit: int | None = None,
**kwargs: Any,
) -> dict:
"""Get data asset details. If no asset_id is passed, details for all assets are returned.
:param asset_id: unique ID of the asset
:type asset_id: str
:param limit: limit number of fetched records
:type limit: int, optional
:param get_all: if True, it will get all entries in 'limited' chunks
:type get_all: bool, optional
:return: metadata of the stored data asset
:rtype: dict
**Example:**
.. code-block:: python
asset_details = client.data_assets.get_details(asset_id)
"""
asset_id = _get_id_from_deprecated_uid(
kwargs, asset_id, "asset", can_be_none=True
)
return self._get_asset_based_resource(
asset_id,
"data_asset",
self._get_required_element_from_response,
limit=limit,
get_all=get_all,
)
[docs]
def create(self, name: str, file_path: str) -> dict[str, Any]:
"""Create a data asset and upload content to it.
:param name: name to be given to the data asset
:type name: str
:param file_path: path to the content file to be uploaded
:type file_path: str
:return: metadata of the stored data asset
:rtype: dict
**Example:**
.. code-block:: python
asset_details = client.data_assets.create(name="sample_asset", file_path="/path/to/file")
"""
# quick support for COS credentials instead of local path
# TODO add error handling and cleaning (remove the file)
Assets._validate_type(name, "name", str, True)
Assets._validate_type(file_path, "file_path", str, True)
return self._create_asset(name, file_path)
[docs]
def store(self, meta_props: dict) -> dict[str, Any]:
"""Create a data asset and upload content to it.
:param meta_props: metadata of the space configuration. To see available meta names, use:
.. code-block:: python
client.data_assets.ConfigurationMetaNames.get()
:type meta_props: dict
**Example:**
Example of data asset creation for files:
.. code-block:: python
metadata = {
client.data_assets.ConfigurationMetaNames.NAME: 'my data assets',
client.data_assets.ConfigurationMetaNames.DESCRIPTION: 'sample description',
client.data_assets.ConfigurationMetaNames.DATA_CONTENT_NAME: 'sample.csv'
}
asset_details = client.data_assets.store(meta_props=metadata)
Example of data asset creation using a connection:
.. code-block:: python
metadata = {
client.data_assets.ConfigurationMetaNames.NAME: 'my data assets',
client.data_assets.ConfigurationMetaNames.DESCRIPTION: 'sample description',
client.data_assets.ConfigurationMetaNames.CONNECTION_ID: '39eaa1ee-9aa4-4651-b8fe-95d3ddae',
client.data_assets.ConfigurationMetaNames.DATA_CONTENT_NAME: 't1/sample.csv'
}
asset_details = client.data_assets.store(meta_props=metadata)
Example of data asset creation with a database sources type connection:
.. code-block:: python
metadata = {
client.data_assets.ConfigurationMetaNames.NAME: 'my data assets',
client.data_assets.ConfigurationMetaNames.DESCRIPTION: 'sample description',
client.data_assets.ConfigurationMetaNames.CONNECTION_ID: '23eaf1ee-96a4-4651-b8fe-95d3dadfe',
client.data_assets.ConfigurationMetaNames.DATA_CONTENT_NAME: 't1'
}
asset_details = client.data_assets.store(meta_props=metadata)
"""
##For CP4D, check if either spce or project ID is set
self._client._check_if_either_is_set()
# quick support for COS credentials instead of local path
# TODO add error handling and cleaning (remove the file)
Assets._validate_type(meta_props, "meta_props", dict, True)
name = meta_props[self.ConfigurationMetaNames.NAME]
file_path = meta_props[self.ConfigurationMetaNames.DATA_CONTENT_NAME]
description = ""
connection_id = meta_props.get(self.ConfigurationMetaNames.CONNECTION_ID)
if not connection_id and not os.path.isfile(file_path):
warn(f"No connection_id specified and file: {file_path} does not exist.")
if self.ConfigurationMetaNames.DESCRIPTION in meta_props:
description = meta_props[self.ConfigurationMetaNames.DESCRIPTION]
return self._create_asset(
name, file_path, connection_id=connection_id, description=description
)
def _create_asset(
self,
name: str,
file_path: str,
connection_id: str | None = None,
description: str | None = None,
) -> dict:
##Step1: Create a data asset
desc = description
if desc is None:
desc = ""
try:
import mimetypes
except Exception as e:
raise WMLClientError(
Messages.get_message(message_id="module_mimetypes_not_found"), e
)
mime_type = mimetypes.MimeTypes().guess_type(file_path)[0]
if mime_type is None:
mime_type = "application/octet-stream"
asset_meta: dict[str, Any] = {
"metadata": {
"name": name,
"description": desc,
"asset_type": "data_asset",
"origin_country": "us",
"asset_category": "USER",
},
"entity": {"data_asset": {"mime_type": mime_type}},
}
if connection_id is not None:
asset_meta["metadata"].update({"tags": ["connected-data"]})
# Step1 : Create an asset
print(Messages.get_message(message_id="creating_data_asset"))
creation_response = requests.post(
self._client.service_instance._href_definitions.get_data_assets_href(),
headers=self._client._get_headers(),
params=self._client._params(),
json=asset_meta,
)
asset_details = self._handle_response(
201, "creating new asset", creation_response
)
# Step2: Create attachment
if creation_response.status_code == 201:
asset_id = asset_details["metadata"]["asset_id"]
attachment_name = file_path.split("/")[-1]
attachment_meta: dict[str, Any] = {
"asset_type": "data_asset",
"name": attachment_name,
"mime": mime_type,
}
if connection_id is not None:
attachment_meta.update(
{
"connection_id": connection_id,
"connection_path": file_path,
"is_remote": True,
}
)
attachment_response = requests.post(
self._client.service_instance._href_definitions.get_attachments_href(
asset_id
),
headers=self._client._get_headers(),
params=self._client._params(),
json=attachment_meta,
)
attachment_details = self._handle_response(
201, "creating new attachment", attachment_response
)
if attachment_response.status_code == 201:
if connection_id is None:
attachment_id = attachment_details["attachment_id"]
attachment_url = attachment_details["url1"]
# Step3: Put content to attachment
try:
with open(file_path, "rb") as _file:
if not self._client.ICP_PLATFORM_SPACES:
put_response = requests.put(attachment_url, data=_file)
else:
put_response = requests.put(
self._credentials.url + attachment_url,
files={"file": (name, _file, "file")},
)
except Exception as e:
deletion_response = requests.delete(
self._client.service_instance._href_definitions.get_data_asset_href(
asset_id
),
params=self._client._params(),
headers=self._client._get_headers(),
)
print(deletion_response.status_code)
raise WMLClientError(
Messages.get_message(
message_id="failed_while_creating_a_data_asset"
),
e,
)
if (
put_response.status_code == 201
or put_response.status_code == 200
):
# Step4: Complete attachment
complete_response = requests.post(
self._client.service_instance._href_definitions.get_attachment_complete_href(
asset_id, attachment_id
),
headers=self._client._get_headers(),
params=self._client._params(),
)
if complete_response.status_code == 200:
print(Messages.get_message(message_id="success"))
return self._get_required_element_from_response(
asset_details
)
else:
try:
self.delete(asset_id)
except:
pass
raise WMLClientError(
Messages.get_message(
message_id="failed_while_creating_a_data_asset"
)
)
else:
try:
self.delete(asset_id)
except:
pass
raise WMLClientError(
Messages.get_message(
message_id="failed_while_creating_a_data_asset"
)
)
else:
print(Messages.get_message(message_id="success"))
return self._get_required_element_from_response(asset_details)
else:
try:
self.delete(asset_id)
except:
pass
raise WMLClientError(
Messages.get_message(
message_id="failed_while_creating_a_data_asset"
)
)
else:
raise WMLClientError(
Messages.get_message(message_id="failed_while_creating_a_data_asset")
)
[docs]
def list(self, limit: int | None = None) -> DataFrame:
"""Lists stored data assets in a table format.
If limit is set to none, only the first 50 records are shown.
:param limit: limit number for fetched records
:type limit: int
:rtype: DataFrame
:return: listed elements
**Example:**
.. code-block:: python
client.data_assets.list()
"""
Assets._validate_type(limit, "limit", int, False)
href = self._client.service_instance._href_definitions.get_search_asset_href()
data: dict[str, Any] = {"query": "*:*"}
if limit is not None:
data.update({"limit": limit})
response = requests.post(
href,
params=self._client._params(),
headers=self._client._get_headers(),
json=data,
)
self._handle_response(200, "list assets", response)
asset_details = self._handle_response(200, "list assets", response)["results"]
space_values = [
(
m["metadata"]["name"],
m["metadata"]["asset_type"],
m["metadata"]["size"],
m["metadata"]["asset_id"],
)
for m in asset_details
]
table = self._list(
space_values,
["NAME", "ASSET_TYPE", "SIZE", "ASSET_ID"],
limit,
_DEFAULT_LIST_LENGTH,
)
return table
[docs]
def download(
self, asset_id: str | None = None, filename: str = "", **kwargs: Any
) -> str: # asset_id is optional for backward compatibility,
# filename should be not optional, however, as asset_id is, filename also must be
"""Download and store the content of a data asset.
:param asset_id: unique ID of the data asset to be downloaded
:type asset_id: str
:param filename: filename to be used for the downloaded file
:type filename: str
:return: normalized path to the downloaded asset content
:rtype: str
**Example:**
.. code-block:: python
client.data_assets.download(asset_id,"sample_asset.csv")
"""
asset_id = _get_id_from_deprecated_uid(kwargs, asset_id, "asset")
if filename is None:
raise TypeError("Missing required positional argument 'filename'")
content = self.get_content(asset_id)
try:
with open(filename, "wb") as f:
f.write(content)
print(
Messages.get_message(
filename, message_id="successfully_saved_data_asset_content_to_file"
)
)
return os.path.abspath(filename)
except IOError as e:
raise WMLClientError(
Messages.get_message(
filename, message_id="saving_data_asset_to_local_file_failed"
),
e,
)
[docs]
def get_content(
self, asset_id: str | None = None, **kwargs: Any
) -> bytes: # asset_id is optional for backward compatibility
"""Download the content of a data asset.
:param asset_id: unique ID of the data asset to be downloaded
:type asset_id: str
:return: the asset content
:rtype: bytes
**Example:**
.. code-block:: python
content = client.data_assets.get_content(asset_id).decode('ascii')
"""
asset_id = _get_id_from_deprecated_uid(kwargs, asset_id, "asset")
Assets._validate_type(asset_id, "asset_id", str, True)
import urllib
asset_response = requests.get(
self._client.service_instance._href_definitions.get_data_asset_href(
asset_id
),
params=self._client._params(),
headers=self._client._get_headers(),
)
asset_details = self._handle_response(200, "get assets", asset_response)
attachment_id = asset_details["attachments"][0]["id"]
response = requests.get(
self._client.service_instance._href_definitions.get_attachment_href(
asset_id, attachment_id
),
params=self._client._params(),
headers=self._client._get_headers(),
)
if response.status_code == 200:
if (
"connection_id" in asset_details["attachments"][0]
and asset_details["attachments"][0]["connection_id"] is not None
):
conn_details = self._client.connections.get_details(
asset_details["attachments"][0]["connection_id"]
)
attachment_data_source_type = conn_details["entity"].get(
"datasource_type"
)
cos_conn_data_source_id = (
self._client.connections.get_datasource_type_id_by_name(
"cloudobjectstorage"
)
)
if attachment_data_source_type == cos_conn_data_source_id:
attachment_signed_url = response.json()["url"]
att_response = requests.get(attachment_signed_url)
else:
raise WMLClientError(
Messages.get_message(
message_id="download_api_not_supported_for_this_connection_type"
)
)
else:
attachment_signed_url = response.json()["url"]
if self._client.CLOUD_PLATFORM_SPACES:
att_response = requests.get(attachment_signed_url)
else:
att_response = requests.get(
self._credentials.url + attachment_signed_url
)
if att_response.status_code != 200:
raise ApiRequestFailure(
Messages.get_message(
message_id="failure_during_downloading_data_asset"
),
att_response,
)
return att_response.content
else:
raise WMLClientError(
Messages.get_message(message_id="failure_during_downloading_data_asset")
)
[docs]
@staticmethod
def get_id(asset_details: dict) -> str:
"""Get the unique ID of a stored data asset.
:param asset_details: details of the stored data asset
:type asset_details: dict
:return: unique ID of the stored data asset
:rtype: str
**Example:**
.. code-block:: python
asset_id = client.data_assets.get_id(asset_details)
"""
Assets._validate_type(asset_details, "asset_details", object, True)
Assets._validate_type_of_details(asset_details, DATA_ASSETS_DETAILS_TYPE)
return WMLResource._get_required_element_from_dict(
asset_details, "data_assets_details", ["metadata", "guid"]
)
[docs]
@staticmethod
def get_href(asset_details: dict) -> str:
"""Get the URL of a stored data asset.
:param asset_details: details of the stored data asset
:type asset_details: dict
:return: href of the stored data asset
:rtype: str
**Example:**
.. code-block:: python
asset_details = client.data_assets.get_details(asset_id)
asset_href = client.data_assets.get_href(asset_details)
"""
Assets._validate_type(asset_details, "asset_details", object, True)
Assets._validate_type_of_details(asset_details, DATA_ASSETS_DETAILS_TYPE)
return WMLResource._get_required_element_from_dict(
asset_details, "asset_details", ["metadata", "href"]
)
[docs]
def delete(
self, asset_id: str | None = None, **kwargs: Any
) -> dict | str: # asset_id is optional for backward compatibility
"""Delete a stored data asset.
:param asset_id: unique ID of the data asset
:type asset_id: str
:return: status ("SUCCESS" or "FAILED") or dictionary, if deleted asynchronously
:rtype: str or dict
**Example:**
.. code-block:: python
client.data_assets.delete(asset_id)
"""
asset_id = _get_id_from_deprecated_uid(kwargs, asset_id, "asset")
Assets._validate_type(asset_id, "asset_id", str, True)
response = requests.delete(
self._client.service_instance._href_definitions.get_asset_href(asset_id),
params=self._client._params(),
headers=self._client._get_headers(),
)
if response.status_code == 200:
return self._get_required_element_from_response(response.json())
else:
return self._handle_response(204, "delete assets", response)
def _get_required_element_from_response(self, response_data: dict) -> dict:
WMLResource._validate_type(response_data, "data assets response", dict)
import copy
new_el = {"metadata": copy.copy(response_data["metadata"])}
try:
new_el["metadata"]["guid"] = response_data["metadata"]["asset_id"]
new_el["metadata"]["href"] = response_data["href"]
new_el["metadata"]["asset_type"] = response_data["metadata"]["asset_type"]
new_el["metadata"]["created_at"] = response_data["metadata"]["created_at"]
new_el["metadata"]["last_updated_at"] = response_data["metadata"][
"usage"
].get("last_updated_at")
if self._client.default_space_id is not None:
new_el["metadata"]["space_id"] = response_data["metadata"]["space_id"]
elif self._client.default_project_id is not None:
new_el["metadata"]["project_id"] = response_data["metadata"][
"project_id"
]
if "entity" in response_data:
new_el["entity"] = response_data["entity"]
if "attachments" in response_data and response_data["attachments"]:
new_el["metadata"].update(
{"attachment_id": response_data["attachments"][0]["id"]}
)
href_without_host = response_data["href"].split(".com")[-1]
new_el["metadata"].update({"href": href_without_host})
return new_el
except Exception as e:
raise WMLClientError(
Messages.get_message(
response_data,
message_id="failed_to_read_response_from_down_stream_service",
)
)