Source code for ibm_watsonx_ai.import_assets

#  -----------------------------------------------------------------------------------------
#  (C) Copyright IBM Corp. 2023-2024.
#  https://opensource.org/licenses/BSD-3-Clause
#  -----------------------------------------------------------------------------------------

from __future__ import annotations
from typing import TYPE_CHECKING, Any

import os

import ibm_watsonx_ai._wrappers.requests as requests
from ibm_watsonx_ai.wml_client_error import WMLClientError
from ibm_watsonx_ai.wml_resource import WMLResource

if TYPE_CHECKING:
    from ibm_watsonx_ai import APIClient
    import pandas as pd

_DEFAULT_LIST_LENGTH = 50


[docs] class Import(WMLResource): def __init__(self, client: APIClient): WMLResource.__init__(self, __name__, client) self._client = client
[docs] def start( self, file_path: str, space_id: str | None = None, project_id: str | None = None, ) -> dict: """Start the import. You must provide the space_id or the project_id. :param file_path: file path to the zip file with exported assets :type file_path: str :param space_id: space identifier :type space_id: str, optional :param project_id: project identifier :type project_id: str, optional :return: response json :rtype: dict **Example:** .. code-block:: python details = client.import_assets.start(space_id="98a53931-a8c0-4c2f-8319-c793155e4598", file_path="/home/user/data_to_be_imported.zip") """ if space_id is None and project_id is None: raise WMLClientError("Either 'space_id' or 'project_id' has to be provided") if space_id is not None and project_id is not None: raise WMLClientError( "Either 'space_id' or 'project_id' can be provided, not both" ) if not os.path.isfile(file_path): raise WMLClientError( "File with name: '{}' does not exist".format(file_path) ) with open(file_path, "rb") as file: data = file.read() href = self._client.service_instance._href_definitions.imports_href() params = {} if space_id is not None: params.update({"space_id": space_id}) elif project_id is not None: params.update({"project_id": project_id}) creation_response = requests.post( href, params=params, headers=self._client._get_headers(content_type="application/zip"), data=data, ) details = self._handle_response( expected_status_code=202, operationName="import start", response=creation_response, ) import_id = details["metadata"]["id"] print( "import job with id {} has started. Monitor status using client.import_assets.get_details api. " "Check 'help(client.import_assets.get_details)' for details on the api usage".format( import_id ) ) return details
def _validate_input(self, meta_props: dict) -> None: if "name" not in meta_props: raise WMLClientError( "Its mandatory to provide 'NAME' in meta_props. Example: " "client.import_assets.ConfigurationMetaNames.NAME: 'name'" ) if "all_assets" not in meta_props and "asset_ids" not in meta_props: raise WMLClientError( "Its mandatory to provide either 'ALL_ASSETS' or 'ASSET_IDS' in meta_props. Example: " "client.import_assets.ConfigurationMetaNames.ALL_ASSETS: True" )
[docs] def cancel( self, import_id: str, space_id: str | None = None, project_id: str | None = None ) -> None: """Cancel an import job. You must provide the space_id or the project_id. .. note:: To delete an import_id job, use delete() api :param import_id: import the job identifier :type import_id: str :param space_id: space identifier :type space_id: str, optional :param project_id: project identifier :type project_id: str, optional **Example:** .. code-block:: python client.import_assets.cancel(import_id='6213cf1-252f-424b-b52d-5cdd9814956c', space_id='3421cf1-252f-424b-b52d-5cdd981495fe') """ Import._validate_type(import_id, "import_id", str, True) if space_id is None and project_id is None: raise WMLClientError("Its mandatory to provide space_id or project_id") if space_id is not None and project_id is not None: raise WMLClientError( "Either 'space_id' or 'project_id' can be provided, not both" ) href = self._client.service_instance._href_definitions.import_href(import_id) params = {} if space_id is not None: params.update({"space_id": space_id}) elif project_id is not None: params.update({"project_id": project_id}) cancel_response = requests.delete( href, params=params, headers=self._client._get_headers() ) details = self._handle_response( expected_status_code=204, operationName="cancel import", response=cancel_response, ) if "SUCCESS" == details: print("Import job cancelled")
[docs] def delete( self, import_id: str, space_id: str | None = None, project_id: str | None = None ) -> None: """Deletes the given import_id job. You must provide the space_id or the project_id. :param import_id: import the job identifier :type import_id: str :param space_id: space identifier :type space_id: str, optional :param project_id: project identifier :type project_id: str, optional **Example:** .. code-block:: python client.import_assets.delete(import_id='6213cf1-252f-424b-b52d-5cdd9814956c', space_id= '98a53931-a8c0-4c2f-8319-c793155e4598') """ if space_id is None and project_id is None: raise WMLClientError("Its mandatory to provide space_id or project_id") if space_id is not None and project_id is not None: raise WMLClientError( "Either 'space_id' or 'project_id' can be provided, not both" ) Import._validate_type(import_id, "import_id", str, True) href = self._client.service_instance._href_definitions.import_href(import_id) params: dict[str, bool | str] = {"hard_delete": True} if space_id is not None: params.update({"space_id": space_id}) elif project_id is not None: params.update({"project_id": project_id}) delete_response = requests.delete( href, params=params, headers=self._client._get_headers() ) details = self._handle_response( expected_status_code=204, operationName="delete import job", response=delete_response, ) if "SUCCESS" == details: print("Import job deleted")
[docs] def get_details( self, import_id: str | None = None, space_id: str | None = None, project_id: str | None = None, limit: int | None = None, asynchronous: bool = False, get_all: bool = False, ) -> dict: """Get metadata of the given import job. If no `import_id` is specified, all import metadata is returned. :param import_id: import the job identifier :type import_id: str, optional :param space_id: space identifier :type space_id: str, optional :param project_id: project identifier :type project_id: str, optional :param limit: limit number of fetched records :type limit: int, optional :param asynchronous: if `True`, it will work as a generator :type asynchronous: bool, optional :param get_all: if `True`, it will get all entries in 'limited' chunks :type get_all: bool, optional :return: import(s) metadata :rtype: dict (if import_id is not None) or {"resources": [dict]} (if import_id is None) **Example:** .. code-block:: python details = client.import_assets.get_details(import_id) details = client.import_assets.get_details() details = client.import_assets.get_details(limit=100) details = client.import_assets.get_details(limit=100, get_all=True) details = [] for entry in client.import_assets.get_details(limit=100, asynchronous=True, get_all=True): details.extend(entry) """ if space_id is None and project_id is None: raise WMLClientError("Its mandatory to provide space_id or project_id") if space_id is not None and project_id is not None: raise WMLClientError( "Either 'space_id' or 'project_id' can be provided, not both" ) Import._validate_type(import_id, "import_id", str, False) Import._validate_type(limit, "limit", int, False) href = self._client.service_instance._href_definitions.imports_href() params = {} if space_id is not None: params.update({"space_id": space_id}) elif project_id is not None: params.update({"project_id": project_id}) if import_id is None: return self._get_artifact_details( href, import_id, limit, "import job", query_params=params, _async=asynchronous, _all=get_all, ) else: return self._get_artifact_details( href, import_id, limit, "import job", query_params=params )
[docs] def list( self, space_id: str | None = None, project_id: str | None = None, limit: int | None = None, ) -> pd.DataFrame: """Return import jobs in a table format. If limit is set to None, only the first 50 records are shown. :param space_id: space identifier :type space_id: str, optional :param project_id: project identifier :type project_id: str, optional :param limit: limit number of fetched records :type limit: int, optional :return: pandas.DataFrame with listed assets :rtype: pandas.DataFrame **Example:** .. code-block:: python client.import_assets.list() """ if space_id is None and project_id is None: raise WMLClientError("Its mandatory to provide space_id or project_id") if space_id is not None and project_id is not None: raise WMLClientError( "Either 'space_id' or 'project_id' can be provided, not both" ) if space_id is not None: resources = self.get_details(space_id=space_id)["resources"] else: resources = self.get_details(project_id=project_id)["resources"] values = [ ( m["metadata"]["id"], m["metadata"]["created_at"], m["entity"]["status"]["state"], ) for m in resources ] table = self._list( values, ["ID", "CREATED", "STATUS"], limit, _DEFAULT_LIST_LENGTH ) return table
[docs] @staticmethod def get_id(import_details: dict) -> str: """Get ID of the import job from import details. :param import_details: metadata of the import job :type import_details: dict :return: ID of the import job :rtype: str **Example:** .. code-block:: python id = client.import_assets.get_id(import_details) """ Import._validate_type(import_details, "import_details", object, True) return WMLResource._get_required_element_from_dict( import_details, "import_details", ["metadata", "id"] )