Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ All notable changes to cueapi-core will be documented here.
- **Verification modes** for cue outcomes. A new `verification: {mode: ...}` field on `CueCreate` / `CueUpdate` with five values: `none` (default), `require_external_id`, `require_result_url`, `require_artifacts`, `manual`. The outcome service computes `outcome_state` from (success, mode, evidence): missing required evidence lands in `verification_failed`, satisfied requirements land in `verified_success`, manual mode parks in `verification_pending`.
- **Inline evidence on `POST /v1/executions/{id}/outcome`.** `OutcomeRequest` now accepts `external_id`, `result_url`, `result_ref`, `result_type`, `summary`, `artifacts` alongside the existing `success` / `result` / `error` / `metadata`. Fully backward compatible — the legacy shape still works. The separate `PATCH /v1/executions/{id}/evidence` endpoint remains for two-step flows.
- **Migration 017** — `verification_mode` column on `cues` (String(50), nullable, CHECK-constrained enum). NULL and `none` are equivalent.
- **Alerts** — persisted alerts for `consecutive_failures`, `verification_failed`, and `outcome_timeout`. Three alert types are storage-ready; `consecutive_failures` and `verification_failed` fire automatically from `outcome_service.record_outcome`. `outcome_timeout` requires the deadline-checking poller (not yet in OSS) to activate; the CHECK constraint and router accept the type already.
- **Alert webhook delivery** — optional `alert_webhook_url` on the user. When set, each alert POSTs an HMAC-SHA256-signed payload to that URL. Fire-and-forget delivery; best-effort, never blocks outcome reporting. `X-CueAPI-Signature`, `X-CueAPI-Timestamp`, `X-CueAPI-Alert-Id`, `X-CueAPI-Alert-Type` headers. SSRF-protected at delivery time.
- **`GET /v1/alerts`** — list alerts for the authenticated user, with `alert_type` / `since` / `limit` / `offset` filters and per-user scoping.
- **`PATCH /v1/auth/me`** accepts `alert_webhook_url` (empty string clears; SSRF-validated at set time).
- **`GET /v1/auth/alert-webhook-secret`** — lazily generate + return the HMAC signing secret (64 hex chars).
- **`POST /v1/auth/alert-webhook-secret/regenerate`** — rotate the secret (requires `X-Confirm-Destructive: true`).
- **Dedup** — alerts collapse on `(user_id, alert_type, execution_id)` inside a 5-minute window.
- **Migrations 018 + 019** — alerts table with indexes and CHECK constraints; two columns on users.
- `examples/alert_webhook_receiver.py` — 30-line Flask receiver demonstrating signature verification.

### Changed
- **`POST /v1/executions/{id}/verify`** now accepts `{valid: bool, reason: str?}`. `valid=true` (default, preserving legacy behavior) transitions to `verified_success`; `valid=false` transitions to `verification_failed` and records the reason onto `evidence_summary` (truncated to 500 chars). Accepted starting states expanded to include `reported_failure`.
Expand Down
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,46 @@ Backward-compat paths still work: `POST /outcome` with just `{success: true}` be

> Worker-transport cues can currently use `none` or `manual` only. Evidence-requiring modes (`require_external_id`, `require_result_url`, `require_artifacts`) are rejected at create/update time with `400 unsupported_verification_for_transport`. This restriction will be lifted once cueapi-worker 0.3.0 ships to PyPI with evidence reporting via `CUEAPI_OUTCOME_FILE`.

## Alerts

cueapi-core persists alerts when outcomes go wrong and — if you configure a webhook URL — POSTs them to you with an HMAC signature. Three alert types today:

| Type | When it fires |
|------|--------------|
| `consecutive_failures` | Same cue reports `success=false` three runs in a row |
| `verification_failed` | Outcome is missing evidence required by the cue's verification mode (see "Verification modes") |
| `outcome_timeout` | Handler never reports an outcome before the deadline (not yet wired in OSS — coming with the deadline-checking poller) |

Alerts are deduplicated per `(user, alert_type, execution_id)` within a 5-minute window so flapping executions don't flood your inbox.

### Query alerts directly

```bash
curl http://localhost:8000/v1/alerts \
-H "Authorization: Bearer YOUR_API_KEY"
# Optional filters: ?alert_type=consecutive_failures&since=2026-01-01T00:00:00Z&limit=20
```

### Receive alerts via webhook

1. **Set your webhook URL:**
```bash
curl -X PATCH http://localhost:8000/v1/auth/me \
-H "Authorization: Bearer YOUR_API_KEY" \
-d '{"alert_webhook_url": "https://your-server.example.com/cueapi-alerts"}'
```

2. **Retrieve your signing secret** (generated lazily on first call, 64 hex chars):
```bash
curl http://localhost:8000/v1/auth/alert-webhook-secret \
-H "Authorization: Bearer YOUR_API_KEY"
```
Rotate with `POST /v1/auth/alert-webhook-secret/regenerate` (requires `X-Confirm-Destructive: true`).

3. **Verify incoming alerts** on your end. See [`examples/alert_webhook_receiver.py`](examples/alert_webhook_receiver.py) for a 30-line Flask receiver. Each POST carries `X-CueAPI-Signature: v1=<hex>`, `X-CueAPI-Timestamp`, `X-CueAPI-Alert-Id`, and `X-CueAPI-Alert-Type`.

> **Delivery path is HTTP only.** cueapi-core ships alert persistence + webhook delivery and nothing else. For email / SMS / Slack, point your `alert_webhook_url` at a forwarder you control, or use hosted cueapi.ai which includes managed email delivery via SendGrid. See [HOSTED_ONLY.md](HOSTED_ONLY.md) for the full open-core policy.

## What CueAPI is not

- Not a workflow orchestrator
Expand Down
51 changes: 51 additions & 0 deletions alembic/versions/018_add_alerts_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Add alerts table.

Revision ID: 018
Revises: 017

Chained off 017 (verification_mode column, PR #18). PR #18 landed
first on main so 018 now hangs directly off it.
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB, UUID

revision = "018"
down_revision = "017"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
"alerts",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column("user_id", UUID(as_uuid=True), nullable=False),
sa.Column("cue_id", sa.String(length=20), nullable=True),
sa.Column("execution_id", UUID(as_uuid=True), nullable=True),
sa.Column("alert_type", sa.String(length=50), nullable=False),
sa.Column("severity", sa.String(length=20), nullable=False, server_default="warning"),
sa.Column("message", sa.Text, nullable=False),
sa.Column("metadata", JSONB, nullable=True),
sa.Column("acknowledged", sa.Boolean, nullable=False, server_default=sa.text("false")),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
sa.CheckConstraint(
"alert_type IN ("
"'outcome_timeout', 'verification_failed', 'consecutive_failures')",
name="valid_alert_type",
),
sa.CheckConstraint(
"severity IN ('info', 'warning', 'critical')",
name="valid_alert_severity",
),
)
op.create_index("ix_alerts_user_id", "alerts", ["user_id"])
op.create_index("ix_alerts_user_created", "alerts", ["user_id", "created_at"])
op.create_index("ix_alerts_execution_id", "alerts", ["execution_id"])


def downgrade() -> None:
op.drop_index("ix_alerts_execution_id", table_name="alerts")
op.drop_index("ix_alerts_user_created", table_name="alerts")
op.drop_index("ix_alerts_user_id", table_name="alerts")
op.drop_table("alerts")
22 changes: 22 additions & 0 deletions alembic/versions/019_add_user_alert_webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Add alert_webhook_url + alert_webhook_secret to users.

Revision ID: 019
Revises: 018
"""
from alembic import op
import sqlalchemy as sa

revision = "019"
down_revision = "018"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column("users", sa.Column("alert_webhook_url", sa.String(length=2048), nullable=True))
op.add_column("users", sa.Column("alert_webhook_secret", sa.String(length=64), nullable=True))


def downgrade() -> None:
op.drop_column("users", "alert_webhook_secret")
op.drop_column("users", "alert_webhook_url")
4 changes: 3 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from app.middleware.rate_limit import RateLimitMiddleware
from app.middleware.request_id import RequestIdMiddleware
from app.redis import close_redis
from app.routers import auth_routes, cues, device_code, echo, executions, health, usage, webhook_secret, workers
from app.routers import alerts, auth_routes, cues, device_code, echo, executions, health, usage, webhook_secret, workers
from app.utils.logging import setup_logging


Expand Down Expand Up @@ -43,6 +43,7 @@ async def lifespan(app: FastAPI):
{"name": "worker", "description": "Worker transport: heartbeat registration for pull-based execution delivery"},
{"name": "usage", "description": "Usage stats and plan information"},
{"name": "echo", "description": "Echo endpoint for testing webhook delivery"},
{"name": "alerts", "description": "Persisted alerts fired by outcome service; optional signed webhook delivery via alert_webhook_url"},
{"name": "auth-pages", "description": "HTML pages for device code verification flow"},
]

Expand Down Expand Up @@ -158,3 +159,4 @@ async def generic_error_handler(request: Request, exc: Exception):
app.include_router(workers.router)
app.include_router(workers.workers_list_router)
app.include_router(webhook_secret.router)
app.include_router(alerts.router)
3 changes: 2 additions & 1 deletion app/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from app.models.usage_monthly import UsageMonthly
from app.models.device_code import DeviceCode
from app.models.worker import Worker
from app.models.alert import Alert

__all__ = [
"User", "Cue", "Execution", "DispatchOutbox", "UsageMonthly", "DeviceCode",
"Worker",
"Worker", "Alert",
]
47 changes: 47 additions & 0 deletions app/models/alert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Alert model — persisted, user-scoped events fired by outcome
service / poller and optionally delivered to the user's configured
alert webhook."""

from __future__ import annotations

import uuid

from sqlalchemy import Boolean, CheckConstraint, Column, DateTime, Index, String, Text, func
from sqlalchemy.dialects.postgresql import JSONB, UUID

from app.database import Base


class Alert(Base):
__tablename__ = "alerts"

id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), nullable=False, index=True)
# Both are nullable — an alert can be about a user-level event
# (e.g. API key rotation) with no execution or cue attached.
cue_id = Column(String(20), nullable=True)
execution_id = Column(UUID(as_uuid=True), nullable=True)
alert_type = Column(String(50), nullable=False)
# Kept at warning by default — the UI / webhook receiver decides
# how to render. ``critical`` is reserved for future use.
severity = Column(String(20), nullable=False, server_default="warning")
message = Column(Text, nullable=False)
# DB column is named ``metadata`` (SQLAlchemy reserves the
# ``metadata`` attr on Base), Python attr is ``alert_metadata``.
alert_metadata = Column("metadata", JSONB, nullable=True)
acknowledged = Column(Boolean, nullable=False, server_default="false")
created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())

__table_args__ = (
CheckConstraint(
"alert_type IN ("
"'outcome_timeout', 'verification_failed', 'consecutive_failures')",
name="valid_alert_type",
),
CheckConstraint(
"severity IN ('info', 'warning', 'critical')",
name="valid_alert_severity",
),
Index("ix_alerts_user_created", "user_id", "created_at"),
Index("ix_alerts_execution_id", "execution_id"),
)
8 changes: 8 additions & 0 deletions app/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,13 @@ class User(Base):
rate_limit_per_minute = Column(Integer, nullable=False, default=60)
webhook_secret = Column(String(80), nullable=False)
api_key_encrypted = Column(String(256), nullable=True)
# Optional HTTPS endpoint that receives alert webhooks (signed).
# If NULL, alerts are persisted but not delivered — users poll
# ``GET /v1/alerts``.
alert_webhook_url = Column(String(2048), nullable=True)
# HMAC-SHA256 signing key for alert webhook payloads. Generated
# lazily on first ``GET /v1/auth/alert-webhook-secret`` and rotatable
# via ``POST /v1/auth/alert-webhook-secret/regenerate``.
alert_webhook_secret = Column(String(64), nullable=True)
created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())
updated_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now())
61 changes: 61 additions & 0 deletions app/routers/alerts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from __future__ import annotations

from datetime import datetime
from typing import Optional

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession

from app.auth import AuthenticatedUser, get_current_user
from app.database import get_db
from app.schemas.alert import AlertListResponse, AlertResponse
from app.services.alert_service import list_alerts

router = APIRouter(prefix="/v1/alerts", tags=["alerts"])

_VALID_TYPES = {"outcome_timeout", "verification_failed", "consecutive_failures"}


@router.get("", response_model=AlertListResponse)
async def get_alerts(
alert_type: Optional[str] = Query(None, description="Filter by alert type"),
since: Optional[datetime] = Query(None, description="Return alerts created at or after this ISO timestamp"),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
user: AuthenticatedUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""List alerts for the authenticated user."""
if alert_type and alert_type not in _VALID_TYPES:
raise HTTPException(
status_code=400,
detail={
"error": {
"code": "invalid_filter",
"message": f"alert_type must be one of: {', '.join(sorted(_VALID_TYPES))}",
"status": 400,
}
},
)
result = await list_alerts(
db, user.id, alert_type=alert_type, since=since, limit=limit, offset=offset
)
return AlertListResponse(
alerts=[
AlertResponse(
id=str(a.id),
cue_id=a.cue_id,
execution_id=str(a.execution_id) if a.execution_id else None,
alert_type=a.alert_type,
severity=a.severity,
message=a.message,
metadata=a.alert_metadata,
acknowledged=a.acknowledged,
created_at=a.created_at,
)
for a in result["alerts"]
],
total=result["total"],
limit=result["limit"],
offset=result["offset"],
)
71 changes: 70 additions & 1 deletion app/routers/auth_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ async def get_me(

class PatchMeRequest(BaseModel):
email: Optional[str] = None
# Alert webhook config. Pass an empty string to clear; pass a URL
# to set. SSRF-validated at set time.
alert_webhook_url: Optional[str] = None


@router.patch("/me")
Expand All @@ -179,17 +182,83 @@ async def patch_me(
db: AsyncSession = Depends(get_db),
):
"""Update user profile settings."""
from app.utils.url_validation import validate_callback_url

now = datetime.now(timezone.utc)
updates = {"updated_at": now}
if body.email is not None:
updates["email"] = body.email
if not updates or len(updates) == 1: # only updated_at
if body.alert_webhook_url is not None:
if body.alert_webhook_url == "":
# Explicit clear
updates["alert_webhook_url"] = None
else:
is_valid, err = validate_callback_url(body.alert_webhook_url, settings.ENV)
if not is_valid:
raise HTTPException(
status_code=400,
detail={"error": {"code": "invalid_alert_webhook_url", "message": err, "status": 400}},
)
updates["alert_webhook_url"] = body.alert_webhook_url
if len(updates) == 1: # only updated_at
raise HTTPException(status_code=422, detail={"error": {"code": "no_fields", "message": "No fields to update", "status": 422}})
await db.execute(update(User).where(User.id == user.id).values(**updates))
await db.commit()
return {"updated_at": now.isoformat()}


@router.get("/alert-webhook-secret")
async def get_alert_webhook_secret(
user: AuthenticatedUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Return the HMAC signing secret for alert webhooks.

Lazily generated on first call — a fresh secret is persisted to
the user row and returned. Subsequent calls return the same value.
Rotate via ``POST /v1/auth/alert-webhook-secret/regenerate``.
"""
import secrets as _secrets

row = await db.execute(
select(User.alert_webhook_secret).where(User.id == user.id)
)
existing = row.scalar_one_or_none()
if existing:
return {"alert_webhook_secret": existing}

new_secret = _secrets.token_hex(32) # 64 hex chars
await db.execute(
update(User).where(User.id == user.id).values(alert_webhook_secret=new_secret)
)
await db.commit()
return {"alert_webhook_secret": new_secret}


@router.post("/alert-webhook-secret/regenerate")
async def regenerate_alert_webhook_secret(
request: Request,
user: AuthenticatedUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Rotate the alert webhook signing secret. Old secret is
immediately invalidated. Requires ``X-Confirm-Destructive: true``
— rotation breaks receivers that haven't been updated."""
import secrets as _secrets

if request.headers.get("x-confirm-destructive") != "true":
raise HTTPException(
status_code=400,
detail={"error": {"code": "confirmation_required", "message": "This action is destructive. Set X-Confirm-Destructive: true header to confirm.", "status": 400}},
)
new_secret = _secrets.token_hex(32)
await db.execute(
update(User).where(User.id == user.id).values(alert_webhook_secret=new_secret)
)
await db.commit()
return {"alert_webhook_secret": new_secret, "previous_secret_revoked": True}


class SessionRequest(BaseModel):
token: str

Expand Down
Loading
Loading