Coverage for mcpgateway / toolops / toolops_altk_service.py: 97%

136 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/toolops/toolops_altk_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Jay Bandlamudi 

6 

7MCP Gateway - Main module for toolops services. 

8 

9This module defines the different toolops services 

10 

11Features and Responsibilities: 

12- Automated test case generation for a tool 

13- Tool meta-data enrichment, 

14- Tool NL test cases execution with MCP server using an agent. 

15 

16Structure: 

17- Import necessary required toolops modules from ALTK package 

18- Overriding ALTK LLM inference modules with MCP-CF inference modules 

19- Creating services for toolops functionalities 

20""" 

21 

22# Standard 

23import os 

24from typing import Any 

25 

26# Third-Party 

27import orjson 

28 

29try: 

30 # Third-Party 

31 from altk.build_time.test_case_generation_toolkit.src.toolops.enrichment.mcp_cf_tool_enrichment import prompt_utils 

32 from altk.build_time.test_case_generation_toolkit.src.toolops.enrichment.mcp_cf_tool_enrichment.enrichment import ToolOpsMCPCFToolEnrichment 

33 from altk.build_time.test_case_generation_toolkit.src.toolops.generation.nl_utterance_generation.nl_utterance_generation import NlUtteranceGeneration 

34 from altk.build_time.test_case_generation_toolkit.src.toolops.generation.nl_utterance_generation.nl_utterance_generation_utils import nlg_util 

35 from altk.build_time.test_case_generation_toolkit.src.toolops.generation.test_case_generation.test_case_generation import TestcaseGeneration 

36 from altk.build_time.test_case_generation_toolkit.src.toolops.generation.test_case_generation.test_case_generation_utils import prompt_execution 

37 from altk.build_time.test_case_generation_toolkit.src.toolops.utils import llm_util 

38except ImportError: 

39 prompt_utils = None 

40 ToolOpsMCPCFToolEnrichment = None 

41 NlUtteranceGeneration = None 

42 nlg_util = None 

43 TestcaseGeneration = None 

44 prompt_execution = None 

45 llm_util = None 

46 

47# Third-Party 

48from sqlalchemy.orm import Session 

49 

50# First-Party 

51from mcpgateway.schemas import ToolRead, ToolUpdate 

52from mcpgateway.services.logging_service import LoggingService 

53from mcpgateway.services.mcp_client_chat_service import LLMConfig, MCPChatService, MCPClientConfig, MCPServerConfig 

54from mcpgateway.services.tool_service import ToolService 

55from mcpgateway.toolops.utils.db_util import populate_testcases_table, query_testcases_table, query_tool_auth 

56from mcpgateway.toolops.utils.format_conversion import convert_to_toolops_spec, post_process_nl_test_cases 

57from mcpgateway.toolops.utils.llm_util import get_llm_instance 

58 

59logging_service = LoggingService() 

60logger = logging_service.get_logger(__name__) 

61 

62 

63toolops_llm_provider = os.getenv("LLM_PROVIDER") 

64toolops_llm, toolops_llm_provider_config = get_llm_instance() 

65if toolops_llm is not None and toolops_llm_provider_config is not None: 

66 TOOLOPS_LLM_CONFIG = LLMConfig(provider=toolops_llm_provider, config=toolops_llm_provider_config) 

67else: 

68 logger.error("Error in obtaining LLM instance for Toolops services") 

69 TOOLOPS_LLM_CONFIG = None 

70 

71LLM_MODEL_ID = os.getenv("OPENAI_MODEL", "") 

72provider = os.getenv("OPENAI_BASE_URL", "") 

73LLM_PLATFORM = "OpenAIProvider - " + provider 

74 

75 

76# --------------- 

77# IMPORTANT NOTE: 

78# --------------- 

79# ALTK (agent life cycle toolkit) does not support all LLM providers that are supported in MCP context forge. 

80# To use all MCP CF supported LLM providers we need to override the ALTK modules related to LLM inferencing. 

81# i.e; `execute_prompt` method used in different ALTK toolops modules is overrided with custom execute prompt 

82# that uses MCP context forge LLM inferencing modules. 

83 

84 

85# custom execute prompt to support MCP-CF LLM providers 

86def custom_mcp_cf_execute_prompt(prompt, client=None, gen_mode=None, parameters=None, max_new_tokens=600, stop_sequences=None): 

87 """ 

88 Custom execute prompt method to support MCP-CF LLM providers and this method is used to override several Toolops modules 'execute_prompt' method for LLM inferencing.Since we are overriding the method few dummy inputs such as 'client,gen_mode,parameters,max_new_tokens' are retained and assigned with None value 

89 

90 Args: 

91 prompt: User provided prompt/input for LLM inferencing 

92 client: ALTK specific LLM client, default is None 

93 gen_mode: ALTK specific LLM client generation mode , default is None 

94 parameters : ALTK specific LLM inferencing parameters , default is None 

95 max_new_tokens: ALTK specific LLM parameter , default is None 

96 stop_sequences: List of stop sequences to be used in LLM inferencing 

97 

98 Returns: 

99 response: LLM output for the given prompt 

100 """ 

101 try: 

102 logger.info("LLM Inference call using MCP-CF LLM provider") 

103 # To suppress pylint errors creating dummy altk params and asserting 

104 altk_dummy_params = {"client": client, "gen_mode": gen_mode, "parameters": parameters, "max_new_tokens": max_new_tokens, "stop_sequences": stop_sequences} 

105 logger.debug(altk_dummy_params) 

106 chat_llm_instance, _ = get_llm_instance(model_type="chat") 

107 llm_response = chat_llm_instance.invoke(prompt) 

108 response = llm_response.content 

109 return response 

110 except Exception as e: 

111 logger.error("Error in LLM Inference call usinf MCP-CF LLM provider - " + orjson.dumps({"Error": str(e)}).decode()) 

112 return "" 

113 

114 

115# overriding methods (replace ALTK llm inferencing methods with MCP CF methods) 

116if llm_util: 

117 llm_util.execute_prompt = custom_mcp_cf_execute_prompt 

118 

119if prompt_execution: 

120 prompt_execution.execute_prompt = custom_mcp_cf_execute_prompt 

121 

122if nlg_util: 

123 nlg_util.execute_prompt = custom_mcp_cf_execute_prompt 

124 

125if prompt_utils: 

126 prompt_utils.execute_prompt = custom_mcp_cf_execute_prompt 

127 

128 

129# Test case generation service method 

130async def validation_generate_test_cases(tool_id, tool_service: ToolService, db: Session, number_of_test_cases=2, number_of_nl_variations=1, mode="generate"): 

131 """ 

132 Method for the service to generate tool test cases using toolops modules 

133 

134 Args: 

135 tool_id: Unique tool id in MCP-CF 

136 tool_service (ToolService): Tool service to obtain the tool from database 

137 db (Session): DB session to connect with database 

138 number_of_test_cases: Maximum of number of tool test cases to be generated , default is 2 

139 number_of_nl_variations: Number of natural language variations(paraphrases) to be generated for each test case , default is 1 

140 mode: Refers to service execution mode , supported values - 'generation' , 'query' , 'status' \ 

141 - in 'generation' mode test case generation is triggered, test cases are generated afresh and stored in database \ 

142 - in 'query' mode test cases related to the tool in the database are retreived after test case generation is completed \ 

143 - in 'status' mode provides test case generation status ie; 'in-progress','failed','completed' 

144 

145 Returns: 

146 test_cases: list of tool test cases 

147 """ 

148 test_cases = [] 

149 try: 

150 tool_schema: ToolRead = await tool_service.get_tool(db, tool_id) 

151 # check if test case generation is required 

152 if mode == "generate": 

153 logger.info( 

154 "Generating test cases for tool - " + str(tool_id) + "," + orjson.dumps({"number_of_test_cases": number_of_test_cases, "number_of_nl_variations": number_of_nl_variations}).decode() 

155 ) 

156 mcp_cf_tool = tool_schema.to_dict(use_alias=True) 

157 if mcp_cf_tool is not None: 157 ↛ 188line 157 didn't jump to line 188 because the condition on line 157 was always true

158 wxo_tool_spec = convert_to_toolops_spec(mcp_cf_tool) 

159 populate_testcases_table(tool_id, test_cases, "in-progress", db) 

160 tc_generator = TestcaseGeneration(client=None, gen_mode=None, max_number_testcases_to_generate=number_of_test_cases) 

161 ip_test_cases, _ = tc_generator.testcase_generation_full_pipeline(wxo_tool_spec) 

162 nl_generator = NlUtteranceGeneration(client=None, gen_mode=None, max_nl_utterances=number_of_nl_variations) 

163 nl_test_cases = nl_generator.generate_nl(ip_test_cases) 

164 test_cases = post_process_nl_test_cases(nl_test_cases) 

165 populate_testcases_table(tool_id, test_cases, "completed", db) 

166 elif mode == "query": 

167 # check if tool test cases generation is complete and get test cases 

168 tool_record = query_testcases_table(tool_id, db) 

169 if tool_record: 169 ↛ 188line 169 didn't jump to line 188 because the condition on line 169 was always true

170 if tool_record.run_status == "completed": 170 ↛ 188line 170 didn't jump to line 188 because the condition on line 170 was always true

171 test_cases = tool_record.test_cases 

172 logger.info("Obtained exisitng test cases from the table for tool " + str(tool_id)) 

173 elif mode == "status": 173 ↛ 188line 173 didn't jump to line 188 because the condition on line 173 was always true

174 # check the test case generation status 

175 tool_record = query_testcases_table(tool_id, db) 

176 if tool_record: 

177 status = tool_record.run_status 

178 test_cases = [{"status": status, "tool_id": tool_id}] 

179 logger.info("Test case generation status for the tool -" + str(tool_id) + ", status -" + str(status)) 

180 else: 

181 test_cases = [{"status": "not-initiated", "tool_id": tool_id}] 

182 logger.info("Test case generation is not initiated for the tool " + str(tool_id)) 

183 except Exception as e: 

184 error_message = "Error in generating test cases for tool - " + str(tool_id) + " , details - " + str(e) 

185 logger.info(error_message) 

186 test_cases = [{"status": "error", "error_message": error_message, "tool_id": tool_id}] 

187 populate_testcases_table(tool_id, test_cases, "failed", db) 

188 return test_cases 

189 

190 

191async def execute_tool_nl_test_cases(tool_id, tool_nl_test_cases, tool_service: ToolService, db: Session): 

192 """ 

193 Method for the service to execute tool nl test cases with MCP server using agent. 

194 

195 Args: 

196 tool_id: Unique tool id in MCP-CF 

197 tool_nl_test_cases: List of tool invoking utternaces for testing the tool with Agent 

198 tool_service: Tool service to obtain the tool from database 

199 db: DB session to connect with database 

200 

201 Returns: 

202 tool_test_case_outputs: list of tool outputs after tool test cases execution with agent. 

203 """ 

204 tool_schema: ToolRead = await tool_service.get_tool(db, tool_id) 

205 mcp_cf_tool = tool_schema.to_dict(use_alias=True) 

206 tool_url = mcp_cf_tool.get("url") 

207 tool_auth = query_tool_auth(tool_id, db) 

208 # handling transport based on protocol type 

209 if "/mcp" in tool_url: 

210 config = MCPClientConfig(mcp_server=MCPServerConfig(url=tool_url, transport="streamable_http", headers=tool_auth), llm=TOOLOPS_LLM_CONFIG) 

211 elif "/sse" in tool_url: 

212 config = MCPClientConfig(mcp_server=MCPServerConfig(url=tool_url, transport="sse", headers=tool_auth), llm=TOOLOPS_LLM_CONFIG) 

213 else: 

214 config = MCPClientConfig(mcp_server=MCPServerConfig(url=tool_url, transport="stdio", headers=tool_auth), llm=TOOLOPS_LLM_CONFIG) 

215 

216 service = MCPChatService(config) 

217 await service.initialize() 

218 logger.info("MCP tool server - " + str(tool_url) + " is ready for tool validation") 

219 

220 tool_test_case_outputs = [] 

221 # we execute each nl test case and if there are any errors we add that to test case output 

222 for nl_utterance in tool_nl_test_cases: 

223 try: 

224 tool_output = await service.chat(message=nl_utterance) 

225 tool_test_case_outputs.append(tool_output) 

226 except Exception as e: 

227 logger.info("Error in executing tool validation test cases with MCP server - " + str(e)) 

228 tool_test_case_outputs.append(str(e)) 

229 continue 

230 return tool_test_case_outputs 

231 

232 

233async def enrich_tool(tool_id: str, tool_service: ToolService, db: Session) -> tuple[str, ToolRead]: 

234 """ 

235 Method for the service to enrich tool meta data such as tool description 

236 

237 Args: 

238 tool_id: Unique tool id in MCP-CF 

239 tool_service: Tool service to obtain the tool from database 

240 db: DB session to connect with database 

241 

242 Returns: 

243 enriched_description: Enriched tool description 

244 tool_schema: Updated tool schema in MCP-CF ToolRead format 

245 

246 Raises: 

247 Exception: If the tool cannot be retrieved or converted to schema. 

248 """ 

249 try: 

250 tool_schema: ToolRead = await tool_service.get_tool(db, tool_id) 

251 mcp_cf_tool = tool_schema.to_dict(use_alias=True) 

252 except Exception as e: 

253 logger.error(f"Failed to convert tool {tool_id} to schema: {e}") 

254 raise 

255 

256 toolops_enrichment = ToolOpsMCPCFToolEnrichment(llm_client=None, gen_mode=None) 

257 enriched_description = await toolops_enrichment.enrich_mc_cf_tool(mcp_cf_toolspec=mcp_cf_tool) 

258 

259 if enriched_description: 259 ↛ 272line 259 didn't jump to line 272 because the condition on line 259 was always true

260 try: 

261 update_data: dict[str, Any] = { 

262 "name": tool_schema.name, 

263 "description": enriched_description, 

264 } 

265 updated_tool: ToolUpdate = ToolUpdate(**update_data) 

266 updated_tool.name = tool_schema.name 

267 updated_tool.description = enriched_description 

268 await tool_service.update_tool(db, tool_id, updated_tool) 

269 except Exception as e: 

270 logger.error(f"Failed to update tool {tool_id} with enriched description: {e}") 

271 

272 return enriched_description, tool_schema 

273 

274 

275# if __name__ == "__main__": 

276# # Standard 

277# import asyncio 

278 

279# # First-Party 

280# from mcpgateway.db import SessionLocal 

281# from mcpgateway.services.tool_service import ToolService 

282 

283# tool_id = "69df98bcab6b4895a0345a20aeb038b2" 

284# tool_service = ToolService() 

285# db = SessionLocal() 

286# # tool_test_cases = asyncio.run(validation_generate_test_cases(tool_id, tool_service, db, number_of_test_cases=2, number_of_nl_variations=2, mode="generate")) 

287# # print("#" * 30) 

288# # print("tool_test_cases") 

289# # print(tool_test_cases) 

290# # enrich_output = asyncio.run(enrich_tool(tool_id, tool_service, db)) 

291# # print("#" * 30) 

292# # print("enrich_output") 

293# # print(enrich_output) 

294# tool_nl_test_cases = ["add 3 and 10","please add 4 and 6"] 

295# tool_outputs = asyncio.run(execute_tool_nl_test_cases(tool_id, tool_nl_test_cases, tool_service, db)) 

296# print("#" * 30) 

297# print("len - tool_outputs", len(tool_outputs)) 

298# print(tool_outputs)