Coverage for mcpgateway / services / email_notification_service.py: 100%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/email_notification_service.py 

3Copyright 2026 

4SPDX-License-Identifier: Apache-2.0 

5 

6Email notification service for authentication workflows. 

7""" 

8 

9# Standard 

10import asyncio 

11from email.message import EmailMessage 

12from email.utils import formataddr 

13import html 

14from pathlib import Path 

15import re 

16import smtplib 

17import ssl 

18from typing import Any, Dict, Optional 

19 

20# Third-Party 

21from jinja2 import Environment, FileSystemLoader, select_autoescape, TemplateNotFound 

22 

23# First-Party 

24from mcpgateway.config import settings 

25from mcpgateway.services.logging_service import LoggingService 

26 

27logging_service = LoggingService() 

28logger = logging_service.get_logger(__name__) 

29 

30 

31class AuthEmailNotificationService: 

32 """Send authentication-related email notifications.""" 

33 

34 def __init__(self) -> None: 

35 """Initialize template rendering for authentication emails.""" 

36 template_dir = Path(__file__).resolve().parents[1] / "templates" 

37 self._jinja = Environment(loader=FileSystemLoader(str(template_dir)), autoescape=select_autoescape(["html", "xml"])) 

38 

39 @staticmethod 

40 def _smtp_password() -> Optional[str]: 

41 """Resolve SMTP password from settings. 

42 

43 Returns: 

44 Optional[str]: SMTP password string or None when unset. 

45 """ 

46 raw = getattr(settings, "smtp_password", None) 

47 if raw is None: 

48 return None 

49 if hasattr(raw, "get_secret_value"): 

50 return raw.get_secret_value() 

51 return str(raw) 

52 

53 @staticmethod 

54 def _smtp_ready() -> bool: 

55 """Check whether minimum SMTP settings are available. 

56 

57 Returns: 

58 bool: True when SMTP delivery is enabled and required fields exist. 

59 """ 

60 return bool(getattr(settings, "smtp_enabled", False) and getattr(settings, "smtp_host", None) and getattr(settings, "smtp_from_email", None)) 

61 

62 def _render_template(self, template_name: str, context: Dict[str, Any], fallback_title: str, fallback_body: str) -> str: 

63 """Render an email template with graceful fallback. 

64 

65 Args: 

66 template_name: Jinja template filename. 

67 context: Rendering context values. 

68 fallback_title: Fallback HTML title when template fails. 

69 fallback_body: Fallback body text when template fails. 

70 

71 Returns: 

72 str: Rendered HTML email body. 

73 """ 

74 try: 

75 template = self._jinja.get_template(template_name) 

76 return template.render(**context) 

77 except TemplateNotFound: 

78 logger.warning("Email template %s not found. Using fallback template.", template_name) 

79 except Exception as exc: 

80 logger.warning("Failed to render email template %s: %s. Using fallback template.", template_name, exc) 

81 

82 safe_title = html.escape(fallback_title) 

83 safe_body = html.escape(fallback_body).replace("\n", "<br/>") 

84 return f"<html><body><h2>{safe_title}</h2><p>{safe_body}</p></body></html>" 

85 

86 @staticmethod 

87 def _html_to_text(value: str) -> str: 

88 """Convert simple HTML content to plain text. 

89 

90 Args: 

91 value: HTML value. 

92 

93 Returns: 

94 str: Text-only representation. 

95 """ 

96 no_tags = re.sub(r"<[^>]+>", " ", value) 

97 compact = re.sub(r"\s+", " ", no_tags).strip() 

98 return compact 

99 

100 async def _send_email(self, to_email: str, subject: str, html_body: str) -> bool: 

101 """Send an email asynchronously. 

102 

103 Args: 

104 to_email: Destination email address. 

105 subject: Message subject. 

106 html_body: HTML message body. 

107 

108 Returns: 

109 bool: True when message is sent successfully. 

110 """ 

111 if not self._smtp_ready(): 

112 logger.info("SMTP not configured. Skipping email to %s with subject '%s'.", to_email, subject) 

113 return False 

114 return await asyncio.to_thread(self._send_email_sync, to_email, subject, html_body) 

115 

116 def _send_email_sync(self, to_email: str, subject: str, html_body: str) -> bool: 

117 """Send an email synchronously over SMTP. 

118 

119 Args: 

120 to_email: Destination email address. 

121 subject: Message subject. 

122 html_body: HTML message body. 

123 

124 Returns: 

125 bool: True when message is sent successfully. 

126 """ 

127 from_email = str(getattr(settings, "smtp_from_email", "") or "") 

128 from_name = str(getattr(settings, "smtp_from_name", "ContextForge") or "ContextForge") 

129 smtp_user = getattr(settings, "smtp_user", None) 

130 smtp_password = self._smtp_password() 

131 

132 message = EmailMessage() 

133 message["Subject"] = subject 

134 message["From"] = formataddr((from_name, from_email)) 

135 message["To"] = to_email 

136 message.set_content(self._html_to_text(html_body)) 

137 message.add_alternative(html_body, subtype="html") 

138 

139 smtp_host = str(getattr(settings, "smtp_host", "")) 

140 smtp_port = int(getattr(settings, "smtp_port", 587)) 

141 timeout_seconds = int(getattr(settings, "smtp_timeout_seconds", 15)) 

142 use_ssl = bool(getattr(settings, "smtp_use_ssl", False)) 

143 use_tls = bool(getattr(settings, "smtp_use_tls", True)) 

144 

145 try: 

146 if use_ssl: 

147 with smtplib.SMTP_SSL(smtp_host, smtp_port, timeout=timeout_seconds) as server: 

148 if smtp_user and smtp_password: 

149 server.login(str(smtp_user), smtp_password) 

150 server.send_message(message) 

151 else: 

152 with smtplib.SMTP(smtp_host, smtp_port, timeout=timeout_seconds) as server: 

153 server.ehlo() 

154 if use_tls: 

155 server.starttls(context=ssl.create_default_context()) 

156 server.ehlo() 

157 if smtp_user and smtp_password: 

158 server.login(str(smtp_user), smtp_password) 

159 server.send_message(message) 

160 

161 logger.info("Auth notification email sent to %s", to_email) 

162 return True 

163 except Exception as exc: 

164 logger.warning("Failed to send auth notification email to %s: %s", to_email, exc) 

165 return False 

166 

167 async def send_password_reset_email(self, to_email: str, full_name: Optional[str], reset_url: str, expires_minutes: int) -> bool: 

168 """Send password-reset email containing a one-time reset link. 

169 

170 Args: 

171 to_email: Destination email address. 

172 full_name: Optional display name for salutation. 

173 reset_url: Password-reset link. 

174 expires_minutes: Link validity duration. 

175 

176 Returns: 

177 bool: True when message is sent successfully. 

178 """ 

179 display_name = full_name or to_email.split("@", maxsplit=1)[0] 

180 subject = "Reset your ContextForge password" 

181 body = self._render_template( 

182 template_name="password_reset_email.html", 

183 context={"display_name": display_name, "reset_url": reset_url, "expires_minutes": expires_minutes, "recipient_email": to_email}, 

184 fallback_title="Password reset requested", 

185 fallback_body=f"Hi {display_name},\n\nUse this link to reset your password: {reset_url}\n\nThe link expires in {expires_minutes} minutes.", 

186 ) 

187 return await self._send_email(to_email, subject, body) 

188 

189 async def send_password_reset_confirmation_email(self, to_email: str, full_name: Optional[str]) -> bool: 

190 """Send post-reset confirmation email. 

191 

192 Args: 

193 to_email: Destination email address. 

194 full_name: Optional display name for salutation. 

195 

196 Returns: 

197 bool: True when message is sent successfully. 

198 """ 

199 display_name = full_name or to_email.split("@", maxsplit=1)[0] 

200 subject = "Your ContextForge password was changed" 

201 body = self._render_template( 

202 template_name="password_reset_confirmation_email.html", 

203 context={"display_name": display_name, "recipient_email": to_email}, 

204 fallback_title="Password changed", 

205 fallback_body=f"Hi {display_name},\n\nYour password was changed successfully. If this was not you, contact an administrator immediately.", 

206 ) 

207 return await self._send_email(to_email, subject, body) 

208 

209 async def send_account_lockout_email(self, to_email: str, full_name: Optional[str], locked_until_iso: str, reset_url: str) -> bool: 

210 """Notify the user that login attempts triggered a temporary lockout. 

211 

212 Args: 

213 to_email: Destination email address. 

214 full_name: Optional display name for salutation. 

215 locked_until_iso: ISO timestamp for lockout expiry. 

216 reset_url: Forgot-password URL for recovery. 

217 

218 Returns: 

219 bool: True when message is sent successfully. 

220 """ 

221 display_name = full_name or to_email.split("@", maxsplit=1)[0] 

222 subject = "Your ContextForge account was temporarily locked" 

223 body = self._render_template( 

224 template_name="account_lockout_email.html", 

225 context={"display_name": display_name, "locked_until": locked_until_iso, "reset_url": reset_url, "recipient_email": to_email}, 

226 fallback_title="Account temporarily locked", 

227 fallback_body=f"Hi {display_name},\n\nYour account is locked until {locked_until_iso} due to repeated failed sign-in attempts.\n\nIf this was not you, reset your password now: {reset_url}", 

228 ) 

229 return await self._send_email(to_email, subject, body)