diff --git a/CHANGELOG.md b/CHANGELOG.md index b3083cc..89728ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,20 @@ All notable changes to cueapi-core will be documented here. +## [Unreleased] + +### Added +- **Verification modes** for cue outcomes. A new `verification: {mode: ...}` field on `CueCreate` / `CueUpdate` with five values: `none` (default), `require_external_id`, `require_result_url`, `require_artifacts`, `manual`. The outcome service computes `outcome_state` from (success, mode, evidence): missing required evidence lands in `verification_failed`, satisfied requirements land in `verified_success`, manual mode parks in `verification_pending`. +- **Inline evidence on `POST /v1/executions/{id}/outcome`.** `OutcomeRequest` now accepts `external_id`, `result_url`, `result_ref`, `result_type`, `summary`, `artifacts` alongside the existing `success` / `result` / `error` / `metadata`. Fully backward compatible — the legacy shape still works. The separate `PATCH /v1/executions/{id}/evidence` endpoint remains for two-step flows. +- **Migration 017** — `verification_mode` column on `cues` (String(50), nullable, CHECK-constrained enum). NULL and `none` are equivalent. + +### Changed +- **`POST /v1/executions/{id}/verify`** now accepts `{valid: bool, reason: str?}`. `valid=true` (default, preserving legacy behavior) transitions to `verified_success`; `valid=false` transitions to `verification_failed` and records the reason onto `evidence_summary` (truncated to 500 chars). Accepted starting states expanded to include `reported_failure`. +- `OutcomeResponse` now surfaces `outcome_state` in the response body. + +### Restricted +- Worker-transport cues cannot currently combine with evidence-requiring verification modes (`require_external_id`, `require_result_url`, `require_artifacts`). Attempting to create or PATCH such a combination returns `400 unsupported_verification_for_transport`. `none` and `manual` are allowed for worker cues. This restriction will be lifted once cueapi-worker 0.3.0 (evidence reporting via `CUEAPI_OUTCOME_FILE`) is on PyPI. + ## [0.1.2] - 2026-03-28 ### Security diff --git a/README.md b/README.md index 9c99cb2..e246297 100644 --- a/README.md +++ b/README.md @@ -198,6 +198,56 @@ curl -X POST http://localhost:8000/v1/worker/heartbeat \ The handlers array tells CueAPI which cue names this worker can process. +### Verification modes + +Cues can require evidence on the outcome report. Configure a `verification` policy at create or update time: + +```bash +curl -X POST http://localhost:8000/v1/cues \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -d '{"name": "nightly-report", "schedule": {"type": "recurring", "cron": "0 9 * * *"}, + "callback": {"url": "https://your-handler.com"}, + "verification": {"mode": "require_external_id"}}' +``` + +Five modes: + +| Mode | Behavior | +|------|----------| +| `none` (default) | Reported `success` is final — `reported_success` / `reported_failure`. | +| `require_external_id` | Outcome must include `external_id`. Missing → `verification_failed`. Present → `verified_success`. | +| `require_result_url` | Outcome must include `result_url`. | +| `require_artifacts` | Outcome must include `artifacts` (non-empty). | +| `manual` | Every successful outcome parks in `verification_pending` until someone calls `POST /v1/executions/{id}/verify`. | + +Report outcomes with evidence inline on the existing endpoint: + +```bash +curl -X POST http://localhost:8000/v1/executions/EXEC_ID/outcome \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -d '{"success": true, "external_id": "stripe_ch_abc123", + "result_url": "https://dashboard.stripe.com/payments/ch_abc123", + "summary": "Charged customer 42"}' +``` + +Manually verify or reject a parked outcome: + +```bash +# Approve +curl -X POST http://localhost:8000/v1/executions/EXEC_ID/verify \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -d '{"valid": true}' + +# Reject (e.g. after audit) +curl -X POST http://localhost:8000/v1/executions/EXEC_ID/verify \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -d '{"valid": false, "reason": "invoice number does not match"}' +``` + +Backward-compat paths still work: `POST /outcome` with just `{success: true}` behaves identically to before, and `PATCH /v1/executions/{id}/evidence` remains available as a two-step alternative. + +> Worker-transport cues can currently use `none` or `manual` only. Evidence-requiring modes (`require_external_id`, `require_result_url`, `require_artifacts`) are rejected at create/update time with `400 unsupported_verification_for_transport`. This restriction will be lifted once cueapi-worker 0.3.0 ships to PyPI with evidence reporting via `CUEAPI_OUTCOME_FILE`. + ## What CueAPI is not - Not a workflow orchestrator diff --git a/alembic/versions/017_add_verification_mode.py b/alembic/versions/017_add_verification_mode.py new file mode 100644 index 0000000..55263dd --- /dev/null +++ b/alembic/versions/017_add_verification_mode.py @@ -0,0 +1,41 @@ +"""Add verification_mode column to cues table. + +Outcome-verification policy per cue. Stored as a plain string rather +than JSONB because the only structured field today is `mode`; keeping +it a string keeps queries simple (`WHERE verification_mode = ...`) and +lets Postgres enforce the enum via a CHECK constraint. If the policy +gains fields later, widen to JSONB with a separate migration. + +NULL means "no verification" — equivalent to mode=none but avoids a +row rewrite for the 100% of existing rows that have never configured +verification. Outcome service treats NULL and 'none' identically. + +Revision ID: 017 +Revises: 016 +""" +from alembic import op +import sqlalchemy as sa + +revision = "017" +down_revision = "016" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "cues", + sa.Column("verification_mode", sa.String(length=50), nullable=True), + ) + op.create_check_constraint( + "valid_verification_mode", + "cues", + "verification_mode IS NULL OR verification_mode IN (" + "'none', 'require_external_id', 'require_result_url', " + "'require_artifacts', 'manual')", + ) + + +def downgrade() -> None: + op.drop_constraint("valid_verification_mode", "cues", type_="check") + op.drop_column("cues", "verification_mode") diff --git a/app/models/cue.py b/app/models/cue.py index b17517e..ff1b48d 100644 --- a/app/models/cue.py +++ b/app/models/cue.py @@ -28,6 +28,8 @@ class Cue(Base): run_count = Column(Integer, nullable=False, default=0) fired_count = Column(Integer, nullable=False, default=0) on_failure = Column(JSONB, nullable=True, default={"email": True, "webhook": None, "pause": False}) + # Outcome-verification policy. NULL == no verification (same as 'none'). + verification_mode = Column(String(50), nullable=True, default=None) created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now()) updated_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()) @@ -36,5 +38,11 @@ class Cue(Base): CheckConstraint("schedule_type IN ('once', 'recurring')", name="valid_schedule_type"), CheckConstraint("callback_method IN ('POST', 'GET', 'PUT', 'PATCH')", name="valid_callback_method"), CheckConstraint("callback_transport IN ('webhook', 'worker')", name="valid_callback_transport"), + CheckConstraint( + "verification_mode IS NULL OR verification_mode IN (" + "'none', 'require_external_id', 'require_result_url', " + "'require_artifacts', 'manual')", + name="valid_verification_mode", + ), UniqueConstraint("user_id", "name", name="unique_user_cue_name"), ) diff --git a/app/routers/executions.py b/app/routers/executions.py index b01f90b..2dd8b73 100644 --- a/app/routers/executions.py +++ b/app/routers/executions.py @@ -3,7 +3,8 @@ from datetime import datetime, timedelta, timezone from typing import Optional -from fastapi import APIRouter, Depends, Header, HTTPException, Query +from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query +from pydantic import BaseModel from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession @@ -331,29 +332,86 @@ async def replay_execution( # ── Verify ── +class VerifyRequest(BaseModel): + """Body for ``POST /v1/executions/{id}/verify``. + + Body is optional — a request with no body (or ``{}``) defaults to + ``valid=true`` so the previous always-success behavior remains the + default. ``valid=false`` is the new branch: it transitions to + ``verification_failed`` and optionally persists a human-readable + ``reason`` onto ``evidence_summary``. + """ + + valid: bool = True + reason: Optional[str] = None + + @router.post("/{execution_id}/verify") async def verify_execution( execution_id: str, + body: Optional[VerifyRequest] = Body(None), user: AuthenticatedUser = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): - """Mark execution outcome as verified.""" + """Mark execution outcome as verified or verification-failed. + + Accepts ``{valid: bool, reason: str?}``. ``valid=true`` (default) + transitions to ``verified_success``; ``valid=false`` transitions to + ``verification_failed`` and records the reason on + ``evidence_summary`` (truncated to 500 chars). Accepted starting + states: ``reported_success``, ``reported_failure``, + ``verification_pending``. + """ result = await db.execute( select(Execution).join(Cue, Execution.cue_id == Cue.id) .where(Execution.id == execution_id, Cue.user_id == user.id) ) execution = result.scalar_one_or_none() if not execution: - raise HTTPException(status_code=404) + raise HTTPException(status_code=404, detail={"error": {"code": "execution_not_found", "message": "Execution not found", "status": 404}}) - if execution.outcome_state not in {"reported_success", "verification_pending"}: - raise HTTPException(status_code=409, detail={"error": {"code": "invalid_state", "message": f"Cannot verify from state '{execution.outcome_state}'"}}) + if execution.outcome_state not in { + "reported_success", + "reported_failure", + "verification_pending", + }: + raise HTTPException( + status_code=409, + detail={ + "error": { + "code": "invalid_state", + "message": f"Cannot verify from state '{execution.outcome_state}'", + "status": 409, + } + }, + ) - execution.outcome_state = "verified_success" - execution.evidence_validation_state = "valid" - execution.updated_at = datetime.now(timezone.utc) + payload = body or VerifyRequest() + now = datetime.now(timezone.utc) + if payload.valid: + execution.outcome_state = "verified_success" + execution.evidence_validation_state = "valid" + else: + execution.outcome_state = "verification_failed" + execution.evidence_validation_state = "invalid" + if payload.reason: + # Persist reason alongside any existing summary; truncate + # to the column cap. We prepend so operators who set a + # summary at outcome-report time still see it. + truncated = payload.reason[:500] + if execution.evidence_summary: + combined = f"{execution.evidence_summary} | verification rejected: {truncated}" + execution.evidence_summary = combined[:500] + else: + execution.evidence_summary = truncated + execution.updated_at = now await db.commit() - return {"execution_id": str(execution_id), "outcome_state": "verified_success"} + return { + "execution_id": str(execution_id), + "outcome_state": execution.outcome_state, + "valid": payload.valid, + "reason": payload.reason, + } # ── Verification pending ── diff --git a/app/schemas/cue.py b/app/schemas/cue.py index f573be7..f613031 100644 --- a/app/schemas/cue.py +++ b/app/schemas/cue.py @@ -1,6 +1,7 @@ from __future__ import annotations from datetime import datetime +from enum import Enum from typing import Dict, List, Optional from pydantic import BaseModel, ConfigDict, Field, HttpUrl, field_validator, model_validator @@ -8,6 +9,34 @@ from app.schemas.execution import ExecutionResponse +class VerificationMode(str, Enum): + """Outcome-verification policy for a cue. + + - ``none``: the reported ``success`` bool is final. Default. + - ``require_external_id``/``result_url``/``artifacts``: evidence + field must be present on the outcome report; if missing, the + execution is marked ``verification_failed``. If present and + ``success=True``, the execution is marked ``verified_success``. + - ``manual``: every successful outcome sits in + ``verification_pending`` until someone calls + ``POST /v1/executions/{id}/verify``. + """ + + none = "none" + require_external_id = "require_external_id" + require_result_url = "require_result_url" + require_artifacts = "require_artifacts" + manual = "manual" + + +class VerificationPolicy(BaseModel): + """Outcome-verification policy. Only ``mode`` today; kept as a + sub-object so future fields (e.g. ``auto_verify_after``) can be + added without breaking the API shape.""" + + mode: VerificationMode = Field(default=VerificationMode.none) + + class ScheduleCreate(BaseModel): type: str # "once" | "recurring" cron: Optional[str] = None @@ -43,6 +72,7 @@ class CueCreate(BaseModel): payload: Optional[dict] = Field(default={}) retry: Optional[RetryConfig] = Field(default_factory=RetryConfig) on_failure: Optional[OnFailureConfig] = Field(default_factory=OnFailureConfig) + verification: Optional[VerificationPolicy] = None @model_validator(mode="after") def validate_transport(self) -> "CueCreate": @@ -73,6 +103,7 @@ class CueUpdate(BaseModel): payload: Optional[dict] = None retry: Optional[RetryConfig] = None on_failure: Optional[OnFailureConfig] = None + verification: Optional[VerificationPolicy] = None @field_validator("status") @classmethod @@ -97,6 +128,7 @@ class CueResponse(BaseModel): run_count: int fired_count: int = 0 on_failure: Optional[dict] = None + verification: Optional[dict] = None warning: Optional[str] = None created_at: datetime updated_at: datetime diff --git a/app/schemas/outcome.py b/app/schemas/outcome.py index c929899..286a694 100644 --- a/app/schemas/outcome.py +++ b/app/schemas/outcome.py @@ -1,18 +1,35 @@ from __future__ import annotations -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, HttpUrl class OutcomeRequest(BaseModel): + """Outcome report. Evidence fields are optional and additive — a + caller that only sends {success, result, error, metadata} gets the + identical behavior it always did. Evidence fields feed the + verification-modes policy configured on the cue (see + ``VerificationMode``).""" + success: bool result: Optional[str] = Field(None, max_length=2000) error: Optional[str] = Field(None, max_length=2000) metadata: Optional[Dict[str, Any]] = None + # Evidence fields — recorded on the Execution's ``evidence_*`` + # columns. Any missing evidence required by the cue's verification + # mode causes the outcome to land in ``verification_failed`` rather + # than ``reported_success``. + external_id: Optional[str] = Field(None, max_length=500) + result_url: Optional[HttpUrl] = None + result_ref: Optional[str] = Field(None, max_length=500) + result_type: Optional[str] = Field(None, max_length=100) + summary: Optional[str] = Field(None, max_length=500) + artifacts: Optional[List[Any]] = None class OutcomeResponse(BaseModel): execution_id: str outcome_recorded: bool + outcome_state: Optional[str] = None reason: Optional[str] = None diff --git a/app/services/cue_service.py b/app/services/cue_service.py index 1635b47..000a77f 100644 --- a/app/services/cue_service.py +++ b/app/services/cue_service.py @@ -29,6 +29,49 @@ def validate_cron(expression: str) -> bool: return False +# Verification modes that require evidence on the outcome report. +# Worker transport (today) has no path to attach evidence on the single +# outcome POST, so these modes are rejected for worker cues at create / +# update time. Ref: cueapi-worker < 0.3.0. This rejection is lifted in +# a later PR once cueapi-worker 0.3.0 (CUEAPI_OUTCOME_FILE) is on PyPI. +_EVIDENCE_REQUIRING_MODES = frozenset( + {"require_external_id", "require_result_url", "require_artifacts"} +) +_WORKER_COMPATIBLE_MODES = ("none", "manual") + + +def _check_transport_verification_combo( + transport: str, mode: Optional[str] +) -> Optional[dict]: + """Reject worker transport paired with evidence-based verification. + + Returns an error dict (matching the service-layer error shape) when + the combination is invalid, or None when it's fine. Lives here + rather than as a Pydantic validator because the existing API shape + uses structured 400 errors (``{"error": {"code": ...}}``) and + Pydantic ValueErrors surface as 422 with a different schema. + """ + if transport != "worker" or not mode or mode in _WORKER_COMPATIBLE_MODES: + return None + if mode not in _EVIDENCE_REQUIRING_MODES: + return None + return { + "error": { + "code": "unsupported_verification_for_transport", + "message": ( + "Worker transport does not yet support evidence-based " + "verification modes. Use 'none' or 'manual' for worker " + "cues, or switch to webhook transport for evidence " + "verification." + ), + "status": 400, + "transport": "worker", + "verification_mode": mode, + "supported_worker_modes": list(_WORKER_COMPATIBLE_MODES), + } + } + + def _contains_null_byte(obj) -> bool: """Recursively check if any string in a dict/list contains a null byte.""" if isinstance(obj, str): @@ -81,6 +124,9 @@ def _cue_to_response(cue: Cue) -> CueResponse: "backoff_minutes": cue.retry_backoff_minutes, } + verification_mode = getattr(cue, "verification_mode", None) + verification = {"mode": verification_mode} if verification_mode else None + return CueResponse( id=cue.id, name=cue.name, @@ -96,6 +142,7 @@ def _cue_to_response(cue: Cue) -> CueResponse: run_count=cue.run_count, fired_count=getattr(cue, 'fired_count', 0) or 0, on_failure=getattr(cue, 'on_failure', None), + verification=verification, created_at=cue.created_at, updated_at=cue.updated_at, ) @@ -157,6 +204,17 @@ async def create_cue(db: AsyncSession, user: AuthenticatedUser, data: CueCreate) transport = data.transport or "webhook" warning = None + # Reject worker transport paired with evidence-based verification. + # See ``_check_transport_verification_combo`` for the rationale — + # this will be lifted once cueapi-worker 0.3.0 (evidence reporting + # via CUEAPI_OUTCOME_FILE) is on PyPI. + if data.verification is not None: + combo_err = _check_transport_verification_combo( + transport, data.verification.mode.value + ) + if combo_err is not None: + return combo_err + if transport == "webhook": is_valid, error_msg = validate_callback_url(str(data.callback.url), settings.ENV) if not is_valid: @@ -252,6 +310,10 @@ async def create_cue(db: AsyncSession, user: AuthenticatedUser, data: CueCreate) else: on_failure_dict = {"email": True, "webhook": None, "pause": False} + verification_mode = ( + data.verification.mode.value if data.verification is not None else None + ) + cue = Cue( id=generate_cue_id(), user_id=user.id, @@ -271,6 +333,7 @@ async def create_cue(db: AsyncSession, user: AuthenticatedUser, data: CueCreate) retry_backoff_minutes=retry.backoff_minutes, next_run=next_run, on_failure=on_failure_dict, + verification_mode=verification_mode, ) db.add(cue) @@ -393,6 +456,18 @@ async def update_cue(db: AsyncSession, user: AuthenticatedUser, cue_id: str, dat cue.retry_max_attempts = data.retry.max_attempts cue.retry_backoff_minutes = data.retry.backoff_minutes + # Verification policy update. Validate the *resulting* (transport, + # mode) combo — transport is effectively immutable via PATCH today, + # so the resulting transport is whatever the cue currently has. + if data.verification is not None: + resulting_transport = cue.callback_transport or "webhook" + combo_err = _check_transport_verification_combo( + resulting_transport, data.verification.mode.value + ) + if combo_err is not None: + return combo_err + cue.verification_mode = data.verification.mode.value + if data.on_failure is not None: if data.on_failure.webhook: is_valid, error_msg = validate_callback_url(data.on_failure.webhook, settings.ENV) diff --git a/app/services/outcome_service.py b/app/services/outcome_service.py index a67396e..202a226 100644 --- a/app/services/outcome_service.py +++ b/app/services/outcome_service.py @@ -26,10 +26,16 @@ async def record_outcome( """ # Find execution and verify ownership via cue -> user_id - # Also fetch cue transport and schedule_type for worker lifecycle + # Also fetch cue transport, schedule_type, and verification_mode + # for worker lifecycle + outcome-state computation. # Use FOR UPDATE to prevent concurrent outcome submissions (write-once) result = await db.execute( - select(Execution, Cue.callback_transport, Cue.schedule_type) + select( + Execution, + Cue.callback_transport, + Cue.schedule_type, + Cue.verification_mode, + ) .join(Cue, Execution.cue_id == Cue.id) .where(Execution.id == execution_id, Cue.user_id == user.id) .with_for_update(of=Execution) @@ -48,6 +54,7 @@ async def record_outcome( execution = row[0] transport = row[1] schedule_type = row[2] + verification_mode = row[3] or "none" # Write-once check (row is locked by FOR UPDATE, so this is race-safe) if execution.outcome_recorded_at is not None: @@ -80,6 +87,65 @@ async def record_outcome( execution.outcome_metadata = body.metadata execution.outcome_recorded_at = now + # Persist any evidence fields unconditionally — they're descriptive + # and don't affect the state decision on their own. + if body.external_id: + execution.evidence_external_id = body.external_id + if body.result_url is not None: + execution.evidence_result_url = str(body.result_url) + if body.result_ref: + execution.evidence_result_ref = body.result_ref + if body.result_type: + execution.evidence_result_type = body.result_type + if body.summary: + execution.evidence_summary = body.summary[:500] + if body.artifacts: + execution.evidence_artifacts = body.artifacts + if body.external_id or body.result_url is not None or body.artifacts: + execution.evidence_produced_at = now + + # Compute outcome_state from (success, verification_mode, evidence). + # Semantics, by mode: + # none : reported_success | reported_failure + # require_external_id : verified_success if external_id else verification_failed + # require_result_url : verified_success if result_url else verification_failed + # require_artifacts : verified_success if artifacts else verification_failed + # manual : verification_pending (regardless of evidence) + # Failure bypasses verification entirely — the report was already + # explicit that the work didn't succeed. + if not body.success: + execution.outcome_state = "reported_failure" + elif verification_mode in (None, "none"): + execution.outcome_state = "reported_success" + elif verification_mode == "manual": + execution.outcome_state = "verification_pending" + elif verification_mode == "require_external_id": + if body.external_id: + execution.outcome_state = "verified_success" + execution.evidence_validation_state = "valid" + else: + execution.outcome_state = "verification_failed" + execution.evidence_validation_state = "invalid" + elif verification_mode == "require_result_url": + if body.result_url is not None: + execution.outcome_state = "verified_success" + execution.evidence_validation_state = "valid" + else: + execution.outcome_state = "verification_failed" + execution.evidence_validation_state = "invalid" + elif verification_mode == "require_artifacts": + if body.artifacts: + execution.outcome_state = "verified_success" + execution.evidence_validation_state = "valid" + else: + execution.outcome_state = "verification_failed" + execution.evidence_validation_state = "invalid" + else: + # Unknown mode: treat defensively as 'none'. The CHECK + # constraint should prevent this, but don't corrupt state if + # someone adds a new mode and forgets to update this block. + execution.outcome_state = "reported_success" + # For worker transport, the outcome IS the completion signal. # Update execution status and cue lifecycle (mirroring _handle_success/_handle_failure). if transport == "worker": @@ -135,5 +201,6 @@ async def record_outcome( "outcome": OutcomeResponse( execution_id=execution_id, outcome_recorded=True, + outcome_state=execution.outcome_state, ) } diff --git a/tests/test_execution_parity.py b/tests/test_execution_parity.py index ee7b019..817b4a7 100644 --- a/tests/test_execution_parity.py +++ b/tests/test_execution_parity.py @@ -208,11 +208,11 @@ async def test_verify_success(self, client, auth_headers, db_session, registered @pytest.mark.asyncio async def test_verify_wrong_state(self, client, auth_headers, db_session, registered_user): + # reported_failure is now an accepted starting state (PR: verification + # modes parity). Use a pre-outcome state — still invalid. user_id = await _get_user_id(db_session, registered_user) cue = await _create_webhook_cue(db_session, user_id) - ex = await _create_execution(db_session, cue.id, status="success", - outcome_state="reported_failure", - outcome_recorded_at=datetime.now(timezone.utc)) + ex = await _create_execution(db_session, cue.id, status="pending") resp = await client.post(f"/v1/executions/{ex.id}/verify", headers=auth_headers) assert resp.status_code == 409 diff --git a/tests/test_outcome_evidence.py b/tests/test_outcome_evidence.py new file mode 100644 index 0000000..29a6d32 --- /dev/null +++ b/tests/test_outcome_evidence.py @@ -0,0 +1,161 @@ +"""OutcomeRequest accepts evidence fields — persistence + backward compat. + +Covers: +- Evidence fields on POST /outcome persist to the execution's + evidence_* columns. +- A request that sends only {success} (the legacy shape) still works + and leaves evidence_* columns NULL. +- PATCH /v1/executions/{id}/evidence (two-step flow) remains + functional. +""" +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.cue import Cue +from app.models.execution import Execution +from app.models.user import User + + +async def _user_id(session: AsyncSession, user: dict) -> str: + r = await session.execute(select(User.id).where(User.email == user["email"])) + return str(r.scalar_one()) + + +async def _webhook_cue(session, user_id): + cue = Cue( + id=f"cue_{uuid.uuid4().hex[:12]}", + user_id=user_id, + name=f"ev-{uuid.uuid4().hex[:6]}", + schedule_type="once", + schedule_at=datetime.now(timezone.utc) + timedelta(hours=1), + next_run=datetime.now(timezone.utc) + timedelta(hours=1), + callback_url="https://example.com/h", + callback_method="POST", + callback_transport="webhook", + status="active", + payload={}, + retry_max_attempts=3, + retry_backoff_minutes=[1, 5, 15], + on_failure={"email": False, "webhook": None, "pause": False}, + ) + session.add(cue) + await session.commit() + return cue + + +async def _exec(session, cue_id): + ex = Execution( + id=uuid.uuid4(), + cue_id=cue_id, + scheduled_for=datetime.now(timezone.utc), + status="delivering", + ) + session.add(ex) + await session.commit() + return ex + + +class TestInlineEvidenceOnOutcome: + @pytest.mark.asyncio + async def test_all_evidence_fields_persist( + self, client, auth_headers, db_session, registered_user + ): + uid = await _user_id(db_session, registered_user) + cue = await _webhook_cue(db_session, uid) + ex = await _exec(db_session, cue.id) + + resp = await client.post( + f"/v1/executions/{ex.id}/outcome", + headers=auth_headers, + json={ + "success": True, + "external_id": "ext-xyz", + "result_url": "https://example.com/r/1", + "result_ref": "ref-1", + "result_type": "document", + "summary": "Ran successfully", + "artifacts": [{"type": "log", "url": "https://e.com/l"}], + }, + ) + assert resp.status_code == 200 + + await db_session.refresh(ex) + assert ex.evidence_external_id == "ext-xyz" + assert ex.evidence_result_url == "https://example.com/r/1" + assert ex.evidence_result_ref == "ref-1" + assert ex.evidence_result_type == "document" + assert ex.evidence_summary == "Ran successfully" + assert ex.evidence_artifacts == [ + {"type": "log", "url": "https://e.com/l"} + ] + assert ex.evidence_produced_at is not None + + @pytest.mark.asyncio + async def test_legacy_shape_still_accepted( + self, client, auth_headers, db_session, registered_user + ): + uid = await _user_id(db_session, registered_user) + cue = await _webhook_cue(db_session, uid) + ex = await _exec(db_session, cue.id) + + resp = await client.post( + f"/v1/executions/{ex.id}/outcome", + headers=auth_headers, + json={"success": True, "result": "ok"}, + ) + assert resp.status_code == 200 + + await db_session.refresh(ex) + assert ex.evidence_external_id is None + assert ex.evidence_result_url is None + assert ex.evidence_artifacts is None + + @pytest.mark.asyncio + async def test_summary_truncated_to_500( + self, client, auth_headers, db_session, registered_user + ): + # Pydantic caps summary at 500 chars before we even see it — + # a caller that sends longer fails validation with 422. This + # pins that behavior. + uid = await _user_id(db_session, registered_user) + cue = await _webhook_cue(db_session, uid) + ex = await _exec(db_session, cue.id) + + resp = await client.post( + f"/v1/executions/{ex.id}/outcome", + headers=auth_headers, + json={"success": True, "summary": "x" * 501}, + ) + assert resp.status_code == 422 + + +class TestPatchEvidenceStillWorks: + @pytest.mark.asyncio + async def test_patch_evidence_after_outcome( + self, client, auth_headers, db_session, registered_user + ): + uid = await _user_id(db_session, registered_user) + cue = await _webhook_cue(db_session, uid) + ex = await _exec(db_session, cue.id) + + outcome = await client.post( + f"/v1/executions/{ex.id}/outcome", + headers=auth_headers, + json={"success": True}, + ) + assert outcome.status_code == 200 + + patch = await client.patch( + f"/v1/executions/{ex.id}/evidence", + headers=auth_headers, + json={"external_id": "after-the-fact"}, + ) + assert patch.status_code == 200 + await db_session.refresh(ex) + assert ex.evidence_external_id == "after-the-fact" diff --git a/tests/test_transport_verification_combo.py b/tests/test_transport_verification_combo.py new file mode 100644 index 0000000..a0e546a --- /dev/null +++ b/tests/test_transport_verification_combo.py @@ -0,0 +1,166 @@ +"""Worker + evidence-based verification rejection. + +This combo is rejected at cue create/update time because cueapi-worker +< 0.3.0 has no mechanism to attach evidence on the outcome report. The +rejection is lifted in a later PR once cueapi-worker 0.3.0 is on PyPI. + +Eight tests: 3 evidence-requiring modes × (create, update, webhook +allowed) + 2 worker-compatible modes confirming the combo is allowed. +""" +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone + +import pytest +from httpx import AsyncClient + + +def _cue_body(*, transport="worker", mode=None, name=None): + body = { + "name": name or f"combo-{uuid.uuid4().hex[:6]}", + "schedule": { + "type": "once", + "at": (datetime.now(timezone.utc) + timedelta(hours=1)).isoformat(), + "timezone": "UTC", + }, + "transport": transport, + "payload": {"task": "t"}, + } + if transport == "webhook": + body["callback"] = {"url": "https://example.com/hook"} + if mode is not None: + body["verification"] = {"mode": mode} + return body + + +class TestWorkerEvidenceRejectedAtCreate: + @pytest.mark.asyncio + @pytest.mark.parametrize( + "mode", + ["require_external_id", "require_result_url", "require_artifacts"], + ) + async def test_worker_plus_evidence_mode_rejected( + self, client: AsyncClient, auth_headers, mode + ): + resp = await client.post( + "/v1/cues", + headers=auth_headers, + json=_cue_body(transport="worker", mode=mode), + ) + assert resp.status_code == 400 + body = resp.json() + err = body["detail"]["error"] if "detail" in body else body["error"] + assert err["code"] == "unsupported_verification_for_transport" + assert err["transport"] == "worker" + assert err["verification_mode"] == mode + assert err["supported_worker_modes"] == ["none", "manual"] + + +class TestWorkerCompatibleModesAcceptedAtCreate: + @pytest.mark.asyncio + async def test_worker_none_accepted(self, client: AsyncClient, auth_headers): + resp = await client.post( + "/v1/cues", + headers=auth_headers, + json=_cue_body(transport="worker", mode="none"), + ) + assert resp.status_code == 201, resp.text + + @pytest.mark.asyncio + async def test_worker_manual_accepted(self, client: AsyncClient, auth_headers): + resp = await client.post( + "/v1/cues", + headers=auth_headers, + json=_cue_body(transport="worker", mode="manual"), + ) + assert resp.status_code == 201, resp.text + + +class TestWebhookAllModesAccepted: + @pytest.mark.asyncio + @pytest.mark.parametrize( + "mode", + [ + "none", + "require_external_id", + "require_result_url", + "require_artifacts", + "manual", + ], + ) + async def test_webhook_any_mode_accepted( + self, client: AsyncClient, auth_headers, mode + ): + resp = await client.post( + "/v1/cues", + headers=auth_headers, + json=_cue_body(transport="webhook", mode=mode), + ) + assert resp.status_code == 201, resp.text + + +class TestPatchTransitions: + @pytest.mark.asyncio + async def test_patch_worker_to_evidence_mode_rejected( + self, client: AsyncClient, auth_headers + ): + # Create worker cue with no verification + create = await client.post( + "/v1/cues", + headers=auth_headers, + json=_cue_body(transport="worker"), + ) + assert create.status_code == 201 + cue_id = create.json()["id"] + + # Try to PATCH verification to an evidence-requiring mode + resp = await client.patch( + f"/v1/cues/{cue_id}", + headers=auth_headers, + json={"verification": {"mode": "require_external_id"}}, + ) + assert resp.status_code == 400 + body = resp.json() + err = body["detail"]["error"] if "detail" in body else body["error"] + assert err["code"] == "unsupported_verification_for_transport" + + @pytest.mark.asyncio + async def test_patch_webhook_to_evidence_mode_accepted( + self, client: AsyncClient, auth_headers + ): + create = await client.post( + "/v1/cues", + headers=auth_headers, + json=_cue_body(transport="webhook"), + ) + assert create.status_code == 201 + cue_id = create.json()["id"] + + resp = await client.patch( + f"/v1/cues/{cue_id}", + headers=auth_headers, + json={"verification": {"mode": "require_result_url"}}, + ) + assert resp.status_code == 200 + assert resp.json()["verification"] == {"mode": "require_result_url"} + + @pytest.mark.asyncio + async def test_patch_worker_to_manual_accepted( + self, client: AsyncClient, auth_headers + ): + create = await client.post( + "/v1/cues", + headers=auth_headers, + json=_cue_body(transport="worker"), + ) + assert create.status_code == 201 + cue_id = create.json()["id"] + + resp = await client.patch( + f"/v1/cues/{cue_id}", + headers=auth_headers, + json={"verification": {"mode": "manual"}}, + ) + assert resp.status_code == 200 + assert resp.json()["verification"] == {"mode": "manual"} diff --git a/tests/test_verification_modes.py b/tests/test_verification_modes.py new file mode 100644 index 0000000..f924a67 --- /dev/null +++ b/tests/test_verification_modes.py @@ -0,0 +1,239 @@ +"""Verification-mode behavior on outcome report. + +Ten tests: 5 modes × (satisfied / unsatisfied / inapplicable) shapes. +Each test creates a cue with a specific verification mode, claims a +pending execution, reports an outcome, and asserts the resulting +``outcome_state``. +""" +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone + +import pytest +from httpx import AsyncClient +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.cue import Cue +from app.models.execution import Execution +from app.models.user import User + + +def _cue_id() -> str: + return f"cue_{uuid.uuid4().hex[:12]}" + + +async def _get_user_id(session: AsyncSession, user: dict) -> str: + result = await session.execute(select(User.id).where(User.email == user["email"])) + return str(result.scalar_one()) + + +async def _make_cue(session, user_id, *, verification_mode=None, transport="webhook"): + cue = Cue( + id=_cue_id(), + user_id=user_id, + name=f"t-{uuid.uuid4().hex[:6]}", + schedule_type="once", + schedule_at=datetime.now(timezone.utc) + timedelta(hours=1), + next_run=datetime.now(timezone.utc) + timedelta(hours=1), + callback_url="https://example.com/hook" if transport == "webhook" else None, + callback_method="POST", + callback_transport=transport, + status="active", + payload={"task": "t"}, + retry_max_attempts=3, + retry_backoff_minutes=[1, 5, 15], + on_failure={"email": False, "webhook": None, "pause": False}, + verification_mode=verification_mode, + ) + session.add(cue) + await session.commit() + return cue + + +async def _make_execution(session, cue_id): + ex = Execution( + id=uuid.uuid4(), + cue_id=cue_id, + scheduled_for=datetime.now(timezone.utc), + status="delivering", + ) + session.add(ex) + await session.commit() + return ex + + +async def _post_outcome(client: AsyncClient, headers, exec_id, **body): + body.setdefault("success", True) + return await client.post( + f"/v1/executions/{exec_id}/outcome", headers=headers, json=body + ) + + +class TestModeNone: + @pytest.mark.asyncio + async def test_success_marks_reported_success( + self, client, auth_headers, db_session, registered_user + ): + user_id = await _get_user_id(db_session, registered_user) + cue = await _make_cue(db_session, user_id, verification_mode=None) + ex = await _make_execution(db_session, cue.id) + + resp = await _post_outcome(client, auth_headers, ex.id, success=True) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "reported_success" + + @pytest.mark.asyncio + async def test_failure_marks_reported_failure( + self, client, auth_headers, db_session, registered_user + ): + user_id = await _get_user_id(db_session, registered_user) + cue = await _make_cue(db_session, user_id, verification_mode="none") + ex = await _make_execution(db_session, cue.id) + + resp = await _post_outcome( + client, auth_headers, ex.id, success=False, error="boom" + ) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "reported_failure" + + +class TestModeRequireExternalId: + @pytest.mark.asyncio + async def test_satisfied_marks_verified_success( + self, client, auth_headers, db_session, registered_user + ): + user_id = await _get_user_id(db_session, registered_user) + cue = await _make_cue( + db_session, user_id, verification_mode="require_external_id" + ) + ex = await _make_execution(db_session, cue.id) + + resp = await _post_outcome( + client, auth_headers, ex.id, success=True, external_id="ext-abc-123" + ) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "verified_success" + + @pytest.mark.asyncio + async def test_missing_marks_verification_failed( + self, client, auth_headers, db_session, registered_user + ): + user_id = await _get_user_id(db_session, registered_user) + cue = await _make_cue( + db_session, user_id, verification_mode="require_external_id" + ) + ex = await _make_execution(db_session, cue.id) + + resp = await _post_outcome(client, auth_headers, ex.id, success=True) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "verification_failed" + + +class TestModeRequireResultUrl: + @pytest.mark.asyncio + async def test_satisfied_marks_verified_success( + self, client, auth_headers, db_session, registered_user + ): + user_id = await _get_user_id(db_session, registered_user) + cue = await _make_cue( + db_session, user_id, verification_mode="require_result_url" + ) + ex = await _make_execution(db_session, cue.id) + + resp = await _post_outcome( + client, + auth_headers, + ex.id, + success=True, + result_url="https://example.com/receipts/42", + ) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "verified_success" + + @pytest.mark.asyncio + async def test_missing_marks_verification_failed( + self, client, auth_headers, db_session, registered_user + ): + user_id = await _get_user_id(db_session, registered_user) + cue = await _make_cue( + db_session, user_id, verification_mode="require_result_url" + ) + ex = await _make_execution(db_session, cue.id) + + resp = await _post_outcome(client, auth_headers, ex.id, success=True) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "verification_failed" + + +class TestModeRequireArtifacts: + @pytest.mark.asyncio + async def test_satisfied_marks_verified_success( + self, client, auth_headers, db_session, registered_user + ): + user_id = await _get_user_id(db_session, registered_user) + cue = await _make_cue( + db_session, user_id, verification_mode="require_artifacts" + ) + ex = await _make_execution(db_session, cue.id) + + resp = await _post_outcome( + client, + auth_headers, + ex.id, + success=True, + artifacts=[{"type": "file", "url": "https://x.com/a.pdf"}], + ) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "verified_success" + + @pytest.mark.asyncio + async def test_missing_marks_verification_failed( + self, client, auth_headers, db_session, registered_user + ): + user_id = await _get_user_id(db_session, registered_user) + cue = await _make_cue( + db_session, user_id, verification_mode="require_artifacts" + ) + ex = await _make_execution(db_session, cue.id) + + resp = await _post_outcome(client, auth_headers, ex.id, success=True) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "verification_failed" + + +class TestModeManual: + @pytest.mark.asyncio + async def test_success_parks_in_verification_pending( + self, client, auth_headers, db_session, registered_user + ): + user_id = await _get_user_id(db_session, registered_user) + cue = await _make_cue(db_session, user_id, verification_mode="manual") + ex = await _make_execution(db_session, cue.id) + + resp = await _post_outcome( + client, + auth_headers, + ex.id, + success=True, + external_id="irrelevant", # evidence present but ignored under manual + ) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "verification_pending" + + @pytest.mark.asyncio + async def test_failure_still_reported_failure( + self, client, auth_headers, db_session, registered_user + ): + # Failure bypasses verification — manual mode doesn't park + # failed outcomes. + user_id = await _get_user_id(db_session, registered_user) + cue = await _make_cue(db_session, user_id, verification_mode="manual") + ex = await _make_execution(db_session, cue.id) + + resp = await _post_outcome( + client, auth_headers, ex.id, success=False, error="nope" + ) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "reported_failure" diff --git a/tests/test_verify_endpoints.py b/tests/test_verify_endpoints.py new file mode 100644 index 0000000..95b1e91 --- /dev/null +++ b/tests/test_verify_endpoints.py @@ -0,0 +1,226 @@ +"""POST /verify + POST /verification-pending endpoint behavior. + +Explicitly pins the behavior-change in this PR: POST /verify now +accepts {valid: bool, reason: str?}. valid=true transitions to +verified_success (legacy default). valid=false transitions to +verification_failed and records reason on evidence_summary. +""" +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.cue import Cue +from app.models.execution import Execution +from app.models.user import User + + +async def _uid(session: AsyncSession, user: dict) -> str: + r = await session.execute(select(User.id).where(User.email == user["email"])) + return str(r.scalar_one()) + + +async def _cue(session, user_id, verification_mode=None): + c = Cue( + id=f"cue_{uuid.uuid4().hex[:12]}", + user_id=user_id, + name=f"v-{uuid.uuid4().hex[:6]}", + schedule_type="once", + schedule_at=datetime.now(timezone.utc) + timedelta(hours=1), + next_run=datetime.now(timezone.utc) + timedelta(hours=1), + callback_url="https://example.com/h", + callback_method="POST", + callback_transport="webhook", + status="active", + payload={}, + retry_max_attempts=3, + retry_backoff_minutes=[1, 5, 15], + on_failure={"email": False, "webhook": None, "pause": False}, + verification_mode=verification_mode, + ) + session.add(c) + await session.commit() + return c + + +async def _exec_reported(session, cue_id, *, success=True, state="reported_success"): + ex = Execution( + id=uuid.uuid4(), + cue_id=cue_id, + scheduled_for=datetime.now(timezone.utc), + status="success" if success else "failed", + outcome_recorded_at=datetime.now(timezone.utc), + outcome_success=success, + outcome_state=state, + ) + session.add(ex) + await session.commit() + return ex + + +class TestVerifyValid: + @pytest.mark.asyncio + async def test_valid_true_transitions_to_verified_success( + self, client, auth_headers, db_session, registered_user + ): + uid = await _uid(db_session, registered_user) + cue = await _cue(db_session, uid) + ex = await _exec_reported(db_session, cue.id) + + resp = await client.post( + f"/v1/executions/{ex.id}/verify", + headers=auth_headers, + json={"valid": True}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["outcome_state"] == "verified_success" + assert data["valid"] is True + + @pytest.mark.asyncio + async def test_empty_body_defaults_to_valid_true( + self, client, auth_headers, db_session, registered_user + ): + uid = await _uid(db_session, registered_user) + cue = await _cue(db_session, uid) + ex = await _exec_reported(db_session, cue.id) + + resp = await client.post( + f"/v1/executions/{ex.id}/verify", headers=auth_headers + ) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "verified_success" + + @pytest.mark.asyncio + async def test_from_reported_failure_accepted( + self, client, auth_headers, db_session, registered_user + ): + # Newly accepted starting state — pre-PR, this was rejected. + uid = await _uid(db_session, registered_user) + cue = await _cue(db_session, uid) + ex = await _exec_reported( + db_session, cue.id, success=False, state="reported_failure" + ) + + resp = await client.post( + f"/v1/executions/{ex.id}/verify", + headers=auth_headers, + json={"valid": True}, + ) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "verified_success" + + +class TestVerifyInvalid: + @pytest.mark.asyncio + async def test_valid_false_transitions_to_verification_failed( + self, client, auth_headers, db_session, registered_user + ): + uid = await _uid(db_session, registered_user) + cue = await _cue(db_session, uid) + ex = await _exec_reported(db_session, cue.id) + + resp = await client.post( + f"/v1/executions/{ex.id}/verify", + headers=auth_headers, + json={"valid": False, "reason": "evidence fabricated"}, + ) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "verification_failed" + assert resp.json()["valid"] is False + + await db_session.refresh(ex) + assert ex.evidence_validation_state == "invalid" + assert ex.evidence_summary is not None + assert "evidence fabricated" in ex.evidence_summary + + @pytest.mark.asyncio + async def test_reason_preserves_existing_summary( + self, client, auth_headers, db_session, registered_user + ): + uid = await _uid(db_session, registered_user) + cue = await _cue(db_session, uid) + ex = await _exec_reported(db_session, cue.id) + ex.evidence_summary = "handler finished" + await db_session.commit() + + resp = await client.post( + f"/v1/executions/{ex.id}/verify", + headers=auth_headers, + json={"valid": False, "reason": "audit found discrepancy"}, + ) + assert resp.status_code == 200 + await db_session.refresh(ex) + assert ex.evidence_summary is not None + assert "handler finished" in ex.evidence_summary + assert "audit found discrepancy" in ex.evidence_summary + + +class TestVerifyInvalidState: + @pytest.mark.asyncio + async def test_unrecorded_outcome_rejected( + self, client, auth_headers, db_session, registered_user + ): + uid = await _uid(db_session, registered_user) + cue = await _cue(db_session, uid) + # Execution with no outcome_state + ex = Execution( + id=uuid.uuid4(), + cue_id=cue.id, + scheduled_for=datetime.now(timezone.utc), + status="pending", + ) + db_session.add(ex) + await db_session.commit() + + resp = await client.post( + f"/v1/executions/{ex.id}/verify", + headers=auth_headers, + json={"valid": True}, + ) + assert resp.status_code == 409 + body = resp.json() + err = body["detail"]["error"] if "detail" in body else body["error"] + assert err["code"] == "invalid_state" + + +class TestVerificationPending: + @pytest.mark.asyncio + async def test_from_reported_success( + self, client, auth_headers, db_session, registered_user + ): + uid = await _uid(db_session, registered_user) + cue = await _cue(db_session, uid) + ex = await _exec_reported(db_session, cue.id) + + resp = await client.post( + f"/v1/executions/{ex.id}/verification-pending", + headers=auth_headers, + ) + assert resp.status_code == 200 + assert resp.json()["outcome_state"] == "verification_pending" + + @pytest.mark.asyncio + async def test_rejects_when_no_outcome( + self, client, auth_headers, db_session, registered_user + ): + uid = await _uid(db_session, registered_user) + cue = await _cue(db_session, uid) + ex = Execution( + id=uuid.uuid4(), + cue_id=cue.id, + scheduled_for=datetime.now(timezone.utc), + status="pending", + ) + db_session.add(ex) + await db_session.commit() + + resp = await client.post( + f"/v1/executions/{ex.id}/verification-pending", + headers=auth_headers, + ) + assert resp.status_code == 409