# -----------------------------------------------------------------------------------------
# (C) Copyright IBM Corp. 2023-2025.
# https://opensource.org/licenses/BSD-3-Clause
# -----------------------------------------------------------------------------------------
from __future__ import annotations
import shlex
import subprocess
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, cast
from warnings import warn
import ibm_watsonx_ai._wrappers.requests as requests
from ibm_watsonx_ai.metanames import VolumeMetaNames
from ibm_watsonx_ai.utils.utils import raise_exception_about_unsupported_on_cloud
from ibm_watsonx_ai.wml_client_error import UnsupportedOperation, WMLClientError
from ibm_watsonx_ai.wml_resource import WMLResource
if TYPE_CHECKING:
import pandas
from ibm_watsonx_ai import APIClient
[docs]
class Volume(WMLResource):
"""Store and manage volume assets."""
ConfigurationMetaNames = VolumeMetaNames()
"""MetaNames for volume assets creation."""
def __init__(self, client: APIClient) -> None:
WMLResource.__init__(self, __name__, client)
[docs]
@raise_exception_about_unsupported_on_cloud
def get_details(self, volume_id: str) -> dict:
"""Get volume details.
:param volume_id: Unique ID of the volume
:type volume_id: str
:return: metadata of the volume details
:rtype: dict
**Example:**
.. code-block:: python
volume_details = client.volumes.get_details(volume_id)
"""
Volume._validate_type(volume_id, "volume_id", str, True)
response = requests.get(
self._client._href_definitions.volume_href(volume_id),
headers=self._client._get_headers(zen=True),
)
if response.status_code == 200:
return response.json()
else:
warn(f"{response.status_code} {response.text}")
raise WMLClientError("Failed to Get the volume details. Try again.")
[docs]
@raise_exception_about_unsupported_on_cloud
def create(self, meta_props: dict[str, Any]) -> dict:
"""Create a volume asset.
:param meta_props: metadata of the volume asset
:type meta_props: dict
:return: metadata of the created volume details
:rtype: dict
**Examples**
Provision new PVC volume:
.. code-block:: python
metadata = {
client.volumes.ConfigurationMetaNames.NAME: 'volume-for-wml-test',
client.volumes.ConfigurationMetaNames.NAMESPACE: 'wmldev2',
client.volumes.ConfigurationMetaNames.STORAGE_CLASS: 'nfs-client'
client.volumes.ConfigurationMetaNames.STORAGE_SIZE: "2G"
}
asset_details = client.volumes.store(meta_props=metadata)
Provision an existing PVC volume:
.. code-block:: python
metadata = {
client.volumes.ConfigurationMetaNames.NAME: "volume-for-wml-test",
client.volumes.ConfigurationMetaNames.NAMESPACE: "wmldev2",
client.volumes.ConfigurationMetaNames.EXISTING_PVC_NAME: "volume-for-wml-test",
}
asset_details = client.volumes.store(meta_props=metadata)
"""
create_meta = {}
if (
self.ConfigurationMetaNames.EXISTING_PVC_NAME in meta_props
and meta_props[self.ConfigurationMetaNames.EXISTING_PVC_NAME] is not None
):
if (
self.ConfigurationMetaNames.STORAGE_CLASS in meta_props
and meta_props[self.ConfigurationMetaNames.STORAGE_CLASS] is not None
):
raise WMLClientError(
"Failed while creating volume. Either provide EXISTING_PVC_NAME to create a volume using existing volume or"
"provide STORAGE_CLASS and STORAGE_SIZE for new volume creation"
)
else:
create_meta.update(
{
"existing_pvc_name": meta_props[
self.ConfigurationMetaNames.EXISTING_PVC_NAME
]
}
)
else:
if (
self.ConfigurationMetaNames.STORAGE_CLASS in meta_props
and meta_props[self.ConfigurationMetaNames.STORAGE_CLASS] is not None
):
if (
self.ConfigurationMetaNames.STORAGE_SIZE in meta_props
and meta_props[self.ConfigurationMetaNames.STORAGE_SIZE] is not None
):
create_meta.update(
{
"storageClass": meta_props[
self.ConfigurationMetaNames.STORAGE_CLASS
]
}
)
create_meta.update(
{
"storageSize": meta_props[
self.ConfigurationMetaNames.STORAGE_SIZE
]
}
)
else:
raise WMLClientError(
"Failed to create volume. Missing input STORAGE_SIZE"
)
if (
self.ConfigurationMetaNames.EXISTING_PVC_NAME in meta_props
and meta_props[self.ConfigurationMetaNames.EXISTING_PVC_NAME] is not None
):
input_meta = {
"addon_type": "volumes",
"addon_version": "-",
"create_arguments": {"metadata": create_meta},
"namespace": meta_props[self.ConfigurationMetaNames.NAMESPACE],
"display_name": meta_props[self.ConfigurationMetaNames.NAME],
}
else:
input_meta = {
"addon_type": "volumes",
"addon_version": "-",
"create_arguments": {"metadata": create_meta},
"namespace": meta_props[self.ConfigurationMetaNames.NAMESPACE],
"display_name": meta_props[self.ConfigurationMetaNames.NAME],
}
if self._client.CLOUD_PLATFORM_SPACES: # CLOUD
raise UnsupportedOperation("NFS Volume creation not supported for CLOUD!")
else: # CPD
creation_response = requests.post(
url=self._client._href_definitions.volumes_href(),
headers=self._client._get_headers(zen=True),
json=input_meta,
)
if creation_response.status_code == 200:
volume_id_details = (
creation_response.json()
) # messy details returned for backward compatibility
import copy
volume_details = copy.deepcopy(input_meta)
volume_details.update(volume_id_details)
actual_details = self.get_details(self.get_id(volume_id_details))
volume_details.update(actual_details)
return volume_details
else:
raise WMLClientError(
f"Failed to create a volume with message:\n"
f"{creation_response.text}\n"
f"and status_code:{creation_response.status_code}\n"
)
[docs]
@raise_exception_about_unsupported_on_cloud
def start(
self, name: str, wait_for_available: bool = False
) -> Literal["SUCCESS", "FAILED"]:
"""Start the volume service.
:param name: unique name of the volume to be started
:type name: str
:param wait_for_available: flag indicating if method should wait until volume service is available
:type wait_for_available: bool
:return: status ("SUCCESS" or "FAILED")
:rtype: str
**Example:**
.. code-block:: python
client.volumes.start(volume_name)
"""
if "::" not in name:
raise WMLClientError(
"Invalid name to start volume. Correct volume name format: `<namespace>::<name>`. Retrieve the correct name using `client.volumes.get_name(client.volumes.get_details(volume_id))` command."
)
start_url = self._client._href_definitions.volume_service_href(name)
# Start the volume service
start_data: dict = {}
try:
start_data = {}
creation_response = requests.post(
start_url, headers=self._client._get_headers(zen=True), json=start_data
)
if creation_response.status_code == 200:
print("Volume Service started")
if wait_for_available:
retries = 0
volume_status = False
while True and retries < 60 and not volume_status:
volume_status = self.get_volume_status(name)
time.sleep(5)
retries += 1
if volume_status:
return "SUCCESS"
volume_service_not_started_warning = (
"Volume Service has been started, but it is not available yet. "
"Check volume availability using get_volume_status method."
)
warn(volume_service_not_started_warning)
return "FAILED"
elif creation_response.status_code == 500:
failed_to_start_volume_warning = (
"Failed to start the volume. "
"Make sure volume is in running with status RUNNING or UNKNOWN and then re-try"
)
warn(failed_to_start_volume_warning)
return "FAILED"
else:
warn(f"{creation_response.status_code} {creation_response.text}")
raise WMLClientError("Failed to start the file to volume. Try again.")
except Exception as e:
warn(f"Exception: {e}")
raise WMLClientError("Failed to start the file to volume. Try again.")
[docs]
@raise_exception_about_unsupported_on_cloud
def get_volume_status(self, name: str) -> bool:
"""Monitor a volume's file server status.
:param name: name of the volume to retrieve status for
:type name: str
:return: status of the volume (True if volume is available, otherwise False)
:rtype: bool
**Example:**
.. code-block:: python
client.volumes.get_volume_status(volume_name)
"""
if "::" not in name:
raise WMLClientError(
"Invalid name to start volume. Correct volume name format: `<namespace>::<name>`. Retrieve the correct name using `client.volumes.get_name(client.volumes.get_details(volume_id))` command."
)
monitor_url = self._client._href_definitions.volume_monitor_href(name)
try:
monitor_response = requests.get(
monitor_url, headers=self._client._get_headers(zen=True)
)
if monitor_response.status_code == 200:
return True
elif monitor_response.status_code == 502:
return False
else:
warn(f"{monitor_response.status_code} {monitor_response.text}")
raise WMLClientError("Cannot retrieve status of the volume.")
except Exception as e:
warn(f"Exception: {e}")
raise WMLClientError("Cannot retrieve status of the volume.")
[docs]
@raise_exception_about_unsupported_on_cloud
def upload_file(
self, name: str, file_path: str | Path
) -> Literal["SUCCESS", "FAILED"]:
"""Upload the data file into stored volume.
:param name: unique name of the stored volume
:type name: str
:param file_path: file to be uploaded into the volume
:type file_path: str | Path
:return: status ("SUCCESS" or "FAILED")
:rtype: str
**Example:**
.. code-block:: python
client.volumes.upload_file("testA", "DRUG.csv")
"""
if isinstance(file_path, Path):
file_path = str(file_path)
header_input = self._client._get_headers(zen=True)
zen_token = header_input.get("Authorization", "")
filename_to_upload = file_path.split("/")[-1]
upload_url_file = (
self._client._href_definitions.volume_upload_href(name) + filename_to_upload
)
cmd_str = (
'curl -k -X PUT "'
+ upload_url_file
+ '"'
+ " -H 'Content-Type: multipart/form-data' -H 'Authorization: "
+ zen_token
+ "' -F upFile='@"
+ file_path
+ "'"
)
args = shlex.split(cmd_str)
upload_response = subprocess.run(args, capture_output=True, text=True)
if upload_response.returncode == 0:
import json
try:
cmd_output = json.loads(upload_response.stdout)
if cmd_output.get("_statusCode_") == 403:
insufficient_permissions_warning = (
"It seems that you don't have the necessary permissions to perform this action. "
"Please review your permissions and try again once they have been updated."
)
warn(insufficient_permissions_warning)
return "FAILED"
print(cmd_output.get("message"))
return "SUCCESS"
except Exception:
pass
upload_response_error_warning = f"{upload_response.returncode} {upload_response.stdout} {upload_response.stderr}"
warn(upload_response_error_warning)
warn("Failed to upload the file to volume. Try again.")
return "FAILED"
[docs]
@raise_exception_about_unsupported_on_cloud
def list(self) -> pandas.DataFrame:
"""Lists stored volumes in a table format.
:return: pandas.DataFrame with listed volumes
:rtype: pandas.DataFrame
**Example:**
.. code-block:: python
client.volumes.list()
"""
href = self._client._href_definitions.volumes_href()
params = {}
params.update({"addon_type": "volumes"})
response = requests.get(
href, params=params, headers=self._client._get_headers(zen=True)
)
asset_details = self._handle_response(200, "list volumes", response)
asset_list = asset_details.get("service_instances", [])
volume_values = [
(m["display_name"], m["id"], m["provision_status"]) for m in asset_list
]
table = self._list(
volume_values,
["NAME", "ID", "PROVISION_STATUS"],
None,
)
return table
[docs]
@staticmethod
def get_id(volume_details: dict) -> str:
"""Get unique Id of stored volume details.
:param volume_details: metadata of the stored volume details
:type volume_details: dict
:return: unique Id of stored volume asset
:rtype: str
**Example:**
.. code-block:: python
volume_id = client.volumes.get_id(volume_details)
"""
Volume._validate_type(volume_details, "volume_details", object, True)
if (
"service_instance" in volume_details
and volume_details.get("service_instance") is not None
):
vol_details = volume_details.get("service_instance")
if vol_details is None:
raise WMLClientError("Missing service_instance in volume details")
return cast(
str,
WMLResource._get_required_element_from_dict(
vol_details, "volume_assets_details", ["id"]
),
)
else:
return cast(
str,
WMLResource._get_required_element_from_dict(
volume_details, "volume_assets_details", ["id"]
),
)
[docs]
@staticmethod
def get_name(volume_details: dict) -> str:
"""Get unique name of stored volume asset.
:params volume_details: metadata of the stored volume asset
:type volume_details: dict
:return: unique name of stored volume asset
:rtype: str
**Example:**
.. code-block:: python
volume_name = client.volumes.get_name(asset_details)
"""
Volume._validate_type(volume_details, "asset_details", object, True)
if (
"service_instance" in volume_details
and volume_details.get("service_instance") is not None
):
vol_details = volume_details.get("service_instance")
if vol_details is None:
raise WMLClientError("Missing service_instance in volume details")
return cast(
str,
WMLResource._get_required_element_from_dict(
vol_details, "volume_assets_details", ["display_name"]
),
)
else:
return cast(
str,
WMLResource._get_required_element_from_dict(
volume_details, "volume_assets_details", ["display_name"]
),
)
[docs]
@raise_exception_about_unsupported_on_cloud
def delete(self, volume_id: str) -> Literal["SUCCESS", "FAILED"]:
"""Delete a volume.
:param volume_id: unique ID of the volume
:type volume_id: str
:return: status ("SUCCESS" or "FAILED")
:rtype: str
**Example:**
.. code-block:: python
client.volumes.delete(volume_id)
"""
Volume._validate_type(volume_id, "volume_id", str, True)
response = requests.delete(
self._client._href_definitions.volume_href(volume_id),
headers=self._client._get_headers(zen=True),
)
if response.status_code == 200 or response.status_code == 204:
print("Successfully deleted volume service.")
return "SUCCESS"
else:
warn("Failed to delete volume.")
warn(f"{response.status_code} {response.text}")
return "FAILED"
[docs]
@raise_exception_about_unsupported_on_cloud
def stop(self, volume_name: str) -> Literal["SUCCESS", "FAILED"]:
"""Stop the volume service.
:param volume_name: unique name of the volume
:type volume_name: str
:return: status ("SUCCESS" or "FAILED")
:rtype: str
**Example:**
.. code-block:: python
client.volumes.stop(volume_name)
"""
Volume._validate_type(volume_name, "volume_name", str, True)
response = requests.delete(
self._client._href_definitions.volume_service_href(volume_name),
headers=self._client._get_headers(zen=True),
)
if response.status_code == 200:
print("Successfully stopped volume service.")
return "SUCCESS"
else:
warn(f"{response.status_code} {response.text}")
return "FAILED"