Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions src/database/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,43 @@ def get_parameters(flow_id: int, expdb: Connection) -> Sequence[Row]:
)


def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the common tagging SQL into shared internal helper functions and have the specific tag/get_tag/delete_tag wrappers delegate to them to avoid duplication.

You can reduce repetition by extracting the shared tagging logic into a small internal helper and having the flow/task/run-specific functions delegate to it. That keeps behavior in one place while preserving the public API.

For example, in your data layer module:

# Internal helpers – table/column names are fixed constants, not user input.
def _tag_generic(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    user_id: int,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            INSERT INTO {table}(`{id_column}`, `tag`, `uploader`)
            VALUES (:id, :tag, :user_id)
            """
        ),
        parameters={"id": id_, "tag": tag_, "user_id": user_id},
    )


def _get_tag_generic(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> Row | None:
    return connection.execute(
        text(
            f"""
            SELECT `{id_column}` as id, `tag`, `uploader`
            FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    ).one_or_none()


def _delete_tag_generic(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            DELETE FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    )

Then your flow-specific functions become tiny wrappers:

def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
    _tag_generic(
        table="implementation_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        user_id=user_id,
        connection=connection,
    )


def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None:
    return _get_tag_generic(
        table="implementation_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )


def delete_tag(id_: int, tag_: str, connection: Connection) -> None:
    _delete_tag_generic(
        table="implementation_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )

You can apply the same pattern to the existing task/run tagging functions by just changing table/id_column. This centralizes the behavior so it’s less likely to diverge over time while keeping the public interface unchanged.

connection.execute(
text(
"""
INSERT INTO implementation_tag(`id`, `tag`, `uploader`)
VALUES (:flow_id, :tag, :user_id)
""",
),
parameters={"flow_id": id_, "tag": tag_, "user_id": user_id},
)


def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None:
return connection.execute(
text(
"""
SELECT `id`, `tag`, `uploader`
FROM implementation_tag
WHERE `id` = :flow_id AND `tag` = :tag
""",
),
parameters={"flow_id": id_, "tag": tag_},
).one_or_none()


def delete_tag(id_: int, tag_: str, connection: Connection) -> None:
connection.execute(
text(
"""
DELETE FROM implementation_tag
WHERE `id` = :flow_id AND `tag` = :tag
""",
),
parameters={"flow_id": id_, "tag": tag_},
)


def get_by_name(name: str, external_version: str, expdb: Connection) -> Row | None:
"""Gets flow by name and external version."""
return expdb.execute(
Expand Down
52 changes: 52 additions & 0 deletions src/database/runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from sqlalchemy import Connection, Row, text


def get_tags(id_: int, expdb: Connection) -> list[str]:
tag_rows = expdb.execute(
text(
"""
SELECT `tag`
FROM run_tag
WHERE `id` = :run_id
""",
),
parameters={"run_id": id_},
)
return [row.tag for row in tag_rows]


def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
connection.execute(
text(
"""
INSERT INTO run_tag(`id`, `tag`, `uploader`)
VALUES (:run_id, :tag, :user_id)
""",
),
parameters={"run_id": id_, "tag": tag_, "user_id": user_id},
)


def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None:
return connection.execute(
text(
"""
SELECT `id`, `tag`, `uploader`
FROM run_tag
WHERE `id` = :run_id AND `tag` = :tag
""",
),
parameters={"run_id": id_, "tag": tag_},
).one_or_none()


def delete_tag(id_: int, tag_: str, connection: Connection) -> None:
connection.execute(
text(
"""
DELETE FROM run_tag
WHERE `id` = :run_id AND `tag` = :tag
""",
),
parameters={"run_id": id_, "tag": tag_},
)
37 changes: 37 additions & 0 deletions src/database/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,40 @@ def get_tags(id_: int, expdb: Connection) -> list[str]:
parameters={"task_id": id_},
)
return [row.tag for row in tag_rows]


def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the common tag insert/get/delete logic into shared helpers parameterized by table and id column, then have the task-specific functions delegate to them.

You can keep the new tag/get_tag/delete_tag APIs but route them through a shared helper to avoid copy‑pasted SQL across tasks/flows/runs.

For example, introduce a small internal helper that only varies by table name (and optionally id column), then delegate from the public functions:

def _insert_tag(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    user_id: int,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            INSERT INTO {table}(`{id_column}`, `tag`, `uploader`)
            VALUES (:id, :tag, :user_id)
            """
        ),
        parameters={"id": id_, "tag": tag_, "user_id": user_id},
    )


def _get_tag(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> Row | None:
    return connection.execute(
        text(
            f"""
            SELECT `{id_column}` AS id, `tag`, `uploader`
            FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    ).one_or_none()


def _delete_tag(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            DELETE FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    )

Then your task‑specific functions become thin wrappers:

def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
    _insert_tag(
        table="task_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        user_id=user_id,
        connection=connection,
    )


def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None:
    return _get_tag(
        table="task_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )


def delete_tag(id_: int, tag_: str, connection: Connection) -> None:
    _delete_tag(
        table="task_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )

You can then reuse the same _insert_tag/_get_tag/_delete_tag helpers for flows and runs by just changing table= (and id_column= if needed). This keeps the public API intact while centralizing tagging behavior and reducing future synchronization overhead.

connection.execute(
text(
"""
INSERT INTO task_tag(`id`, `tag`, `uploader`)
VALUES (:task_id, :tag, :user_id)
""",
),
parameters={"task_id": id_, "tag": tag_, "user_id": user_id},
)


def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None:
return connection.execute(
text(
"""
SELECT `id`, `tag`, `uploader`
FROM task_tag
WHERE `id` = :task_id AND `tag` = :tag
""",
),
parameters={"task_id": id_, "tag": tag_},
).one_or_none()


def delete_tag(id_: int, tag_: str, connection: Connection) -> None:
connection.execute(
text(
"""
DELETE FROM task_tag
WHERE `id` = :task_id AND `tag` = :tag
""",
),
parameters={"task_id": id_, "tag": tag_},
)
2 changes: 2 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from routers.openml.evaluations import router as evaluationmeasures_router
from routers.openml.flows import router as flows_router
from routers.openml.qualities import router as qualities_router
from routers.openml.runs import router as runs_router
from routers.openml.study import router as study_router
from routers.openml.tasks import router as task_router
from routers.openml.tasktype import router as ttype_router
Expand Down Expand Up @@ -54,6 +55,7 @@ def create_api() -> FastAPI:
app.include_router(estimationprocedure_router)
app.include_router(task_router)
app.include_router(flows_router)
app.include_router(runs_router)
app.include_router(study_router)
return app

Expand Down
71 changes: 68 additions & 3 deletions src/routers/openml/flows.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,82 @@
from http import HTTPStatus
from typing import Annotated, Literal
from typing import Annotated, Any, Literal

from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Body, Depends, HTTPException
from sqlalchemy import Connection

import database.flows
from core.conversions import _str_to_num
from routers.dependencies import expdb_connection
from database.users import User, UserGroup
from routers.dependencies import expdb_connection, fetch_user
from routers.types import SystemString64
from schemas.flows import Flow, Parameter, Subflow

router = APIRouter(prefix="/flows", tags=["flows"])


@router.post(path="/tag")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the repeated auth and tag/untag logic into reusable helper functions so the individual endpoints just supply resource-specific behavior and response keys.

You can cut down duplication and make future tag changes safer by extracting the shared behavior into small helpers, then wiring them up in each router.

For example, the auth check and error payloads are identical across tag/untag endpoints and across routers:

def _require_user(user: User | None) -> User:
    if user is None:
        raise HTTPException(
            status_code=HTTPStatus.PRECONDITION_FAILED,
            detail={"code": "103", "message": "Authentication failed"},
        )
    return user

Then in your endpoints:

@router.post("/tag")
def tag_flow(
    flow_id: Annotated[int, Body()],
    tag: Annotated[str, SystemString64],
    user: Annotated[User | None, Depends(fetch_user)] = None,
    expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
    user = _require_user(user)

    tags = database.flows.get_tags(flow_id, expdb)
    if tag.casefold() in (t.casefold() for t in tags):
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "473",
                "message": "Entity already tagged by this tag.",
                "additional_information": f"id={flow_id}; tag={tag}",
            },
        )
    database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
    return {"flow_tag": {"id": str(flow_id), "tag": [*tags, tag]}}

You can also encapsulate the generic tag/untag logic so tasks/runs/flows only provide resource-specific functions and the response key:

# tagging_helpers.py
from collections.abc import Callable

def make_tag_endpoint(
    get_tags: Callable[[int, Connection], list[str]],
    tag_fn: Callable[..., None],
    response_key: str,
):
    def endpoint(
        entity_id: Annotated[int, Body()],
        tag: Annotated[str, SystemString64],
        user: Annotated[User | None, Depends(fetch_user)] = None,
        expdb: Annotated[Connection, Depends(expdb_connection)] = None,
    ) -> dict[str, dict[str, Any]]:
        user = _require_user(user)
        tags = get_tags(entity_id, expdb)
        if tag.casefold() in (t.casefold() for t in tags):
            raise HTTPException(
                status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
                detail={
                    "code": "473",
                    "message": "Entity already tagged by this tag.",
                    "additional_information": f"id={entity_id}; tag={tag}",
                },
            )
        tag_fn(entity_id, tag, user_id=user.user_id, connection=expdb)
        return {response_key: {"id": str(entity_id), "tag": [*tags, tag]}}
    return endpoint

Then in the flows router:

from tagging_helpers import make_tag_endpoint

tag_flow = router.post("/tag")(
    make_tag_endpoint(
        get_tags=database.flows.get_tags,
        tag_fn=database.flows.tag,
        response_key="flow_tag",
    )
)

A similar make_untag_endpoint can encapsulate the get_tag / ownership / delete logic, taking an owner_id_attr or a small predicate so that flows/tasks/runs don’t re-implement it. This keeps your new flow endpoints while reducing repetition across routers and making future tag behavior changes localized.

def tag_flow(
flow_id: Annotated[int, Body()],
tag: Annotated[str, SystemString64],
user: Annotated[User | None, Depends(fetch_user)] = None,
expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
if user is None:
raise HTTPException(
status_code=HTTPStatus.PRECONDITION_FAILED,
detail={"code": "103", "message": "Authentication failed"},
)
tags = database.flows.get_tags(flow_id, expdb)
if tag.casefold() in [t.casefold() for t in tags]:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={flow_id}; tag={tag}",
},
)
database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
Comment on lines +29 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Search for implementation_tag table definition and constraints
find . -type f \( -name "*.sql" -o -name "*.py" \) | head -30

Repository: openml/server-api

Length of output: 898


🏁 Script executed:

# Look for database models and schema definitions
rg "implementation_tag" -A 5 -B 2 --max-count=20

Repository: openml/server-api

Length of output: 1655


🏁 Script executed:

# Find the database.flows module to examine tag() and get_tags() implementations
fd "database" --type f -e py | head -20

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Check the SQL schema files for implementation_tag constraints
cat ./docker/mysql/data/openml.sql | grep -A 10 "implementation_tag"

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Also check the _expdb schema
cat ./docker/mysql/data/openml_expdb.sql | grep -A 10 "implementation_tag"

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Read the full flows.py database module to see tag() and get_tags() functions
cat -n src/database/flows.py

Repository: openml/server-api

Length of output: 3895


🏁 Script executed:

# Try to read schema files with head/tail to see structure
head -100 ./docker/mysql/data/openml.sql | grep -A 15 "implementation_tag"

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Read the router flows.py file to see the full context of lines 29-39
cat -n src/routers/openml/flows.py | head -60

Repository: openml/server-api

Length of output: 2676


🏁 Script executed:

# Search for implementation_tag table creation in SQL files
fd -e sql -x grep -l "implementation_tag" {}

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Try to read the SQL files line by line
wc -l ./docker/mysql/data/openml.sql ./docker/mysql/data/openml_expdb.sql

Repository: openml/server-api

Length of output: 143


🏁 Script executed:

# Search for UNIQUE constraint patterns in SQL files
rg "UNIQUE.*implementation_tag|implementation_tag.*UNIQUE" --include="*.sql"

Repository: openml/server-api

Length of output: 502


🏁 Script executed:

# Check for PRIMARY KEY or UNIQUE constraints on (id, tag)
rg -A 20 "CREATE TABLE.*implementation_tag" --include="*.sql"

Repository: openml/server-api

Length of output: 502


🏁 Script executed:

# Read the SQL files directly to find implementation_tag table definition
cat ./docker/mysql/data/openml.sql | head -5
cat ./docker/mysql/data/openml_expdb.sql | head -5

Repository: openml/server-api

Length of output: 158


🏁 Script executed:

# Search for implementation_tag CREATE TABLE in both files
grep -n "implementation_tag" ./docker/mysql/data/openml.sql ./docker/mysql/data/openml_expdb.sql | head -20

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Try using sed or awk to extract lines around implementation_tag
sed -n '/implementation_tag/,/;/p' ./docker/mysql/data/openml.sql | head -30
sed -n '/implementation_tag/,/;/p' ./docker/mysql/data/openml_expdb.sql | head -30

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Search for database migration or setup files
find . -type f \( -name "*migration*" -o -name "*schema*" -o -name "*setup*" \) | grep -E "\.(py|sql)$"

Repository: openml/server-api

Length of output: 380


🏁 Script executed:

# Search for IntegrityError handling in the codebase
rg "IntegrityError" -t py

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Check database/__init__.py and database/setup.py for schema or constraint info
cat -n src/database/setup.py

Repository: openml/server-api

Length of output: 1145


🏁 Script executed:

# Check if there's any documentation about the schema in README files
find . -name "README*" -o -name "readme*" | head -5

Repository: openml/server-api

Length of output: 142


🏁 Script executed:

# Search for error handling around database.flows.tag() calls
rg "database\.flows\.tag|\.tag\(" -A 5 -B 5 src/routers/

Repository: openml/server-api

Length of output: 2753


🏁 Script executed:

# Check if there's try/except around tag operations in router
rg "try|except|IntegrityError|SQLAlchemy" -A 3 src/routers/openml/flows.py

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Look for ON DUPLICATE KEY UPDATE or similar patterns in database module
rg "ON DUPLICATE|INSERT IGNORE|INSERT OR" src/database/

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Check the actual imports and error handling setup in database/flows.py
head -20 src/database/flows.py

Repository: openml/server-api

Length of output: 563


TOCTOU race condition in duplicate-tag check

The get_tags → casefold check → tag() insert sequence is not atomic. Two concurrent requests for the same (flow_id, tag) can both pass Line 30 and both reach the INSERT on Line 39, resulting in either a silent duplicate row or an unhandled database exception.

The tag() function (in src/database/flows.py) executes a plain INSERT with no error handling. No try/except wraps the call, and the query lacks ON DUPLICATE KEY UPDATE or similar protection. Even if a unique constraint exists on (id, tag) in the schema, the resulting IntegrityError would propagate uncaught rather than returning the proper 473 response.

Fix: Add a UNIQUE constraint on (id, tag) in implementation_tag and wrap the tag() call in a try/except to catch and translate IntegrityError to code 473, or use INSERT IGNORE/ON DUPLICATE KEY UPDATE with a post-insert existence check.

Note: The same pattern exists in tasks.py, runs.py, and datasets.py.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/flows.py` around lines 29 - 39, The current get_tags ->
tag sequence has a TOCTOU race: add a UNIQUE constraint on implementation_tag
for (id, tag) at the schema level, and modify the call site around
database.flows.tag(flow_id, tag, user_id=...) to catch the DB integrity
exception (e.g., IntegrityError or the DB driver's unique-constraint error) and
translate it into the same HTTPException payload (status_code
HTTPStatus.INTERNAL_SERVER_ERROR, detail code "473" with id and tag) instead of
letting the exception propagate; alternatively, change database.flows.tag to use
an idempotent INSERT (INSERT IGNORE / ON DUPLICATE KEY UPDATE) or perform an
atomic existence-check-and-insert in the DB and surface the 473 response on
duplicate—apply the same pattern to database.flows.tag usage in tasks.py,
runs.py, and datasets.py.

tags = database.flows.get_tags(flow_id, expdb)
return {"flow_tag": {"id": str(flow_id), "tag": tags}}


@router.post(path="/untag")
def untag_flow(
flow_id: Annotated[int, Body()],
tag: Annotated[str, SystemString64],
user: Annotated[User | None, Depends(fetch_user)] = None,
expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
if user is None:
raise HTTPException(
status_code=HTTPStatus.PRECONDITION_FAILED,
detail={"code": "103", "message": "Authentication failed"},
)
existing = database.flows.get_tag(flow_id, tag, expdb)
if existing is None:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "477",
"message": "Tag not found.",
"additional_information": f"id={flow_id}; tag={tag}",
},
)
if existing.uploader != user.user_id and UserGroup.ADMIN not in user.groups:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "478",
"message": "Tag is not owned by you.",
"additional_information": f"id={flow_id}; tag={tag}",
},
)
database.flows.delete_tag(flow_id, tag, expdb)
tags = database.flows.get_tags(flow_id, expdb)
return {"flow_tag": {"id": str(flow_id), "tag": tags}}


@router.get("/exists/{name}/{external_version}")
def flow_exists(
name: str,
Expand Down
75 changes: 75 additions & 0 deletions src/routers/openml/runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from http import HTTPStatus
from typing import Annotated, Any

from fastapi import APIRouter, Body, Depends, HTTPException
from sqlalchemy import Connection

import database.runs
from database.users import User, UserGroup
from routers.dependencies import expdb_connection, fetch_user
from routers.types import SystemString64

router = APIRouter(prefix="/runs", tags=["runs"])


@router.post(path="/tag")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the repeated auth and tagging checks into shared helper functions so the router endpoints only handle wiring to the database layer.

You can reduce the complexity here by extracting the duplicated tagging logic into shared helpers, and keeping the router functions as thin wiring to the DB layer.

For example, centralize the auth + duplicate‑check + error construction in a small tagging helper/service:

# tagging_service.py
from http import HTTPStatus
from fastapi import HTTPException
from database.users import User, UserGroup

def require_authenticated(user: User | None) -> User:
    if user is None:
        raise HTTPException(
            status_code=HTTPStatus.PRECONDITION_FAILED,
            detail={"code": "103", "message": "Authentication failed"},
        )
    return user

def ensure_not_tagged(tags: list[str], tag: str, entity_id: int, entity_name: str) -> None:
    if tag.casefold() in [t.casefold() for t in tags]:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "473",
                "message": f"{entity_name} already tagged by this tag.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )

def ensure_tag_exists(existing, entity_id: int, tag: str, entity_name: str) -> None:
    if existing is None:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "477",
                "message": "Tag not found.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )

def ensure_tag_owner_or_admin(existing, user: User, entity_id: int, tag: str) -> None:
    if existing.uploader != user.user_id and UserGroup.ADMIN not in user.groups:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "478",
                "message": "Tag is not owned by you.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )

Then your new router becomes mostly plumbing, similar to flows/tasks routers:

from fastapi import APIRouter, Body, Depends
from sqlalchemy import Connection
import database.runs
from database.users import User
from routers.dependencies import expdb_connection, fetch_user
from routers.types import SystemString64
from tagging_service import (
    require_authenticated,
    ensure_not_tagged,
    ensure_tag_exists,
    ensure_tag_owner_or_admin,
)

router = APIRouter(prefix="/runs", tags=["runs"])

@router.post(path="/tag")
def tag_run(
    run_id: int = Body(),
    tag: str = SystemString64,
    user: User | None = Depends(fetch_user),
    expdb: Connection = Depends(expdb_connection),
) -> dict[str, dict[str, Any]]:
    user = require_authenticated(user)
    tags = database.runs.get_tags(run_id, expdb)
    ensure_not_tagged(tags, tag, run_id, "Entity")
    database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb)
    return {"run_tag": {"id": str(run_id), "tag": [*tags, tag]}}

@router.post(path="/untag")
def untag_run(
    run_id: int = Body(),
    tag: str = SystemString64,
    user: User | None = Depends(fetch_user),
    expdb: Connection = Depends(expdb_connection),
) -> dict[str, dict[str, Any]]:
    user = require_authenticated(user)
    existing = database.runs.get_tag(run_id, tag, expdb)
    ensure_tag_exists(existing, run_id, tag, "Entity")
    ensure_tag_owner_or_admin(existing, user, run_id, tag)
    database.runs.delete_tag(run_id, tag, expdb)
    tags = database.runs.get_tags(run_id, expdb)
    return {"run_tag": {"id": str(run_id), "tag": tags}}

You can reuse the same tagging_service in the flows/tasks routers by just wiring the appropriate database.* functions and entity name. This keeps each router very small and removes the line‑for‑line duplication while preserving current behavior and error payloads.

def tag_run(
run_id: Annotated[int, Body()],
tag: Annotated[str, SystemString64],
user: Annotated[User | None, Depends(fetch_user)] = None,
expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
if user is None:
raise HTTPException(
status_code=HTTPStatus.PRECONDITION_FAILED,
detail={"code": "103", "message": "Authentication failed"},
)
tags = database.runs.get_tags(run_id, expdb)
if tag.casefold() in [t.casefold() for t in tags]:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={run_id}; tag={tag}",
},
)
database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb)
tags = database.runs.get_tags(run_id, expdb)
return {"run_tag": {"id": str(run_id), "tag": tags}}
Comment on lines +15 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

tag_run does not validate that run_id exists before inserting a tag.

No existence check is performed before writing to the tag table. A non-existent run_id will either silently create an orphaned row (no FK constraint) or produce an unhandled DB exception. The same major issue was already identified for tag_flow; tag_task carries it too.

🛡️ Proposed fix
     if user is None:
         raise HTTPException(
             status_code=HTTPStatus.PRECONDITION_FAILED,
             detail={"code": "103", "message": "Authentication failed"},
         )
+    if database.runs.get(run_id, expdb) is None:
+        raise HTTPException(
+            status_code=HTTPStatus.NOT_FOUND,
+            detail={"code": "...", "message": "Unknown run."},
+        )
     tags = database.runs.get_tags(run_id, expdb)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/runs.py` around lines 15 - 39, Check that the run exists
before inserting a tag: in tag_run, call an existence/read helper (e.g.,
database.runs.get or database.runs.get_by_id) using run_id and expdb before
calling database.runs.tag; if the helper returns no row, raise an HTTPException
(HTTPStatus.NOT_FOUND) with an appropriate detail payload (e.g.,
{"code":"404","message":"Run not found", "additional_information":
f"id={run_id}"}). Do the same existence check path before the duplicate-tag
check so you don't query tags for a missing run, and mirror this pattern for
tag_flow and tag_task (use the same database.runs.* helper names shown in this
diff to locate where to add the check).



@router.post(path="/untag")
def untag_run(
run_id: Annotated[int, Body()],
tag: Annotated[str, SystemString64],
user: Annotated[User | None, Depends(fetch_user)] = None,
expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
if user is None:
raise HTTPException(
status_code=HTTPStatus.PRECONDITION_FAILED,
detail={"code": "103", "message": "Authentication failed"},
)
existing = database.runs.get_tag(run_id, tag, expdb)
if existing is None:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "477",
"message": "Tag not found.",
"additional_information": f"id={run_id}; tag={tag}",
},
)
if existing.uploader != user.user_id and UserGroup.ADMIN not in user.groups:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "478",
"message": "Tag is not owned by you.",
"additional_information": f"id={run_id}; tag={tag}",
},
)
database.runs.delete_tag(run_id, tag, expdb)
tags = database.runs.get_tags(run_id, expdb)
return {"run_tag": {"id": str(run_id), "tag": tags}}
Loading