Source code for ibm_watsonx_ai.foundation_models.extractions.text_extractions_v2
# -----------------------------------------------------------------------------------------
# (C) Copyright IBM Corp. 2025.
# https://opensource.org/licenses/BSD-3-Clause
# -----------------------------------------------------------------------------------------
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
from ibm_watsonx_ai.foundation_models.extractions.text_extractions_v2_result_formats import (
TextExtractionsV2ResultFormats,
)
from ibm_watsonx_ai.helpers import DataConnection
from ibm_watsonx_ai.wml_client_error import (
InvalidMultipleArguments,
InvalidValue,
UnexpectedType,
WMLClientError,
)
from ibm_watsonx_ai.wml_resource import WMLResource
if TYPE_CHECKING:
import pandas
from ibm_watsonx_ai import APIClient, Credentials
[docs]
class TextExtractionsV2(WMLResource):
"""Instantiate the Text Extraction service.
:param credentials: credentials to the watsonx.ai instance
:type credentials: Credentials, optional
:param project_id: ID of the project, defaults to None
:type project_id: str, optional
:param space_id: ID of the space, defaults to None
:type space_id: str, optional
:param api_client: initialized APIClient object with a set project ID or space ID. If passed, ``credentials`` and ``project_id``/``space_id`` are not required, defaults to None
:type api_client: APIClient, optional
:raises InvalidMultipleArguments: raised when neither `api_client` nor `credentials` alongside `space_id` or `project_id` are provided
:raises WMLClientError: raised if the CPD version is less than 5.1
.. code-block:: python
from ibm_watsonx_ai import Credentials
from ibm_watsonx_ai.foundation_models.extractions import TextExtractionsV2
extraction = TextExtractionsV2(
credentials=Credentials(
api_key = IAM_API_KEY,
url = "https://us-south.ml.cloud.ibm.com"),
project_id="*****"
)
"""
def __init__(
self,
credentials: Credentials | None = None,
project_id: str | None = None,
space_id: str | None = None,
api_client: APIClient | None = None,
) -> None:
if credentials is not None:
from ibm_watsonx_ai import APIClient
self._client = APIClient(credentials)
elif api_client is not None:
self._client = api_client
else:
raise InvalidMultipleArguments(
params_names_list=["credentials", "api_client"],
reason="None of the arguments were provided.",
)
if space_id is not None:
self._client.set.default_space(space_id)
elif project_id is not None:
self._client.set.default_project(project_id)
elif not api_client:
raise InvalidMultipleArguments(
params_names_list=["space_id", "project_id"],
reason="None of the arguments were provided.",
)
if not self._client.CLOUD_PLATFORM_SPACES:
cpd_version_error_message: str | None = None
if self._client.CPD_version < 5.0:
cpd_version_error_message = "Operation is unsupported for this release."
elif self._client.CPD_version <= 5.1:
cpd_version_error_message = (
f"For watsonx.ai software {self._client.CPD_version} release, please use "
"`ibm_watsonx_ai.foundation_models.extractions.TextExtractions` class."
)
if cpd_version_error_message:
raise WMLClientError(cpd_version_error_message)
super().__init__(__name__, self._client)
[docs]
def run_job(
self,
document_reference: DataConnection,
results_reference: DataConnection,
result_formats: (
TextExtractionsV2ResultFormats
| list[TextExtractionsV2ResultFormats]
| list[str]
| None
) = None,
parameters: dict | None = None,
) -> dict:
"""Start a request to extract text and metadata from a document.
:param document_reference: reference to the document in the bucket from which text will be extracted
:type document_reference: DataConnection
:param results_reference: reference to the location in the bucket where results will saved
:type results_reference: DataConnection
:param result_formats: result formats for the text extraction, can be passed as an enum or list, defaults to None
:type result_formats: TextExtractionsV2ResultFormats | list[TextExtractionsV2ResultFormats] | list[str], optional
:param parameters: the parameters for the text extraction, defaults to None
:type parameters: dict | None, optional
:return: raw response from the server with the text extraction job details
:rtype: dict
**Example:**
.. code-block:: python
from ibm_watsonx_ai.foundation_models.extractions import TextExtractionsV2ResultFormats
from ibm_watsonx_ai.metanames import TextExtractionsV2ParametersMetaNames
from ibm_watsonx_ai.helpers import DataConnection, S3Location
document_reference = DataConnection(
connection_asset_id="<connection_id>",
location=S3Location(bucket="<bucket_name>", path="path/to/file"),
)
results_reference = DataConnection(
connection_asset_id="<connection_id>",
location=S3Location(bucket="<bucket_name>", path="path/to/directory/"), # Path must end with /
)
response = extraction.run_job(
document_reference=document_reference,
results_reference=results_reference,
parameters={
TextExtractionsV2ParametersMetaNames.MODE: "high_quality",
TextExtractionsV2ParametersMetaNames.OCR_MODE: "enabled",
TextExtractionsV2ParametersMetaNames.LANGUAGES: ["en", "fr"],
TextExtractionsV2ParametersMetaNames.AUTO_ROTATION_CORRECTION: True,
TextExtractionsV2ParametersMetaNames.CREATE_EMBEDDED_IMAGES: "enabled_placeholder",
TextExtractionsV2ParametersMetaNames.OUTPUT_DPI: 72,
TextExtractionsV2ParametersMetaNames.KVP_MODE: "invoice",
},
result_formats=[
TextExtractionsV2ResultFormats.PLAIN_TEXT,
TextExtractionsV2ResultFormats.MARKDOWN,
TextExtractionsV2ResultFormats.ASSEMBLY_JSON,
]
)
"""
if not isinstance(document_reference, DataConnection):
raise UnexpectedType(
el_name="document_reference",
expected_type=DataConnection,
actual_type=type(document_reference),
)
if not isinstance(results_reference, DataConnection):
raise UnexpectedType(
el_name="results_reference",
expected_type=DataConnection,
actual_type=type(results_reference),
)
if result_formats is None:
result_formats = TextExtractionsV2ResultFormats.PLAIN_TEXT
self._validate_type(parameters, "parameters", dict, False)
payload = {
"document_reference": document_reference.to_dict(),
"results_reference": results_reference.to_dict(),
"parameters": {
"requested_outputs": (
[result_formats]
if isinstance(result_formats, TextExtractionsV2ResultFormats)
else result_formats
),
},
}
if self._client.default_project_id is not None:
payload["project_id"] = self._client.default_project_id
elif self._client.default_space_id is not None:
payload["space_id"] = self._client.default_space_id
if parameters is not None:
payload["parameters"].update(parameters)
response = self._client.httpx_client.post(
url=self._client._href_definitions.get_text_extractions_href(),
json=payload,
params=self._client._params(skip_for_create=True, skip_userfs=True),
headers=self._client._get_headers(),
)
return self._handle_response(201, "run_job", response)
[docs]
def list_jobs(self, limit: int | None = None) -> pandas.DataFrame:
"""List text extraction jobs. If limit is None, all jobs will be listed.
:param limit: limit number of fetched records, defaults to None
:type limit: int | None, optional
:return: text extraction jobs information as a pandas DataFrame
:rtype: pandas.DataFrame
**Example:**
.. code-block:: python
extraction.list_jobs()
"""
import pandas
columns = ["metadata.id", "metadata.created_at", "entity.results.status"]
details = self.get_job_details(limit=limit)
resources = details["resources"]
data_normalize = pandas.json_normalize(resources)
extraction_data = data_normalize.reindex(columns=columns)
df_details = pandas.DataFrame(extraction_data, columns=columns)
df_details.rename(
columns={
"metadata.id": "EXTRACTION_JOB_ID",
"metadata.created_at": "CREATED",
"entity.results.status": "STATUS",
},
inplace=True,
)
return df_details
[docs]
def get_job_details(
self, extraction_job_id: str | None = None, limit: int | None = None
) -> dict:
"""Return text extraction job details. If `extraction_job_id` is None, return the details of all text extraction jobs.
:param extraction_job_id: ID of the text extraction job, defaults to None
:type extraction_job_id: str | None, optional
:param limit: limit number of fetched records, defaults to None
:type limit: int | None, optional
:return: details of the text extraction job
:rtype: dict
**Example:**
.. code-block:: python
extraction.get_job_details(extraction_job_id="<extraction_job_id>")
"""
self._validate_type(extraction_job_id, "extraction_job_id", str, False)
if extraction_job_id is not None:
response = self._client.httpx_client.get(
url=self._client._href_definitions.get_text_extraction_href(
extraction_job_id
),
params=self._client._params(skip_userfs=True),
headers=self._client._get_headers(),
)
elif limit is None or 1 <= limit <= 200:
params = self._client._params(skip_userfs=True)
if limit is not None:
params["limit"] = limit
# TODO: pagination is not yet implemented
response = self._client.httpx_client.get(
url=self._client._href_definitions.get_text_extractions_href(),
params=params,
headers=self._client._get_headers(),
)
else:
raise InvalidValue(
value_name="limit",
reason=f"The given value {limit} is not in between 1 and 200",
)
return self._handle_response(200, "get_job_details", response)
[docs]
def delete_job(self, extraction_job_id: str) -> Literal["SUCCESS"]:
"""Delete a text extraction job.
:param extraction_job_id: ID of text extraction job
:type extraction_job_id: str
:return: "SUCCESS" if the deletion succeeds
:rtype: str
**Example:**
.. code-block:: python
extraction.delete_job(extraction_job_id="<extraction_job_id>")
"""
self._validate_type(extraction_job_id, "extraction_job_id", str, True)
params = self._client._params(skip_userfs=True)
params["hard_delete"] = True
response = self._client.httpx_client.delete(
url=self._client._href_definitions.get_text_extraction_href(
extraction_job_id
),
params=params,
headers=self._client._get_headers(),
)
return self._handle_response(204, "delete_job", response) # type: ignore[return-value]
[docs]
def cancel_job(self, extraction_job_id: str) -> Literal["SUCCESS"]:
"""Cancel a text extraction job.
:param extraction_job_id: ID of text extraction job
:type extraction_job_id: str
:return: "SUCCESS" if the cancellation succeeds
:rtype: str
**Example:**
.. code-block:: python
extraction.cancel_job(extraction_job_id="<extraction_job_id>")
"""
self._validate_type(extraction_job_id, "extraction_job_id", str, True)
response = self._client.httpx_client.delete(
url=self._client._href_definitions.get_text_extraction_href(
extraction_job_id
),
params=self._client._params(skip_userfs=True),
headers=self._client._get_headers(),
)
return self._handle_response(204, "cancel_job", response) # type: ignore[return-value]
[docs]
def get_results_reference(self, extraction_job_id: str) -> DataConnection:
"""Get a `DataConnection` instance that is a reference to the results stored on COS.
:param extraction_job_id: ID of text extraction job
:type extraction_job_id: str
:return: location of the Data Connection to text extraction job results
:rtype: DataConnection
**Example:**
.. code-block:: python
results_reference = extraction.get_results_reference(extraction_job_id="<extraction_job_id>")
"""
self._validate_type(extraction_job_id, "extraction_job_id", str, True)
job_details = self.get_job_details(extraction_job_id)
results_reference = job_details.get("entity", {}).get("results_reference")
data_conn = DataConnection._from_dict(results_reference)
data_conn.set_client(self._client)
return data_conn
[docs]
@classmethod
def get_job_id(cls, extraction_details: dict) -> str:
"""Get the unique ID of a stored extraction request.
:param extraction_details: metadata of the stored extraction
:type extraction_details: dict
:return: unique ID of the stored extraction request
:rtype: str
**Example:**
.. code-block:: python
extraction_details = extraction.run_job(...)
extraction_job_id = extraction.get_id(extraction_details)
"""
cls._validate_type(extraction_details, "extraction_details", dict, True)
return cls._get_required_element_from_dict(
extraction_details, "extraction_details", ["metadata", "id"]
)