# -----------------------------------------------------------------------------------------
# (C) Copyright IBM Corp. 2020-2024.
# https://opensource.org/licenses/BSD-3-Clause
# -----------------------------------------------------------------------------------------
from __future__ import print_function
import json
import os
import ibm_watson_machine_learning._wrappers.requests as requests
from ibm_watson_machine_learning.metanames import PkgExtnMetaNames
from ibm_watson_machine_learning.utils import PKG_EXTN_DETAILS_TYPE
from ibm_watson_machine_learning.wml_client_error import WMLClientError, ApiRequestFailure
from ibm_watson_machine_learning.wml_resource import WMLResource
_DEFAULT_LIST_LENGTH = 50
[docs]
class PkgExtn(WMLResource):
"""Store and manage software Packages Extension specs."""
ConfigurationMetaNames = PkgExtnMetaNames()
"""MetaNames for Package Extensions creation."""
def __init__(self, client):
WMLResource.__init__(self, __name__, client)
self._ICP = client.ICP
[docs]
def get_details(self, pkg_extn_id):
"""Get package extensions details.
:param pkg_extn_id: Unique Id of package extension
:type pkg_extn_id: str
:return: details of the package extensions
:rtype: dict
**Example**
.. code-block:: python
pkg_extn_details = client.pkg_extn.get_details(pkg_extn_id)
"""
PkgExtn._validate_type(pkg_extn_id, u'pkg_extn_id', str, True)
response = requests.get(self._client.service_instance._href_definitions.get_pkg_extn_href(pkg_extn_id),
params=self._client._params(),
headers=self._client._get_headers())
if response.status_code == 200:
return self._get_required_element_from_response(self._handle_response(200, u'get hw spec details', response))
else:
return self._handle_response(200, u'get hw spec details', response)
[docs]
def store(self, meta_props, file_path):
"""Create a package extensions.
:param meta_props: meta data of the package extension. To see available meta names use:
.. code-block:: python
client.package_extensions.ConfigurationMetaNames.get()
:type meta_props: dict
:param file_path: path to file which will be uploaded as package extension
:type file_path: str
:return: metadata of the package extensions
:rtype: dict
**Example**
.. code-block:: python
meta_props = {
client.package_extensions.ConfigurationMetaNames.NAME: "skl_pipeline_heart_problem_prediction",
client.package_extensions.ConfigurationMetaNames.DESCRIPTION: "description scikit-learn_0.20",
client.package_extensions.ConfigurationMetaNames.TYPE: "conda_yml"
}
pkg_extn_details = client.package_extensions.store(meta_props=meta_props, file_path="/path/to/file")
"""
WMLResource._chk_and_block_create_update_for_python36(self)
# quick support for COS credentials instead of local path
# TODO add error handling and cleaning (remove the file)
PkgExtn._validate_type(meta_props, u'meta_props', dict, True)
pkg_extn_meta = self.ConfigurationMetaNames._generate_resource_metadata(
meta_props,
with_validation=True,
client=self._client)
pkg_extn_meta_json = json.dumps(pkg_extn_meta)
PkgExtn._validate_type(file_path, u'file_path', str, True)
#Step1 : Create an asset
print("Creating package extensions")
href = self._client.service_instance._href_definitions.get_pkg_extns_href()
creation_response = requests.post(href, params=self._client._params(), headers=self._client._get_headers(), data=pkg_extn_meta_json)
pkg_extn_details = self._handle_response(201, u'creating new package_extensions', creation_response)
# Step2: upload pkg extension file to presigned url
if creation_response.status_code == 201:
pkg_extn_asset_id = pkg_extn_details["metadata"]["asset_id"]
pkg_extn_presigned_url = pkg_extn_details["entity"]["package_extension"]["href"]
FILE_SIZE_LIMIT = int(0.3 * 1073741824)
def read_in_chunks(chunk_size=FILE_SIZE_LIMIT):
with open(file_path, 'rb') as file_object:
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
def send_multipart_request(file, url):
content_path = os.path.abspath(file)
content_size = os.stat(content_path).st_size
index = 0
headers = {}
try:
for chunk in read_in_chunks(FILE_SIZE_LIMIT):
offset = index + len(chunk)
headers['Content-Range'] = 'bytes %s-%s/%s' % (index, offset - 1, content_size)
index = offset
response = requests.put(
url,
files={"file": (file, chunk, 'application/octet-stream')}
)
except Exception as e:
deletion_response = requests.delete(
self._client.service_instance._href_definitions.get_pkg_extn_href(pkg_extn_asset_id),
params=self._client._params(),
headers=self._client._get_headers()
)
print(deletion_response.status_code)
raise WMLClientError("Failed while reading a file.", e)
return response
# chunking, when finally it will work on service side. to confirm it's working, the downloaded file needs to be the same as 400MB+ upladed file
#
# put_response = send_multipart_request(file_path,
# (self._client.service_instance._wml_credentials['url'] if self._client.ICP else "") + pkg_extn_presigned_url)
href = (self._client.service_instance._wml_credentials['url'] if self._client.ICP else "") + pkg_extn_presigned_url
# if pkg_extn_details["entity"]["package_extension"]["type"] in ["conda_yml", "pip_zip"]:
try:
if os.stat(file_path).st_size == 0:
raise WMLClientError('Package extension file cannot be empty')
with open(file_path, 'rb') as file_object:
if not self._client.ICP:
put_response = requests.put(
href,
data=file_object.read()
)
else:
put_response = requests.put(
href,
files={'file': (file_path, file_object.read(), 'application/octet-stream')}
)
except Exception as e:
deletion_response = requests.delete(
self._client.service_instance._href_definitions.get_pkg_extn_href(pkg_extn_asset_id),
params=self._client._params(),
headers=self._client._get_headers()
)
print(deletion_response.status_code)
raise WMLClientError("Failed while reading a file.", e)
if put_response.status_code == 201 or put_response.status_code == 200:
# Step3: Mark the upload complete
complete_response = requests.post(
self._client.service_instance._href_definitions.get_pkg_extn_href(pkg_extn_asset_id) + "/upload_complete",
headers=self._client._get_headers(),
params=self._client._params()
)
if complete_response.status_code == 204:
print("SUCCESS")
return self._get_required_element_from_response(pkg_extn_details)
else:
#print(complete_response.text) # remove print later
self._delete(pkg_extn_asset_id)
raise WMLClientError("Failed while creating a package extensions " + complete_response.text)
else:
self._delete(pkg_extn_asset_id)
raise WMLClientError("Failed while creating a package extensions " + put_response.text)
else:
raise WMLClientError("Failed while creating a package extensions " + creation_response.text)
[docs]
def list(self, return_as_df=True):
"""List package extensions in a table format.
:param return_as_df: determinate if table should be returned as pandas.DataFrame object, default: True
:type return_as_df: bool, optional
:return: pandas.DataFrame with listed package extensions or None if return_as_df is False
:rtype: pandas.DataFrame or None
.. code-block:: python
client.package_extensions.list()
"""
href = self._client.service_instance._href_definitions.get_pkg_extns_href()
response = requests.get(href, params=self._client._params(), headers=self._client._get_headers())
self._handle_response(200, u'list pkg_extn', response)
asset_details = self._handle_response(200, u'list assets', response)["resources"]
pkg_extn_values = [
(m[u'metadata'][u'name'],
m[u'metadata'][u'asset_id'],
m[u'entity'][u'package_extension'][u'type'],
m[u'metadata'][u'created_at']) for
m in asset_details]
table = self._list(pkg_extn_values, [u'NAME', u'ASSET_ID', u'TYPE', u'CREATED_AT'], None, _DEFAULT_LIST_LENGTH)
if return_as_df:
return table
[docs]
@staticmethod
def get_uid(pkg_extn_details):
"""Get Unique Id of package extensions.
*Deprecated:* Use ``get_id(pkg_extn_details)`` instead.
:param pkg_extn_details: details of the package extensions
:type pkg_extn_details: dict
:return: Unique Id of package extension
:rtype: str
**Example**
.. code-block:: python
asset_uid = client.package_extensions.get_uid(pkg_extn_details)
"""
PkgExtn._validate_type(pkg_extn_details, u'pkg_extn_details', object, True)
#print(pkg_extn_details)
PkgExtn._validate_type_of_details(pkg_extn_details, PKG_EXTN_DETAILS_TYPE)
return WMLResource._get_required_element_from_dict(pkg_extn_details, u'pkg_extn_details',
[u'metadata', u'asset_id'])
[docs]
@staticmethod
def get_id(pkg_extn_details):
"""Get Unique Id of package extensions.
:param pkg_extn_details: details of the package extensions
:type pkg_extn_details: dict
:return: Unique Id of package extension
:rtype: str
**Example**
.. code-block:: python
asset_id = client.package_extensions.get_id(pkg_extn_details)
"""
return PkgExtn.get_uid(pkg_extn_details)
[docs]
def get_uid_by_name(self, pkg_extn_name):
"""Get UID of package extensions.
*Deprecated:* Use ``get_id_by_name(pkg_extn_name)`` instead.
:param pkg_extn_name: name of the package extension
:type pkg_extn_name: str
:return: Unique Id of package extension
:rtype: str
**Example**
.. code-block:: python
asset_uid = client.package_extensions.get_uid_by_name(pkg_extn_name)
"""
PkgExtn._validate_type(pkg_extn_name, u'pkg_extn_name', str, True)
parameters = self._client._params()
parameters.update(name=pkg_extn_name)
response = requests.get(self._client.service_instance._href_definitions.get_pkg_extns_href(),
params=parameters,
headers=self._client._get_headers())
if response.status_code == 200:
total_values = self._handle_response(200, u'get pkg extn', response)["total_results"]
if total_values != 0:
pkg_extn_details = self._handle_response(200, u'get pkg extn', response)["resources"]
return pkg_extn_details[0][u'metadata'][u'asset_id']
else:
return "Not Found"
[docs]
def get_id_by_name(self, pkg_extn_name):
"""Get ID of package extensions.
:param pkg_extn_name: name of the package extension
:type pkg_extn_name: str
:return: Unique Id of package extension
:rtype: str
**Example**
.. code-block:: python
asset_id = client.package_extensions.get_id_by_name(pkg_extn_name)
"""
return PkgExtn.get_uid_by_name(self, pkg_extn_name)
[docs]
@staticmethod
def get_href(pkg_extn_details):
"""Get url of stored package extensions.
:param pkg_extn_details: details of the package extensions
:type pkg_extn_details: dict
:return: href of package extension
:rtype: str
**Example**
.. code-block:: python
pkg_extn_details = client.package_extensions.get_details(pkg_extn_uid)
pkg_extn_href = client.package_extensions.get_href(pkg_extn_details)
"""
PkgExtn._validate_type(pkg_extn_details, u'pkg_extn_details', object, True)
PkgExtn._validate_type_of_details(pkg_extn_details, PKG_EXTN_DETAILS_TYPE)
return WMLResource._get_required_element_from_dict(pkg_extn_details,
u'pkg_extn_details',
[u'entity', u'package_extension', u'href'])
[docs]
def delete(self, pkg_extn_id):
"""Delete a package extension.
:param pkg_extn_id: Unique Id of package extension
:type pkg_extn_id: str
:return: status ("SUCCESS" or "FAILED")
:rtype: str
**Example**
.. code-block:: python
client.package_extensions.delete(pkg_extn_id)
"""
PkgExtn._validate_type(pkg_extn_id, u'pkg_extn_uid', str, True)
response = requests.delete(self._client.service_instance._href_definitions.get_pkg_extn_href(pkg_extn_id),
params=self._client._params(),
headers=self._client._get_headers())
if response.status_code == 200:
return self._get_required_element_from_response(response.json())
else:
return self._handle_response(204, u'delete pkg extn specification', response)
def _delete(self, pkg_extn_uid):
PkgExtn._validate_type(pkg_extn_uid, u'pkg_extn_uid', str, True)
response = requests.delete(self._client.service_instance._href_definitions.get_pkg_extn_href(pkg_extn_uid), params=self._client._params(),
headers=self._client._get_headers())
def _get_required_element_from_response(self, response_data):
WMLResource._validate_type(response_data, u'pkg_extn_response', dict)
try:
if self._client.default_space_id is not None:
new_el = {'metadata': {'space_id': response_data['metadata']['space_id'],
'name': response_data['metadata']['name'],
'asset_id': response_data['metadata']['asset_id'],
'asset_type': response_data['metadata']['asset_type'],
'created_at': response_data['metadata']['created_at']
#'updated_at': response_data['metadata']['updated_at']
},
'entity': response_data['entity']
}
elif self._client.default_project_id is not None:
if self._client.WSD:
href = self._client.service_instance._href_definitions.get_base_asset_href(response_data['metadata']['asset_id']) + "?" + "project_id=" + response_data['metadata']['project_id']
new_el = {'metadata': {'project_id': response_data['metadata']['project_id'],
'name': response_data['metadata']['name'],
'asset_id': response_data['metadata']['asset_id'],
'asset_type': response_data['metadata']['asset_type'],
'created_at': response_data['metadata']['created_at']
},
'entity': response_data['entity']
}
else:
new_el = {'metadata': {'project_id': response_data['metadata']['project_id'],
'name': response_data['metadata']['name'],
'asset_id': response_data['metadata']['asset_id'],
'asset_type': response_data['metadata']['asset_type'],
'created_at': response_data['metadata']['created_at']
},
'entity': response_data['entity']
}
if (self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES) and 'href' in response_data['metadata']:
href_without_host = response_data['metadata']['href'].split('.com')[-1]
new_el[u'metadata'].update({'href': href_without_host})
return new_el
except Exception as e:
raise WMLClientError("Failed to read Response from down-stream service: " + response_data.text)
[docs]
def download(self, pkg_extn_id, filename):
"""Download a package extension.
:param pkg_extn_id: Unique Id of the package extension to be downloaded
:type pkg_extn_id: str
:param filename: filename to be used for the downloaded file
:type filename: str
:return: path to the downloaded package extension content
:rtype: str
**Example**
.. code-block:: python
client.package_extensions.download(pkg_extn_id,"sample_conda.yml/custom_library.zip")
"""
PkgExtn._validate_type(pkg_extn_id, u'pkg_extn_id', str, True)
if self._WSD:
pkg_extn_response = requests.get(self._client.service_instance._href_definitions.get_pkg_extn_href(pkg_extn_id),
params=self._client._params(),
headers=self._client._get_headers())
pkg_extn_details = self._handle_response(200, u'get assets', pkg_extn_response)
artifact_content_url = pkg_extn_details['entity']['package_extension']['href']
r = requests.get(artifact_content_url, params=self._client._params(), headers=self._client._get_headers(),
stream=True)
if r.status_code != 200:
raise ApiRequestFailure(u'Failure during {}.'.format("downloading model"), r)
downloaded_asset = r.content
try:
with open(filename, 'wb') as f:
f.write(downloaded_asset)
print(u'Successfully saved asset content to file: \'{}\''.format(filename))
return os.getcwd() + "/" + filename
except IOError as e:
raise WMLClientError(u'Saving asset with artifact_url: \'{}\' failed.'.format(filename), e)
else:
pkg_extn_response = requests.get(self._client.service_instance._href_definitions.get_pkg_extn_href(pkg_extn_id),
params=self._client._params(),
headers=self._client._get_headers())
pkg_extn_details = self._handle_response(200, u'get assets', pkg_extn_response)
artifact_content_url = pkg_extn_details['entity']['package_extension']['href']
if pkg_extn_response.status_code == 200:
if not self._ICP:
# att_response = requests.get(self._wml_credentials["url"]+artifact_content_url)
att_response = requests.get(artifact_content_url)
else:
att_response = requests.get(self._wml_credentials["url"]+artifact_content_url)
if att_response.status_code != 200:
raise ApiRequestFailure(u'Failure during {}.'.format("downloading package extension"),
att_response)
downloaded_asset = att_response.content
try:
with open(filename, 'wb') as f:
f.write(downloaded_asset)
print(u'Successfully saved package extension content to file: \'{}\''.format(filename))
return os.getcwd() + "/" + filename
except IOError as e:
raise WMLClientError(u'Saving asset with artifact_url: \'{}\' failed.'.format(filename), e)
else:
raise WMLClientError("Failed while downloading the package extension "+ pkg_extn_id)