# -----------------------------------------------------------------------------------------
# (C) Copyright IBM Corp. 2023-2024.
# https://opensource.org/licenses/BSD-3-Clause
# -----------------------------------------------------------------------------------------
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Any, Literal
from ibm_watsonx_ai._wrappers import requests
from ibm_watsonx_ai.service_instance import ServiceInstance
from ibm_watsonx_ai.messages.messages import Messages
from ibm_watsonx_ai.metanames import (
SpacesMetaNames,
SpacesMemberMetaNames,
)
from ibm_watsonx_ai.utils import (
print_text_header_h1,
StatusLogger,
print_text_header_h2,
)
from ibm_watsonx_ai.utils.deployment.errors import PromotionFailed
from ibm_watsonx_ai.wml_client_error import (
WMLClientError,
ResourceIdByNameNotFound,
MultipleResourceIdByNameFound,
)
from ibm_watsonx_ai.wml_resource import WMLResource
if TYPE_CHECKING:
from ibm_watsonx_ai import APIClient
from pandas import DataFrame
_DEFAULT_LIST_LENGTH = 50
[docs]
class Spaces(WMLResource):
"""Store and manage spaces."""
ConfigurationMetaNames = SpacesMetaNames()
"""MetaNames for spaces creation."""
MemberMetaNames = SpacesMemberMetaNames()
"""MetaNames for space members creation."""
def __init__(self, client: APIClient):
WMLResource.__init__(self, __name__, client)
self._client = client
if not (self._client.default_project_id or self._client.default_space_id):
self._connection_validation()
def _get_resources(
self, url: str, op_name: str, params: dict | None = None
) -> dict:
if params is not None and "limit" in params.keys():
if params["limit"] < 1:
raise WMLClientError("Limit cannot be lower than 1.")
elif params["limit"] > 1000:
raise WMLClientError("Limit cannot be larger than 1000.")
if params is not None and len(params) > 0:
response_get = requests.get(
url, headers=self._client._get_headers(), params=params
)
return self._handle_response(200, op_name, response_get)
else:
resources = []
while True:
response_get = requests.get(url, headers=self._client._get_headers())
result = self._handle_response(200, op_name, response_get)
resources.extend(result["resources"])
if "next" not in result:
break
else:
url = self._credentials.url + result["next"]["href"]
if "start=invalid" in url:
break
return {"resources": resources}
def _connection_validation(self) -> None:
"""Trial connection to validate authorization.
:raises WMLClientError: raised when connection is unauthorized
"""
href = (
self._client.service_instance._href_definitions.get_platform_spaces_href()
)
response_get = self._client._session.get(
href, headers=self._client._get_headers(), params={"limit": 1}
)
if response_get.status_code == 401:
raise WMLClientError(
Messages.get_message(
response_get.json()["errors"][0]["message"],
message_id="invalid_authorization",
)
)
[docs]
def store(self, meta_props: dict, background_mode: bool = True) -> dict:
"""Create a space. The instance associated with the space via COMPUTE will be used for billing purposes on
the cloud. Note that STORAGE and COMPUTE are applicable only for cloud.
:param meta_props: metadata of the space configuration. To see available meta names, use:
.. code-block:: python
client.spaces.ConfigurationMetaNames.get()
:type meta_props: dict
:param background_mode: indicator if store() method will run in background (async) or (sync)
:type background_mode: bool, optional
:return: metadata of the stored space
:rtype: dict
**Example:**
.. code-block:: python
metadata = {
client.spaces.ConfigurationMetaNames.NAME: "my_space",
client.spaces.ConfigurationMetaNames.DESCRIPTION: "spaces",
client.spaces.ConfigurationMetaNames.STORAGE: {"resource_crn": "provide crn of the COS storage"},
client.spaces.ConfigurationMetaNames.COMPUTE: {"name": "test_instance",
"crn": "provide crn of the instance"},
client.spaces.ConfigurationMetaNames.STAGE: {"production": True,
"name": "stage_name"},
client.spaces.ConfigurationMetaNames.TAGS: ["sample_tag_1", "sample_tag_2"],
client.spaces.ConfigurationMetaNames.TYPE: "cpd",
}
spaces_details = client.spaces.store(meta_props=metadata)
"""
# quick support for COS credentials instead of local path
# TODO add error handling and cleaning (remove the file)
Spaces._validate_type(meta_props, "meta_props", dict, True)
if (
"compute" in meta_props or "storage" in meta_props
) and self._client.ICP_PLATFORM_SPACES:
raise WMLClientError(
"'STORAGE' and 'COMPUTE' meta props are not applicable on "
"IBM Cloud PakĀ® for Data. If using any of these, remove and retry"
)
if "storage" not in meta_props and self._client.CLOUD_PLATFORM_SPACES:
raise WMLClientError("'STORAGE' is mandatory for cloud")
if "compute" in meta_props and self._client.CLOUD_PLATFORM_SPACES:
if (
"name" not in meta_props["compute"]
or "crn" not in meta_props["compute"]
):
raise WMLClientError("'name' and 'crn' is mandatory for 'COMPUTE'")
temp_meta = meta_props["compute"]
temp_meta.update({"type": "machine_learning"})
meta_props["compute"] = temp_meta
if "stage" in meta_props and self._client.CLOUD_PLATFORM_SPACES:
if not type(meta_props["stage"]["production"]) == bool:
raise WMLClientError("'production' for 'STAGE' must be boolean")
space_meta = self.ConfigurationMetaNames._generate_resource_metadata(
meta_props, with_validation=True, client=self._client
)
if "compute" in meta_props and self._client.CLOUD_PLATFORM_SPACES:
payload_compute = []
payload_compute.append(space_meta["compute"])
space_meta["compute"] = payload_compute
creation_response = requests.post(
self._client.service_instance._href_definitions.get_platform_spaces_href(),
headers=self._client._get_headers(),
json=space_meta,
)
spaces_details = self._handle_response(
202, "creating new spaces", creation_response
)
# Cloud Convergence: Set self._client.credentials.instance_id to instance_id
# during client.set.default_space since that's where space is associated with client
# and also in client.set.default_project
#
if (
"compute" in spaces_details["entity"].keys()
and self._client.CLOUD_PLATFORM_SPACES
):
instance_id = spaces_details["entity"]["compute"][0]["guid"]
self._client.service_instance = ServiceInstance(self._client)
self._client.service_instance._instance_id = instance_id
if background_mode:
print(
"Space has been created. However some background setup activities might still be on-going. "
"Check for 'status' field in the response. It has to show 'active' before space can be used. "
"If it's not 'active', you can monitor the state with a call to spaces.get_details(space_id). "
"Alternatively, use background_mode=False when calling client.spaces.store()."
)
return spaces_details
else:
# note: monitor space status
space_id = self.get_id(spaces_details)
print_text_header_h1(
"Synchronous space creation with id: '{}' started".format(space_id)
)
status = spaces_details["entity"]["status"].get("state")
with StatusLogger(status) as status_logger:
while status not in [
"failed",
"error",
"completed",
"canceled",
"active",
]:
time.sleep(10)
spaces_details = self.get_details(space_id)
status = spaces_details["entity"]["status"].get("state")
status_logger.log_state(status)
# --- end note
if "active" in status:
print_text_header_h2(
"\nCreating space '{}' finished successfully.".format(space_id)
)
else:
raise WMLClientError(
f"Space {space_id} creation failed with status: {spaces_details['entity']['status']}"
)
return spaces_details
[docs]
@staticmethod
def get_id(space_details: dict) -> str:
"""Get the space_id from the space details.
:param space_details: metadata of the stored space
:type space_details: dict
:return: ID of the stored space
:rtype: str
**Example:**
.. code-block:: python
space_details = client.spaces.store(meta_props)
space_id = client.spaces.get_id(space_details)
"""
Spaces._validate_type(space_details, "space_details", object, True)
return WMLResource._get_required_element_from_dict(
space_details, "space_details", ["metadata", "id"]
)
[docs]
def get_id_by_name(self, space_name: str) -> str:
"""Get the ID of a stored space by name.
:param space_name: name of the stored space
:type space_name: str
:return: ID of the stored space
:rtype: str
**Example:**
.. code-block:: python
space_id = client.spaces.get_id_by_name(space_name)
"""
Spaces._validate_type(space_name, "space_name", str, True)
details = self.get_details(space_name=space_name)
if len(details["resources"]) > 1:
raise MultipleResourceIdByNameFound(space_name, "space")
elif len(details["resources"]) == 0:
raise ResourceIdByNameNotFound(space_name, "space")
return details["resources"][0]["metadata"]["id"]
[docs]
@staticmethod
def get_uid(space_details: dict) -> str:
"""Get the unique ID of the space.
*Deprecated:* Use ``get_id(space_details)`` instead.
:param space_details: metadata of the space
:type space_details: dict
:return: unique ID of the space
:rtype: str
**Example:**
.. code-block:: python
space_details = client.spaces.store(meta_props)
space_uid = client.spaces.get_uid(space_details)
"""
return Spaces.get_id(space_details)
[docs]
def delete(self, space_id: str) -> Literal["SUCCESS"]:
"""Delete a stored space.
:param space_id: ID of the space
:type space_id: str
:return: status "SUCCESS" if deletion is successful
:rtype: Literal["SUCCESS"]
**Example:**
.. code-block:: python
client.spaces.delete(space_id)
"""
Spaces._validate_type(space_id, "space_id", str, True)
space_endpoint = (
self._client.service_instance._href_definitions.get_platform_space_href(
space_id
)
)
response_delete = requests.delete(
space_endpoint, headers=self._client._get_headers()
)
response = self._handle_response(202, "space deletion", response_delete, False)
print("DELETED")
return response
[docs]
def get_details(
self,
space_id: str | None = None,
limit: int | None = None,
asynchronous: bool | None = False,
get_all: bool | None = False,
space_name: str | None = None,
) -> dict:
"""Get metadata of stored space(s).
:param space_id: ID of the space
:type space_id: str, optional
:param limit: applicable when `space_id` is not provided, otherwise `limit` will be ignored
:type limit: int, optional
:param asynchronous: if `True`, it will work as a generator
:type asynchronous: bool, optional
:param get_all: if `True`, it will get all entries in 'limited' chunks
:type get_all: bool, optional
:param space_name: name of the stored space, can be used only when `space_id` is None
:type space_name: str, optional
:return: metadata of stored space(s)
:rtype:
- **dict** - if space_id is not None
- **{"resources": [dict]}** - if space_id is None
**Example:**
.. code-block:: python
space_details = client.spaces.get_details(space_id)
space_details = client.spaces.get_details(space_name)
space_details = client.spaces.get_details(limit=100)
space_details = client.spaces.get_details(limit=100, get_all=True)
space_details = []
for entry in client.spaces.get_details(limit=100, asynchronous=True, get_all=True):
space_details.extend(entry)
"""
Spaces._validate_type(space_id, "space_id", str, False)
href = self._client.service_instance._href_definitions.get_platform_space_href(
space_id
)
if space_id is not None:
response_get = requests.get(href, headers=self._client._get_headers())
return self._handle_response(200, "Get space", response_get)
else:
query_params = {"name": space_name} if space_name else None
return self._get_with_or_without_limit(
self._client.service_instance._href_definitions.get_platform_spaces_href(),
limit,
"spaces",
summary=False,
pre_defined=False,
skip_space_project_chk=True,
query_params=query_params,
_async=asynchronous,
_all=get_all,
)
[docs]
def list(
self,
limit: int | None = None,
member: str | None = None,
roles: str | None = None,
space_type: str | None = None,
) -> DataFrame:
"""List stored spaces in a table format. If limit is set to None, only the first 50 records are shown.
:param limit: limit number of fetched records
:type limit: int, optional
:param member: filters the result list, only includes spaces where the user with a matching user ID
is a member
:type member: str, optional
:param roles: limit number of fetched records
:type roles: str, optional
:param space_type: filter spaces by their type, available types are 'wx', 'cpd', and 'wca'
:type space_type: str, optional
:return: pandas.DataFrame with listed spaces
:rtype: pandas.DataFrame
**Example:**
.. code-block:: python
client.spaces.list()
"""
Spaces._validate_type(limit, "limit", int, False)
href = (
self._client.service_instance._href_definitions.get_platform_spaces_href()
)
params: dict[str, Any] = {}
if limit is not None:
params.update({"limit": limit})
if limit is None:
params.update({"limit": 50})
if member is not None:
params.update({"member": member})
if roles is not None:
params.update({"roles": roles})
if space_type is not None:
params.update({"type": space_type})
space_resources = [
m
for r in self._get_with_or_without_limit(
href,
None,
"spaces",
summary=False,
pre_defined=False,
skip_space_project_chk=True,
query_params=params,
_async=True,
_all=True,
)
for m in r["resources"]
]
if limit is not None:
space_resources = space_resources[:limit]
space_values = [
(m["metadata"]["id"], m["entity"]["name"], m["metadata"]["created_at"])
for m in space_resources
]
if limit is None:
print(
"Note: 'limit' is not provided. Only first 50 records will be displayed if the number of records "
"exceed 50"
)
table = self._list(
space_values, ["ID", "NAME", "CREATED"], limit, _DEFAULT_LIST_LENGTH
)
return table
[docs]
def update(self, space_id: str, changes: dict) -> dict:
"""Update existing space metadata. 'STORAGE' cannot be updated.
STORAGE and COMPUTE are applicable only for cloud.
:param space_id: ID of the space with the definition to be updated
:type space_id: str
:param changes: elements to be changed, where keys are ConfigurationMetaNames
:type changes: dict
:return: metadata of the updated space
:rtype: dict
**Example:**
.. code-block:: python
metadata = {
client.spaces.ConfigurationMetaNames.NAME:"updated_space",
client.spaces.ConfigurationMetaNames.COMPUTE: {"name": "test_instance",
"crn": "v1:staging:public:pm-20-dev:us-south:a/09796a1b4cddfcc9f7fe17824a68a0f8:f1026e4b-77cf-4703-843d-c9984eac7272::"
}
}
space_details = client.spaces.update(space_id, changes=metadata)
"""
if (
"compute" in changes or "storage" in changes
) and self._client.ICP_PLATFORM_SPACES:
raise WMLClientError(
"'STORAGE' and 'COMPUTE' meta props are not applicable on"
"IBM Cloud PakĀ® for Data. If using any of these, remove and retry"
)
if "storage" in changes:
raise WMLClientError("STORAGE cannot be updated")
self._validate_type(space_id, "space_id", str, True)
self._validate_type(changes, "changes", dict, True)
details = self.get_details(space_id)
if "compute" in changes and self._client.CLOUD_PLATFORM_SPACES:
changes["compute"]["type"] = "machine_learning"
payload_compute = []
payload_compute.append(changes["compute"])
changes["compute"] = payload_compute
print("changes in update: ", changes)
patch_payload = self.ConfigurationMetaNames._generate_patch_payload(
details["entity"], changes
)
print("patch payload: ", patch_payload)
href = self._client.service_instance._href_definitions.get_platform_space_href(
space_id
)
response = requests.patch(
href, json=patch_payload, headers=self._client._get_headers()
)
updated_details = self._handle_response(200, "spaces patch", response)
# Cloud Convergence
if (
"compute" in updated_details["entity"].keys()
and self._client.CLOUD_PLATFORM_SPACES
):
instance_id = updated_details["entity"]["compute"][0]["guid"]
self._client.service_instance = ServiceInstance(self._client)
self._client.service_instance._instance_id = instance_id
return updated_details
#######SUPPORT FOR SPACE MEMBERS
[docs]
def create_member(self, space_id: str, meta_props: dict) -> dict:
"""Create a member within a space.
:param space_id: ID of the space with the definition to be updated
:type space_id: str
:param meta_props: metadata of the member configuration. To see available meta names, use:
.. code-block:: python
client.spaces.MemberMetaNames.get()
:type meta_props: dict
:return: metadata of the stored member
:rtype: dict
.. note::
* `role` can be any one of the following: "viewer", "editor", "admin"
* `type` can be any one of the following: "user", "service"
* `id` can be one of the following: service-ID or IAM-userID
**Examples**
.. code-block:: python
metadata = {
client.spaces.MemberMetaNames.MEMBERS: [{"id":"IBMid-100000DK0B",
"type": "user",
"role": "admin" }]
}
members_details = client.spaces.create_member(space_id=space_id, meta_props=metadata)
.. code-block:: python
metadata = {
client.spaces.MemberMetaNames.MEMBERS: [{"id":"iam-ServiceId-5a216e59-6592-43b9-8669-625d341aca71",
"type": "service",
"role": "admin" }]
}
members_details = client.spaces.create_member(space_id=space_id, meta_props=metadata)
"""
self._validate_type(space_id, "space_id", str, True)
Spaces._validate_type(meta_props, "meta_props", dict, True)
meta = {}
if "members" in meta_props:
meta = meta_props
elif "member" in meta_props:
dictionary = meta_props["member"]
payload = []
payload.append(dictionary)
meta["members"] = payload
space_meta = self.MemberMetaNames._generate_resource_metadata(
meta, with_validation=True, client=self._client
)
creation_response = requests.post(
self._client.service_instance._href_definitions.get_platform_spaces_members_href(
space_id
),
headers=self._client._get_headers(),
json=space_meta,
)
# TODO: Change response code one they change it to 201
members_details = self._handle_response(
200, "creating new members", creation_response
)
return members_details
[docs]
def get_member_details(self, space_id: str, member_id: str) -> dict:
"""Get metadata of a member associated with a space.
:param space_id: ID of that space with the definition to be updated
:type space_id: str
:param member_id: ID of the member
:type member_id: str
:return: metadata of the space member
:rtype: dict
**Example:**
.. code-block:: python
member_details = client.spaces.get_member_details(space_id,member_id)
"""
Spaces._validate_type(space_id, "space_id", str, True)
Spaces._validate_type(member_id, "member_id", str, True)
href = self._client.service_instance._href_definitions.get_platform_spaces_member_href(
space_id, member_id
)
response_get = requests.get(href, headers=self._client._get_headers())
return self._handle_response(200, "Get space member", response_get)
[docs]
def delete_member(self, space_id: str, member_id: str) -> str:
"""Delete a member associated with a space.
:param space_id: ID of the space
:type space_id: str
:param member_id: ID of the member
:type member_id: str
:return: status ("SUCCESS" or "FAILED")
:rtype: str
**Example:**
.. code-block:: python
client.spaces.delete_member(space_id,member_id)
"""
Spaces._validate_type(space_id, "space_id", str, True)
Spaces._validate_type(member_id, "member_id", str, True)
member_endpoint = self._client.service_instance._href_definitions.get_platform_spaces_member_href(
space_id, member_id
)
response_delete = requests.delete(
member_endpoint, headers=self._client._get_headers()
)
print("DELETED")
return self._handle_response(
204, "space member deletion", response_delete, False
)
[docs]
def update_member(self, space_id: str, member_id: str, changes: dict) -> dict:
"""Update the metadata of an existing member.
:param space_id: ID of the space
:type space_id: str
:param member_id: ID of the member to be updated
:type member_id: str
:param changes: elements to be changed, where keys are ConfigurationMetaNames
:type changes: dict
:return: metadata of the updated member
:rtype: dict
**Example:**
.. code-block:: python
metadata = {
client.spaces.MemberMetaNames.MEMBER: {"role": "editor"}
}
member_details = client.spaces.update_member(space_id, member_id, changes=metadata)
"""
self._validate_type(space_id, "space_id", str, True)
self._validate_type(member_id, "member_id", str, True)
self._validate_type(changes, "changes", dict, True)
details = self.get_member_details(space_id, member_id)
# The member record is a bit different than most other type of records we deal w.r.t patch
# There is no encapsulating object for the fields. We need to be consistent with the way we
# provide the meta in create/patch. When we give with .MEMBER, _generate_patch_payload
# will generate with /member patch. So, separate logic for member patch inline here
changes1 = changes["member"]
# Union of two dictionaries. The one in changes1 will override existent ones in current meta
details.update(changes1)
role_str = {}
state_str = {}
if "role" in details:
role_str["op"] = "replace"
role_str["path"] = "/role"
role_str["value"] = details["role"]
if "state" in details:
state_str["op"] = "replace"
state_str["path"] = "/state"
state_str["value"] = details["state"]
patch_payload = []
if role_str:
patch_payload.append(role_str)
if state_str:
patch_payload.append(state_str)
href = self._client.service_instance._href_definitions.get_platform_spaces_member_href(
space_id, member_id
)
response = requests.patch(
href, json=patch_payload, headers=self._client._get_headers()
)
updated_details = self._handle_response(200, "members patch", response)
return updated_details
[docs]
def list_members(
self,
space_id: str,
limit: int | None = None,
identity_type: str | None = None,
role: str | None = None,
state: str | None = None,
) -> DataFrame:
"""Print the stored members of a space in a table format.
If limit is set to None, only the first 50 records are shown.
:param space_id: ID of the space
:type space_id: str
:param limit: limit number of fetched records
:type limit: int, optional
:param identity_type: filter the members by type
:type identity_type: str, optional
:param role: filter the members by role
:type role: str, optional
:param state: filter the members by state
:type state: str, optional
:return: pandas.DataFrame with listed members
:rtype: pandas.DataFrame
**Example:**
.. code-block:: python
client.spaces.list_members(space_id)
"""
self._validate_type(space_id, "space_id", str, True)
params: dict[str, Any] = {}
if limit is not None:
params.update({"limit": limit})
if limit is None:
params.update({"limit": 50})
if identity_type is not None:
params.update({"type": identity_type})
if role is not None:
params.update({"role": role})
if state is not None:
params.update({"state": state})
href = self._client.service_instance._href_definitions.get_platform_spaces_members_href(
space_id
)
member_resources = self._get_resources(href, "space members", params)[
"resources"
]
space_values = [
(
(m["id"], m["type"], m["role"], m["state"])
if "state" in m
else (m["id"], m["type"], m["role"], None)
)
for m in member_resources
]
if limit is None:
print(
"Note: 'limit' is not provided. Only first 50 records will be displayed if the number of records "
"exceed 50"
)
table = self._list(
space_values, ["ID", "TYPE", "ROLE", "STATE"], limit, _DEFAULT_LIST_LENGTH
)
return table