Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions app/services/email_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Centralized email service for CueAPI.

All emails go through this module. Test cue emails are suppressed.
Auth emails (magic link, key rotation) always send.
"""

from __future__ import annotations

import logging

from app.config import settings

logger = logging.getLogger(__name__)

TEST_CUE_PREFIXES = [
"argus-",
"test-",
"verify-",
"verify-production-",
"verify-diag-",
"e2e-",
"test-cueapi-core-",
"cueapi-core-test-",
]


def is_test_cue(cue_name: str) -> bool:
"""Returns True if this is a test/ephemeral cue that should not trigger emails."""
if not cue_name:
return False
return any(cue_name.lower().startswith(prefix) for prefix in TEST_CUE_PREFIXES)


def send_email(to: str, subject: str, html: str, cue_name: str | None = None) -> bool:
"""Send an email via Resend.

Args:
to: Recipient email address.
subject: Email subject line.
html: Email body HTML.
cue_name: If provided and is a test cue, email is suppressed.

Returns:
True if sent, False if suppressed or failed.
"""
if cue_name and is_test_cue(cue_name):
logger.info("Email suppressed for test cue: %s | subject: %s", cue_name, subject)
return False

if not settings.RESEND_API_KEY:
logger.info("RESEND_API_KEY not set, skipping email: %s", subject)
return False

try:
import resend

resend.api_key = settings.RESEND_API_KEY
from_email = getattr(settings, "RESEND_FROM_EMAIL", "CueAPI <alerts@cueapi.ai>")
resend.Emails.send({
"from": from_email,
"to": to,
"subject": subject,
"html": html,
})
logger.info("Email sent | to: %s | subject: %s", to, subject)
return True
except ImportError:
logger.info("resend not installed, skipping email: %s", subject)
return False
except Exception as e:
logger.error("Email send failed | to: %s | subject: %s | error: %s", to, subject, e)
return False
69 changes: 69 additions & 0 deletions tests/test_email_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Tests for email_service.py — test cue detection and email suppression."""

from app.services.email_service import is_test_cue, send_email


class TestIsTestCue:
def test_argus_prefix(self):
assert is_test_cue("argus-daily-summary") is True

def test_test_prefix(self):
assert is_test_cue("test-cueapi-core-12345") is True

def test_verify_prefix(self):
assert is_test_cue("verify-production-1774636583") is True

def test_verify_diag_prefix(self):
assert is_test_cue("verify-diag-7a02a4") is True

def test_e2e_prefix(self):
assert is_test_cue("e2e-webhook-flow") is True

def test_real_cue_not_blocked(self):
assert is_test_cue("morning-briefing") is False
assert is_test_cue("daily-report") is False
assert is_test_cue("content-quality-check") is False

def test_case_insensitive(self):
assert is_test_cue("ARGUS-test") is True
assert is_test_cue("Test-my-cue") is True
assert is_test_cue("VERIFY-diag-123") is True

def test_none_returns_false(self):
assert is_test_cue(None) is False

def test_empty_returns_false(self):
assert is_test_cue("") is False


class TestSendEmail:
def test_suppressed_for_test_cue(self):
result = send_email(
to="user@test.com",
subject="Test failure",
html="<p>Failed</p>",
cue_name="argus-daily-test",
)
assert result is False

def test_no_cue_name_not_suppressed(self):
# Will fail to send (no RESEND_API_KEY in test) but should not be suppressed
result = send_email(
to="user@test.com",
subject="Key rotated",
html="<p>Rotated</p>",
cue_name=None,
)
# Returns False because RESEND_API_KEY is not set, not because suppressed
assert result is False

def test_real_cue_not_suppressed(self):
# Will fail (no API key) but the suppression check should pass
result = send_email(
to="user@test.com",
subject="Real failure",
html="<p>Failed</p>",
cue_name="morning-briefing",
)
# Returns False because no RESEND_API_KEY, not suppression
assert result is False
8 changes: 8 additions & 0 deletions worker/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,14 @@ async def check_worker_health(db_engine, redis, offline_threshold: int = 300) ->

minutes_offline = int((now - worker_row.last_heartbeat).total_seconds() / 60)

# Rate limit: max 1 worker offline email per worker per hour
rate_key = f"worker_offline_email:{worker_id}"
already_sent = await redis.get(rate_key)
if already_sent:
logger.info("Worker offline email suppressed (rate limited): %s", worker_id)
continue
await redis.setex(rate_key, 3600, "1")

# Send alert email
try:
import resend
Expand Down
6 changes: 6 additions & 0 deletions worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ async def _send_failure_email(
"""Send email notification when execution fails after all retries.

Rate limited: max 10 failure emails per hour per user via Redis counter.
Suppressed for test/ephemeral cues.
"""
from app.services.email_service import is_test_cue
if is_test_cue(cue_name):
logger.info("Failure email suppressed for test cue: %s", cue_name)
return

try:
redis_client = aioredis.from_url(settings.REDIS_URL)
rate_key = f"failure_email:{user_id}"
Expand Down
Loading