Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 93 additions & 3 deletions cueapi/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,70 @@ def _generate_device_code() -> str:
return f"{part1}-{part2}"


def _resolve_key_via_session(client, poll_data: dict) -> Optional[str]:
"""Exchange a one-time session_token for a JWT, then call
GET /v1/auth/key to reveal the user's stored api_key plaintext.

Used for the existing-user login path where poll_data omits
``api_key`` (the server couldn't decrypt it, or the record
predates encrypted storage). Returns the plaintext api_key on
success, None on failure β€” errors are echoed to the user with
actionable next steps.
"""
session_token = poll_data.get("session_token")
if not session_token:
# No key AND no session token β€” poll response is malformed or
# this user hasn't been upgraded to the session-token flow.
# Shouldn't happen for any production code path as of 2026-04-19
# (commit adbfe77) but we still want an actionable message.
click.echo()
echo_error(
"Login approved but the server didn't return an api_key or "
"session_token. Try `cueapi login` again, or contact support."
)
return None

# Exchange session_token (single-use) for a JWT bearer.
exchange = client.post("/auth/session", json={"token": session_token})
if exchange.status_code != 200:
click.echo()
echo_error(
f"Could not finalize login (session exchange HTTP "
f"{exchange.status_code}). Run `cueapi login` to try again."
)
return None
jwt = exchange.json().get("session_token")
if not jwt:
click.echo()
echo_error("Login response missing session_token. Try again.")
return None

# Use the JWT as a bearer to reveal the decrypted api_key. /auth/key
# returns 410 plaintext_unavailable if the encrypted column is empty
# (no reversible copy exists) β€” in that case the only remedy is a
# key rotation, so we surface that guidance directly.
reveal = client.get(
"/auth/key",
headers={"Authorization": f"Bearer {jwt}"},
)
if reveal.status_code == 200:
return reveal.json().get("api_key")
if reveal.status_code == 410:
click.echo()
echo_error(
"Your API key can't be recovered on this device β€” the stored "
"plaintext is no longer available. Run `cueapi key regenerate` "
"to mint a fresh key, then `cueapi login` again."
)
return None
click.echo()
echo_error(
f"Could not retrieve your api_key (HTTP {reveal.status_code}). "
"Run `cueapi login` to try again or contact support."
)
return None


def do_login(api_base: Optional[str] = None, profile: str = "default") -> None:
"""Run the device code login flow."""
base = api_base or resolve_api_base(profile=profile)
Expand Down Expand Up @@ -68,9 +132,28 @@ def do_login(api_base: Optional[str] = None, profile: str = "default") -> None:
status = poll_data.get("status")

if status == "approved":
api_key = poll_data["api_key"]
email = poll_data["email"]

# Resolve the plaintext api_key from the response.
# Shape varies by user type (server-side logic lives in
# app/services/device_code_service.py::poll_device_code):
# - New user: poll_data contains "api_key" directly.
# - Existing user whose api_key_encrypted decrypts:
# poll_data ALSO contains "api_key".
# - Existing user whose decryption failed or whose
# record predates encrypted-storage: poll_data has
# NO "api_key", only "session_token" +
# "existing_user": true. The CLI must exchange the
# session token for a JWT and then call
# GET /v1/auth/key to reveal the stored plaintext.
api_key = poll_data.get("api_key")
if not api_key:
api_key = _resolve_key_via_session(client, poll_data)
if not api_key:
# _resolve_key_via_session already printed a
# user-facing error + next-step guidance.
return

# Save credentials
save_credentials(
profile=profile,
Expand All @@ -84,8 +167,15 @@ def do_login(api_base: Optional[str] = None, profile: str = "default") -> None:
click.echo()
echo_success(f"Authenticated as {email}")
click.echo(f"API key stored in credentials file.\n")
click.echo(f"Your API key: {api_key}")
click.echo("(This is the only time your full key will be shown. Save it if you need it elsewhere.)\n")
# Only show the key plaintext for new users. For existing
# users it's already on their record from first signup β€”
# reprinting it here is a pointless exfil risk (their
# terminal scrollback, screen-share, etc.).
if poll_data.get("existing_user"):
click.echo(f"Welcome back, {email}.")
else:
click.echo(f"Your API key: {api_key}")
click.echo("(This is the only time your full key will be shown. Save it if you need it elsewhere.)\n")
click.echo('Run `cueapi quickstart` to create your first cue.')
return

Expand Down
229 changes: 229 additions & 0 deletions tests/test_login_existing_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
"""Tests for the existing-user login path in cueapi.auth.do_login.

Backend commit adbfe77 changed POST /v1/auth/device-code/poll so the
response can approve a login WITHOUT returning an api_key (for
existing users whose encrypted record couldn't be decrypted). Before
this fix, do_login did `poll_data["api_key"]` and crashed with
KeyError on every existing-user login. These tests pin the new flow
so the regression can't return silently.
"""
from __future__ import annotations

from unittest.mock import MagicMock, patch

import pytest

from cueapi import auth


@pytest.fixture
def poll_responses(monkeypatch):
"""Feed a scripted series of UnauthClient.get / .post responses.

Returns (script, used). `script` is a list the caller mutates
to specify response shape; `used` captures what the test code
actually called, for assertion.
"""
script = {"post": [], "get": []}
used = {"post": [], "get": []}

def _make_response(status_code: int, body: dict | None = None):
r = MagicMock()
r.status_code = status_code
r.json.return_value = body or {}
return r

class FakeUnauth:
def __init__(self, *args, **kwargs):
pass
def __enter__(self):
return self
def __exit__(self, *a):
pass
def post(self, path, **kwargs):
used["post"].append({"path": path, **kwargs})
sc, body = script["post"].pop(0)
return _make_response(sc, body)
def get(self, path, **kwargs):
used["get"].append({"path": path, **kwargs})
sc, body = script["get"].pop(0)
return _make_response(sc, body)

monkeypatch.setattr(auth, "UnauthClient", FakeUnauth)
# Skip the browser open and sleep so tests run instantly.
monkeypatch.setattr(auth, "webbrowser", MagicMock())
monkeypatch.setattr(auth.time, "sleep", lambda _s: None)

return script, used


@pytest.fixture
def captured_saves(monkeypatch):
"""Capture save_credentials calls so we can assert what got stored."""
calls = []

def _capture(**kwargs):
calls.append(kwargs)

monkeypatch.setattr(auth, "save_credentials", _capture)
monkeypatch.setattr(auth, "resolve_api_base", lambda profile=None: "https://api.cueapi.ai/v1")
return calls


class TestNewUserLogin:
"""New-user poll responses include api_key directly β€” unchanged
behavior, pinned so the refactor didn't break the happy path."""

def test_new_user_saves_api_key_from_poll(
self, poll_responses, captured_saves
):
script, used = poll_responses
# Sequence: device-code create β†’ poll approved with api_key
script["post"] = [
(201, {"verification_url": "https://example/verify", "expires_in": 60}),
(200, {"status": "approved", "api_key": "cue_sk_new_user_123", "email": "new@example.com"}),
]

auth.do_login(api_base="https://api.cueapi.ai/v1", profile="default")

assert len(captured_saves) == 1
assert captured_saves[0]["data"]["api_key"] == "cue_sk_new_user_123"
assert captured_saves[0]["data"]["email"] == "new@example.com"
# Session exchange must NOT have been called β€” new user path
# has the key inline.
assert not any(c["path"] == "/auth/session" for c in used["post"])
assert not any(c["path"] == "/auth/key" for c in used["get"])


class TestExistingUserDecryptSucceeded:
"""Server was able to decrypt the stored api_key_encrypted β€”
poll returns api_key + existing_user=True."""

def test_existing_user_with_inline_key_skips_session_exchange(
self, poll_responses, captured_saves
):
script, used = poll_responses
script["post"] = [
(201, {"verification_url": "https://example/verify", "expires_in": 60}),
(200, {
"status": "approved",
"api_key": "cue_sk_existing_decrypted",
"email": "old@example.com",
"existing_user": True,
"session_token": "stk_abc",
}),
]

auth.do_login(api_base="https://api.cueapi.ai/v1", profile="default")

assert captured_saves[0]["data"]["api_key"] == "cue_sk_existing_decrypted"
# session_token is present in the poll response but we have the
# key already β€” no need to hit /auth/session or /auth/key.
assert not any(c["path"] == "/auth/session" for c in used["post"])
assert not any(c["path"] == "/auth/key" for c in used["get"])


class TestExistingUserNeedsSessionExchange:
"""Server couldn't decrypt β€” poll omits api_key. CLI must exchange
session_token for JWT, then call GET /v1/auth/key."""

def test_falls_back_to_session_exchange_then_reveal(
self, poll_responses, captured_saves
):
script, used = poll_responses
script["post"] = [
# create device code
(201, {"verification_url": "https://example/verify", "expires_in": 60}),
# poll approved β€” no api_key, but session_token present
(200, {
"status": "approved",
"email": "existing@example.com",
"session_token": "stk_one_time",
"existing_user": True,
}),
# /auth/session β†’ JWT
(200, {"session_token": "jwt_payload_here", "email": "existing@example.com"}),
]
script["get"] = [
# /auth/key with Bearer jwt β†’ api_key
(200, {"api_key": "cue_sk_revealed_via_jwt"}),
]

auth.do_login(api_base="https://api.cueapi.ai/v1", profile="default")

# Session exchange happened with the single-use token.
session_calls = [c for c in used["post"] if c["path"] == "/auth/session"]
assert len(session_calls) == 1
assert session_calls[0]["json"]["token"] == "stk_one_time"

# /auth/key was called with the JWT as Bearer.
reveal_calls = [c for c in used["get"] if c["path"] == "/auth/key"]
assert len(reveal_calls) == 1
assert reveal_calls[0]["headers"]["Authorization"] == "Bearer jwt_payload_here"

# Saved the revealed key, not the session_token.
assert captured_saves[0]["data"]["api_key"] == "cue_sk_revealed_via_jwt"


class TestFailureModes:
"""echo_error in cueapi.formatting raises SystemExit(1), so these
failure-path tests wrap do_login in pytest.raises(SystemExit) and
read the actionable message from stderr (not stdout)."""

def test_reveal_returns_410_shows_regenerate_hint(
self, poll_responses, captured_saves, capsys
):
"""plaintext_unavailable (410) must be translated into a clear
"run cueapi key regenerate" message, not a cryptic stacktrace."""
script, used = poll_responses
script["post"] = [
(201, {"verification_url": "https://example/verify", "expires_in": 60}),
(200, {"status": "approved", "email": "gone@example.com",
"session_token": "stk_x", "existing_user": True}),
(200, {"session_token": "jwt_x", "email": "gone@example.com"}),
]
script["get"] = [(410, {"error": {"code": "plaintext_unavailable"}})]

with pytest.raises(SystemExit):
auth.do_login(api_base="https://api.cueapi.ai/v1", profile="default")

assert captured_saves == []
err = capsys.readouterr().err
assert "cueapi key regenerate" in err

def test_session_exchange_failure_surfaces_error(
self, poll_responses, captured_saves, capsys
):
script, used = poll_responses
script["post"] = [
(201, {"verification_url": "https://example/verify", "expires_in": 60}),
(200, {"status": "approved", "email": "x@example.com",
"session_token": "stk_x", "existing_user": True}),
# /auth/session fails
(500, {"error": {"code": "session_unavailable"}}),
]

with pytest.raises(SystemExit):
auth.do_login(api_base="https://api.cueapi.ai/v1", profile="default")

assert captured_saves == []
err = capsys.readouterr().err
assert "500" in err or "try again" in err.lower()

def test_approved_without_any_key_or_token_shows_actionable_error(
self, poll_responses, captured_saves, capsys
):
"""Defensive: if the server returns neither api_key nor
session_token we don't crash β€” we tell the user what to do."""
script, used = poll_responses
script["post"] = [
(201, {"verification_url": "https://example/verify", "expires_in": 60}),
(200, {"status": "approved", "email": "x@example.com"}),
]

with pytest.raises(SystemExit):
auth.do_login(api_base="https://api.cueapi.ai/v1", profile="default")

assert captured_saves == []
err = capsys.readouterr().err
assert "api_key" in err or "session_token" in err or "support" in err
Loading