import json
from collections import defaultdict
from functools import cached_property
from typing import Any, Iterator, NamedTuple, Optional, Type, cast

from genai import Client, Credentials
from genai.schema import (
from genai.text.generation import CreateExecutionOptions as TextGenerationExecutionOptions
from genai.text.tokenization import CreateExecutionOptions as TokenizationExecutionOptions

    import lm_eval.utils
    from lm_eval.api.instance import Instance
    from lm_eval.api.model import LM
    from lm_eval.api.registry import register_model
    from lm_eval.models.utils import Grouper
except ImportError:
    raise ImportError("Could not import lm_eval: Please install ibm-generative-ai[lm-eval] extension.")  # noqa: B904
    from tqdm import tqdm
except ImportError:
    raise ImportError("Could not import tqdm: Please install ibm-generative-ai[lm-eval] extension.")  # noqa: B904

[docs] class LogLikelihoodResult(NamedTuple): log_likelihood: float is_greedy: bool
[docs] def initialize_model(): pass # model is registered by importing this module
[docs] @register_model("ibm_genai") class IBMGenAILMEval(LM): """ Implementation of LM model interface for evaluating GenAI model with the lm_eval framework. See for reference. """ DEFAULT_TOKENIZATION_EXECUTION_OPTIONS = TokenizationExecutionOptions( batch_size=100, concurrency_limit=5, ) DEFAULT_GENERATION_EXECUTION_OPTIONS = TextGenerationExecutionOptions() DEFAULT_NUM_RETRIES = 6 # Increased number of retries for robustness, because evaluation typically runs for hours
[docs] @classmethod def create_from_arg_string( cls: Type["IBMGenAILMEval"], arg_string: str, additional_config: Optional[dict] = None, ) -> "IBMGenAILMEval": """Allow the user to specify model parameters (TextGenerationParameters) in CLI arguments.""" additional_config = {} if additional_config is None else additional_config args = lm_eval.utils.simple_parse_args_string(arg_string) model_id = args.pop("model_id", None) if model_id is None: raise ValueError("'model_id' is required, please pass it in 'model_args'") parameters = TextGenerationParameters.model_validate(args) return cls(client=Client(credentials=Credentials.from_env()), model_id=model_id, parameters=parameters)
[docs] def __init__( self, client: Optional[Client] = None, model_id: Optional[str] = None, parameters: Optional[TextGenerationParameters] = None, show_progressbar: Optional[bool] = True, tokenization_execution_options: Optional[TokenizationExecutionOptions] = None, generation_execution_options: Optional[TextGenerationExecutionOptions] = None, ): super().__init__() self._client = client or Client( credentials=Credentials.from_env(), config={"api_client_config": {"transport_options": {"retries": self.DEFAULT_NUM_RETRIES}}}, ) self._model_id = model_id self._parameters = parameters or TextGenerationParameters() self._show_progressbar = show_progressbar for opts, name in [ (tokenization_execution_options, "tokenization"), (generation_execution_options, "generation"), ]: if opts and opts.ordered is False: raise ValueError(f"Ordering is not configurable for evaluation ({name}_execution_options).") self._tokenization_execution_options = ( tokenization_execution_options or self.DEFAULT_TOKENIZATION_EXECUTION_OPTIONS ) self._generation_execution_options = generation_execution_options or self.DEFAULT_GENERATION_EXECUTION_OPTIONS
@cached_property def model_token_limit(self): return self._client.model.retrieve(id=self._model_id).result.token_limits[0].token_limit
[docs] def dump_parameters(self): return self._parameters.model_dump()
def _tokenize(self, inputs: list[str]) -> Iterator[list[str]]: pb = tqdm(desc="Tokenizing requests", total=len(inputs), disable=not self._show_progressbar) for response in self._client.text.tokenization.create( model_id=self._model_id, input=inputs, parameters=TextTokenizationParameters(return_options=TextTokenizationReturnOptions(tokens=True)), execution_options=self._tokenization_execution_options, ): pb.update(len(response.results)) for result in response.results: yield result.tokens pb.close() def _check_last_token_is_stop_token(self, response_tokens: list[str], context_tokens: list[str]) -> bool: """ Check whether tokens from context and response are the same. Only last token can differ, in case or stop sequence (</s>) Returns: True if only last token differs, False if all tokens are the same Raises: RuntimeError: if some other tokens differ than the last one RuntimeError: if last token differs but context token is substring of response token. Loglikelihood of second part of token is not defined """ context_length = len(context_tokens) if response_tokens[: context_length - 1] != context_tokens[: context_length - 1]: raise RuntimeError( f"There is an unexpected difference between tokenizer and model tokens:\n" f"context_tokens={context_tokens}\n" f"response_tokens={response_tokens[:context_length]}" ) last_context_token = context_tokens[context_length - 1] last_context_token_resp = response_tokens[context_length - 1] if last_context_token != last_context_token_resp and last_context_token_resp.startswith(last_context_token): raise RuntimeError( f"The context sent to loglikelihood evaluation ends with a token ({last_context_token}) " f"that is substring of the continuation token ({last_context_token_resp}).\n" f"context_tokens={context_tokens}\n" f"response_tokens={response_tokens[:context_length]}\n" "This is not allowed as it would skew the results. Please check your data." ) return last_context_token != last_context_token_resp def _check_model_logprobs_support(self): input_tokens = ( list( self._client.text.generation.create( model_id=self._model_id, inputs=["The best ice cream flavor is:"], parameters=self._log_likelihood_parameters, execution_options=self._generation_execution_options, ) )[0] .results[0] .input_tokens ) if all(token.logprob is None for token in input_tokens): raise RuntimeError(f"Model {self._model_id} is not supported: does not return logprobs for input tokens") def _get_log_likelihood(self, input_tokens: list[BaseTokens], context_tokens: list[str]) -> LogLikelihoodResult: response_tokens: list[str] = [token.text for token in input_tokens] context_length = len(context_tokens) if self._check_last_token_is_stop_token(response_tokens, context_tokens): context_length -= 1 return LogLikelihoodResult( log_likelihood=sum(token.logprob for token in input_tokens[context_length:]), is_greedy=all(token.rank == 1 for token in input_tokens[context_length:]), ) @property def _log_likelihood_parameters(self): return TextGenerationParameters.model_validate( { **self._parameters.model_dump(), "max_new_tokens": 1, # 0 is treated like "unlimited" "return_options": TextGenerationReturnOptions( input_tokens=True, token_logprobs=True, token_ranks=True, ), } )
[docs] def loglikelihood(self, requests: list[Instance]) -> list[tuple[float, bool]]: """ Args: requests: Each request contains Instance.args : Tuple[str, str] containing: 1. an input string to the LM and 2. a target string on which the loglikelihood of the LM producing this target, conditioned on the input, will be returned. Returns: tuple (loglikelihood, is_greedy) for each request according to the input order: loglikelihood: probability of generating the target string conditioned on the input is_greedy: True if and only if the target string would be generated by greedy sampling from the LM """ self._check_model_logprobs_support() requests = [request.args for request in requests] results: list[LogLikelihoodResult] = [] contexts_tokenized = list(self._tokenize([context for context, _ in requests])) generation_inputs = [context + continuation for context, continuation in requests] pb = tqdm(desc="Running text generation", total=len(contexts_tokenized), disable=not self._show_progressbar) for response, context_tokens in zip( self._client.text.generation.create( model_id=self._model_id, inputs=generation_inputs, parameters=self._log_likelihood_parameters, execution_options=self._generation_execution_options, ), contexts_tokenized, ): pb.update(len(response.results)) for result in response.results: results.append(self._get_log_likelihood(result.input_tokens, context_tokens)) pb.close() return cast(list[tuple[float, bool]], results)
[docs] def loglikelihood_rolling(self, requests: list[Instance]) -> list[tuple[float, bool]]: """ Used to evaluate perplexity on a data distribution. Args: requests: Each request contains Instance.args : tuple[str] containing an input string to the model whose entire loglikelihood, conditioned on purely the EOT token, will be calculated. Returns: tuple (loglikelihood,) for each request according to the input order: loglikelihood: solely the probability of producing each piece of text given no starting input. """ self._check_model_logprobs_support() generation_inputs = [request.args[0] for request in requests] results: list[LogLikelihoodResult] = [] for response in zip( self._client.text.generation.create( model_id=self._model_id, inputs=generation_inputs, parameters=self._log_likelihood_parameters, execution_options=self._generation_execution_options, ), ): for result in response.results: results.append(self._get_log_likelihood(result.input_tokens, [])) return cast(list[tuple[float, bool]], results)
[docs] def generate_until(self, requests: list[Instance]) -> list[str]: """ From official model_guide: Each request contains Instance.args : Tuple[str, dict] containing: 1. an input string to the LM and 2. a dictionary of keyword arguments used to control generation parameters. Using this input and these generation parameters, text will be sampled from the language model ( typically until a maximum output length or specific stopping string sequences--for example, {"until": ["\n\n", "."], "max_gen_toks": 128} ). The generated input+output text from the model will then be returned. """ # group requests by their args (e.g. temperature, do_sample, etc.) grouper = Grouper(requests, lambda request: json.dumps(request.args[1], sort_keys=True)) results: dict[str, list[str]] = defaultdict(list) pb = tqdm(desc="Running text generation", total=len(requests), disable=not self._show_progressbar) for key, requests_group in grouper.get_grouped().items(): generation_parameters: dict[str, Any] = requests_group[0].args[1] inputs = [request.args[0] for request in requests_group] # Process parameters do_sample = generation_parameters.pop("do_sample", False) decoding_method = DecodingMethod.SAMPLE if do_sample else DecodingMethod.GREEDY until = generation_parameters.pop("until") stop_sequences = [until] if isinstance(until, str) else until stop_sequences.append("<|endoftext|>") # Use same default 256 token limit as huggingface # max_new_tokens = generation_parameters.pop("max_gen_toks", 256) temperature = generation_parameters.pop("temperature", None) truncate_input_tokens = self.model_token_limit - max_new_tokens parameters = TextGenerationParameters.model_validate( { **self._parameters.model_dump(), "decoding_method": decoding_method, "stop_sequences": stop_sequences, "temperature": temperature, "max_new_tokens": max_new_tokens, "truncate_input_tokens": truncate_input_tokens, } ) for response in self._client.text.generation.create( model_id=self._model_id, inputs=inputs, parameters=parameters ): results[key].extend(result.generated_text for result in response.results) pb.update(len(response.results)) pb.close() return grouper.get_original(results)