Source code for ibm_watsonx_ai.foundation_models.extractions.text_extractions
# -----------------------------------------------------------------------------------------
# (C) Copyright IBM Corp. 2024-2025.
# https://opensource.org/licenses/BSD-3-Clause
# -----------------------------------------------------------------------------------------
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
from ibm_watsonx_ai.wml_client_error import (
InvalidMultipleArguments,
WMLClientError,
UnexpectedType,
InvalidValue,
)
from ibm_watsonx_ai.wml_resource import WMLResource
from ibm_watsonx_ai._wrappers import requests
from ibm_watsonx_ai.helpers import DataConnection
if TYPE_CHECKING:
from ibm_watsonx_ai import APIClient, Credentials
import pandas
[docs]
class TextExtractions(WMLResource):
"""Instantiate the Text Extraction service.
:param credentials: credentials to the Watson Machine Learning instance
:type credentials: Credentials, optional
:param project_id: ID of the Watson Studio project, defaults to None
:type project_id: str, optional
:param space_id: ID of the Watson Studio space, defaults to None
:type space_id: str, optional
:param api_client: initialized APIClient object with a set project ID or space ID. If passed, ``credentials`` and ``project_id``/``space_id`` are not required, defaults to None
:type api_client: APIClient, optional
:raises InvalidMultipleArguments: raised if `space_id` and `project_id` or `credentials` and `api_client` are provided simultaneously
:raises WMLClientError: raised if the CPD version is less than 5.0
.. code-block:: python
from ibm_watsonx_ai import Credentials
from ibm_watsonx_ai.foundation_models.extractions import TextExtractions
extraction = TextExtractions(
credentials=Credentials(
api_key = IAM_API_KEY,
url = "https://us-south.ml.cloud.ibm.com"),
project_id="*****"
)
"""
def __init__(
self,
credentials: Credentials | None = None,
project_id: str | None = None,
space_id: str | None = None,
api_client: APIClient | None = None,
) -> None:
if credentials is not None:
from ibm_watsonx_ai import APIClient
self._client = APIClient(credentials)
elif api_client is not None:
self._client = api_client
else:
raise InvalidMultipleArguments(
params_names_list=["credentials", "api_client"],
reason="None of the arguments were provided.",
)
if space_id is not None:
self._client.set.default_space(space_id)
elif project_id is not None:
self._client.set.default_project(project_id)
elif not api_client:
raise InvalidMultipleArguments(
params_names_list=["space_id", "project_id"],
reason="None of the arguments were provided.",
)
if not self._client.CLOUD_PLATFORM_SPACES and self._client.CPD_version < 5.0:
raise WMLClientError(error_msg="Operation is unsupported for this release.")
WMLResource.__init__(self, __name__, self._client)
[docs]
def run_job(
self,
document_reference: DataConnection,
results_reference: DataConnection,
steps: dict | None = None,
results_format: Literal["json", "markdown"] = "json",
) -> dict:
"""Start a request to extract text and metadata from a document.
:param document_reference: reference to the document in the bucket from which text will be extracted
:type document_reference: DataConnection
:param results_reference: reference to the location in the bucket where results will saved
:type results_reference: DataConnection
:param steps: steps for the text extraction pipeline, defaults to None
:type steps: dict | None, optional
:param results_format: results format for the text extraction, defaults to "json"
:type results_format: Literal["json", "markdown"], optional
:return: raw response from the server with the text extraction job details
:rtype: dict
**Example:**
.. code-block:: python
from ibm_watsonx_ai.metanames import TextExtractionsMetaNames
from ibm_watsonx_ai.helpers import DataConnection, S3Location
document_reference = DataConnection(
connection_asset_id="<connection_id>",
location=S3Location(bucket="<bucket_name>", path="path/to/file"),
)
results_reference = DataConnection(
connection_asset_id="<connection_id>",
location=S3Location(bucket="<bucket_name>", path="path/to/file"),
)
response = extraction.run_job(
document_reference=document_reference,
results_reference=results_reference,
steps={
TextExtractionsMetaNames.OCR: {"languages_list": ["en", "fr"]},
TextExtractionsMetaNames.TABLE_PROCESSING: {"enabled": True},
},
results_format="markdown"
)
"""
if not isinstance(document_reference, DataConnection):
raise UnexpectedType(
el_name="document_reference",
expected_type=DataConnection,
actual_type=type(document_reference),
)
elif not isinstance(results_reference, DataConnection):
raise UnexpectedType(
el_name="results_reference",
expected_type=DataConnection,
actual_type=type(results_reference),
)
TextExtractions._validate_type(steps, "steps", dict, False)
payload: dict = {}
if self._client.default_project_id is not None:
payload.update({"project_id": self._client.default_project_id})
elif self._client.default_space_id is not None:
payload.update({"space_id": self._client.default_space_id})
payload.update({"document_reference": document_reference._to_dict()})
payload.update({"results_reference": results_reference._to_dict()})
if steps is not None:
payload.update({"steps": steps})
if results_format == "json":
payload.update({"assembly_json": {}})
elif results_format == "markdown":
payload.update({"assembly_md": {}})
else:
raise ValueError(
"Incorrect results format provided. Only 'json' and 'markdown' are supported."
)
response = requests.post(
url=self._client.service_instance._href_definitions.get_text_extraction_href(),
json=payload,
params=self._client._params(skip_for_create=True, skip_userfs=True),
headers=self._client._get_headers(),
)
return self._handle_response(201, "run_job", response)
[docs]
def list_jobs(self, limit: int | None = None) -> pandas.DataFrame:
"""List text extraction jobs. If limit is None, all jobs will be listed.
:param limit: limit number of fetched records, defaults to None
:type limit: int | None, optional
:return: job information of a pandas DataFrame with text extraction
:rtype: pandas.DataFrame
**Example:**
.. code-block:: python
extraction.list_jobs()
"""
import pandas
columns = ["metadata.id", "metadata.created_at", "entity.results.status"]
details = self.get_job_details(limit=limit)
resources = details["resources"]
data_normalize = pandas.json_normalize(resources)
extraction_data = data_normalize.reindex(columns=columns)
df_details: pandas.DataFrame = pandas.DataFrame(
extraction_data, columns=columns
)
df_details.rename(
columns={
"metadata.id": "EXTRACTION_ID",
"metadata.created_at": "CREATED",
"entity.results.status": "STATUS",
},
inplace=True,
)
return df_details
[docs]
def get_job_details(
self, extraction_id: str | None = None, limit: int | None = None
) -> dict:
"""Return text extraction job details. If `extraction_id` is None, returns the details of all text extraction jobs.
:param extraction_id: ID of the text extraction job, defaults to None
:type extraction_id: str | None, optional
:param limit: limit number of fetched records, defaults to None
:type limit: int | None, optional
:return: details of the text extraction job
:rtype: dict
**Example:**
.. code-block:: python
extraction.get_job_details(extraction_id="<extraction_id>")
"""
TextExtractions._validate_type(extraction_id, "extraction_id", str, False)
if extraction_id is not None:
response = requests.get(
url=self._client.service_instance._href_definitions.get_text_extraction_href()
+ f"/{extraction_id}",
params=self._client._params(skip_userfs=True),
headers=self._client._get_headers(),
)
else:
_params: dict | None = None
if limit is not None:
if limit < 1 or limit > 200:
raise InvalidValue(
value_name="limit",
reason=f"The given value {limit} is not in the range <1, 200>",
)
else:
_params = {"limit": limit}
# TODO: pagination is not yet implemented
response = requests.get(
url=self._client.service_instance._href_definitions.get_text_extraction_href(),
params=(self._client._params(skip_userfs=True) | (_params or {})),
headers=self._client._get_headers(),
)
return self._handle_response(200, "get_job_details", response)
[docs]
def delete_job(self, extraction_id: str) -> Literal["SUCCESS"]:
"""Delete a text extraction job.
:return: return "SUCCESS" if the deletion succeeds
:rtype: str
**Example:**
.. code-block:: python
extraction.delete_job(extraction_id="<extraction_id>")
"""
TextExtractions._validate_type(extraction_id, "extraction_id", str, True)
params = self._client._params(skip_userfs=True)
params.update({"hard_delete": True})
response = requests.delete(
url=self._client.service_instance._href_definitions.get_text_extraction_href()
+ f"/{extraction_id}",
params=params,
headers=self._client._get_headers(),
)
return self._handle_response(204, "delete_job", response) # type: ignore[return-value]
[docs]
def cancel_job(self, extraction_id: str) -> Literal["SUCCESS"]:
"""Cancel a text extraction job.
:return: return "SUCCESS" if the cancellation succeeds
:rtype: str
**Example:**
.. code-block:: python
extraction.cancel_job(extraction_id="<extraction_id>")
"""
TextExtractions._validate_type(extraction_id, "extraction_id", str, True)
response = requests.delete(
url=self._client.service_instance._href_definitions.get_text_extraction_href()
+ f"/{extraction_id}",
params=self._client._params(skip_userfs=True),
headers=self._client._get_headers(),
)
return self._handle_response(204, "cancel_job", response) # type: ignore[return-value]
[docs]
def get_results_reference(self, extraction_id: str) -> DataConnection:
"""Get a `DataConnection` instance that is a reference to the results stored on COS.
:param extraction_id: ID of text extraction job
:type extraction_id: str
:return: location of the Data Connection to text extraction job results
:rtype: DataConnection
**Example:**
.. code-block:: python
results_reference = extraction.get_results_reference(extraction_id="<extraction_id>")
"""
TextExtractions._validate_type(extraction_id, "extraction_id", str, True)
job_details = self.get_job_details(extraction_id=extraction_id)
results_reference = job_details.get("entity", {}).get("results_reference")
data_conn = DataConnection._from_dict(results_reference)
data_conn.set_client(self._client)
return data_conn
[docs]
@staticmethod
def get_id(extraction_details: dict) -> str:
"""Get the unique ID of a stored extraction request.
:param extraction_details: metadata of the stored extraction
:type extraction_details: dict
:return: unique ID of the stored extraction request
:rtype: str
**Example:**
.. code-block:: python
extraction_details = extraction.get_job_details(extraction_id)
extraction_id = extraction.get_id(extraction_details)
"""
TextExtractions._validate_type(
extraction_details, "extraction_details", dict, True
)
return WMLResource._get_required_element_from_dict(
extraction_details, "extraction_details", ["metadata", "id"]
)