Semantic Schema¶
Note
SemanticSchema is supported since ibm-watsonx-ai 1.5.8 and IBM Cloud Pak® for Data 5.4.x.
- class ibm_watsonx_ai.foundation_models.semantic_schema.SemanticSchema(*, credentials=None, project_id=None, space_id=None, api_client=None)[source]¶
Bases:
WMLResourceManage semantic schema operations for watsonx.ai.
This class provides a unified interface for managing semantic schemas through specialized operation handlers. Each operation type is accessible through a dedicated attribute that provides specific functionality.
- Parameters:
credentials (Credentials or dict, optional) – credentials for the watsonx.ai instance
project_id (str, optional) – ID of the project
space_id (str, optional) – ID of the space
api_client (APIClient, optional) – initialized APIClient object with a set project ID or space ID
Attributes:
- Variables:
create (CreateSchemas) – Handler for schema creation operations from documents
improve (ImproveSchemas) – Handler for schema improvement operations
merge (MergeSchemas) – Handler for schema merging operations
cluster (ClusterSchemas) – Handler for schema clustering operations
Note
You must provide one of: [‘credentials’, ‘api_client’]
When ‘credentials’ is passed, you must provide one of: [‘project_id’, ‘space_id’]
Example:
from ibm_watsonx_ai import Credentials from ibm_watsonx_ai.foundation_models.semantic_schema import ( SemanticSchema, ) semantic_schema = SemanticSchema( credentials=Credentials( url="https://us-south.ml.cloud.ibm.com", api_key="your_api_key", ), project_id="your_project_id", )
Schema Operations¶
The SemanticSchema class provides access to four specialized operation classes through its attributes:
create- Create new schemas from documentsimprove- Improve existing schemasmerge- Merge multiple schemas into onecluster- Cluster similar schemas
- class ibm_watsonx_ai.foundation_models.semantic_schema.create_schemas.CreateSchemas(api_client)[source]¶
Bases:
BaseCustomSchemasHandle schema creation operations.
This class provides methods to create new schemas from documents through job-based operations. Schema creation analyzes document structure and generates appropriate schema definitions automatically.
- delete_job(job_id)[source]¶
Delete a specific job.
- Parameters:
job_id (str) – ID of the job to delete
- Returns:
“SUCCESS” if the deletion succeeds
- Return type:
str
- Raises:
WMLClientError – if deletion fails or job not found
Example:
job_id = "<create_schemas_job_id>" semantic_schema.create.delete_job(job_id)
- get_job_details(job_id=None, limit=None)[source]¶
Retrieve details of a specific job or all jobs.
- Parameters:
job_id (str | None) – ID of the job to retrieve. If None, returns all jobs
limit (int, optional) – limit number of fetched records, defaults to None
- Returns:
job details or list of all jobs
- Return type:
dict
- Raises:
WMLClientError – if job_id is provided but job not found
Example:
# Get details of a specific create job job_id = "<create_schemas_job_id>" job_details = semantic_schema.create.get_job_details(job_id) # Get all create jobs with limit all_jobs_details = semantic_schema.create.get_job_details(limit=50)
- classmethod get_job_id(job_details)[source]¶
Extract job ID from job details dictionary.
- Parameters:
job_details (dict) – dictionary containing job details
- Returns:
job ID
- Return type:
str
- Raises:
WMLClientError – if job_details is invalid or job ID not found
Example:
job_id = semantic_schema.create.get_job_id(job_details) print(f"Job ID: {job_id}")
- get_results(job_id)[source]¶
Retrieve results of a specific job.
- Parameters:
job_id (str) – ID of the job to retrieve results for
- Returns:
job results
- Return type:
dict
- Raises:
WMLClientError – if job_id is invalid or job not found
Example:
job_id = "<create_schemas_job_id>" results = semantic_schema.create.get_results(job_id) results
- get_status(job_id)[source]¶
Retrieve status of a specific job.
- Parameters:
job_id (str) – ID of the job to retrieve status for
- Returns:
job status
- Return type:
str
- Raises:
WMLClientError – if job_id is invalid or job not found
Example:
job_id = "<create_schemas_job_id>" status = semantic_schema.create.get_status(job_id) print(f"Job status: {status}")
- list_jobs(limit=None)[source]¶
List all jobs.
- Parameters:
limit (int, optional) – limit number of fetched records, defaults to None
- Returns:
DataFrame containing all jobs with their status
- Return type:
pd.DataFrame
Example:
jobs_list = semantic_schema.create.list_jobs(limit=10) jobs_list
- run_job(document_reference, parameters=None)[source]¶
Execute a schema creation job from documents.
- Parameters:
document_reference (DataConnection) – data connection reference to documents for schema creation
parameters (CreateSchemasParameters, dict, optional) – schema creation parameters including schema name and options
- Returns:
job details including job_id and initial status
- Return type:
dict
- Raises:
WMLClientError – if job creation fails
ApiRequestFailure – if API request fails
Example:
from ibm_watsonx_ai.helpers import DataConnection, ContainerLocation document_reference = DataConnection( location=ContainerLocation(path="files/document.pdf") ) job_details = semantic_schema.create.run_job( document_reference=document_reference, parameters={ "mode": "high_quality", "ocr_mode": "enabled", "enable_grounding": False, "auto_rotation_correction": False, "languages": ["en", "latn"], }, )
- class ibm_watsonx_ai.foundation_models.semantic_schema.improve_schemas.ImproveSchemas(api_client)[source]¶
Bases:
BaseCustomSchemasHandle schema improvement operations.
This class provides methods to improve existing schemas through job-based operations. Schema improvement can include refining field definitions, adding metadata, optimizing structure, and enhancing schema quality based on data analysis.
- delete_job(job_id)[source]¶
Delete a specific job.
- Parameters:
job_id (str) – ID of the job to delete
- Returns:
“SUCCESS” if the deletion succeeds
- Return type:
str
- Raises:
WMLClientError – if deletion fails or job not found
Example:
job_id = "<improve_schemas_job_id>" semantic_schema.improve.delete_job(job_id)
- get_job_details(job_id=None, limit=None)[source]¶
Retrieve details of a specific job or all jobs.
- Parameters:
job_id (str | None) – ID of the job to retrieve. If None, returns all jobs
limit (int, optional) – limit number of fetched records, defaults to None
- Returns:
job details or list of all jobs
- Return type:
dict
- Raises:
WMLClientError – if job_id is provided but job not found
Example:
# Get details of a specific improve job job_id = "<improve_schemas_job_id>" job_details = semantic_schema.improve.get_job_details(job_id) # Get all improve jobs with limit all_jobs_details = semantic_schema.improve.get_job_details(limit=50)
- classmethod get_job_id(job_details)[source]¶
Extract job ID from job details dictionary.
- Parameters:
job_details (dict) – dictionary containing job details
- Returns:
job ID
- Return type:
str
- Raises:
WMLClientError – if job_details is invalid or job ID not found
Example:
job_id = semantic_schema.improve.get_job_id(job_details) print(f"Job ID: {job_id}")
- get_results(job_id)[source]¶
Retrieve results of a specific job.
- Parameters:
job_id (str) – ID of the job to retrieve results for
- Returns:
job results
- Return type:
dict
- Raises:
WMLClientError – if job_id is invalid or job not found
Example:
job_id = "<improve_schemas_job_id>" results = semantic_schema.improve.get_results(job_id) results
- get_status(job_id)[source]¶
Retrieve status of a specific job.
- Parameters:
job_id (str) – ID of the job to retrieve status for
- Returns:
job status
- Return type:
str
- Raises:
WMLClientError – if job_id is invalid or job not found
Example:
job_id = "<improve_schemas_job_id>" status = semantic_schema.improve.get_status(job_id) print(f"Job status: {status}")
- list_jobs(limit=None)[source]¶
List all jobs.
- Parameters:
limit (int, optional) – limit number of fetched records, defaults to None
- Returns:
DataFrame containing all jobs with their status
- Return type:
pd.DataFrame
Example:
jobs_list = semantic_schema.improve.list_jobs(limit=10) jobs_list
- run_job(parameters)[source]¶
Execute a schema improvement job.
- Parameters:
parameters (ImproveSchemasParameters, dict) – improvement parameters and options
- Returns:
job details including job_id and initial status
- Return type:
dict
- Raises:
WMLClientError – if job creation fails
ApiRequestFailure – if API request fails
Example:
job_details = semantic_schema.improve.run_job( parameters={ "schema": { "document_type": "Passport", "document_description": "Passport document to get the schema", "fields": { "description": "Name", "example": "name of the user", }, } } )
- class ibm_watsonx_ai.foundation_models.semantic_schema.merge_schemas.MergeSchemas(api_client)[source]¶
Bases:
BaseCustomSchemasHandle schema merging operations.
This class provides methods to merge multiple schemas into a unified schema through job-based operations. Schema merging combines fields from multiple source schemas, resolves conflicts, and creates a consolidated schema structure.
- delete_job(job_id)[source]¶
Delete a specific job.
- Parameters:
job_id (str) – ID of the job to delete
- Returns:
“SUCCESS” if the deletion succeeds
- Return type:
str
- Raises:
WMLClientError – if deletion fails or job not found
Example:
job_id = "<merge_schemas_job_id>" semantic_schema.merge.delete_job(job_id)
- get_job_details(job_id=None, limit=None)[source]¶
Retrieve details of a specific job or all jobs.
- Parameters:
job_id (str | None) – ID of the job to retrieve. If None, returns all jobs
limit (int, optional) – limit number of fetched records, defaults to None
- Returns:
job details or list of all jobs
- Return type:
dict
- Raises:
WMLClientError – if job_id is provided but job not found
Example:
# Get details of a specific merge job job_id = "<merge_schemas_job_id>" job_details = semantic_schema.merge.get_job_details(job_id) # Get all merge jobs with limit all_jobs_details = semantic_schema.merge.get_job_details(limit=50)
- classmethod get_job_id(job_details)[source]¶
Extract job ID from job details dictionary.
- Parameters:
job_details (dict) – dictionary containing job details
- Returns:
job ID
- Return type:
str
- Raises:
WMLClientError – if job_details is invalid or job ID not found
Example:
job_id = semantic_schema.merge.get_job_id(job_details) print(f"Job ID: {job_id}")
- get_results(job_id)[source]¶
Retrieve results of a specific job.
- Parameters:
job_id (str) – ID of the job to retrieve results for
- Returns:
job results
- Return type:
dict
- Raises:
WMLClientError – if job_id is invalid or job not found
Example:
job_id = "<merge_schemas_job_id>" results = semantic_schema.merge.get_results(job_id) results
- get_status(job_id)[source]¶
Retrieve status of a specific job.
- Parameters:
job_id (str) – ID of the job to retrieve status for
- Returns:
job status
- Return type:
str
- Raises:
WMLClientError – if job_id is invalid or job not found
Example:
job_id = "<merge_schemas_job_id>" status = semantic_schema.merge.get_status(job_id) print(f"Job status: {status}")
- list_jobs(limit=None)[source]¶
List all jobs.
- Parameters:
limit (int, optional) – limit number of fetched records, defaults to None
- Returns:
DataFrame containing all jobs with their status
- Return type:
pd.DataFrame
Example:
jobs_list = semantic_schema.merge.list_jobs(limit=10) jobs_list
- run_job(parameters)[source]¶
Execute a schema merging job.
- Parameters:
parameters (MergeSchemasParameters, dict) – merge parameters and options
- Returns:
job details including job_id and initial status
- Return type:
dict
- Raises:
WMLClientError – if job creation fails
ApiRequestFailure – if API request fails
Example:
job_details = semantic_schema.merge.run_job( parameters={ "schemas": [ { "document_type": "Passport", "document_description": "Passport document to get the schema", "fields": { "description": "Name", "example": "name of the user", }, }, { "document_type": "National ID Card", "document_description": "National ID Cards are government-issued identification documents", "fields": { "description": "Name", "example": "Holder legal name as shown on the ID", }, }, ] } )
- class ibm_watsonx_ai.foundation_models.semantic_schema.cluster_schemas.ClusterSchemas(api_client)[source]¶
Bases:
BaseCustomSchemasHandle schema clustering operations.
This class provides methods to cluster and group schemas based on similarity through job-based operations. Schema clustering analyzes multiple schemas to identify patterns, group similar schemas together, and discover schema relationships.
- delete_job(job_id)[source]¶
Delete a specific job.
- Parameters:
job_id (str) – ID of the job to delete
- Returns:
“SUCCESS” if the deletion succeeds
- Return type:
str
- Raises:
WMLClientError – if deletion fails or job not found
Example:
job_id = "<cluster_schemas_job_id>" semantic_schema.cluster.delete_job(job_id)
- get_job_details(job_id=None, limit=None)[source]¶
Retrieve details of a specific job or all jobs.
- Parameters:
job_id (str | None) – ID of the job to retrieve. If None, returns all jobs
limit (int, optional) – limit number of fetched records, defaults to None
- Returns:
job details or list of all jobs
- Return type:
dict
- Raises:
WMLClientError – if job_id is provided but job not found
Example:
# Get details of a specific cluster job job_id = "<cluster_schemas_job_id>" job_details = semantic_schema.cluster.get_job_details(job_id) # Get all cluster jobs with limit all_jobs_details = semantic_schema.cluster.get_job_details(limit=50)
- classmethod get_job_id(job_details)[source]¶
Extract job ID from job details dictionary.
- Parameters:
job_details (dict) – dictionary containing job details
- Returns:
job ID
- Return type:
str
- Raises:
WMLClientError – if job_details is invalid or job ID not found
Example:
job_id = semantic_schema.cluster.get_job_id(job_details) print(f"Job ID: {job_id}")
- get_results(job_id)[source]¶
Retrieve results of a specific job.
- Parameters:
job_id (str) – ID of the job to retrieve results for
- Returns:
job results
- Return type:
dict
- Raises:
WMLClientError – if job_id is invalid or job not found
Example:
job_id = "<cluster_schemas_job_id>" results = semantic_schema.cluster.get_results(job_id) results
- get_status(job_id)[source]¶
Retrieve status of a specific job.
- Parameters:
job_id (str) – ID of the job to retrieve status for
- Returns:
job status
- Return type:
str
- Raises:
WMLClientError – if job_id is invalid or job not found
Example:
job_id = "<cluster_schemas_job_id>" status = semantic_schema.cluster.get_status(job_id) print(f"Job status: {status}")
- list_jobs(limit=None)[source]¶
List all jobs.
- Parameters:
limit (int, optional) – limit number of fetched records, defaults to None
- Returns:
DataFrame containing all jobs with their status
- Return type:
pd.DataFrame
Example:
jobs_list = semantic_schema.cluster.list_jobs(limit=10) jobs_list
- run_job(parameters)[source]¶
Execute a schema clustering job.
- Parameters:
parameters (ClusterSchemasParameters, dict) – clustering parameters and options
- Returns:
job details including job_id and initial status
- Return type:
dict
- Raises:
WMLClientError – if job creation fails
ApiRequestFailure – if API request fails
Example:
job_details = semantic_schema.cluster.run_job( parameters={ "schemas": [ { "document_name": "Passport", "schema": { "document_type": "Passport", "document_description": "Passport document to get the schema", "fields": { "description": "Name", "example": "name of the user", }, }, }, { "document_name": "National_ID_Card", "schema": { "document_type": "National ID Card", "document_description": "National ID Cards are government-issued identification documents", "fields": { "description": "Alice Marie Smith", "example": "Holder legal name as shown on the ID", }, }, }, ] } )