Coverage for mcpgateway / services / root_service.py: 97%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/root_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Root Service Implementation. 

8This module implements root directory management according to the MCP specification. 

9It handles root registration, validation, and change notifications. 

10""" 

11 

12# Standard 

13import asyncio 

14import os 

15from typing import AsyncGenerator, Dict, List, Optional 

16from urllib.parse import urlparse 

17 

18# First-Party 

19from mcpgateway.common.models import Root 

20from mcpgateway.config import settings 

21from mcpgateway.services.logging_service import LoggingService 

22 

23# Initialize logging service first 

24logging_service = LoggingService() 

25logger = logging_service.get_logger(__name__) 

26 

27 

28class RootServiceError(Exception): 

29 """Base class for root service errors.""" 

30 

31 

32class RootServiceNotFoundError(RootServiceError): 

33 """Raised when a requested root is not found. 

34 

35 Examples: 

36 >>> error = RootServiceNotFoundError("Root Service not found") 

37 >>> str(error) 

38 'Root Service not found' 

39 >>> isinstance(error, RootServiceError) 

40 True 

41 """ 

42 

43 

44class RootService: 

45 """MCP root service. 

46 

47 Manages roots that can be exposed to MCP clients. 

48 Handles: 

49 - Root registration and validation 

50 - Change notifications 

51 - Root permissions and access control 

52 """ 

53 

54 def __init__(self) -> None: 

55 """Initialize root service.""" 

56 self._roots: Dict[str, Root] = {} 

57 self._subscribers: List[asyncio.Queue] = [] 

58 

59 async def initialize(self) -> None: 

60 """Initialize root service. 

61 

62 Examples: 

63 >>> from mcpgateway.services.root_service import RootService 

64 >>> import asyncio 

65 >>> service = RootService() 

66 >>> asyncio.run(service.initialize()) 

67 

68 Test with default roots configured: 

69 >>> from unittest.mock import patch 

70 >>> service = RootService() 

71 >>> with patch('mcpgateway.config.settings.default_roots', ['file:///tmp', 'http://example.com']): 

72 ... asyncio.run(service.initialize()) 

73 >>> len(service._roots) 

74 2 

75 """ 

76 logger.info("Initializing root service") 

77 # Add any configured default roots 

78 for root_uri in settings.default_roots: 

79 try: 

80 await self.add_root(root_uri) 

81 except RootServiceError as e: 

82 logger.error(f"Failed to add default root {root_uri}: {e}") 

83 

84 async def shutdown(self) -> None: 

85 """Shutdown root service. 

86 

87 Examples: 

88 >>> from mcpgateway.services.root_service import RootService 

89 >>> import asyncio 

90 >>> service = RootService() 

91 >>> asyncio.run(service.shutdown()) 

92 

93 Test cleanup of roots and subscribers: 

94 >>> service = RootService() 

95 >>> _ = asyncio.run(service.add_root('file:///tmp')) 

96 >>> service._subscribers.append(asyncio.Queue()) 

97 >>> asyncio.run(service.shutdown()) 

98 >>> len(service._roots) 

99 0 

100 >>> len(service._subscribers) 

101 0 

102 """ 

103 logger.info("Shutting down root service") 

104 # Clear all roots and subscribers 

105 self._roots.clear() 

106 self._subscribers.clear() 

107 

108 async def list_roots(self) -> List[Root]: 

109 """List available roots. 

110 

111 Returns: 

112 List of registered roots 

113 

114 Examples: 

115 >>> from mcpgateway.services.root_service import RootService 

116 >>> import asyncio 

117 >>> service = RootService() 

118 >>> asyncio.run(service.list_roots()) 

119 [] 

120 

121 Test with multiple roots: 

122 >>> service = RootService() 

123 >>> _ = asyncio.run(service.add_root('file:///tmp')) 

124 >>> _ = asyncio.run(service.add_root('file:///home')) 

125 >>> roots = asyncio.run(service.list_roots()) 

126 >>> len(roots) 

127 2 

128 >>> sorted([str(r.uri) for r in roots]) 

129 ['file:///home', 'file:///tmp'] 

130 """ 

131 return list(self._roots.values()) 

132 

133 async def add_root(self, uri: str, name: Optional[str] = None) -> Root: 

134 """Add a new root. 

135 

136 Args: 

137 uri: Root URI 

138 name: Optional root name 

139 

140 Returns: 

141 Created root object 

142 

143 Raises: 

144 RootServiceError: If root is invalid or already exists 

145 

146 Examples: 

147 >>> from mcpgateway.services.root_service import RootService 

148 >>> import asyncio 

149 >>> service = RootService() 

150 >>> root = asyncio.run(service.add_root('file:///tmp')) 

151 >>> root.uri == 'file:///tmp' 

152 True 

153 

154 Test with custom name: 

155 >>> service = RootService() 

156 >>> root = asyncio.run(service.add_root('file:///home/user', 'MyHome')) 

157 >>> root.name 

158 'MyHome' 

159 

160 Test duplicate root error: 

161 >>> service = RootService() 

162 >>> _ = asyncio.run(service.add_root('file:///tmp')) 

163 >>> try: 

164 ... asyncio.run(service.add_root('file:///tmp')) 

165 ... except RootServiceError as e: 

166 ... str(e) 

167 'Root already exists: file:///tmp' 

168 

169 Test invalid URI error: 

170 >>> from unittest.mock import patch 

171 >>> service = RootService() 

172 >>> with patch.object(service, '_make_root_uri', side_effect=ValueError('Bad URI')): 

173 ... try: 

174 ... asyncio.run(service.add_root('bad_uri')) 

175 ... except RootServiceError as e: 

176 ... str(e) 

177 'Invalid root URI: Bad URI' 

178 """ 

179 try: 

180 root_uri = self._make_root_uri(uri) 

181 except ValueError as e: 

182 raise RootServiceError(f"Invalid root URI: {e}") 

183 

184 # Skip any access check; just store the key/value. 

185 root_obj = Root( 

186 uri=root_uri, 

187 name=name or os.path.basename(urlparse(root_uri).path) or root_uri, 

188 ) 

189 

190 # NORMALIZED URI from the Root object as the dictionary key 

191 normalized_key = str(root_obj.uri) 

192 

193 if normalized_key in self._roots: 

194 raise RootServiceError(f"Root already exists: {root_uri}") 

195 

196 self._roots[normalized_key] = root_obj 

197 

198 await self._notify_root_added(root_obj) 

199 logger.info(f"Added root: {root_uri}") 

200 return root_obj 

201 

202 async def get_root_by_uri(self, root_uri: str) -> Root: 

203 """Get a root by URI. 

204 

205 Args: 

206 root_uri: Root URI to retrieve 

207 

208 Returns: 

209 Root: The found root object 

210 

211 Raises: 

212 RootServiceNotFoundError: If root not found 

213 

214 Examples: 

215 >>> from mcpgateway.services.root_service import RootService 

216 >>> import asyncio 

217 >>> service = RootService() 

218 >>> _ = asyncio.run(service.add_root('file:///tmp')) 

219 >>> root = asyncio.run(service.get_root_by_uri('file:///tmp')) 

220 >>> root.uri == 'file:///tmp' 

221 True 

222 

223 Test root not found error: 

224 >>> service = RootService() 

225 >>> try: 

226 ... asyncio.run(service.get_root_by_uri('file:///nonexistent')) 

227 ... except RootServiceError as e: 

228 ... str(e) 

229 'Root not found: file:///nonexistent' 

230 """ 

231 # Normalize the URI to match how it was stored 

232 normalized_uri = self._make_root_uri(root_uri) 

233 if normalized_uri not in self._roots: 

234 raise RootServiceNotFoundError(f"Root not found: {root_uri}") 

235 return self._roots[normalized_uri] 

236 

237 async def update_root(self, root_uri: str, name: Optional[str] = None) -> Root: 

238 """Update an existing root. 

239 

240 Args: 

241 root_uri: Root URI to update 

242 name: New name for the root 

243 

244 Returns: 

245 Root: The updated root object 

246 

247 Raises: 

248 RootServiceNotFoundError: If root is not found 

249 

250 Examples: 

251 >>> from mcpgateway.services.root_service import RootService 

252 >>> import asyncio 

253 >>> service = RootService() 

254 >>> _ = asyncio.run(service.add_root('file:///tmp', 'Temp')) 

255 >>> updated = asyncio.run(service.update_root('file:///tmp', 'Updated Temp')) 

256 >>> updated.name 

257 'Updated Temp' 

258 

259 Test root not found error: 

260 >>> service = RootService() 

261 >>> try: 

262 ... asyncio.run(service.update_root('file:///nonexistent', 'New Name')) 

263 ... except RootServiceError as e: 

264 ... str(e) 

265 'Root not found: file:///nonexistent' 

266 """ 

267 # Normalize the URI to match how it was stored 

268 normalized_uri = self._make_root_uri(root_uri) 

269 if normalized_uri not in self._roots: 

270 raise RootServiceNotFoundError(f"Root not found: {root_uri}") 

271 

272 root_obj = self._roots[normalized_uri] 

273 

274 # Update name if provided 

275 if name is not None: 275 ↛ 279line 275 didn't jump to line 279 because the condition on line 275 was always true

276 root_obj.name = name 

277 

278 # Notify subscribers of the update 

279 event = {"type": "root_updated", "data": {"uri": root_obj.uri, "name": root_obj.name}} 

280 await self._notify_subscribers(event) 

281 

282 logger.info(f"Updated root: {root_uri}, name: {name}") 

283 return root_obj 

284 

285 async def remove_root(self, root_uri: str) -> None: 

286 """Remove a registered root. 

287 

288 Args: 

289 root_uri: Root URI to remove 

290 

291 Raises: 

292 RootServiceError: If root not found 

293 

294 Examples: 

295 >>> from mcpgateway.services.root_service import RootService 

296 >>> import asyncio 

297 >>> service = RootService() 

298 >>> _ = asyncio.run(service.add_root('file:///tmp')) 

299 >>> asyncio.run(service.remove_root('file:///tmp')) 

300 

301 Test root not found error: 

302 >>> service = RootService() 

303 >>> try: 

304 ... asyncio.run(service.remove_root('file:///nonexistent')) 

305 ... except RootServiceError as e: 

306 ... str(e) 

307 'Root not found: file:///nonexistent' 

308 """ 

309 # Normalize the URI to match how it was stored 

310 normalized_uri = self._make_root_uri(root_uri) 

311 if normalized_uri not in self._roots: 

312 raise RootServiceError(f"Root not found: {root_uri}") 

313 root_obj = self._roots.pop(normalized_uri) 

314 await self._notify_root_removed(root_obj) 

315 logger.info(f"Removed root: {root_uri}") 

316 

317 async def subscribe_changes(self) -> AsyncGenerator[Dict, None]: 

318 """Subscribe to root changes. 

319 

320 Yields: 

321 Root change events 

322 

323 Examples: 

324 This example demonstrates subscription mechanics: 

325 >>> import asyncio 

326 >>> from mcpgateway.services.root_service import RootService 

327 >>> async def test_subscribe(): 

328 ... service = RootService() 

329 ... events = [] 

330 ... async def collect_events(): 

331 ... async for event in service.subscribe_changes(): 

332 ... events.append(event) 

333 ... if event['type'] == 'root_removed': 

334 ... break 

335 ... task = asyncio.create_task(collect_events()) 

336 ... await asyncio.sleep(0) # Let subscription start 

337 ... await service.add_root('file:///tmp') 

338 ... await service.remove_root('file:///tmp') 

339 ... await task 

340 ... return events 

341 >>> events = asyncio.run(test_subscribe()) 

342 >>> len(events) 

343 2 

344 >>> events[0]['type'] 

345 'root_added' 

346 >>> events[1]['type'] 

347 'root_removed' 

348 """ 

349 queue: asyncio.Queue = asyncio.Queue() 

350 self._subscribers.append(queue) 

351 try: 

352 while True: 

353 event = await queue.get() 

354 yield event 

355 finally: 

356 self._subscribers.remove(queue) 

357 

358 def _make_root_uri(self, uri: str) -> str: 

359 """Convert input to a valid URI. 

360 

361 If no scheme is provided, assume a file URI and convert the path to an absolute path. 

362 

363 Args: 

364 uri: Input URI or filesystem path 

365 

366 Returns: 

367 A valid URI string 

368 

369 Examples: 

370 >>> service = RootService() 

371 >>> service._make_root_uri('/tmp') 

372 'file:///tmp' 

373 >>> service._make_root_uri('file:///home') 

374 'file:///home' 

375 >>> service._make_root_uri('http://example.com') 

376 'http://example.com' 

377 >>> service._make_root_uri('ftp://server/path') 

378 'ftp://server/path' 

379 """ 

380 parsed = urlparse(uri) 

381 if not parsed.scheme: 

382 # No scheme provided; assume a file URI and add file:// prefix 

383 return f"file://{uri}" 

384 # If a scheme is present (e.g., http, https, ftp, etc.), return the URI as-is. 

385 return uri 

386 

387 async def _notify_root_added(self, root: Root) -> None: 

388 """Notify subscribers of root addition. 

389 

390 Args: 

391 root: Added root 

392 

393 Note: 

394 The root.uri field returns a FileUrl object which is serialized 

395 as-is in the event data. 

396 

397 Examples: 

398 >>> import asyncio 

399 >>> from mcpgateway.services.root_service import RootService 

400 >>> from mcpgateway.common.models import Root 

401 >>> service = RootService() 

402 >>> queue = asyncio.Queue() 

403 >>> service._subscribers.append(queue) 

404 >>> root = Root(uri='file:///tmp', name='temp') 

405 >>> asyncio.run(service._notify_root_added(root)) 

406 >>> event = asyncio.run(queue.get()) 

407 >>> event['type'] 

408 'root_added' 

409 >>> event['data']['uri'] 

410 FileUrl('file:///tmp') 

411 """ 

412 event = {"type": "root_added", "data": {"uri": root.uri, "name": root.name}} 

413 await self._notify_subscribers(event) 

414 

415 async def _notify_root_removed(self, root: Root) -> None: 

416 """Notify subscribers of root removal. 

417 

418 Args: 

419 root: Removed root 

420 

421 Examples: 

422 >>> import asyncio 

423 >>> from mcpgateway.services.root_service import RootService 

424 >>> from mcpgateway.common.models import Root 

425 >>> service = RootService() 

426 >>> queue = asyncio.Queue() 

427 >>> service._subscribers.append(queue) 

428 >>> root = Root(uri='file:///tmp', name='temp') 

429 >>> asyncio.run(service._notify_root_removed(root)) 

430 >>> event = asyncio.run(queue.get()) 

431 >>> event['type'] 

432 'root_removed' 

433 >>> event['data']['uri'] 

434 FileUrl('file:///tmp') 

435 """ 

436 event = {"type": "root_removed", "data": {"uri": root.uri}} 

437 await self._notify_subscribers(event) 

438 

439 async def _notify_subscribers(self, event: Dict) -> None: 

440 """Send event to all subscribers. 

441 

442 Args: 

443 event: Event to send 

444 

445 Examples: 

446 >>> import asyncio 

447 >>> from mcpgateway.services.root_service import RootService 

448 >>> service = RootService() 

449 >>> queue1 = asyncio.Queue() 

450 >>> queue2 = asyncio.Queue() 

451 >>> service._subscribers.extend([queue1, queue2]) 

452 >>> event = {"type": "test", "data": {}} 

453 >>> asyncio.run(service._notify_subscribers(event)) 

454 >>> asyncio.run(queue1.get()) == event 

455 True 

456 >>> asyncio.run(queue2.get()) == event 

457 True 

458 

459 Test error handling with closed queue: 

460 >>> from unittest.mock import AsyncMock 

461 >>> service = RootService() 

462 >>> bad_queue = AsyncMock() 

463 >>> bad_queue.put.side_effect = Exception("Queue error") 

464 >>> service._subscribers.append(bad_queue) 

465 >>> asyncio.run(service._notify_subscribers({"type": "test"})) 

466 """ 

467 for queue in self._subscribers: 

468 try: 

469 await queue.put(event) 

470 except Exception as e: 

471 logger.error(f"Failed to notify subscriber: {e}") 

472 

473 

474# Lazy singleton - created on first access, not at module import time. 

475# This avoids instantiation when only exception classes are imported. 

476_root_service_instance = None # pylint: disable=invalid-name 

477 

478 

479def __getattr__(name: str): 

480 """Module-level __getattr__ for lazy singleton creation. 

481 

482 Args: 

483 name: The attribute name being accessed. 

484 

485 Returns: 

486 The root_service singleton instance if name is "root_service". 

487 

488 Raises: 

489 AttributeError: If the attribute name is not "root_service". 

490 """ 

491 global _root_service_instance # pylint: disable=global-statement 

492 if name == "root_service": 

493 if _root_service_instance is None: 

494 _root_service_instance = RootService() 

495 return _root_service_instance 

496 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")