# -----------------------------------------------------------------------------------------
# (C) Copyright IBM Corp. 2023-2024.
# https://opensource.org/licenses/BSD-3-Clause
# -----------------------------------------------------------------------------------------
from __future__ import annotations
from typing import TYPE_CHECKING, cast
from ibm_watsonx_ai.messages.messages import Messages
from ibm_watsonx_ai.wml_resource import WMLResource
from ibm_watsonx_ai.wml_client_error import WMLClientError
from ibm_watsonx_ai.utils.autoai.errors import ContainerTypeNotSupported
from ibm_watsonx_ai.helpers.connections import (
DataConnection,
ContainerLocation,
S3Connection,
S3Location,
FSLocation,
AssetLocation,
)
from ibm_watsonx_ai.utils.autoai.utils import is_ipython
from ibm_watsonx_ai.foundation_models.utils import PromptTuningParams
import datetime
import numpy as np
if TYPE_CHECKING:
from ibm_watsonx_ai import APIClient
from pandas import DataFrame
[docs]
class PromptTuner:
id: str | None = None
_client: APIClient = None # type: ignore[assignment]
_training_metadata: dict | None = None
def __init__(
self,
name: str,
task_id: str,
*,
description: str | None = None,
base_model: str | None = None,
accumulate_steps: int | None = None,
batch_size: int | None = None,
init_method: str | None = None,
init_text: str | None = None,
learning_rate: float | None = None,
max_input_tokens: int | None = None,
max_output_tokens: int | None = None,
num_epochs: int | None = None,
verbalizer: str | None = None,
tuning_type: str | None = None,
auto_update_model: bool = True,
group_by_name: bool | None = None,
):
self.name = name
self.description = description if description else "Prompt tuning with SDK"
self.auto_update_model = auto_update_model
self.group_by_name = group_by_name
base_model_red: dict = {"model_id": base_model}
self.prompt_tuning_params = PromptTuningParams(
base_model=base_model_red,
accumulate_steps=accumulate_steps,
batch_size=batch_size,
init_method=init_method,
init_text=init_text,
learning_rate=learning_rate,
max_input_tokens=max_input_tokens,
max_output_tokens=max_output_tokens,
num_epochs=num_epochs,
task_id=task_id,
tuning_type=tuning_type,
verbalizer=verbalizer,
)
if not isinstance(self.name, str):
raise WMLClientError(
f"'name' param expected string, but got {type(self.name)}: {self.name}"
)
if self.description and (not isinstance(self.description, str)):
raise WMLClientError(
f"'description' param expected string, but got {type(self.description)}: "
f"{self.description}"
)
if self.auto_update_model and (not isinstance(self.auto_update_model, bool)):
raise WMLClientError(
f"'auto_update_model' param expected bool, but got {type(self.auto_update_model)}: "
f"{self.auto_update_model}"
)
if self.group_by_name and (not isinstance(self.group_by_name, bool)):
raise WMLClientError(
f"'group_by_name' param expected bool, but got {type(self.group_by_name)}: "
f"{self.group_by_name}"
)
[docs]
def run(
self,
training_data_references: list[DataConnection],
training_results_reference: DataConnection | None = None,
background_mode: bool = False,
) -> dict:
"""Run a prompt tuning process of a foundation model on top of the training data referenced by DataConnection.
:param training_data_references: data storage connection details to inform where the training data is stored
:type training_data_references: list[DataConnection]
:param training_results_reference: data storage connection details to store pipeline training results
:type training_results_reference: DataConnection, optional
:param background_mode: indicator if the fit() method will run in the background, async or sync
:type background_mode: bool, optional
:return: run details
:rtype: dict
**Example:**
.. code-block:: python
from ibm_watsonx_ai.experiment import TuneExperiment
from ibm_watsonx_ai.helpers import DataConnection, S3Location
experiment = TuneExperiment(credentials, ...)
prompt_tuner = experiment.prompt_tuner(...)
prompt_tuner.run(
training_data_references=[DataConnection(
connection_asset_id=connection_id,
location=S3Location(
bucket='prompt_tuning_data',
path='pt_train_data.json')
)
)]
background_mode=False)
"""
WMLResource._validate_type(
training_data_references, "training_data_references", list, mandatory=True
)
WMLResource._validate_type(
training_results_reference,
"training_results_reference",
object,
mandatory=False,
)
for source_data_connection in [training_data_references]:
if source_data_connection:
self._validate_source_data_connections(source_data_connection)
training_results_reference = self._determine_result_reference(
results_reference=training_results_reference,
data_references=training_data_references,
)
self._initialize_training_metadata(
training_data_references,
test_data_references=None,
training_results_reference=training_results_reference,
)
self._training_metadata = cast(dict, self._training_metadata)
tuning_details = self._client.training.run(
meta_props=self._training_metadata, asynchronous=background_mode
)
self.id = self._client.training.get_id(tuning_details)
return self._client.training.get_details(
self.id
) # TODO improve the background_mode = False option
def _initialize_training_metadata(
self,
training_data_references: list[DataConnection],
test_data_references: list[DataConnection] | None = None,
training_results_reference: DataConnection | None = None,
) -> None:
self._training_metadata = {
self._client.training.ConfigurationMetaNames.TAGS: self._get_tags(),
self._client.training.ConfigurationMetaNames.TRAINING_DATA_REFERENCES: [
connection._to_dict() for connection in training_data_references
],
self._client.training.ConfigurationMetaNames.NAME: f"{self.name[:100]}",
self._client.training.ConfigurationMetaNames.PROMPT_TUNING: self.prompt_tuning_params.to_dict(),
}
if test_data_references:
self._training_metadata[
self._client.training.ConfigurationMetaNames.TEST_DATA_REFERENCES
] = [connection._to_dict() for connection in test_data_references]
if training_results_reference:
self._training_metadata[
self._client.training.ConfigurationMetaNames.TRAINING_RESULTS_REFERENCE
] = training_results_reference._to_dict()
if self.description:
self._training_metadata[
self._client.training.ConfigurationMetaNames.DESCRIPTION
] = f"{self.description}"
if self.auto_update_model is not None:
self._training_metadata[
self._client.training.ConfigurationMetaNames.AUTO_UPDATE_MODEL
] = self.auto_update_model
def _validate_source_data_connections(
self, source_data_connections: list[DataConnection]
) -> list[DataConnection]:
for data_connection in source_data_connections:
if isinstance(data_connection.location, ContainerLocation):
if self._client.ICP_PLATFORM_SPACES:
raise ContainerTypeNotSupported() # block Container type on CPD
elif isinstance(data_connection.connection, S3Connection):
# note: remove S3 inline credential from data asset before training
data_connection.connection = None
if hasattr(data_connection.location, "bucket"):
delattr(data_connection.location, "bucket")
# --- end note
if isinstance(data_connection.connection, S3Connection) and isinstance(
data_connection.location, AssetLocation
):
# note: remove S3 inline credential from data asset before training
data_connection.connection = None
for s3_attr in ["bucket", "path"]:
if hasattr(data_connection.location, s3_attr):
delattr(data_connection.location, s3_attr)
# --- end note
return source_data_connections
def _determine_result_reference(
self,
results_reference: DataConnection | None,
data_references: list[DataConnection],
result_path: str = "default_tuning_output",
) -> DataConnection:
# note: if user did not provide results storage information, use default ones
if results_reference is None:
if self._client.ICP_PLATFORM_SPACES:
location = FSLocation(path="/{option}/{id}/assets/wx_prompt_tune")
if self._client.default_project_id is None:
location.path = location.path.format(
option="spaces", id=self._client.default_space_id
)
else:
location.path = location.path.format(
option="projects", id=self._client.default_project_id
)
results_reference = DataConnection(connection=None, location=location)
else:
if isinstance(data_references[0].location, S3Location):
results_reference = DataConnection(
connection=data_references[0].connection,
location=S3Location(
bucket=data_references[0].location.bucket, path="."
),
)
elif isinstance(data_references[0].location, AssetLocation):
connection_id = data_references[0].location._get_connection_id(
self._client
)
if connection_id is not None:
results_reference = DataConnection(
connection_asset_id=connection_id,
location=S3Location(
bucket=data_references[0].location._get_bucket(
self._client
),
path=result_path,
),
)
else: # set container output location when default DAta Asset is as a train ref
results_reference = DataConnection(
location=ContainerLocation(path=result_path)
)
else:
results_reference = DataConnection(
location=ContainerLocation(path=result_path)
)
# -- end note
# note: validate location types:
if self._client.ICP_PLATFORM_SPACES:
if not isinstance(results_reference.location, FSLocation):
raise TypeError(
"Unsupported results location type. Results reference can be stored on FSLocation."
)
else:
if not isinstance(
results_reference.location, (S3Location, ContainerLocation)
):
raise TypeError(
"Unsupported results location type. Results reference can be stored"
" only on S3Location or ContainerLocation."
)
# -- end note
return results_reference
def _get_tags(self) -> list:
tags = ["prompt_tuning"]
if self.group_by_name is not None and self.group_by_name:
for training in self._client.training.get_details(
tag_value="prompt_tuning"
)["resources"]:
if training["metadata"].get("name") == self.name:
# Find recent tags related to 'name'
tags = list(set(tags) | set(training["metadata"].get("tags")))
break
if tags != ["prompt_tuning"]:
self._client.generate_ux_tag = False
return tags
@staticmethod
def _get_last_iteration_metrics_for_each_epoch(tuning_details: dict) -> list:
last_iteration_metrics_for_each_epoch = []
for ind in range(len(tuning_details["entity"]["status"]["metrics"])):
if ind == 0:
last_iteration_metrics_for_each_epoch.append(
tuning_details["entity"]["status"]["metrics"][0]
)
else:
if (
tuning_details["entity"]["status"]["metrics"][ind]["ml_metrics"][
"epoch"
]
== tuning_details["entity"]["status"]["metrics"][ind - 1][
"ml_metrics"
]["epoch"]
):
last_iteration_metrics_for_each_epoch.pop()
last_iteration_metrics_for_each_epoch.append(
tuning_details["entity"]["status"]["metrics"][ind]
)
else:
last_iteration_metrics_for_each_epoch.append(
tuning_details["entity"]["status"]["metrics"][ind]
)
return last_iteration_metrics_for_each_epoch
@staticmethod
def _get_average_loss_score_for_each_epoch(tuning_details: dict) -> list:
scores = []
temp_score = []
epoch = 0
if "data" in tuning_details["entity"]["status"]["metrics"][0]:
for ind, metric in enumerate(tuning_details["entity"]["status"]["metrics"]):
if int(metric["data"]["epoch"]) == epoch:
temp_score.append(metric["data"]["value"])
else:
epoch += 1
scores.append(np.average(temp_score))
temp_score = [metric["data"]["value"]]
scores.append(np.average(temp_score))
else:
for ind, metric in enumerate(tuning_details["entity"]["status"]["metrics"]):
if int(metric["ml_metrics"]["epoch"]) == epoch:
temp_score.append(metric["ml_metrics"]["loss"])
else:
epoch += 1
scores.append(np.average(temp_score))
temp_score = [metric["ml_metrics"]["loss"]]
scores.append(np.average(temp_score))
return scores
@staticmethod
def _get_first_and_last_iteration_metrics_for_each_epoch(
tuning_details: dict,
) -> list:
first_and_last_iteration_metrics_for_each_epoch = []
first_iteration = True
tuning_metrics = tuning_details["entity"]["status"]["metrics"]
for ind in range(len(tuning_metrics)):
if ind == 0:
first_and_last_iteration_metrics_for_each_epoch.append(
tuning_metrics[ind]
)
first_and_last_iteration_metrics_for_each_epoch.append(
tuning_metrics[ind]
)
first_iteration = False
elif first_iteration:
first_and_last_iteration_metrics_for_each_epoch.append(
tuning_metrics[ind]
)
first_iteration = False
else:
if (
tuning_metrics[ind].get(
"data", tuning_metrics[ind].get("ml_metrics")
)["epoch"]
== tuning_metrics[ind - 1].get(
"data", tuning_metrics[ind - 1].get("ml_metrics")
)["epoch"]
):
first_and_last_iteration_metrics_for_each_epoch.pop()
first_and_last_iteration_metrics_for_each_epoch.append(
tuning_metrics[ind]
)
else:
first_and_last_iteration_metrics_for_each_epoch.append(
tuning_metrics[ind]
)
first_iteration = True
return first_and_last_iteration_metrics_for_each_epoch
[docs]
def get_params(self) -> dict:
"""Get configuration parameters of PromptTuner.
:return: PromptTuner parameters
:rtype: dict
**Example:**
.. code-block:: python
from ibm_watsonx_ai.experiment import TuneExperiment
experiment = TuneExperiment(credentials, ...)
prompt_tuner = experiment.prompt_tuner(...)
prompt_tuner.get_params()
# Result:
#
# {'base_model': {'name': 'google/flan-t5-xl'},
# 'task_id': 'summarization',
# 'name': 'Prompt Tuning of Flan T5 model',
# 'auto_update_model': False,
# 'group_by_name': False}
"""
params = self.prompt_tuning_params.to_dict()
params["name"] = self.name
params["description"] = self.description
params["auto_update_model"] = self.auto_update_model
params["group_by_name"] = self.group_by_name
return params
#####################
# Run operations #
#####################
[docs]
def get_run_status(self) -> str:
"""Check the status/state of an initialized prompt tuning run if it was run in background mode.
:return: status of the Prompt Tuning run
:rtype: str
**Example:**
.. code-block:: python
from ibm_watsonx_ai.experiment import TuneExperiment
experiment = TuneExperiment(credentials, ...)
prompt_tuner = experiment.prompt_tuner(...)
prompt_tuner.run(...)
prompt_tuner.get_run_details()
# Result:
# 'completed'
"""
if self.id is None:
raise WMLClientError(
Messages.get_message(message_id="fm_prompt_tuning_not_scheduled")
)
return self._client.training.get_status(training_id=self.id).get("state") # type: ignore[return-value]
[docs]
def get_run_details(self, include_metrics: bool = False) -> dict:
"""Get details of a prompt tuning run.
:param include_metrics: indicates to include metrics in the training details output
:type include_metrics: bool, optional
:return: details of the prompt tuning
:rtype: dict
**Example:**
.. code-block:: python
from ibm_watsonx_ai.experiment import TuneExperiment
experiment = TuneExperiment(credentials, ...)
prompt_tuner = experiment.prompt_tuner(...)
prompt_tuner.run(...)
prompt_tuner.get_run_details()
"""
if self.id is None:
raise WMLClientError(
Messages.get_message(message_id="fm_prompt_tuning_not_scheduled")
)
details = self._client.training.get_details(training_id=self.id)
if include_metrics:
try:
details["entity"]["status"]["metrics"] = (
self._get_metrics_data_from_property_or_file(details)
)
except KeyError:
pass
finally:
return details
if details["entity"]["status"].get("metrics", False):
del details["entity"]["status"]["metrics"]
return details
def _get_metrics_data_from_property_or_file(self, details: dict) -> dict:
path = details["entity"]["status"]["metrics"][0]["context"]["prompt_tuning"][
"metrics_location"
]
results_reference = details["entity"]["results_reference"]
conn = DataConnection._from_dict(results_reference)
conn._api_client = self._client
metrics_data = conn._download_json_file(path, tuning_type="prompt_tuning")
return metrics_data
[docs]
def plot_learning_curve(self) -> None:
"""Plot learning curves.
.. note ::
Available only for Jupyter notebooks.
**Example:**
.. code-block:: python
from ibm_watsonx_ai.experiment import TuneExperiment
experiment = TuneExperiment(credentials, ...)
prompt_tuner = experiment.prompt_tuner(...)
prompt_tuner.run(...)
prompt_tuner.plot_learning_curve()
"""
if not is_ipython():
raise WMLClientError(
"Function `plot_learning_curve` is available only for Jupyter notebooks."
)
from ibm_watsonx_ai.utils.autoai.incremental import plot_learning_curve
import matplotlib.pyplot as plt
tuning_details = self.get_run_details(include_metrics=True)
if "metrics" in tuning_details["entity"]["status"]:
# average loss score for each epoch
scores = self._get_average_loss_score_for_each_epoch(
tuning_details=tuning_details
)
# date_time from the first and last iteration on each epoch
if "data" in tuning_details["entity"]["status"]["metrics"][0]:
date_times = [
datetime.datetime.strptime(
m_obj["data"]["timestamp"], "%Y-%m-%dT%H:%M:%S.%f"
)
for m_obj in self._get_first_and_last_iteration_metrics_for_each_epoch(
tuning_details=tuning_details
)
]
else:
date_times = [
datetime.datetime.strptime(
m_obj["timestamp"], "%Y-%m-%dT%H:%M:%S.%f%z"
)
for m_obj in self._get_first_and_last_iteration_metrics_for_each_epoch(
tuning_details=tuning_details
)
]
elapsed_time = []
for i in range(1, len(date_times), 2):
elapsed_time.append((date_times[i] - date_times[i - 1]).total_seconds())
fig, axes = plt.subplots(1, 3, figsize=(18, 4))
if scores:
plot_learning_curve(
fig=fig,
axes=axes,
scores=scores,
fit_times=elapsed_time,
xlabels={"first_xlabel": "Epochs", "second_xlabel": "Epochs"},
titles={"first_plot": "Loss function"},
)
else:
raise WMLClientError(
Messages.get_message(message_id="fm_prompt_tuning_no_metrics")
)
[docs]
def summary(self, scoring: str = "loss") -> DataFrame:
"""Print the details of PromptTuner models (prompt-tuned models).
:param scoring: scoring metric for sorting pipelines,
when not provided, uses loss one
:type scoring: string, optional
:return: computed models and metrics
:rtype: pandas.DataFrame
**Example:**
.. code-block:: python
from ibm_watsonx_ai.experiment import TuneExperiment
experiment = TuneExperiment(credentials, ...)
prompt_tuner = experiment.prompt_tuner(...)
prompt_tuner.run(...)
prompt_tuner.summary()
# Result:
# Enhancements Base model ... loss
# Model Name
# Prompt_tuned_M_1 [prompt_tuning] google/flan-t5-xl ... 0.449197
"""
if self.id is None:
raise WMLClientError(
Messages.get_message(message_id="fm_prompt_tuning_not_scheduled")
)
from pandas import DataFrame
details = self.get_run_details(include_metrics=True)
metrics = details["entity"]["status"].get("metrics", [{}])[0]
is_ml_metrics = "data" in metrics or "ml_metrics" in metrics
if not is_ml_metrics:
raise WMLClientError(
Messages.get_message(message_id="fm_prompt_tuning_no_metrics")
)
columns = [
"Model Name",
"Enhancements",
"Base model",
"Auto store",
"Epochs",
scoring,
]
values = []
model_name = "model_" + self.id
base_model_name = None
epochs = None
enhancements = []
if scoring == "loss":
model_metrics = [
self._get_average_loss_score_for_each_epoch(tuning_details=details)[-1]
]
else:
if "data" in details["entity"]["status"]["metrics"][0]:
model_metrics = [
details["entity"]["status"]
.get("metrics", [{}])[-1]
.get("data", {})[scoring]
]
else:
model_metrics = [
details["entity"]["status"]
.get("metrics", [{}])[-1]
.get("ml_metrics", {})[scoring]
]
if "prompt_tuning" in details["entity"]:
enhancements = [details["entity"]["prompt_tuning"]["tuning_type"]]
base_model_name = details["entity"]["prompt_tuning"]["base_model"][
"model_id"
]
epochs = details["entity"]["prompt_tuning"]["num_epochs"]
values.append(
(
[model_name]
+ [enhancements]
+ [base_model_name]
+ [details["entity"]["auto_update_model"]]
+ [epochs]
+ model_metrics
)
)
summary = DataFrame(data=values, columns=columns)
summary.set_index("Model Name", inplace=True)
return summary
[docs]
def get_model_id(self) -> str:
"""Get the model ID.
**Example:**
.. code-block:: python
from ibm_watsonx_ai.experiment import TuneExperiment
experiment = TuneExperiment(credentials, ...)
prompt_tuner = experiment.prompt_tuner(...)
prompt_tuner.run(...)
prompt_tuner.get_model_id()
"""
run_details = self.get_run_details()
if run_details["entity"]["auto_update_model"]:
return run_details["entity"]["model_id"]
else:
raise WMLClientError(
Messages.get_message(message_id="fm_prompt_tuning_no_model_id")
)
[docs]
def cancel_run(self, hard_delete: bool = False) -> None:
"""Cancel or delete a Prompt Tuning run.
:param hard_delete: if True, the completed or cancelled prompt tuning run is deleted,
if False, the current run is canceled. Default: False
:type hard_delete: bool, optional
"""
if self.id is None:
raise WMLClientError(
Messages.get_message(message_id="fm_prompt_tuning_not_scheduled")
)
self._client.training.cancel(training_id=self.id, hard_delete=hard_delete)
[docs]
def get_data_connections(self) -> list[DataConnection]:
"""Create DataConnection objects for further usage
(eg. to handle data storage connection).
:return: list of DataConnections
:rtype: list['DataConnection']
**Example:**
.. code-block:: python
from ibm_watsonx_ai.experiment import TuneExperiment
experiment = TuneExperiment(credentials, ...)
prompt_tuner = experiment.prompt_tuner(...)
prompt_tuner.run(...)
data_connections = prompt_tuner.get_data_connections()
"""
training_data_references = self.get_run_details()["entity"][
"training_data_references"
]
data_connections = [
DataConnection._from_dict(_dict=data_connection)
for data_connection in training_data_references
]
for data_connection in data_connections:
data_connection.set_client(self._client)
data_connection._run_id = self.id
return data_connections