# -----------------------------------------------------------------------------------------
# (C) Copyright IBM Corp. 2023-2024.
# https://opensource.org/licenses/BSD-3-Clause
# -----------------------------------------------------------------------------------------
from __future__ import annotations
import json
import os
from typing import Generator, TYPE_CHECKING
from requests import Response
from ibm_watsonx_ai._wrappers import requests
from ibm_watsonx_ai.metanames import PkgExtnMetaNames
from ibm_watsonx_ai.utils import PKG_EXTN_DETAILS_TYPE
from ibm_watsonx_ai.wml_client_error import (
WMLClientError,
ApiRequestFailure,
ResourceIdByNameNotFound,
)
from ibm_watsonx_ai.wml_resource import WMLResource
if TYPE_CHECKING:
from pandas import DataFrame
from ibm_watsonx_ai import APIClient
_DEFAULT_LIST_LENGTH = 50
[docs]
class PkgExtn(WMLResource):
"""Store and manage software Packages Extension specs."""
ConfigurationMetaNames = PkgExtnMetaNames()
"""MetaNames for Package Extensions creation."""
def __init__(self, client: APIClient):
WMLResource.__init__(self, __name__, client)
[docs]
def get_details(self, pkg_extn_id: str) -> dict:
"""Get package extensions details.
:param pkg_extn_id: unique ID of the package extension
:type pkg_extn_id: str
:return: details of the package extension
:rtype: dict
**Example:**
.. code-block:: python
pkg_extn_details = client.pkg_extn.get_details(pkg_extn_id)
"""
PkgExtn._validate_type(pkg_extn_id, "pkg_extn_id", str, True)
response = requests.get(
self._client.service_instance._href_definitions.get_pkg_extn_href(
pkg_extn_id
),
params=self._client._params(),
headers=self._client._get_headers(),
)
if response.status_code == 200:
return self._get_required_element_from_response(
self._handle_response(200, "get hw spec details", response)
)
else:
return self._handle_response(200, "get hw spec details", response)
[docs]
def store(self, meta_props: dict, file_path: str) -> dict:
"""Create a package extension.
:param meta_props: metadata of the package extension. To see available meta names, use:
.. code-block:: python
client.package_extensions.ConfigurationMetaNames.get()
:type meta_props: dict
:param file_path: path to the file to be uploaded as a package extension
:type file_path: str
:return: metadata of the package extension
:rtype: dict
**Example:**
.. code-block:: python
meta_props = {
client.package_extensions.ConfigurationMetaNames.NAME: "skl_pipeline_heart_problem_prediction",
client.package_extensions.ConfigurationMetaNames.DESCRIPTION: "description scikit-learn_0.20",
client.package_extensions.ConfigurationMetaNames.TYPE: "conda_yml"
}
pkg_extn_details = client.package_extensions.store(meta_props=meta_props, file_path="/path/to/file")
"""
# quick support for COS credentials instead of local path
# TODO add error handling and cleaning (remove the file)
PkgExtn._validate_type(meta_props, "meta_props", dict, True)
pkg_extn_meta = self.ConfigurationMetaNames._generate_resource_metadata(
meta_props, with_validation=True, client=self._client
)
pkg_extn_meta_json = json.dumps(pkg_extn_meta)
PkgExtn._validate_type(file_path, "file_path", str, True)
# Step1 : Create an asset
print("Creating package extensions")
href = self._client.service_instance._href_definitions.get_pkg_extns_href()
creation_response = requests.post(
href,
params=self._client._params(),
headers=self._client._get_headers(),
data=pkg_extn_meta_json,
)
pkg_extn_details = self._handle_response(
201, "creating new package_extensions", creation_response
)
# Step2: upload pkg extension file to presigned url
if creation_response.status_code == 201:
pkg_extn_asset_id = pkg_extn_details["metadata"]["asset_id"]
pkg_extn_presigned_url = pkg_extn_details["entity"]["package_extension"][
"href"
]
FILE_SIZE_LIMIT = int(0.3 * 1073741824)
def read_in_chunks(
chunk_size: int | None = FILE_SIZE_LIMIT,
) -> Generator[bytes, None, None]:
with open(file_path, "rb") as file_object:
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
def send_multipart_request(file: str, url: str) -> Response:
content_path = os.path.abspath(file)
content_size = os.stat(content_path).st_size
index = 0
headers = {}
try:
for chunk in read_in_chunks(FILE_SIZE_LIMIT):
offset = index + len(chunk)
headers["Content-Range"] = "bytes %s-%s/%s" % (
index,
offset - 1,
content_size,
)
index = offset
response = requests.put(
url,
files={"file": (file, chunk, "application/octet-stream")},
)
except Exception as e:
deletion_response = requests.delete(
self._client.service_instance._href_definitions.get_pkg_extn_href(
pkg_extn_asset_id
),
params=self._client._params(),
headers=self._client._get_headers(),
)
print(deletion_response.status_code)
raise WMLClientError("Failed while reading a file.", e)
return response
# chunking, when finally it will work on service side. to confirm it's working, the downloaded file needs to be the same as 400MB+ upladed file
#
# put_response = send_multipart_request(file_path,
# (self._client.service_instance._wml_credentials.url if self._client.ICP_PLATFORM_SPACES else "") + pkg_extn_presigned_url)
href = (
self._client.service_instance._credentials.url
if self._client.ICP_PLATFORM_SPACES
else ""
) + pkg_extn_presigned_url
try:
if os.stat(file_path).st_size == 0:
raise WMLClientError("Package extension file cannot be empty")
with open(file_path, "rb") as file_object:
if not self._client.ICP_PLATFORM_SPACES:
put_response = requests.put(href, data=file_object.read())
else:
put_response = requests.put(
href,
files={
"file": (
file_path,
file_object.read(),
"application/octet-stream",
)
},
)
except Exception as e:
deletion_response = requests.delete(
self._client.service_instance._href_definitions.get_pkg_extn_href(
pkg_extn_asset_id
),
params=self._client._params(),
headers=self._client._get_headers(),
)
print(deletion_response.status_code)
raise WMLClientError("Failed while reading a file.", e)
if put_response.status_code == 201 or put_response.status_code == 200:
# Step3: Mark the upload complete
complete_response = requests.post(
self._client.service_instance._href_definitions.get_pkg_extn_href(
pkg_extn_asset_id
)
+ "/upload_complete",
headers=self._client._get_headers(),
params=self._client._params(),
)
if complete_response.status_code == 204:
print("SUCCESS")
return self._get_required_element_from_response(pkg_extn_details)
else:
try:
self.delete(pkg_extn_asset_id)
except:
pass
raise WMLClientError(
"Failed while creating a package extensions "
+ complete_response.text
)
else:
try:
self.delete(pkg_extn_asset_id)
except:
pass
raise WMLClientError(
"Failed while creating a package extensions " + put_response.text
)
else:
raise WMLClientError(
"Failed while creating a package extensions " + creation_response.text
)
[docs]
def list(self) -> DataFrame:
"""List the package extensions in a table format.
:return: pandas.DataFrame with listed package extensions
:rtype: pandas.DataFrame
.. code-block:: python
client.package_extensions.list()
"""
href = self._client.service_instance._href_definitions.get_pkg_extns_href()
response = requests.get(
href, params=self._client._params(), headers=self._client._get_headers()
)
self._handle_response(200, "list pkg_extn", response)
asset_details = self._handle_response(200, "list assets", response)["resources"]
pkg_extn_values = [
(
m["metadata"]["name"],
m["metadata"]["asset_id"],
m["entity"]["package_extension"]["type"],
m["metadata"]["created_at"],
)
for m in asset_details
]
table = self._list(
pkg_extn_values,
["NAME", "ASSET_ID", "TYPE", "CREATED_AT"],
None,
_DEFAULT_LIST_LENGTH,
)
return table
[docs]
@staticmethod
def get_id(pkg_extn_details: dict) -> str:
"""Get the unique ID of a package extension.
:param pkg_extn_details: details of the package extension
:type pkg_extn_details: dict
:return: unique ID of the package extension
:rtype: str
**Example:**
.. code-block:: python
asset_id = client.package_extensions.get_id(pkg_extn_details)
"""
PkgExtn._validate_type(pkg_extn_details, "pkg_extn_details", object, True)
PkgExtn._validate_type_of_details(pkg_extn_details, PKG_EXTN_DETAILS_TYPE)
return WMLResource._get_required_element_from_dict(
pkg_extn_details, "pkg_extn_details", ["metadata", "asset_id"]
)
[docs]
def get_id_by_name(self, pkg_extn_name: str) -> str:
"""Get the ID of a package extension.
:param pkg_extn_name: name of the package extension
:type pkg_extn_name: str
:return: unique ID of the package extension
:rtype: str
**Example:**
.. code-block:: python
asset_id = client.package_extensions.get_id_by_name(pkg_extn_name)
"""
PkgExtn._validate_type(pkg_extn_name, "pkg_extn_name", str, True)
parameters = self._client._params()
parameters.update(name=pkg_extn_name)
response = requests.get(
self._client.service_instance._href_definitions.get_pkg_extns_href(),
params=parameters,
headers=self._client._get_headers(),
)
total_values = self._handle_response(200, "get pkg extn", response)[
"total_results"
]
if total_values != 0:
pkg_extn_details = self._handle_response(200, "get pkg extn", response)[
"resources"
]
return pkg_extn_details[0]["metadata"]["asset_id"]
else:
raise ResourceIdByNameNotFound(pkg_extn_name, "package extension")
[docs]
@staticmethod
def get_href(pkg_extn_details: dict) -> str:
"""Get the URL of a stored package extension.
:param pkg_extn_details: details of the package extension
:type pkg_extn_details: dict
:return: href of the package extension
:rtype: str
**Example:**
.. code-block:: python
pkg_extn_details = client.package_extensions.get_details(pkg_extn_id)
pkg_extn_href = client.package_extensions.get_href(pkg_extn_details)
"""
PkgExtn._validate_type(pkg_extn_details, "pkg_extn_details", object, True)
PkgExtn._validate_type_of_details(pkg_extn_details, PKG_EXTN_DETAILS_TYPE)
return WMLResource._get_required_element_from_dict(
pkg_extn_details,
"pkg_extn_details",
["entity", "package_extension", "href"],
)
[docs]
def delete(self, pkg_extn_id: str) -> dict | str:
"""Delete a package extension.
:param pkg_extn_id: unique ID of the package extension
:type pkg_extn_id: str
:return: status ("SUCCESS" or "FAILED") if deleted synchronously or dictionary with response
:rtype: str or dict
**Example:**
.. code-block:: python
client.package_extensions.delete(pkg_extn_id)
"""
PkgExtn._validate_type(pkg_extn_id, "pkg_extn_id", str, True)
response = requests.delete(
self._client.service_instance._href_definitions.get_pkg_extn_href(
pkg_extn_id
),
params=self._client._params(),
headers=self._client._get_headers(),
)
if response.status_code == 200:
return self._get_required_element_from_response(response.json())
else:
return self._handle_response(204, "delete pkg extn specification", response)
def _get_required_element_from_response(self, response_data: dict) -> dict:
WMLResource._validate_type(response_data, "pkg_extn_response", dict)
if self._client.default_space_id is not None:
new_el = {
"metadata": {
"space_id": response_data["metadata"]["space_id"],
"name": response_data["metadata"]["name"],
"asset_id": response_data["metadata"]["asset_id"],
"asset_type": response_data["metadata"]["asset_type"],
"created_at": response_data["metadata"]["created_at"],
#'updated_at': response_data['metadata']['updated_at']
},
"entity": response_data["entity"],
}
elif self._client.default_project_id is not None:
new_el = {
"metadata": {
"project_id": response_data["metadata"]["project_id"],
"name": response_data["metadata"]["name"],
"asset_id": response_data["metadata"]["asset_id"],
"asset_type": response_data["metadata"]["asset_type"],
"created_at": response_data["metadata"]["created_at"],
},
"entity": response_data["entity"],
}
if "href" in response_data["metadata"]:
href_without_host = response_data["metadata"]["href"].split(".com")[-1]
new_el["metadata"].update({"href": href_without_host})
return new_el
[docs]
def download(self, pkg_extn_id: str, filename: str) -> str:
"""Download a package extension.
:param pkg_extn_id: unique ID of the package extension to be downloaded
:type pkg_extn_id: str
:param filename: filename to be used for the downloaded file
:type filename: str
:return: path to the downloaded package extension content
:rtype: str
**Example:**
.. code-block:: python
client.package_extensions.download(pkg_extn_id,"sample_conda.yml/custom_library.zip")
"""
PkgExtn._validate_type(pkg_extn_id, "pkg_extn_id", str, True)
pkg_extn_response = requests.get(
self._client.service_instance._href_definitions.get_pkg_extn_href(
pkg_extn_id
),
params=self._client._params(),
headers=self._client._get_headers(),
)
pkg_extn_details = self._handle_response(200, "get assets", pkg_extn_response)
artifact_content_url = pkg_extn_details["entity"]["package_extension"]["href"]
if pkg_extn_response.status_code == 200:
if self._client.CLOUD_PLATFORM_SPACES:
att_response = requests.get(artifact_content_url)
else:
att_response = requests.get(
self._credentials.url + artifact_content_url
)
if att_response.status_code != 200:
raise ApiRequestFailure(
"Failure during {}.".format("downloading package extension"),
att_response,
)
downloaded_asset = att_response.content
try:
with open(filename, "wb") as f:
f.write(downloaded_asset)
print(
"Successfully saved package extension content to file: '{}'".format(
filename
)
)
return os.getcwd() + "/" + filename
except IOError as e:
raise WMLClientError(
"Saving asset with artifact_url: '{}' failed.".format(filename),
e,
)
else:
raise WMLClientError(
"Failed while downloading the package extension " + pkg_extn_id
)