|
| 1 | +"""JWKS key management scripts. |
| 2 | +
|
| 3 | +See https://datatracker.ietf.org/doc/html/rfc7517 for further details. |
| 4 | +""" |
| 5 | + |
| 6 | +from __future__ import annotations |
| 7 | + |
| 8 | +import argparse |
| 9 | +import asyncio |
| 10 | +import json |
| 11 | +import logging |
| 12 | +from pathlib import Path |
| 13 | + |
| 14 | +from joserfc.jwk import JWKRegistry, Key, KeySet |
| 15 | +from uuid_utils import uuid7 |
| 16 | + |
| 17 | +logger = logging.getLogger(__name__) |
| 18 | + |
| 19 | +# ---------- Helpers ---------------------------------------------------------- |
| 20 | + |
| 21 | + |
| 22 | +def load_jwks(path: Path) -> KeySet: |
| 23 | + """Return a (possibly empty) JWKSet.""" |
| 24 | + if path.exists(): |
| 25 | + return KeySet.import_key_set(json.loads(path.read_text())) |
| 26 | + logger.warning("JWKS file %s not found – creating a new one", path) |
| 27 | + return KeySet(keys=[]) |
| 28 | + |
| 29 | + |
| 30 | +def save_jwks(path: Path, jwks: KeySet) -> None: |
| 31 | + """Write JWKSet to disk *including* private parts.""" |
| 32 | + path.write_text(json.dumps(jwks.as_dict(private=True), indent=2)) |
| 33 | + logger.info("JWKS written to %s", path) |
| 34 | + |
| 35 | + |
| 36 | +def new_key( |
| 37 | + kty: str = "OKP", |
| 38 | + crv_or_size: str | int = "Ed25519", |
| 39 | +) -> Key: |
| 40 | + """Create a fresh private signing key.""" |
| 41 | + parameters = { |
| 42 | + "key_ops": ["sign", "verify"], |
| 43 | + "alg": "EdDSA", |
| 44 | + "kid": uuid7().hex, |
| 45 | + } |
| 46 | + return JWKRegistry.generate_key( |
| 47 | + key_type=kty, crv_or_size=crv_or_size, private=True, parameters=parameters # type: ignore[arg-type] |
| 48 | + ) |
| 49 | + |
| 50 | + |
| 51 | +# ---------- CLI -------------------------------------------------------------- |
| 52 | + |
| 53 | + |
| 54 | +async def rotate_jwk(args): |
| 55 | + """Rotate keys in a JWKS file by inserting a new key at index 0 (active).""" |
| 56 | + logger.info("Rotating JWKs...") |
| 57 | + |
| 58 | + crv_or_size = args.crv_or_size |
| 59 | + if isinstance(crv_or_size, str) and crv_or_size.isdigit(): |
| 60 | + crv_or_size = int(crv_or_size) |
| 61 | + |
| 62 | + jwks_path = Path(args.jwks_path) |
| 63 | + jwks = load_jwks(jwks_path) |
| 64 | + |
| 65 | + # Current key (at index 0) is set to "verify" only |
| 66 | + if len(jwks.keys) > 0: |
| 67 | + active_key = jwks.keys[0] |
| 68 | + active_key_dict = active_key.as_dict(private=True) |
| 69 | + active_key_dict["key_ops"] = sorted( |
| 70 | + set(active_key_dict.get("key_ops", [])) - {"sign"} |
| 71 | + ) |
| 72 | + jwks.keys[0] = JWKRegistry.import_key(active_key_dict) |
| 73 | + |
| 74 | + jwk = new_key(args.kty, crv_or_size) |
| 75 | + jwks.keys.insert(0, jwk) |
| 76 | + |
| 77 | + save_jwks(jwks_path, jwks) |
| 78 | + |
| 79 | + |
| 80 | +async def delete_jwk(args): |
| 81 | + """Delete a JWK from a JWKS file.""" |
| 82 | + logger.info("Deleting JWK...") |
| 83 | + |
| 84 | + path = Path(args.jwks_path) |
| 85 | + jwks = load_jwks(path) |
| 86 | + jwks.keys = [k for k in jwks.keys if k.get("kid") != args.kid] |
| 87 | + save_jwks(path, jwks) |
| 88 | + |
| 89 | + |
| 90 | +def parse_args(): |
| 91 | + parser = argparse.ArgumentParser() |
| 92 | + subparsers = parser.add_subparsers(dest="command", required=True) |
| 93 | + |
| 94 | + rotate_jwk_parser = subparsers.add_parser( |
| 95 | + "rotate-jwk", help="Rotate JWK keys in a JWKS file" |
| 96 | + ) |
| 97 | + rotate_jwk_parser.add_argument( |
| 98 | + "--jwks-path", required=True, help="Path to the existing (old) JWKS JSON file." |
| 99 | + ) |
| 100 | + |
| 101 | + rotate_jwk_parser.add_argument( |
| 102 | + "--kty", default="OKP", help="Key type for the new key." |
| 103 | + ) |
| 104 | + rotate_jwk_parser.add_argument( |
| 105 | + "--crv-or-size", default="Ed25519", help="Curve or size for the new key." |
| 106 | + ) |
| 107 | + rotate_jwk_parser.set_defaults(func=rotate_jwk) |
| 108 | + |
| 109 | + delete_jwk_parser = subparsers.add_parser( |
| 110 | + "delete-jwk", help="Delete a JWK key from a JWKS file" |
| 111 | + ) |
| 112 | + delete_jwk_parser.add_argument( |
| 113 | + "--jwks-path", required=True, help="Path to the JWKS JSON file." |
| 114 | + ) |
| 115 | + delete_jwk_parser.add_argument( |
| 116 | + "--kid", required=True, help="Key ID (kid) of the key to delete." |
| 117 | + ) |
| 118 | + delete_jwk_parser.set_defaults(func=delete_jwk) |
| 119 | + |
| 120 | + args = parser.parse_args() |
| 121 | + logger.setLevel(logging.INFO) |
| 122 | + asyncio.run(args.func(args)) |
| 123 | + |
| 124 | + |
| 125 | +if __name__ == "__main__": |
| 126 | + parse_args() |
0 commit comments