Source code for ibm_watson_machine_learning.platform_spaces

#  -----------------------------------------------------------------------------------------
#  (C) Copyright IBM Corp. 2020-2024.
#  -----------------------------------------------------------------------------------------

from __future__ import print_function

import time

import ibm_watson_machine_learning._wrappers.requests as requests
from ibm_watson_machine_learning.instance_new_plan import ServiceInstanceNewPlan
from ibm_watson_machine_learning.messages.messages import Messages
from ibm_watson_machine_learning.metanames import SpacesPlatformMetaNames, SpacesPlatformMemberMetaNames
from ibm_watson_machine_learning.utils import StatusLogger, print_text_header_h1, print_text_header_h2
from ibm_watson_machine_learning.utils.deployment.errors import PromotionFailed
from ibm_watson_machine_learning.wml_client_error import WMLClientError
from ibm_watson_machine_learning.wml_resource import WMLResource


[docs] class PlatformSpaces(WMLResource): """Store and manage spaces.""" ConfigurationMetaNames = SpacesPlatformMetaNames() """MetaNames for spaces creation.""" MemberMetaNames = SpacesPlatformMemberMetaNames() """MetaNames for space members creation.""" def __init__(self, client): WMLResource.__init__(self, __name__, client) self._client = client self._connection_validation() def _get_resources(self, url, op_name, params=None): if params is not None and 'limit' in params.keys(): if params[u'limit'] < 1: raise WMLClientError('Limit cannot be lower than 1.') elif params[u'limit'] > 1000: raise WMLClientError('Limit cannot be larger than 1000.') if len(params) > 0: response_get = requests.get( url, headers=self._client._get_headers(), params=params ) return self._handle_response(200, op_name, response_get) else: resources = [] while True: response_get = requests.get(url, headers=self._client._get_headers()) result = self._handle_response(200, op_name, response_get) resources.extend(result['resources']) if 'next' not in result: break else: url = self._wml_credentials["url"]+result['next']['href'] if('start=invalid' in url): break return { "resources": resources } def _connection_validation(self): """Trial connection to validate authorization. :raises WMLClientError: raises when connection is unauthorized """ href = self._client.service_instance._href_definitions.get_platform_spaces_href() response_get = requests.get(href, headers=self._client._get_headers(), params={"limit": 1}) if response_get.status_code == 401: raise WMLClientError(Messages.get_message(response_get.json()['errors'][0]['message'], message_id="invalid_authorization"))
[docs] def store(self, meta_props, background_mode=True): """Create a space. The instance associated with the space via COMPUTE will be used for billing purposes on cloud. Note that STORAGE and COMPUTE are applicable only for cloud. :param meta_props: meta data of the space configuration. To see available meta names use: .. code-block:: python client.spaces.ConfigurationMetaNames.get() :type meta_props: dict :param background_mode: indicator if store() method will run in background (async) or (sync) :type background_mode: bool, optional :return: metadata of the stored space :rtype: dict **Example** .. code-block:: python metadata = { client.spaces.ConfigurationMetaNames.NAME: "my_space", client.spaces.ConfigurationMetaNames.DESCRIPTION: "spaces", client.spaces.ConfigurationMetaNames.STORAGE: {"resource_crn": "provide crn of the COS storage"}, client.spaces.ConfigurationMetaNames.COMPUTE: {"name": "test_instance", "crn": "provide crn of the instance"}, client.spaces.ConfigurationMetaNames.STAGE: {"production": True, "name": "stage_name"}, client.spaces.ConfigurationMetaNames.TAGS: ["sample_tag_1", "sample_tag_2"], client.spaces.ConfigurationMetaNames.TYPE: "cpd", } spaces_details = """ WMLResource._chk_and_block_create_update_for_python36(self) # quick support for COS credentials instead of local path # TODO add error handling and cleaning (remove the file) PlatformSpaces._validate_type(meta_props, u'meta_props', dict, True) if ('compute' in meta_props or 'storage' in meta_props) and self._client.ICP_PLATFORM_SPACES: raise WMLClientError("'STORAGE' and 'COMPUTE' meta props are not applicable on " "IBM Cloud PakĀ® for Data. If using any of these, remove and retry") if 'storage' not in meta_props and self._client.CLOUD_PLATFORM_SPACES: raise WMLClientError("'STORAGE' is mandatory for cloud") if 'compute' in meta_props and self._client.CLOUD_PLATFORM_SPACES: if 'name' not in meta_props[u'compute'] or 'crn' not in meta_props[u'compute']: raise WMLClientError("'name' and 'crn' is mandatory for 'COMPUTE'") temp_meta = meta_props[u'compute'] temp_meta.update({'type': 'machine_learning'}) meta_props[u'compute'] = temp_meta if 'stage' in meta_props and self._client.CLOUD_PLATFORM_SPACES: if not type(meta_props['stage']['production']) == bool: raise WMLClientError("'production' for 'STAGE' must be boolean") space_meta = self.ConfigurationMetaNames._generate_resource_metadata( meta_props, with_validation=True, client=self._client ) if 'compute' in meta_props and self._client.CLOUD_PLATFORM_SPACES: payload_compute = [] payload_compute.append(space_meta[u'compute']) space_meta[u'compute'] = payload_compute creation_response = self._client.service_instance._href_definitions.get_platform_spaces_href(), headers=self._client._get_headers(), json=space_meta) spaces_details = self._handle_response(202, u'creating new spaces', creation_response) # Cloud Convergence: Set self._client.wml_credentials['instance_id'] to instance_id # during client.set.default_space since that's where space is associated with client # and also in client.set.default_project # if 'compute' in spaces_details['entity'].keys() and self._client.CLOUD_PLATFORM_SPACES: instance_id = spaces_details['entity']['compute'][0]['guid'] self._client.wml_credentials[u'instance_id'] = instance_id self._client.service_instance = ServiceInstanceNewPlan(self._client) self._client.service_instance.details = self._client.service_instance.get_details() if background_mode: print("Space has been created. However some background setup activities might still be on-going. " "Check for 'status' field in the response. It has to show 'active' before space can be used. " "If it's not 'active', you can monitor the state with a call to spaces.get_details(space_id). " "Alternatively, use background_mode=False when calling") return spaces_details else: # note: monitor space status space_id = self.get_id(spaces_details) print_text_header_h1(u'Synchronous space creation with id: \'{}\' started'.format(space_id)) status = spaces_details['entity']['status'].get('state') with StatusLogger(status) as status_logger: while status not in ['failed', 'error', 'completed', 'canceled', 'active']: time.sleep(10) spaces_details = self.get_details(space_id) status = spaces_details['entity']['status'].get('state') status_logger.log_state(status) # --- end note if u'active' in status: print_text_header_h2(u'\nCreating space \'{}\' finished successfully.'.format(space_id)) else: raise WMLClientError( f"Space {space_id} creation failed with status: {spaces_details['entity']['status']}") return spaces_details
[docs] @staticmethod def get_id(space_details): """Get space_id from space details. :param space_details: metadata of the stored space :type space_details: dict :return: space ID :rtype: str **Example** .. code-block:: python space_details = space_id = client.spaces.get_id(space_details) """ PlatformSpaces._validate_type(space_details, u'space_details', object, True) return WMLResource._get_required_element_from_dict(space_details, u'space_details', [u'metadata', u'id'])
[docs] @staticmethod def get_uid(space_details): """Get Unique Id of the space. *Deprecated:* Use ``get_id(space_details)`` instead. :param space_details: metadata of the space :type space_details: dict :return: Unique Id of space :rtype: str **Example** .. code-block:: python space_details = space_uid = client.spaces.get_uid(space_details) """ PlatformSpaces._validate_type(space_details, u'space_details', object, True) return WMLResource._get_required_element_from_dict(space_details, u'space_details', [u'metadata', u'id'])
[docs] def delete(self, space_id): """Delete a stored space. :param space_id: space ID :type space_id: str :return: status ("SUCCESS" or "FAILED") :rtype: str **Example** .. code-block:: python client.spaces.delete(space_id) """ PlatformSpaces._validate_type(space_id, u'space_id', str, True) space_endpoint = self._client.service_instance._href_definitions.get_platform_space_href(space_id) response_delete = requests.delete(space_endpoint, headers=self._client._get_headers()) response = self._handle_response(202, u'space deletion', response_delete, False) print('DELETED') return response
[docs] def get_details(self, space_id=None, limit=None, asynchronous=False, get_all=False): """Get metadata of stored space(s). :param space_id: space ID :type space_id: str, optional :param limit: applicable when `space_id` is not provided, otherwise `limit` will be ignored :type limit: str, optional :param asynchronous: if `True`, it will work as a generator :type asynchronous: bool, optional :param get_all: if `True`, it will get all entries in 'limited' chunks :type get_all: bool, optional :return: metadata of stored space(s) :rtype: dict **Example** .. code-block:: python space_details = client.spaces.get_details(space_uid) space_details = client.spaces.get_details(limit=100) space_details = client.spaces.get_details(limit=100, get_all=True) space_details = [] for entry in client.spaces.get_details(limit=100, asynchronous=True, get_all=True): space_details.extend(entry) """ PlatformSpaces._validate_type(space_id, u'space_id', str, False) href = self._client.service_instance._href_definitions.get_platform_space_href(space_id) if space_id is not None: response_get = requests.get(href, headers=self._client._get_headers()) return self._handle_response(200, 'Get space', response_get) else: return self._get_with_or_without_limit(self._client.service_instance._href_definitions.get_platform_spaces_href(), limit, 'spaces', summary=False, pre_defined=False, skip_space_project_chk=True, _async=asynchronous, _all=get_all)
[docs] def list(self, limit=None, member=None, roles=None, return_as_df=True, space_type=None): """Print stored spaces in a table format. If limit is set to None there will be only first 50 records shown. :param limit: limit number of fetched records :type limit: int, optional :param member: filters the result list to only include spaces where the user with a matching user id is a member :type member: str, optional :param roles: limit number of fetched records :type roles: str, optional :param return_as_df: determinate if table should be returned as pandas.DataFrame object, default: True :type return_as_df: bool, optional :param space_type: filter spaces by their type; available types: 'wx', 'cpd', 'wca' :type space_type: str, optional :return: pandas.DataFrame with listed spaces or None if return_as_df is False :rtype: pandas.DataFrame or None **Example** .. code-block:: python client.spaces.list() """ PlatformSpaces._validate_type(limit, u'limit', int, False) href = self._client.service_instance._href_definitions.get_platform_spaces_href() params = {} if limit is not None: params.update({'limit': limit}) if limit is None: params.update({'limit': 50}) if member is not None: params.update({'member': member}) if roles is not None: params.update({'roles': roles}) if space_type is not None: params.update({'type': space_type}) space_resources = [m for r in self._get_with_or_without_limit(href, None, 'spaces', summary=False, pre_defined=False, skip_space_project_chk=True, query_params=params, _async=True, _all=True) for m in r['resources']] if limit is not None: space_resources = space_resources[:limit] # space_resources = self._get_no_space_artifact_details(href, None, limit, 'spaces')[u'resources'] space_values = [(m[u'metadata'][u'id'], m[u'entity'][u'name'], m[u'metadata'][u'created_at']) for m in space_resources] if limit is None: print("Note: 'limit' is not provided. Only first 50 records will be displayed if the number of records " "exceed 50") table = self._list(space_values, [u'ID', u'NAME', u'CREATED'], limit, _DEFAULT_LIST_LENGTH) if return_as_df: return table
[docs] def update(self, space_id, changes): """Updates existing space metadata. 'STORAGE' cannot be updated. STORAGE and COMPUTE are applicable only for cloud. :param space_id: ID of space which definition should be updated :type space_id: str :param changes: elements which should be changed, where keys are ConfigurationMetaNames :type changes: dict :return: metadata of updated space :rtype: dict **Example** .. code-block:: python metadata = { client.spaces.ConfigurationMetaNames.NAME:"updated_space", client.spaces.ConfigurationMetaNames.COMPUTE: {"name": "test_instance", "crn": "v1:staging:public:pm-20-dev:us-south:a/09796a1b4cddfcc9f7fe17824a68a0f8:f1026e4b-77cf-4703-843d-c9984eac7272::" } } space_details = client.spaces.update(space_id, changes=metadata) """ WMLResource._chk_and_block_create_update_for_python36(self) if ('compute' in changes or 'storage' in changes) and self._client.ICP_PLATFORM_SPACES: raise WMLClientError("'STORAGE' and 'COMPUTE' meta props are not applicable on" "IBM Cloud PakĀ® for Data. If using any of these, remove and retry") if 'storage' in changes: raise WMLClientError("STORAGE cannot be updated") self._validate_type(space_id, u'space_id', str, True) self._validate_type(changes, u'changes', dict, True) details = self.get_details(space_id) if 'compute' in changes and self._client.CLOUD_PLATFORM_SPACES: changes[u'compute'][u'type'] = 'machine_learning' payload_compute = [] payload_compute.append(changes[u'compute']) changes[u'compute'] = payload_compute print("changes in update: ", changes) patch_payload = self.ConfigurationMetaNames._generate_patch_payload(details['entity'], changes) print("patch payload: ", patch_payload) href = self._client.service_instance._href_definitions.get_platform_space_href(space_id) response = requests.patch(href, json=patch_payload, headers=self._client._get_headers()) updated_details = self._handle_response(200, u'spaces patch', response) # Cloud Convergence if 'compute' in updated_details['entity'].keys() and self._client.CLOUD_PLATFORM_SPACES: instance_id = updated_details['entity']['compute'][0]['guid'] self._client.wml_credentials[u'instance_id'] = instance_id self._client.service_instance = ServiceInstanceNewPlan(self._client) self._client.service_instance.details = self._client.service_instance.get_details() return updated_details
[docs] def create_member(self, space_id, meta_props): """Create a member within a space. :param space_id: ID of space which definition should be updated :type space_id: str :param meta_props: metadata of the member configuration. To see available meta names use: .. code-block:: python client.spaces.MemberMetaNames.get() :type meta_props: dict :return: metadata of the stored member :rtype: dict .. note:: * `role` can be any one of the following: "viewer", "editor", "admin" * `type` can be any one of the following: "user", "service" * `id` can be either service-ID or IAM-userID **Examples** .. code-block:: python metadata = { client.spaces.MemberMetaNames.MEMBERS: [{"id":"IBMid-100000DK0B", "type": "user", "role": "admin" }] } members_details = client.spaces.create_member(space_id=space_id, meta_props=metadata) .. code-block:: python metadata = { client.spaces.MemberMetaNames.MEMBERS: [{"id":"iam-ServiceId-5a216e59-6592-43b9-8669-625d341aca71", "type": "service", "role": "admin" }] } members_details = client.spaces.create_member(space_id=space_id, meta_props=metadata) """ self._validate_type(space_id, u'space_id', str, True) PlatformSpaces._validate_type(meta_props, u'meta_props', dict, True) meta = {} if 'members' in meta_props: meta = meta_props elif 'member' in meta_props: dictionary = meta_props['member'] payload = [] payload.append(dictionary) meta['members'] = payload space_meta = self.MemberMetaNames._generate_resource_metadata( meta, with_validation=True, client=self._client ) creation_response = self._client.service_instance._href_definitions.get_platform_spaces_members_href(space_id), headers=self._client._get_headers(), json=space_meta) # TODO: Change response code one they change it to 201 members_details = self._handle_response(200, u'creating new members', creation_response) return members_details
[docs] def get_member_details(self, space_id, member_id): """Get metadata of member associated with a space. :param space_id: ID of space which definition should be updated :type space_id: str :param member_id: member ID :type member_id: str :return: metadata of member of a space :rtype: dict **Example** .. code-block:: python member_details = client.spaces.get_member_details(space_uid,member_id) """ PlatformSpaces._validate_type(space_id, u'space_id', str, True) PlatformSpaces._validate_type(member_id, u'member_id', str, True) href = self._client.service_instance._href_definitions.get_platform_spaces_member_href(space_id, member_id) response_get = requests.get(href, headers=self._client._get_headers()) return self._handle_response(200, 'Get space member', response_get)
[docs] def delete_member(self, space_id, member_id): """Delete a member associated with a space. :param space_id: space UID :type space_id: str :param member_id: member UID :type member_id: str :return: status ("SUCCESS" or "FAILED") :rtype: str **Example** .. code-block:: python client.spaces.delete_member(space_id,member_id) """ PlatformSpaces._validate_type(space_id, u'space_id', str, True) PlatformSpaces._validate_type(member_id, u'member_id', str, True) member_endpoint = self._client.service_instance._href_definitions.get_platform_spaces_member_href(space_id, member_id) response_delete = requests.delete(member_endpoint, headers=self._client._get_headers()) print('DELETED') return self._handle_response(204, u'space member deletion', response_delete, False)
[docs] def update_member(self, space_id, member_id, changes): """Updates existing member metadata. :param space_id: ID of space :type space_id: str :param member_id: ID of member that needs to be updated :type member_id: str :param changes: elements which should be changed, where keys are ConfigurationMetaNames :type changes: dict :return: metadata of updated member :rtype: dict **Example** .. code-block:: python metadata = { client.spaces.MemberMetaNames.MEMBER: {"role": "editor"} } member_details = client.spaces.update_member(space_id, member_id, changes=metadata) """ self._validate_type(space_id, u'space_id', str, True) self._validate_type(member_id, u'member_id', str, True) self._validate_type(changes, u'changes', dict, True) details = self.get_member_details(space_id, member_id) # The member record is a bit different than most other type of records we deal w.r.t patch # There is no encapsulating object for the fields. We need to be consistent with the way we # provide the meta in create/patch. When we give with .MEMBER, _generate_patch_payload # will generate with /member patch. So, separate logic for member patch inline here changes1 = changes['member'] # Union of two dictionaries. The one in changes1 will override existent ones in current meta details.update(changes1) id_str = {} role_str = {} type_str = {} state_str = {} # if 'id' in details: # id_str["op"] = "replace" # id_str["path"] = "/id" # id_str["value"] = details[u'id'] if 'role' in details: role_str["op"] = "replace" role_str["path"] = "/role" role_str["value"] = details[u'role'] # if 'type' in details: # type_str["op"] = "replace" # type_str["path"] = "/type" # type_str["value"] = details[u'type'] if 'state' in details: state_str["op"] = "replace" state_str["path"] = "/state" state_str["value"] = details[u'state'] patch_payload = [] # if id_str: # patch_payload.append(id_str) if role_str: patch_payload.append(role_str) # if type_str: # patch_payload.append(type_str) if state_str: patch_payload.append(state_str) # patch_payload = self.MemberMetaNames._generate_patch_payload(details, changes, with_validation=True) href = self._client.service_instance._href_definitions.get_platform_spaces_member_href(space_id,member_id) response = requests.patch(href, json=patch_payload, headers=self._client._get_headers()) updated_details = self._handle_response(200, u'members patch', response) return updated_details
[docs] def list_members(self, space_id, limit=None, identity_type=None, role=None, state=None, return_as_df=True): """Print stored members of a space in a table format. If limit is set to None there will be only first 50 records shown. :param space_id: ID of space :type space_id: str :param limit: limit number of fetched records :type limit: int, optional :param identity_type: filter the members by type :type identity_type: str, optional :param role: filter the members by role :type role: str, optional :param state: filter the members by state :type state: str, optional :param return_as_df: determinate if table should be returned as pandas.DataFrame object, default: True :type return_as_df: bool, optional :return: pandas.DataFrame with listed members or None if return_as_df is False :rtype: pandas.DataFrame or None **Example** .. code-block:: python client.spaces.list_members(space_id) """ self._validate_type(space_id, u'space_id', str, True) params = {} if limit is not None: params.update({'limit': limit}) if limit is None: params.update({'limit': 50}) if identity_type is not None: params.update({'type': identity_type}) if role is not None: params.update({'role': role}) if state is not None: params.update({'state': state}) href = self._client.service_instance._href_definitions.get_platform_spaces_members_href(space_id) member_resources = self._get_resources(href, 'space members', params)[u'resources'] # space_values = [(m[u'metadata'][u'id'], # m[u'entity'][u'id'], # m[u'entity'][u'type'], # m[u'entity'][u'role'], # m[u'entity'][u'state'], # m[u'metadata'][u'created_at']) for m in member_resources] # self._list(space_values, [u'ID', u'IDENTITY', # u'IDENTITY_TYPE', # u'ROLE', # u'STATE', # u'CREATED'], limit, _DEFAULT_LIST_LENGTH) space_values = [(m[u'id'], m[u'type'], m[u'role'], m[u'state']) if 'state' in m else (m[u'id'], m[u'type'], m[u'role'], None) for m in member_resources] if limit is None: print("Note: 'limit' is not provided. Only first 50 records will be displayed if the number of records " "exceed 50") table = self._list(space_values, [u'ID', u'TYPE', u'ROLE', u'STATE'], limit, _DEFAULT_LIST_LENGTH) if return_as_df: return table
[docs] def promote(self, asset_id: str, source_project_id: str, target_space_id: str, rev_id: str = None) -> str: """Promote asset from project to space. :param asset_id: stored asset :type asset_id: str :param source_project_id: source project, from which asset is promoted :type source_project_id: str :param target_space_id: target space, where asset is promoted :type target_space_id: str :param rev_id: revision ID of the promoted asset :type rev_id: str, optional :return: promoted asset id :rtype: str **Examples** .. code-block:: python promoted_asset_id = client.spaces.promote(asset_id, source_project_id=project_id, target_space_id=space_id) promoted_model_id = client.spaces.promote(model_id, source_project_id=project_id, target_space_id=space_id) promoted_function_id = client.spaces.promote(function_id, source_project_id=project_id, target_space_id=space_id) promoted_data_asset_id = client.spaces.promote(data_asset_id, source_project_id=project_id, target_space_id=space_id) promoted_connection_asset_id = client.spaces.promote(connection_id, source_project_id=project_id, target_space_id=space_id) """ promote_payload = {"spaceId": target_space_id, "projectId": source_project_id, "assetDescription": "Asset promoted by ibm_wml client"} if rev_id: promote_payload['revisionId'] = rev_id promote_href = self._client.service_instance._href_definitions.promote_asset_href(asset_id) response = promote_href, headers=self._client._get_headers(), json=promote_payload ) promotion_details = self._client.repository._handle_response(200, f'promote asset', response) try: promoted_asset_id = promotion_details['promotedAsset']['asset_id'] except KeyError as key_err: raise PromotionFailed(source_project_id, target_space_id, promotion_details, reason=key_err) return promoted_asset_id