diff --git a/CHANGELOG.md b/CHANGELOG.md index 89728ff..94fa054 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,15 @@ All notable changes to cueapi-core will be documented here. - **Verification modes** for cue outcomes. A new `verification: {mode: ...}` field on `CueCreate` / `CueUpdate` with five values: `none` (default), `require_external_id`, `require_result_url`, `require_artifacts`, `manual`. The outcome service computes `outcome_state` from (success, mode, evidence): missing required evidence lands in `verification_failed`, satisfied requirements land in `verified_success`, manual mode parks in `verification_pending`. - **Inline evidence on `POST /v1/executions/{id}/outcome`.** `OutcomeRequest` now accepts `external_id`, `result_url`, `result_ref`, `result_type`, `summary`, `artifacts` alongside the existing `success` / `result` / `error` / `metadata`. Fully backward compatible — the legacy shape still works. The separate `PATCH /v1/executions/{id}/evidence` endpoint remains for two-step flows. - **Migration 017** — `verification_mode` column on `cues` (String(50), nullable, CHECK-constrained enum). NULL and `none` are equivalent. +- **Alerts** — persisted alerts for `consecutive_failures`, `verification_failed`, and `outcome_timeout`. Three alert types are storage-ready; `consecutive_failures` and `verification_failed` fire automatically from `outcome_service.record_outcome`. `outcome_timeout` requires the deadline-checking poller (not yet in OSS) to activate; the CHECK constraint and router accept the type already. +- **Alert webhook delivery** — optional `alert_webhook_url` on the user. When set, each alert POSTs an HMAC-SHA256-signed payload to that URL. Fire-and-forget delivery; best-effort, never blocks outcome reporting. `X-CueAPI-Signature`, `X-CueAPI-Timestamp`, `X-CueAPI-Alert-Id`, `X-CueAPI-Alert-Type` headers. SSRF-protected at delivery time. +- **`GET /v1/alerts`** — list alerts for the authenticated user, with `alert_type` / `since` / `limit` / `offset` filters and per-user scoping. +- **`PATCH /v1/auth/me`** accepts `alert_webhook_url` (empty string clears; SSRF-validated at set time). +- **`GET /v1/auth/alert-webhook-secret`** — lazily generate + return the HMAC signing secret (64 hex chars). +- **`POST /v1/auth/alert-webhook-secret/regenerate`** — rotate the secret (requires `X-Confirm-Destructive: true`). +- **Dedup** — alerts collapse on `(user_id, alert_type, execution_id)` inside a 5-minute window. +- **Migrations 018 + 019** — alerts table with indexes and CHECK constraints; two columns on users. +- `examples/alert_webhook_receiver.py` — 30-line Flask receiver demonstrating signature verification. ### Changed - **`POST /v1/executions/{id}/verify`** now accepts `{valid: bool, reason: str?}`. `valid=true` (default, preserving legacy behavior) transitions to `verified_success`; `valid=false` transitions to `verification_failed` and records the reason onto `evidence_summary` (truncated to 500 chars). Accepted starting states expanded to include `reported_failure`. diff --git a/README.md b/README.md index e246297..eba5139 100644 --- a/README.md +++ b/README.md @@ -248,6 +248,46 @@ Backward-compat paths still work: `POST /outcome` with just `{success: true}` be > Worker-transport cues can currently use `none` or `manual` only. Evidence-requiring modes (`require_external_id`, `require_result_url`, `require_artifacts`) are rejected at create/update time with `400 unsupported_verification_for_transport`. This restriction will be lifted once cueapi-worker 0.3.0 ships to PyPI with evidence reporting via `CUEAPI_OUTCOME_FILE`. +## Alerts + +cueapi-core persists alerts when outcomes go wrong and — if you configure a webhook URL — POSTs them to you with an HMAC signature. Three alert types today: + +| Type | When it fires | +|------|--------------| +| `consecutive_failures` | Same cue reports `success=false` three runs in a row | +| `verification_failed` | Outcome is missing evidence required by the cue's verification mode (see "Verification modes") | +| `outcome_timeout` | Handler never reports an outcome before the deadline (not yet wired in OSS — coming with the deadline-checking poller) | + +Alerts are deduplicated per `(user, alert_type, execution_id)` within a 5-minute window so flapping executions don't flood your inbox. + +### Query alerts directly + +```bash +curl http://localhost:8000/v1/alerts \ + -H "Authorization: Bearer YOUR_API_KEY" +# Optional filters: ?alert_type=consecutive_failures&since=2026-01-01T00:00:00Z&limit=20 +``` + +### Receive alerts via webhook + +1. **Set your webhook URL:** + ```bash + curl -X PATCH http://localhost:8000/v1/auth/me \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -d '{"alert_webhook_url": "https://your-server.example.com/cueapi-alerts"}' + ``` + +2. **Retrieve your signing secret** (generated lazily on first call, 64 hex chars): + ```bash + curl http://localhost:8000/v1/auth/alert-webhook-secret \ + -H "Authorization: Bearer YOUR_API_KEY" + ``` + Rotate with `POST /v1/auth/alert-webhook-secret/regenerate` (requires `X-Confirm-Destructive: true`). + +3. **Verify incoming alerts** on your end. See [`examples/alert_webhook_receiver.py`](examples/alert_webhook_receiver.py) for a 30-line Flask receiver. Each POST carries `X-CueAPI-Signature: v1=`, `X-CueAPI-Timestamp`, `X-CueAPI-Alert-Id`, and `X-CueAPI-Alert-Type`. + +> **Delivery path is HTTP only.** cueapi-core ships alert persistence + webhook delivery and nothing else. For email / SMS / Slack, point your `alert_webhook_url` at a forwarder you control, or use hosted cueapi.ai which includes managed email delivery via SendGrid. See [HOSTED_ONLY.md](HOSTED_ONLY.md) for the full open-core policy. + ## What CueAPI is not - Not a workflow orchestrator diff --git a/alembic/versions/018_add_alerts_table.py b/alembic/versions/018_add_alerts_table.py new file mode 100644 index 0000000..1163b06 --- /dev/null +++ b/alembic/versions/018_add_alerts_table.py @@ -0,0 +1,51 @@ +"""Add alerts table. + +Revision ID: 018 +Revises: 017 + +Chained off 017 (verification_mode column, PR #18). PR #18 landed +first on main so 018 now hangs directly off it. +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB, UUID + +revision = "018" +down_revision = "017" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "alerts", + sa.Column("id", UUID(as_uuid=True), primary_key=True), + sa.Column("user_id", UUID(as_uuid=True), nullable=False), + sa.Column("cue_id", sa.String(length=20), nullable=True), + sa.Column("execution_id", UUID(as_uuid=True), nullable=True), + sa.Column("alert_type", sa.String(length=50), nullable=False), + sa.Column("severity", sa.String(length=20), nullable=False, server_default="warning"), + sa.Column("message", sa.Text, nullable=False), + sa.Column("metadata", JSONB, nullable=True), + sa.Column("acknowledged", sa.Boolean, nullable=False, server_default=sa.text("false")), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.CheckConstraint( + "alert_type IN (" + "'outcome_timeout', 'verification_failed', 'consecutive_failures')", + name="valid_alert_type", + ), + sa.CheckConstraint( + "severity IN ('info', 'warning', 'critical')", + name="valid_alert_severity", + ), + ) + op.create_index("ix_alerts_user_id", "alerts", ["user_id"]) + op.create_index("ix_alerts_user_created", "alerts", ["user_id", "created_at"]) + op.create_index("ix_alerts_execution_id", "alerts", ["execution_id"]) + + +def downgrade() -> None: + op.drop_index("ix_alerts_execution_id", table_name="alerts") + op.drop_index("ix_alerts_user_created", table_name="alerts") + op.drop_index("ix_alerts_user_id", table_name="alerts") + op.drop_table("alerts") diff --git a/alembic/versions/019_add_user_alert_webhook.py b/alembic/versions/019_add_user_alert_webhook.py new file mode 100644 index 0000000..6b27fa8 --- /dev/null +++ b/alembic/versions/019_add_user_alert_webhook.py @@ -0,0 +1,22 @@ +"""Add alert_webhook_url + alert_webhook_secret to users. + +Revision ID: 019 +Revises: 018 +""" +from alembic import op +import sqlalchemy as sa + +revision = "019" +down_revision = "018" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column("users", sa.Column("alert_webhook_url", sa.String(length=2048), nullable=True)) + op.add_column("users", sa.Column("alert_webhook_secret", sa.String(length=64), nullable=True)) + + +def downgrade() -> None: + op.drop_column("users", "alert_webhook_secret") + op.drop_column("users", "alert_webhook_url") diff --git a/app/main.py b/app/main.py index 96aaa5c..4df7079 100644 --- a/app/main.py +++ b/app/main.py @@ -10,7 +10,7 @@ from app.middleware.rate_limit import RateLimitMiddleware from app.middleware.request_id import RequestIdMiddleware from app.redis import close_redis -from app.routers import auth_routes, cues, device_code, echo, executions, health, usage, webhook_secret, workers +from app.routers import alerts, auth_routes, cues, device_code, echo, executions, health, usage, webhook_secret, workers from app.utils.logging import setup_logging @@ -43,6 +43,7 @@ async def lifespan(app: FastAPI): {"name": "worker", "description": "Worker transport: heartbeat registration for pull-based execution delivery"}, {"name": "usage", "description": "Usage stats and plan information"}, {"name": "echo", "description": "Echo endpoint for testing webhook delivery"}, + {"name": "alerts", "description": "Persisted alerts fired by outcome service; optional signed webhook delivery via alert_webhook_url"}, {"name": "auth-pages", "description": "HTML pages for device code verification flow"}, ] @@ -158,3 +159,4 @@ async def generic_error_handler(request: Request, exc: Exception): app.include_router(workers.router) app.include_router(workers.workers_list_router) app.include_router(webhook_secret.router) +app.include_router(alerts.router) diff --git a/app/models/__init__.py b/app/models/__init__.py index 34b15e8..f87d7be 100644 --- a/app/models/__init__.py +++ b/app/models/__init__.py @@ -5,8 +5,9 @@ from app.models.usage_monthly import UsageMonthly from app.models.device_code import DeviceCode from app.models.worker import Worker +from app.models.alert import Alert __all__ = [ "User", "Cue", "Execution", "DispatchOutbox", "UsageMonthly", "DeviceCode", - "Worker", + "Worker", "Alert", ] diff --git a/app/models/alert.py b/app/models/alert.py new file mode 100644 index 0000000..69cf70b --- /dev/null +++ b/app/models/alert.py @@ -0,0 +1,47 @@ +"""Alert model — persisted, user-scoped events fired by outcome +service / poller and optionally delivered to the user's configured +alert webhook.""" + +from __future__ import annotations + +import uuid + +from sqlalchemy import Boolean, CheckConstraint, Column, DateTime, Index, String, Text, func +from sqlalchemy.dialects.postgresql import JSONB, UUID + +from app.database import Base + + +class Alert(Base): + __tablename__ = "alerts" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + user_id = Column(UUID(as_uuid=True), nullable=False, index=True) + # Both are nullable — an alert can be about a user-level event + # (e.g. API key rotation) with no execution or cue attached. + cue_id = Column(String(20), nullable=True) + execution_id = Column(UUID(as_uuid=True), nullable=True) + alert_type = Column(String(50), nullable=False) + # Kept at warning by default — the UI / webhook receiver decides + # how to render. ``critical`` is reserved for future use. + severity = Column(String(20), nullable=False, server_default="warning") + message = Column(Text, nullable=False) + # DB column is named ``metadata`` (SQLAlchemy reserves the + # ``metadata`` attr on Base), Python attr is ``alert_metadata``. + alert_metadata = Column("metadata", JSONB, nullable=True) + acknowledged = Column(Boolean, nullable=False, server_default="false") + created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now()) + + __table_args__ = ( + CheckConstraint( + "alert_type IN (" + "'outcome_timeout', 'verification_failed', 'consecutive_failures')", + name="valid_alert_type", + ), + CheckConstraint( + "severity IN ('info', 'warning', 'critical')", + name="valid_alert_severity", + ), + Index("ix_alerts_user_created", "user_id", "created_at"), + Index("ix_alerts_execution_id", "execution_id"), + ) diff --git a/app/models/user.py b/app/models/user.py index 6d5b8dc..dd26151 100644 --- a/app/models/user.py +++ b/app/models/user.py @@ -19,5 +19,13 @@ class User(Base): rate_limit_per_minute = Column(Integer, nullable=False, default=60) webhook_secret = Column(String(80), nullable=False) api_key_encrypted = Column(String(256), nullable=True) + # Optional HTTPS endpoint that receives alert webhooks (signed). + # If NULL, alerts are persisted but not delivered — users poll + # ``GET /v1/alerts``. + alert_webhook_url = Column(String(2048), nullable=True) + # HMAC-SHA256 signing key for alert webhook payloads. Generated + # lazily on first ``GET /v1/auth/alert-webhook-secret`` and rotatable + # via ``POST /v1/auth/alert-webhook-secret/regenerate``. + alert_webhook_secret = Column(String(64), nullable=True) created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now()) updated_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()) diff --git a/app/routers/alerts.py b/app/routers/alerts.py new file mode 100644 index 0000000..a6a3cce --- /dev/null +++ b/app/routers/alerts.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.ext.asyncio import AsyncSession + +from app.auth import AuthenticatedUser, get_current_user +from app.database import get_db +from app.schemas.alert import AlertListResponse, AlertResponse +from app.services.alert_service import list_alerts + +router = APIRouter(prefix="/v1/alerts", tags=["alerts"]) + +_VALID_TYPES = {"outcome_timeout", "verification_failed", "consecutive_failures"} + + +@router.get("", response_model=AlertListResponse) +async def get_alerts( + alert_type: Optional[str] = Query(None, description="Filter by alert type"), + since: Optional[datetime] = Query(None, description="Return alerts created at or after this ISO timestamp"), + limit: int = Query(20, ge=1, le=100), + offset: int = Query(0, ge=0), + user: AuthenticatedUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """List alerts for the authenticated user.""" + if alert_type and alert_type not in _VALID_TYPES: + raise HTTPException( + status_code=400, + detail={ + "error": { + "code": "invalid_filter", + "message": f"alert_type must be one of: {', '.join(sorted(_VALID_TYPES))}", + "status": 400, + } + }, + ) + result = await list_alerts( + db, user.id, alert_type=alert_type, since=since, limit=limit, offset=offset + ) + return AlertListResponse( + alerts=[ + AlertResponse( + id=str(a.id), + cue_id=a.cue_id, + execution_id=str(a.execution_id) if a.execution_id else None, + alert_type=a.alert_type, + severity=a.severity, + message=a.message, + metadata=a.alert_metadata, + acknowledged=a.acknowledged, + created_at=a.created_at, + ) + for a in result["alerts"] + ], + total=result["total"], + limit=result["limit"], + offset=result["offset"], + ) diff --git a/app/routers/auth_routes.py b/app/routers/auth_routes.py index 64c484c..8caece1 100644 --- a/app/routers/auth_routes.py +++ b/app/routers/auth_routes.py @@ -170,6 +170,9 @@ async def get_me( class PatchMeRequest(BaseModel): email: Optional[str] = None + # Alert webhook config. Pass an empty string to clear; pass a URL + # to set. SSRF-validated at set time. + alert_webhook_url: Optional[str] = None @router.patch("/me") @@ -179,17 +182,83 @@ async def patch_me( db: AsyncSession = Depends(get_db), ): """Update user profile settings.""" + from app.utils.url_validation import validate_callback_url + now = datetime.now(timezone.utc) updates = {"updated_at": now} if body.email is not None: updates["email"] = body.email - if not updates or len(updates) == 1: # only updated_at + if body.alert_webhook_url is not None: + if body.alert_webhook_url == "": + # Explicit clear + updates["alert_webhook_url"] = None + else: + is_valid, err = validate_callback_url(body.alert_webhook_url, settings.ENV) + if not is_valid: + raise HTTPException( + status_code=400, + detail={"error": {"code": "invalid_alert_webhook_url", "message": err, "status": 400}}, + ) + updates["alert_webhook_url"] = body.alert_webhook_url + if len(updates) == 1: # only updated_at raise HTTPException(status_code=422, detail={"error": {"code": "no_fields", "message": "No fields to update", "status": 422}}) await db.execute(update(User).where(User.id == user.id).values(**updates)) await db.commit() return {"updated_at": now.isoformat()} +@router.get("/alert-webhook-secret") +async def get_alert_webhook_secret( + user: AuthenticatedUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Return the HMAC signing secret for alert webhooks. + + Lazily generated on first call — a fresh secret is persisted to + the user row and returned. Subsequent calls return the same value. + Rotate via ``POST /v1/auth/alert-webhook-secret/regenerate``. + """ + import secrets as _secrets + + row = await db.execute( + select(User.alert_webhook_secret).where(User.id == user.id) + ) + existing = row.scalar_one_or_none() + if existing: + return {"alert_webhook_secret": existing} + + new_secret = _secrets.token_hex(32) # 64 hex chars + await db.execute( + update(User).where(User.id == user.id).values(alert_webhook_secret=new_secret) + ) + await db.commit() + return {"alert_webhook_secret": new_secret} + + +@router.post("/alert-webhook-secret/regenerate") +async def regenerate_alert_webhook_secret( + request: Request, + user: AuthenticatedUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Rotate the alert webhook signing secret. Old secret is + immediately invalidated. Requires ``X-Confirm-Destructive: true`` + — rotation breaks receivers that haven't been updated.""" + import secrets as _secrets + + if request.headers.get("x-confirm-destructive") != "true": + raise HTTPException( + status_code=400, + detail={"error": {"code": "confirmation_required", "message": "This action is destructive. Set X-Confirm-Destructive: true header to confirm.", "status": 400}}, + ) + new_secret = _secrets.token_hex(32) + await db.execute( + update(User).where(User.id == user.id).values(alert_webhook_secret=new_secret) + ) + await db.commit() + return {"alert_webhook_secret": new_secret, "previous_secret_revoked": True} + + class SessionRequest(BaseModel): token: str diff --git a/app/schemas/alert.py b/app/schemas/alert.py new file mode 100644 index 0000000..cf81ca5 --- /dev/null +++ b/app/schemas/alert.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel + + +class AlertResponse(BaseModel): + id: str + cue_id: Optional[str] = None + execution_id: Optional[str] = None + alert_type: str + severity: str + message: str + metadata: Optional[Dict[str, Any]] = None + acknowledged: bool + created_at: datetime + + +class AlertListResponse(BaseModel): + alerts: List[AlertResponse] + total: int + limit: int + offset: int diff --git a/app/services/alert_service.py b/app/services/alert_service.py new file mode 100644 index 0000000..b0b26fe --- /dev/null +++ b/app/services/alert_service.py @@ -0,0 +1,192 @@ +"""Alert service — persist alerts and trigger delivery. + +Dedup window: 5 minutes. Two alerts of the same +``(user_id, alert_type, execution_id)`` within that window collapse to +one row. Prevents alert storms on flapping executions. + +Consecutive failures: when an outcome reports ``success=false``, count +the most recent N (default 3) completed executions on the same cue. +If all N failed, fire ``consecutive_failures``. + +Webhook delivery: fire-and-forget. ``create_alert`` returns as soon as +the row is committed; ``deliver_alert`` is scheduled as a detached +task so slow/failing user webhooks never block the outcome-report +transaction. Any delivery exception is swallowed and logged inside +``deliver_alert`` itself — see ``alert_webhook.py``. +""" + +from __future__ import annotations + +import asyncio +import logging +import uuid +from datetime import datetime, timedelta, timezone +from typing import Optional + +from sqlalchemy import desc, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.alert import Alert +from app.models.execution import Execution +from app.models.user import User +from app.services.alert_webhook import deliver_alert + +logger = logging.getLogger(__name__) + +DEDUP_WINDOW_SECONDS = 300 # 5 minutes +CONSECUTIVE_FAILURE_THRESHOLD = 3 + + +async def _recent_duplicate_exists( + db: AsyncSession, + user_id, + alert_type: str, + execution_id=None, + cue_id: Optional[str] = None, +) -> bool: + """Return True if a matching alert was already created inside the + dedup window. Matching on (user_id, alert_type) plus whichever of + (execution_id, cue_id) is non-null.""" + cutoff = datetime.now(timezone.utc) - timedelta(seconds=DEDUP_WINDOW_SECONDS) + stmt = ( + select(Alert.id) + .where( + Alert.user_id == user_id, + Alert.alert_type == alert_type, + Alert.created_at > cutoff, + ) + .limit(1) + ) + if execution_id is not None: + stmt = stmt.where(Alert.execution_id == execution_id) + elif cue_id is not None: + stmt = stmt.where(Alert.cue_id == cue_id) + existing = await db.scalar(stmt) + return existing is not None + + +async def create_alert( + db: AsyncSession, + user_id, + alert_type: str, + message: str, + severity: str = "warning", + cue_id: Optional[str] = None, + execution_id=None, + metadata: Optional[dict] = None, + schedule_delivery: bool = True, +) -> Optional[Alert]: + """Persist an alert and schedule webhook delivery (if configured). + + Returns the created ``Alert`` or ``None`` if deduplicated. Writes + in the caller's transaction (assumes the caller commits). + + ``schedule_delivery=False`` is a test hook — skips the detached + delivery task so tests can assert on the DB row without racing a + background coroutine. + """ + if await _recent_duplicate_exists( + db, user_id, alert_type, execution_id=execution_id, cue_id=cue_id + ): + logger.info( + "Alert dedup: skipped within %ds window. type=%s user_id=%s execution_id=%s", + DEDUP_WINDOW_SECONDS, alert_type, user_id, execution_id, + ) + return None + + alert = Alert( + id=uuid.uuid4(), + user_id=user_id, + cue_id=cue_id, + execution_id=execution_id, + alert_type=alert_type, + severity=severity, + message=message, + alert_metadata=metadata, + ) + db.add(alert) + await db.flush() + # Caller is expected to commit. We need ``created_at`` populated + # for the webhook payload; ``flush`` populates server defaults + # after commit only. Refresh below is done post-commit by caller + # or by webhook's own retrieval. For fire-and-forget we capture + # a snapshot now. + await db.refresh(alert) + + if schedule_delivery: + # Snapshot the URL + secret so the background task doesn't + # need to reopen the session. + user_row = await db.execute( + select(User.alert_webhook_url, User.alert_webhook_secret) + .where(User.id == user_id) + ) + row = user_row.first() + url = row.alert_webhook_url if row else None + secret = row.alert_webhook_secret if row else None + if url: + # Fire-and-forget. The task is detached from the request + # lifecycle on purpose — blocking here would couple outcome + # latency to the user's webhook responsiveness. + try: + asyncio.create_task(deliver_alert(alert, url, secret)) + except RuntimeError: + # No running event loop (shouldn't happen in the API + # request path, but defensive for sync-context callers). + logger.debug("No event loop to schedule alert delivery; skipping.") + + return alert + + +async def count_consecutive_failures(db: AsyncSession, cue_id: str) -> int: + """Count the most recent run of consecutive failed executions on a + cue. Walks the history backward and stops at the first non-failed + row.""" + stmt = ( + select(Execution.status) + .where( + Execution.cue_id == cue_id, + Execution.status.in_(["success", "failed"]), + ) + .order_by(desc(Execution.created_at)) + .limit(CONSECUTIVE_FAILURE_THRESHOLD + 5) # small over-read for safety + ) + result = await db.execute(stmt) + streak = 0 + for (status,) in result.all(): + if status == "failed": + streak += 1 + else: + break + return streak + + +async def list_alerts( + db: AsyncSession, + user_id, + alert_type: Optional[str] = None, + since: Optional[datetime] = None, + limit: int = 20, + offset: int = 0, +) -> dict: + from sqlalchemy import func as sa_func + + query = select(Alert).where(Alert.user_id == user_id) + count_query = select(sa_func.count(Alert.id)).where(Alert.user_id == user_id) + + if alert_type: + query = query.where(Alert.alert_type == alert_type) + count_query = count_query.where(Alert.alert_type == alert_type) + if since is not None: + query = query.where(Alert.created_at >= since) + count_query = count_query.where(Alert.created_at >= since) + + total = await db.scalar(count_query) or 0 + rows = await db.execute( + query.order_by(desc(Alert.created_at)).limit(limit).offset(offset) + ) + return { + "alerts": rows.scalars().all(), + "total": total, + "limit": limit, + "offset": offset, + } diff --git a/app/services/alert_webhook.py b/app/services/alert_webhook.py new file mode 100644 index 0000000..e1087b4 --- /dev/null +++ b/app/services/alert_webhook.py @@ -0,0 +1,114 @@ +"""Alert webhook delivery — HMAC-signed, SSRF-protected, best-effort. + +This module ships the OSS alert-delivery path. Self-hosters configure +``alert_webhook_url`` on their user; alerts fire a signed POST. If no +URL is set, this is a no-op — alerts remain queryable via +``GET /v1/alerts``. Hosted cueapi.ai layers SendGrid on top; that +integration is not shipped in OSS. +""" + +from __future__ import annotations + +import json +import logging +from typing import Optional + +import httpx + +from app.config import settings +from app.models.alert import Alert +from app.utils.signing import sign_payload +from app.utils.url_validation import validate_url_at_delivery + +logger = logging.getLogger(__name__) + +# Shorter than regular webhook delivery (30s) — alert delivery must +# not block outcome reporting. If a user's alert endpoint is slow, we +# give up and log rather than waiting. +ALERT_WEBHOOK_TIMEOUT_SECONDS = 10.0 + + +def _alert_payload(alert: Alert) -> dict: + return { + "alert_id": str(alert.id), + "alert_type": alert.alert_type, + "severity": alert.severity, + "message": alert.message, + "execution_id": str(alert.execution_id) if alert.execution_id else None, + "cue_id": alert.cue_id, + "created_at": alert.created_at.isoformat() if alert.created_at else None, + "metadata": alert.alert_metadata or {}, + } + + +async def deliver_alert( + alert: Alert, + alert_webhook_url: Optional[str], + alert_webhook_secret: Optional[str], +) -> bool: + """Fire an HMAC-signed POST to the user's alert webhook. + + Returns True on 2xx, False otherwise (including all errors). Never + raises — alert delivery is best-effort and must not propagate into + the outcome-reporting transaction. + + If ``alert_webhook_url`` is empty/None, returns False silently — + the alert row is already persisted and queryable via + ``GET /v1/alerts``. + """ + if not alert_webhook_url: + return False + if not alert_webhook_secret: + # A URL without a secret is a misconfiguration — log once, + # don't deliver unsigned. + logger.warning( + "Alert webhook configured without signing secret; skipping delivery. user_id=%s alert_id=%s", + alert.user_id, alert.id, + ) + return False + + # SSRF: re-resolve at delivery time (DNS rebind protection). + is_valid, ssrf_error = validate_url_at_delivery(alert_webhook_url, settings.ENV) + if not is_valid: + logger.warning( + "Alert webhook SSRF-blocked at delivery: url=%s error=%s user_id=%s alert_id=%s", + alert_webhook_url, ssrf_error, alert.user_id, alert.id, + ) + return False + + payload = _alert_payload(alert) + timestamp, signature = sign_payload(payload, alert_webhook_secret) + headers = { + "Content-Type": "application/json", + "X-CueAPI-Signature": signature, + "X-CueAPI-Timestamp": timestamp, + "X-CueAPI-Alert-Id": str(alert.id), + "X-CueAPI-Alert-Type": alert.alert_type, + "User-Agent": "CueAPI/1.0", + } + + try: + async with httpx.AsyncClient( + timeout=ALERT_WEBHOOK_TIMEOUT_SECONDS, + follow_redirects=False, + ) as client: + # sort_keys=True matches sign_payload's serialization so + # receivers can verify by recomputing over request.body(). + content = json.dumps(payload, sort_keys=True, default=str) + response = await client.post(alert_webhook_url, headers=headers, content=content) + if 200 <= response.status_code < 300: + return True + logger.warning( + "Alert webhook non-2xx: status=%d url=%s alert_id=%s", + response.status_code, alert_webhook_url, alert.id, + ) + return False + except httpx.TimeoutException: + logger.warning("Alert webhook timed out: url=%s alert_id=%s", alert_webhook_url, alert.id) + return False + except httpx.ConnectError as e: + logger.warning("Alert webhook connect error: url=%s alert_id=%s err=%s", alert_webhook_url, alert.id, e) + return False + except Exception as e: + logger.warning("Alert webhook unexpected error: url=%s alert_id=%s err=%s", alert_webhook_url, alert.id, e) + return False diff --git a/app/services/outcome_service.py b/app/services/outcome_service.py index 202a226..4664a25 100644 --- a/app/services/outcome_service.py +++ b/app/services/outcome_service.py @@ -187,6 +187,67 @@ async def record_outcome( await db.commit() + # ── Alert firing (best-effort, post-commit) ── + # Each branch uses create_alert's dedup window (5 min) to collapse + # storms. Webhook delivery is fire-and-forget inside create_alert. + try: + from app.services.alert_service import ( + CONSECUTIVE_FAILURE_THRESHOLD, + count_consecutive_failures, + create_alert, + ) + + # verification_failed: set by the PR #18 verification rule + # engine when required evidence is missing. On current main + # (pre-#18), nothing sets outcome_state to 'verification_failed' + # during record_outcome, so this branch is dormant. Once PR #18 + # merges, the hook fires automatically without further changes. + if getattr(execution, "outcome_state", None) == "verification_failed": + await create_alert( + db, + user_id=user.id, + alert_type="verification_failed", + severity="warning", + message=( + f"Execution {execution_id} reported success but failed " + f"verification (required evidence missing)." + ), + execution_id=execution_id, + cue_id=execution.cue_id, + metadata={ + "outcome_state": "verification_failed", + "transport": transport, + }, + ) + await db.commit() + + # consecutive_failures: on a failed outcome, walk recent + # executions on this cue. If threshold reached, fire once + # (dedup keeps subsequent failures quiet for 5 min). + if not body.success: + streak = await count_consecutive_failures(db, execution.cue_id) + if streak >= CONSECUTIVE_FAILURE_THRESHOLD: + await create_alert( + db, + user_id=user.id, + alert_type="consecutive_failures", + severity="warning", + message=( + f"Cue {execution.cue_id} has {streak} consecutive " + f"failed executions." + ), + execution_id=execution_id, + cue_id=execution.cue_id, + metadata={"consecutive_failures": streak}, + ) + await db.commit() + except Exception: + # Alert firing must never break outcome reporting. + logger.exception( + "Alert firing failed for execution %s (outcome was still recorded)", + execution_id, + ) + logger.info( "Outcome recorded", extra={ diff --git a/examples/alert_webhook_receiver.py b/examples/alert_webhook_receiver.py new file mode 100644 index 0000000..6daebe7 --- /dev/null +++ b/examples/alert_webhook_receiver.py @@ -0,0 +1,37 @@ +"""Minimal CueAPI alert-webhook receiver. + +Verifies the HMAC signature and prints the alert. Drop this behind a +reverse proxy with HTTPS and point your user's ``alert_webhook_url`` +at the ``/cueapi-alerts`` path. Replace ``print`` with a forwarder to +your channel of choice (Slack, Discord, ntfy, SMTP relay, etc). + +Retrieve your signing secret via ``GET /v1/auth/alert-webhook-secret``. +""" + +import hashlib +import hmac +import json +import os + +from flask import Flask, abort, request + +SECRET = os.environ["CUEAPI_ALERT_WEBHOOK_SECRET"].encode() + +app = Flask(__name__) + + +@app.post("/cueapi-alerts") +def receive() -> tuple[str, int]: + ts = request.headers.get("X-CueAPI-Timestamp", "") + sig = request.headers.get("X-CueAPI-Signature", "") + body = request.get_data() # raw bytes — sorted-keys JSON + expected = hmac.new(SECRET, f"{ts}.".encode() + body, hashlib.sha256).hexdigest() + if not hmac.compare_digest(sig, f"v1={expected}"): + abort(401) + alert = json.loads(body) + print(f"[{alert['severity']}] {alert['alert_type']}: {alert['message']}") + return "", 204 + + +if __name__ == "__main__": + app.run(port=8080) diff --git a/tests/conftest.py b/tests/conftest.py index 1dabb46..0d71cb6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,7 +21,7 @@ from app.database import Base, get_db from app.main import app -from app.models import Cue, DispatchOutbox, Execution, UsageMonthly, User, Worker, DeviceCode # noqa: F401 +from app.models import Alert, Cue, DispatchOutbox, Execution, UsageMonthly, User, Worker, DeviceCode # noqa: F401 # Use the same database but create/drop tables for isolation TEST_DATABASE_URL = settings.DATABASE_URL diff --git a/tests/test_alert_model.py b/tests/test_alert_model.py new file mode 100644 index 0000000..3e647be --- /dev/null +++ b/tests/test_alert_model.py @@ -0,0 +1,101 @@ +"""Alert model: CRUD, CHECK constraints, indexes.""" +from __future__ import annotations + +import uuid + +import pytest +import pytest_asyncio +from sqlalchemy import select, text +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.alert import Alert +from app.models.user import User + + +async def _make_user(session: AsyncSession): + u = User( + email=f"a-{uuid.uuid4().hex[:8]}@test.com", + api_key_hash=uuid.uuid4().hex, + api_key_prefix="cue_sk_test", + webhook_secret="x" * 64, + ) + session.add(u) + await session.commit() + return u + + +class TestAlertCRUD: + @pytest.mark.asyncio + async def test_create_and_read(self, db_session): + u = await _make_user(db_session) + a = Alert( + id=uuid.uuid4(), + user_id=u.id, + alert_type="verification_failed", + message="test", + alert_metadata={"k": "v"}, + ) + db_session.add(a) + await db_session.commit() + + row = await db_session.execute(select(Alert).where(Alert.id == a.id)) + got = row.scalar_one() + assert got.alert_type == "verification_failed" + assert got.severity == "warning" # server default + assert got.acknowledged is False + assert got.alert_metadata == {"k": "v"} + + +class TestAlertConstraints: + @pytest.mark.asyncio + async def test_invalid_alert_type_rejected(self, db_session): + u = await _make_user(db_session) + a = Alert( + id=uuid.uuid4(), + user_id=u.id, + alert_type="not_a_real_type", + message="x", + ) + db_session.add(a) + with pytest.raises(IntegrityError): + await db_session.commit() + await db_session.rollback() + + @pytest.mark.asyncio + async def test_invalid_severity_rejected(self, db_session): + u = await _make_user(db_session) + a = Alert( + id=uuid.uuid4(), + user_id=u.id, + alert_type="outcome_timeout", + severity="cosmic", + message="x", + ) + db_session.add(a) + with pytest.raises(IntegrityError): + await db_session.commit() + await db_session.rollback() + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "t", ["outcome_timeout", "verification_failed", "consecutive_failures"] + ) + async def test_valid_alert_types_accepted(self, db_session, t): + u = await _make_user(db_session) + a = Alert(id=uuid.uuid4(), user_id=u.id, alert_type=t, message="ok") + db_session.add(a) + await db_session.commit() + + +class TestAlertIndexes: + @pytest.mark.asyncio + async def test_indexes_exist(self, db_session): + # Sanity: the three expected indexes are created. Use pg_indexes. + rows = await db_session.execute(text( + "SELECT indexname FROM pg_indexes WHERE tablename='alerts'" + )) + names = {r[0] for r in rows.all()} + # index names from model definition + assert "ix_alerts_user_created" in names + assert "ix_alerts_execution_id" in names diff --git a/tests/test_alert_service.py b/tests/test_alert_service.py new file mode 100644 index 0000000..df9025d --- /dev/null +++ b/tests/test_alert_service.py @@ -0,0 +1,168 @@ +"""alert_service: create_alert + dedup + consecutive_failures counter.""" +from __future__ import annotations + +import asyncio +import uuid +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy import select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.alert import Alert +from app.models.cue import Cue +from app.models.execution import Execution +from app.models.user import User +from app.services.alert_service import ( + CONSECUTIVE_FAILURE_THRESHOLD, + count_consecutive_failures, + create_alert, +) + + +async def _user(session: AsyncSession): + u = User( + email=f"s-{uuid.uuid4().hex[:8]}@test.com", + api_key_hash=uuid.uuid4().hex, + api_key_prefix="cue_sk_test", + webhook_secret="x" * 64, + ) + session.add(u) + await session.commit() + return u + + +async def _cue(session, user_id): + c = Cue( + id=f"cue_{uuid.uuid4().hex[:12]}", + user_id=user_id, + name=f"s-{uuid.uuid4().hex[:6]}", + schedule_type="once", + schedule_at=datetime.now(timezone.utc) + timedelta(hours=1), + next_run=datetime.now(timezone.utc) + timedelta(hours=1), + callback_url="https://example.com/h", + callback_method="POST", + callback_transport="webhook", + status="active", + payload={}, + retry_max_attempts=3, + retry_backoff_minutes=[1, 5, 15], + on_failure={"email": False, "webhook": None, "pause": False}, + ) + session.add(c) + await session.commit() + return c + + +async def _exec(session, cue_id, *, status="pending", minutes_ago=0): + created = datetime.now(timezone.utc) - timedelta(minutes=minutes_ago) + ex = Execution( + id=uuid.uuid4(), + cue_id=cue_id, + scheduled_for=created, + status=status, + ) + session.add(ex) + await session.commit() + # Stamp created_at so ordering in count_consecutive_failures is + # deterministic when rows are created in a tight loop. + if minutes_ago: + await session.execute( + update(Execution).where(Execution.id == ex.id).values(created_at=created) + ) + await session.commit() + return ex + + +class TestCreateAlert: + @pytest.mark.asyncio + async def test_persists(self, db_session): + u = await _user(db_session) + a = await create_alert( + db_session, + user_id=u.id, + alert_type="outcome_timeout", + message="timed out", + schedule_delivery=False, + ) + await db_session.commit() + assert a is not None + assert a.alert_type == "outcome_timeout" + + @pytest.mark.asyncio + async def test_dedup_within_window(self, db_session): + u = await _user(db_session) + ex_id = uuid.uuid4() + first = await create_alert( + db_session, + user_id=u.id, + alert_type="verification_failed", + message="first", + execution_id=ex_id, + schedule_delivery=False, + ) + await db_session.commit() + second = await create_alert( + db_session, + user_id=u.id, + alert_type="verification_failed", + message="second", + execution_id=ex_id, + schedule_delivery=False, + ) + await db_session.commit() + assert first is not None + assert second is None, "expected dedup to skip the second alert" + + # Only one row persists + rows = await db_session.execute( + select(Alert).where(Alert.execution_id == ex_id) + ) + all_rows = rows.scalars().all() + assert len(all_rows) == 1 + + @pytest.mark.asyncio + async def test_dedup_does_not_block_different_types(self, db_session): + u = await _user(db_session) + ex_id = uuid.uuid4() + a1 = await create_alert( + db_session, user_id=u.id, alert_type="verification_failed", + message="v", execution_id=ex_id, schedule_delivery=False, + ) + await db_session.commit() + a2 = await create_alert( + db_session, user_id=u.id, alert_type="outcome_timeout", + message="t", execution_id=ex_id, schedule_delivery=False, + ) + await db_session.commit() + assert a1 is not None and a2 is not None + + +class TestConsecutiveFailures: + @pytest.mark.asyncio + async def test_streak_counts_contiguous_failures(self, db_session): + u = await _user(db_session) + c = await _cue(db_session, u.id) + # Create in newest-first order so the streak walks backward correctly. + await _exec(db_session, c.id, status="failed", minutes_ago=1) + await _exec(db_session, c.id, status="failed", minutes_ago=2) + await _exec(db_session, c.id, status="failed", minutes_ago=3) + await _exec(db_session, c.id, status="success", minutes_ago=4) + + streak = await count_consecutive_failures(db_session, c.id) + assert streak == 3 + + @pytest.mark.asyncio + async def test_streak_breaks_on_success(self, db_session): + u = await _user(db_session) + c = await _cue(db_session, u.id) + await _exec(db_session, c.id, status="failed", minutes_ago=1) + await _exec(db_session, c.id, status="success", minutes_ago=2) + await _exec(db_session, c.id, status="failed", minutes_ago=3) + + streak = await count_consecutive_failures(db_session, c.id) + assert streak == 1 + + @pytest.mark.asyncio + async def test_threshold_constant(self): + assert CONSECUTIVE_FAILURE_THRESHOLD == 3 diff --git a/tests/test_alert_webhook_config.py b/tests/test_alert_webhook_config.py new file mode 100644 index 0000000..5c771a3 --- /dev/null +++ b/tests/test_alert_webhook_config.py @@ -0,0 +1,95 @@ +"""User-facing webhook config endpoints: PATCH /me + secret mgmt.""" +from __future__ import annotations + +import pytest +from sqlalchemy import select + +from app.models.user import User + + +class TestPatchMeWebhookUrl: + @pytest.mark.asyncio + async def test_set_valid_url(self, client, auth_headers, db_session, registered_user): + resp = await client.patch( + "/v1/auth/me", + headers=auth_headers, + json={"alert_webhook_url": "https://example.com/alerts"}, + ) + assert resp.status_code == 200, resp.text + row = await db_session.execute( + select(User.alert_webhook_url).where(User.email == registered_user["email"]) + ) + assert row.scalar_one() == "https://example.com/alerts" + + @pytest.mark.asyncio + async def test_empty_string_clears(self, client, auth_headers, db_session, registered_user): + await client.patch( + "/v1/auth/me", + headers=auth_headers, + json={"alert_webhook_url": "https://example.com/alerts"}, + ) + resp = await client.patch( + "/v1/auth/me", + headers=auth_headers, + json={"alert_webhook_url": ""}, + ) + assert resp.status_code == 200 + + @pytest.mark.asyncio + async def test_ssrf_url_rejected(self, client, auth_headers): + # 169.254.169.254 is cloud metadata — always blocked + resp = await client.patch( + "/v1/auth/me", + headers=auth_headers, + json={"alert_webhook_url": "http://169.254.169.254/latest/meta-data"}, + ) + assert resp.status_code == 400 + body = resp.json() + err = body["detail"]["error"] if "detail" in body else body["error"] + assert err["code"] == "invalid_alert_webhook_url" + + +class TestGetWebhookSecret: + @pytest.mark.asyncio + async def test_lazy_generate_on_first_call( + self, client, auth_headers, db_session, registered_user + ): + row = await db_session.execute( + select(User.alert_webhook_secret).where( + User.email == registered_user["email"] + ) + ) + assert row.scalar_one() is None # not yet generated + + resp = await client.get("/v1/auth/alert-webhook-secret", headers=auth_headers) + assert resp.status_code == 200 + secret = resp.json()["alert_webhook_secret"] + assert secret and len(secret) == 64 + + # Second call returns the same value + resp2 = await client.get("/v1/auth/alert-webhook-secret", headers=auth_headers) + assert resp2.json()["alert_webhook_secret"] == secret + + +class TestRegenerateWebhookSecret: + @pytest.mark.asyncio + async def test_requires_confirmation(self, client, auth_headers): + resp = await client.post( + "/v1/auth/alert-webhook-secret/regenerate", headers=auth_headers + ) + assert resp.status_code == 400 + + @pytest.mark.asyncio + async def test_rotates(self, client, auth_headers): + # Establish initial secret + r1 = await client.get("/v1/auth/alert-webhook-secret", headers=auth_headers) + old = r1.json()["alert_webhook_secret"] + + headers = {**auth_headers, "X-Confirm-Destructive": "true"} + r2 = await client.post( + "/v1/auth/alert-webhook-secret/regenerate", headers=headers + ) + assert r2.status_code == 200 + new = r2.json()["alert_webhook_secret"] + assert new != old + assert r2.json()["previous_secret_revoked"] is True diff --git a/tests/test_alert_webhook_delivery.py b/tests/test_alert_webhook_delivery.py new file mode 100644 index 0000000..7e5ce8d --- /dev/null +++ b/tests/test_alert_webhook_delivery.py @@ -0,0 +1,162 @@ +"""Alert webhook delivery: HMAC signing, SSRF, timeouts, failure +handling, no-URL short-circuit.""" +from __future__ import annotations + +import hashlib +import hmac +import json +import uuid +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.alert import Alert +from app.services.alert_webhook import deliver_alert + + +def _fake_alert() -> Alert: + a = Alert( + id=uuid.uuid4(), + user_id=uuid.uuid4(), + cue_id="cue_x", + execution_id=uuid.uuid4(), + alert_type="verification_failed", + severity="warning", + message="m", + alert_metadata={"k": "v"}, + acknowledged=False, + ) + a.created_at = datetime.now(timezone.utc) + return a + + +class TestDeliverAlertShortCircuits: + @pytest.mark.asyncio + async def test_no_url_returns_false_silently(self): + a = _fake_alert() + ok = await deliver_alert(a, alert_webhook_url=None, alert_webhook_secret="x" * 64) + assert ok is False + + @pytest.mark.asyncio + async def test_url_without_secret_skipped(self): + a = _fake_alert() + ok = await deliver_alert(a, alert_webhook_url="https://example.com", alert_webhook_secret=None) + assert ok is False + + +class TestSSRF: + @pytest.mark.asyncio + async def test_ssrf_blocks_private_ip(self): + a = _fake_alert() + # 127.0.0.1 is blocked in production-style checks. Use the + # production path explicitly. + with patch("app.services.alert_webhook.settings.ENV", "production"): + ok = await deliver_alert( + a, + alert_webhook_url="http://127.0.0.1/hook", + alert_webhook_secret="s" * 64, + ) + assert ok is False + + +class TestHMACSignature: + @pytest.mark.asyncio + async def test_signature_header_present_and_correct(self): + a = _fake_alert() + secret = "a" * 64 + + captured = {} + + async def _fake_post(self, url, headers=None, content=None, **kw): + captured["url"] = url + captured["headers"] = dict(headers or {}) + captured["content"] = content + resp = MagicMock() + resp.status_code = 200 + return resp + + # Bypass SSRF for this test + with patch( + "app.services.alert_webhook.validate_url_at_delivery", + return_value=(True, ""), + ), patch("httpx.AsyncClient.post", new=_fake_post): + ok = await deliver_alert( + a, + alert_webhook_url="https://example.com/hook", + alert_webhook_secret=secret, + ) + assert ok is True + + sig_header = captured["headers"].get("X-CueAPI-Signature") + ts_header = captured["headers"].get("X-CueAPI-Timestamp") + assert sig_header and sig_header.startswith("v1=") + assert ts_header and ts_header.isdigit() + assert captured["headers"].get("X-CueAPI-Alert-Id") == str(a.id) + assert captured["headers"].get("X-CueAPI-Alert-Type") == "verification_failed" + + # Recompute signature over "{ts}.{sorted_payload}" and compare. + signed = f"{ts_header}.".encode() + captured["content"].encode() + expected = hmac.new(secret.encode(), signed, hashlib.sha256).hexdigest() + assert sig_header == f"v1={expected}" + + +class TestFailureModes: + @pytest.mark.asyncio + async def test_timeout_returns_false(self): + import httpx + + a = _fake_alert() + + async def _boom(self, url, **kw): + raise httpx.TimeoutException("too slow") + + with patch( + "app.services.alert_webhook.validate_url_at_delivery", + return_value=(True, ""), + ), patch("httpx.AsyncClient.post", new=_boom): + ok = await deliver_alert( + a, + alert_webhook_url="https://example.com", + alert_webhook_secret="s" * 64, + ) + assert ok is False # did not raise + + @pytest.mark.asyncio + async def test_non_2xx_returns_false(self): + a = _fake_alert() + + async def _fail(self, url, **kw): + resp = MagicMock() + resp.status_code = 500 + return resp + + with patch( + "app.services.alert_webhook.validate_url_at_delivery", + return_value=(True, ""), + ), patch("httpx.AsyncClient.post", new=_fail): + ok = await deliver_alert( + a, + alert_webhook_url="https://example.com", + alert_webhook_secret="s" * 64, + ) + assert ok is False + + @pytest.mark.asyncio + async def test_unexpected_exception_swallowed(self): + a = _fake_alert() + + async def _boom(self, url, **kw): + raise RuntimeError("surprise") + + with patch( + "app.services.alert_webhook.validate_url_at_delivery", + return_value=(True, ""), + ), patch("httpx.AsyncClient.post", new=_boom): + ok = await deliver_alert( + a, + alert_webhook_url="https://example.com", + alert_webhook_secret="s" * 64, + ) + assert ok is False diff --git a/tests/test_alerts_api.py b/tests/test_alerts_api.py new file mode 100644 index 0000000..5c9c052 --- /dev/null +++ b/tests/test_alerts_api.py @@ -0,0 +1,106 @@ +"""GET /v1/alerts: filters, pagination, auth scoping.""" +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.alert import Alert +from app.models.user import User + + +async def _uid(session: AsyncSession, user: dict) -> str: + r = await session.execute(select(User.id).where(User.email == user["email"])) + return str(r.scalar_one()) + + +async def _seed(session, user_id, alert_type="verification_failed", n=1, minutes_ago=0): + for _ in range(n): + a = Alert( + id=uuid.uuid4(), + user_id=user_id, + alert_type=alert_type, + message="seeded", + ) + session.add(a) + await session.commit() + + +class TestListAlerts: + @pytest.mark.asyncio + async def test_empty(self, client, auth_headers): + resp = await client.get("/v1/alerts", headers=auth_headers) + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 0 + assert data["alerts"] == [] + + @pytest.mark.asyncio + async def test_list_own_alerts(self, client, auth_headers, db_session, registered_user): + uid = await _uid(db_session, registered_user) + await _seed(db_session, uid, n=3) + resp = await client.get("/v1/alerts", headers=auth_headers) + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 3 + assert len(data["alerts"]) == 3 + + @pytest.mark.asyncio + async def test_filter_by_type(self, client, auth_headers, db_session, registered_user): + uid = await _uid(db_session, registered_user) + await _seed(db_session, uid, alert_type="verification_failed", n=2) + await _seed(db_session, uid, alert_type="consecutive_failures", n=1) + + resp = await client.get( + "/v1/alerts?alert_type=verification_failed", headers=auth_headers + ) + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 2 + for a in data["alerts"]: + assert a["alert_type"] == "verification_failed" + + @pytest.mark.asyncio + async def test_invalid_type_rejected(self, client, auth_headers): + resp = await client.get("/v1/alerts?alert_type=bogus", headers=auth_headers) + assert resp.status_code == 400 + body = resp.json() + err = body["detail"]["error"] if "detail" in body else body["error"] + assert err["code"] == "invalid_filter" + + @pytest.mark.asyncio + async def test_pagination(self, client, auth_headers, db_session, registered_user): + uid = await _uid(db_session, registered_user) + await _seed(db_session, uid, n=5) + resp = await client.get("/v1/alerts?limit=2&offset=0", headers=auth_headers) + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 5 + assert len(data["alerts"]) == 2 + + resp2 = await client.get("/v1/alerts?limit=2&offset=4", headers=auth_headers) + assert resp2.status_code == 200 + assert len(resp2.json()["alerts"]) == 1 + + +class TestAuthScoping: + @pytest.mark.asyncio + async def test_user_a_cannot_see_user_b_alerts( + self, client, auth_headers, other_auth_headers, db_session, registered_user + ): + # Seed alerts for user A + uid_a = await _uid(db_session, registered_user) + await _seed(db_session, uid_a, n=2) + + # User B fetches their alerts — should see zero + resp = await client.get("/v1/alerts", headers=other_auth_headers) + assert resp.status_code == 200 + assert resp.json()["total"] == 0 + + @pytest.mark.asyncio + async def test_unauthenticated_rejected(self, client): + resp = await client.get("/v1/alerts") + assert resp.status_code == 401 diff --git a/tests/test_outcome_triggers_alert.py b/tests/test_outcome_triggers_alert.py new file mode 100644 index 0000000..ec6dcdb --- /dev/null +++ b/tests/test_outcome_triggers_alert.py @@ -0,0 +1,164 @@ +"""End-to-end: outcome reports fire alerts for the right conditions. + +Covers: +- verification_failed alert fires when the rule engine sets + ``outcome_state = 'verification_failed'`` during ``record_outcome`` + (rule engine landed in PR #18, merged ahead of this PR — we trigger + it via the real path: a cue configured with a ``require_*`` + verification mode whose outcome report arrives without the required + evidence field). +- consecutive_failures alert fires after the 3rd consecutive failure + on the same cue. +- Dedup prevents repeat firing within the window. +""" +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.alert import Alert +from app.models.cue import Cue +from app.models.execution import Execution +from app.models.user import User + + +async def _uid(session: AsyncSession, user: dict) -> str: + r = await session.execute(select(User.id).where(User.email == user["email"])) + return str(r.scalar_one()) + + +async def _cue(session, user_id, transport="webhook", verification_mode=None): + c = Cue( + id=f"cue_{uuid.uuid4().hex[:12]}", + user_id=user_id, + name=f"t-{uuid.uuid4().hex[:6]}", + schedule_type="once", + schedule_at=datetime.now(timezone.utc) + timedelta(hours=1), + next_run=datetime.now(timezone.utc) + timedelta(hours=1), + callback_url="https://example.com/h" if transport == "webhook" else None, + callback_method="POST", + callback_transport=transport, + status="active", + payload={}, + retry_max_attempts=3, + retry_backoff_minutes=[1, 5, 15], + on_failure={"email": False, "webhook": None, "pause": False}, + verification_mode=verification_mode, + ) + session.add(c) + await session.commit() + return c + + +async def _exec(session, cue_id, *, status="delivering", outcome_state=None): + ex = Execution( + id=uuid.uuid4(), + cue_id=cue_id, + scheduled_for=datetime.now(timezone.utc), + status=status, + outcome_state=outcome_state, + ) + session.add(ex) + await session.commit() + return ex + + +class TestVerificationFailedAlert: + @pytest.mark.asyncio + async def test_fires_when_outcome_state_is_verification_failed( + self, client, auth_headers, db_session, registered_user + ): + # Configure a cue with require_external_id so the rule engine + # (PR #18, now on main) will transition to verification_failed + # when the outcome report omits external_id. This is the real + # production trigger; no pre-seeding needed. + uid = await _uid(db_session, registered_user) + cue = await _cue( + db_session, uid, verification_mode="require_external_id" + ) + ex = await _exec(db_session, cue.id) + + # Report outcome with success=True but NO external_id. The + # rule engine sets outcome_state='verification_failed'; the + # alert hook in outcome_service fires immediately after commit. + resp = await client.post( + f"/v1/executions/{ex.id}/outcome", + headers=auth_headers, + json={"success": True}, + ) + assert resp.status_code == 200 + + rows = await db_session.execute( + select(Alert).where( + Alert.alert_type == "verification_failed", + Alert.execution_id == ex.id, + ) + ) + alerts = rows.scalars().all() + assert len(alerts) == 1 + assert alerts[0].severity == "warning" + assert "verification_failed" in alerts[0].alert_metadata["outcome_state"] + + +class TestConsecutiveFailuresAlert: + @pytest.mark.asyncio + async def test_fires_after_three_consecutive_failures( + self, client, auth_headers, db_session, registered_user + ): + uid = await _uid(db_session, registered_user) + cue = await _cue(db_session, uid, transport="worker") + + # Two prior failed executions already in history + for _ in range(2): + prior = Execution( + id=uuid.uuid4(), + cue_id=cue.id, + scheduled_for=datetime.now(timezone.utc), + status="failed", + outcome_recorded_at=datetime.now(timezone.utc), + ) + db_session.add(prior) + await db_session.commit() + + # Third failure via the API + ex = await _exec(db_session, cue.id) + resp = await client.post( + f"/v1/executions/{ex.id}/outcome", + headers=auth_headers, + json={"success": False, "error": "bang"}, + ) + assert resp.status_code == 200 + + rows = await db_session.execute( + select(Alert).where( + Alert.user_id == uid, + Alert.alert_type == "consecutive_failures", + ) + ) + alerts = rows.scalars().all() + assert len(alerts) == 1 + assert alerts[0].alert_metadata["consecutive_failures"] >= 3 + + @pytest.mark.asyncio + async def test_does_not_fire_on_isolated_failure( + self, client, auth_headers, db_session, registered_user + ): + uid = await _uid(db_session, registered_user) + cue = await _cue(db_session, uid, transport="worker") + ex = await _exec(db_session, cue.id) + + resp = await client.post( + f"/v1/executions/{ex.id}/outcome", + headers=auth_headers, + json={"success": False, "error": "one-off"}, + ) + assert resp.status_code == 200 + + rows = await db_session.execute( + select(Alert).where(Alert.user_id == uid) + ) + assert len(rows.scalars().all()) == 0