Skip to content

Add tag/untag endpoints for tasks, flows, and runs#250

Open
YassinNouh21 wants to merge 5 commits intoopenml:mainfrom
YassinNouh21:feature/tag-untag-endpoints
Open

Add tag/untag endpoints for tasks, flows, and runs#250
YassinNouh21 wants to merge 5 commits intoopenml:mainfrom
YassinNouh21:feature/tag-untag-endpoints

Conversation

@YassinNouh21
Copy link

Summary

  • Adds POST /tasks/tag, /tasks/untag, /flows/tag, /flows/untag, /runs/tag, /runs/untag
  • Follows existing dataset tag pattern; untag adds ownership check (error 478) with admin bypass
  • New src/database/runs.py and src/routers/openml/runs.py for run endpoints

Closes #26, #27, #34, #35, #45, #46

Test plan

  • 30 new tests (11 task, 10 flow, 9 run) — all passing
  • Full suite (pytest -m "not php_api and not slow") — 167 passed, 0 failures
  • E2E curl tests against live API for all 6 endpoints

Add tag(), get_tag(), and delete_tag() to tasks.py and flows.py.
Create new runs.py with get_tags(), tag(), get_tag(), and delete_tag().
- POST /tasks/tag, /tasks/untag
- POST /flows/tag, /flows/untag
- POST /runs/tag, /runs/untag (new router, registered in main.py)

Untag checks tag ownership (error 478) with admin bypass.
Covers auth rejection, tagging, duplicate detection, untagging,
ownership enforcement, and tag-not-found errors.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 21, 2026

Walkthrough

The PR adds tagging support for flows, runs, and tasks. It introduces database helper functions tag, get_tag, and delete_tag in src/database/flows.py, src/database/runs.py, and src/database/tasks.py; new POST endpoints to tag and untag entities in src/routers/openml/flows.py, src/routers/openml/runs.py, and src/routers/openml/tasks.py with authentication and ownership checks; registers the runs router in src/main.py; and adds tests under tests/routers/openml covering auth failures, successful tag/untag, duplicate-tag and tag-not-found cases, and ownership enforcement.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 4.26% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and accurately describes the main change: adding tag/untag endpoints for three entities (tasks, flows, runs).
Description check ✅ Passed The description is related to the changeset, listing the new endpoints, test coverage, and linked issues.
Linked Issues check ✅ Passed Code changes implement tag/untag endpoints for tasks, flows, and runs as required by the linked issues. New database functions and API endpoints match the stated objectives.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing tag/untag functionality for tasks, flows, and runs. Router registration and supporting database functions are within scope.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 7 issues, and left some high level feedback:

  • The tag/untag logic for tasks, flows, and runs (including the DB helpers) is almost identical; consider extracting shared utilities (e.g., a generic tag_entity/untag_entity helper or a small service layer) to reduce duplication and keep behavior changes consistent across entity types.
  • You’re performing case-insensitive duplicate checks in Python with tag.casefold() on every request; if this check needs to be robust at scale, consider enforcing case-insensitive uniqueness at the database level (collation/index or LOWER(tag) constraint) to avoid race conditions and duplicated tags with different casing.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The tag/untag logic for tasks, flows, and runs (including the DB helpers) is almost identical; consider extracting shared utilities (e.g., a generic `tag_entity`/`untag_entity` helper or a small service layer) to reduce duplication and keep behavior changes consistent across entity types.
- You’re performing case-insensitive duplicate checks in Python with `tag.casefold()` on every request; if this check needs to be robust at scale, consider enforcing case-insensitive uniqueness at the database level (collation/index or `LOWER(tag)` constraint) to avoid race conditions and duplicated tags with different casing.

## Individual Comments

### Comment 1
<location> `src/routers/openml/flows.py:24-40` </location>
<code_context>
+            status_code=HTTPStatus.PRECONDITION_FAILED,
+            detail={"code": "103", "message": "Authentication failed"},
+        )
+    tags = database.flows.get_tags(flow_id, expdb)
+    if tag.casefold() in [t.casefold() for t in tags]:
+        raise HTTPException(
+            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+            detail={
+                "code": "473",
+                "message": "Entity already tagged by this tag.",
+                "additional_information": f"id={flow_id}; tag={tag}",
+            },
+        )
+    database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
+    return {"flow_tag": {"id": str(flow_id), "tag": [*tags, tag]}}
+
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Re-read tags from the database after insert to avoid returning a potentially stale tag list.

This builds the response from the previously fetched `tags` plus `tag`, so any tag added between `get_tags` and `tag` won’t be included in the response. To avoid this race and match `untag_flow`’s behavior, re-query `get_tags` after the insert and return that list instead.

```suggestion
    if user is None:
        raise HTTPException(
            status_code=HTTPStatus.PRECONDITION_FAILED,
            detail={"code": "103", "message": "Authentication failed"},
        )

    tags = database.flows.get_tags(flow_id, expdb)
    if tag.casefold() in [t.casefold() for t in tags]:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "473",
                "message": "Entity already tagged by this tag.",
                "additional_information": f"id={flow_id}; tag={tag}",
            },
        )

    database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)

    # Re-read tags after inserting to avoid returning a potentially stale tag list.
    tags = database.flows.get_tags(flow_id, expdb)

    return {"flow_tag": {"id": str(flow_id), "tag": tags}}
```
</issue_to_address>

### Comment 2
<location> `tests/routers/openml/task_tag_test.py:80-89` </location>
<code_context>
+def test_task_untag(expdb_test: Connection, py_api: TestClient) -> None:
</code_context>

<issue_to_address>
**suggestion (testing):** Add a test to cover the case where a non-admin uploader can successfully untag their own task tag.

Current tests cover admin untagging their own tag and blocking a non-owner (`code` 478), but they don’t verify that non-admin users can untag tags they created. Please add a test that:

1. Tags a task using a non-admin user (e.g., `ApiKey.SOME_USER`), then
2. Calls `/tasks/untag` as the same user and asserts success.

This ensures the ownership rule for non-admin uploaders is enforced and protected against regressions.

Suggested implementation:

```python
def test_task_untag(expdb_test: Connection, py_api: TestClient) -> None:
    task_id, tag = 59, "test"
    py_api.post(
        f"/tasks/tag?api_key={ApiKey.ADMIN}",
        json={"task_id": task_id, "tag": tag},
    )
    response = py_api.post(
        f"/tasks/untag?api_key={ApiKey.ADMIN}",
        json={"task_id": task_id, "tag": tag},
    )
    assert response.status_code == HTTPStatus.OK


def test_task_untag_non_admin_can_untag_own_tag(
    expdb_test: Connection, py_api: TestClient
) -> None:
    task_id, tag = 60, "user-tag"
    # Non-admin user tags the task
    tag_response = py_api.post(
        f"/tasks/tag?api_key={ApiKey.SOME_USER}",
        json={"task_id": task_id, "tag": tag},
    )
    assert tag_response.status_code == HTTPStatus.OK

    # Same non-admin user un-tags their own tag
    untag_response = py_api.post(
        f"/tasks/untag?api_key={ApiKey.SOME_USER}",
        json={"task_id": task_id, "tag": tag},
    )
    assert untag_response.status_code == HTTPStatus.OK

```

If `ApiKey.SOME_USER` is not the correct non-admin uploader key in your test suite, replace it with the appropriate enum member that represents a non-admin but authorized uploader (for example, `ApiKey.USER`, `ApiKey.UPLOADER`, etc., consistent with the rest of your tests).
</issue_to_address>

### Comment 3
<location> `src/routers/openml/flows.py:17` </location>
<code_context>
 router = APIRouter(prefix="/flows", tags=["flows"])


+@router.post(path="/tag")
+def tag_flow(
+    flow_id: Annotated[int, Body()],
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the repeated auth and tag/untag logic into reusable helper functions so the individual endpoints just supply resource-specific behavior and response keys.

You can cut down duplication and make future tag changes safer by extracting the shared behavior into small helpers, then wiring them up in each router.

For example, the auth check and error payloads are identical across tag/untag endpoints and across routers:

```python
def _require_user(user: User | None) -> User:
    if user is None:
        raise HTTPException(
            status_code=HTTPStatus.PRECONDITION_FAILED,
            detail={"code": "103", "message": "Authentication failed"},
        )
    return user
```

Then in your endpoints:

```python
@router.post("/tag")
def tag_flow(
    flow_id: Annotated[int, Body()],
    tag: Annotated[str, SystemString64],
    user: Annotated[User | None, Depends(fetch_user)] = None,
    expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
    user = _require_user(user)

    tags = database.flows.get_tags(flow_id, expdb)
    if tag.casefold() in (t.casefold() for t in tags):
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "473",
                "message": "Entity already tagged by this tag.",
                "additional_information": f"id={flow_id}; tag={tag}",
            },
        )
    database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
    return {"flow_tag": {"id": str(flow_id), "tag": [*tags, tag]}}
```

You can also encapsulate the generic tag/untag logic so tasks/runs/flows only provide resource-specific functions and the response key:

```python
# tagging_helpers.py
from collections.abc import Callable

def make_tag_endpoint(
    get_tags: Callable[[int, Connection], list[str]],
    tag_fn: Callable[..., None],
    response_key: str,
):
    def endpoint(
        entity_id: Annotated[int, Body()],
        tag: Annotated[str, SystemString64],
        user: Annotated[User | None, Depends(fetch_user)] = None,
        expdb: Annotated[Connection, Depends(expdb_connection)] = None,
    ) -> dict[str, dict[str, Any]]:
        user = _require_user(user)
        tags = get_tags(entity_id, expdb)
        if tag.casefold() in (t.casefold() for t in tags):
            raise HTTPException(
                status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
                detail={
                    "code": "473",
                    "message": "Entity already tagged by this tag.",
                    "additional_information": f"id={entity_id}; tag={tag}",
                },
            )
        tag_fn(entity_id, tag, user_id=user.user_id, connection=expdb)
        return {response_key: {"id": str(entity_id), "tag": [*tags, tag]}}
    return endpoint
```

Then in the flows router:

```python
from tagging_helpers import make_tag_endpoint

tag_flow = router.post("/tag")(
    make_tag_endpoint(
        get_tags=database.flows.get_tags,
        tag_fn=database.flows.tag,
        response_key="flow_tag",
    )
)
```

A similar `make_untag_endpoint` can encapsulate the `get_tag` / ownership / delete logic, taking an `owner_id_attr` or a small predicate so that flows/tasks/runs don’t re-implement it. This keeps your new flow endpoints while reducing repetition across routers and making future tag behavior changes localized.
</issue_to_address>

### Comment 4
<location> `src/routers/openml/tasks.py:154` </location>
<code_context>
 router = APIRouter(prefix="/flows", tags=["flows"])


+@router.post(path="/tag")
+def tag_flow(
+    flow_id: Annotated[int, Body()],
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the shared tagging/untagging logic into reusable helper functions so the task endpoints become thin wrappers around common behavior.

You can reduce duplication and keep the endpoint behavior identical by extracting the common tagging logic into small helpers and using them from `tag_task`/`untag_task` (and later from the flow/run routers).

Example of how to refactor this router while preserving all behavior:

```python
# helpers (can live in this module or a shared tagging module)

def _require_user(user: User | None) -> User:
    if user is None:
        raise HTTPException(
            status_code=HTTPStatus.PRECONDITION_FAILED,
            detail={"code": "103", "message": "Authentication failed"},
        )
    return user


def _tag_entity(
    entity_id: int,
    tag: str,
    user: User,
    connection: Connection,
    *,
    get_tags: callable,
    tag_fn: callable,
    response_key: str,
) -> dict[str, dict[str, Any]]:
    tags = get_tags(entity_id, connection)
    if tag.casefold() in (t.casefold() for t in tags):
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "473",
                "message": "Entity already tagged by this tag.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )
    tag_fn(entity_id, tag, user_id=user.user_id, connection=connection)
    return {response_key: {"id": str(entity_id), "tag": [*tags, tag]}}


def _untag_entity(
    entity_id: int,
    tag: str,
    user: User,
    connection: Connection,
    *,
    get_tag: callable,
    delete_tag: callable,
    get_tags: callable,
    response_key: str,
) -> dict[str, dict[str, Any]]:
    existing = get_tag(entity_id, tag, connection)
    if existing is None:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "477",
                "message": "Tag not found.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )
    if existing.uploader != user.user_id and UserGroup.ADMIN not in user.groups:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "478",
                "message": "Tag is not owned by you.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )
    delete_tag(entity_id, tag, connection)
    tags = get_tags(entity_id, connection)
    return {response_key: {"id": str(entity_id), "tag": tags}}
```

Your task endpoints become very small and declarative:

```python
@router.post(path="/tag")
def tag_task(
    task_id: Annotated[int, Body()],
    tag: Annotated[str, SystemString64],
    user: Annotated[User | None, Depends(fetch_user)] = None,
    expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
    user = _require_user(user)
    return _tag_entity(
        task_id,
        tag,
        user,
        expdb,
        get_tags=database.tasks.get_tags,
        tag_fn=database.tasks.tag,
        response_key="task_tag",
    )


@router.post(path="/untag")
def untag_task(
    task_id: Annotated[int, Body()],
    tag: Annotated[str, SystemString64],
    user: Annotated[User | None, Depends(fetch_user)] = None,
    expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
    user = _require_user(user)
    return _untag_entity(
        task_id,
        tag,
        user,
        expdb,
        get_tag=database.tasks.get_tag,
        delete_tag=database.tasks.delete_tag,
        get_tags=database.tasks.get_tags,
        response_key="task_tag",
    )
```

This keeps all error codes/messages and response shapes intact but centralizes the tagging rules. The same helpers can then be reused by the flow/run routers by passing their own `get_tags`/`get_tag`/`tag`/`delete_tag` functions and appropriate `response_key`.
</issue_to_address>

### Comment 5
<location> `src/routers/openml/runs.py:15` </location>
<code_context>
 router = APIRouter(prefix="/flows", tags=["flows"])


+@router.post(path="/tag")
+def tag_flow(
+    flow_id: Annotated[int, Body()],
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the repeated auth and tagging checks into shared helper functions so the router endpoints only handle wiring to the database layer.

You can reduce the complexity here by extracting the duplicated tagging logic into shared helpers, and keeping the router functions as thin wiring to the DB layer.

For example, centralize the auth + duplicate‑check + error construction in a small tagging helper/service:

```python
# tagging_service.py
from http import HTTPStatus
from fastapi import HTTPException
from database.users import User, UserGroup

def require_authenticated(user: User | None) -> User:
    if user is None:
        raise HTTPException(
            status_code=HTTPStatus.PRECONDITION_FAILED,
            detail={"code": "103", "message": "Authentication failed"},
        )
    return user

def ensure_not_tagged(tags: list[str], tag: str, entity_id: int, entity_name: str) -> None:
    if tag.casefold() in [t.casefold() for t in tags]:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "473",
                "message": f"{entity_name} already tagged by this tag.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )

def ensure_tag_exists(existing, entity_id: int, tag: str, entity_name: str) -> None:
    if existing is None:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "477",
                "message": "Tag not found.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )

def ensure_tag_owner_or_admin(existing, user: User, entity_id: int, tag: str) -> None:
    if existing.uploader != user.user_id and UserGroup.ADMIN not in user.groups:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "478",
                "message": "Tag is not owned by you.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )
```

Then your new router becomes mostly plumbing, similar to flows/tasks routers:

```python
from fastapi import APIRouter, Body, Depends
from sqlalchemy import Connection
import database.runs
from database.users import User
from routers.dependencies import expdb_connection, fetch_user
from routers.types import SystemString64
from tagging_service import (
    require_authenticated,
    ensure_not_tagged,
    ensure_tag_exists,
    ensure_tag_owner_or_admin,
)

router = APIRouter(prefix="/runs", tags=["runs"])

@router.post(path="/tag")
def tag_run(
    run_id: int = Body(),
    tag: str = SystemString64,
    user: User | None = Depends(fetch_user),
    expdb: Connection = Depends(expdb_connection),
) -> dict[str, dict[str, Any]]:
    user = require_authenticated(user)
    tags = database.runs.get_tags(run_id, expdb)
    ensure_not_tagged(tags, tag, run_id, "Entity")
    database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb)
    return {"run_tag": {"id": str(run_id), "tag": [*tags, tag]}}

@router.post(path="/untag")
def untag_run(
    run_id: int = Body(),
    tag: str = SystemString64,
    user: User | None = Depends(fetch_user),
    expdb: Connection = Depends(expdb_connection),
) -> dict[str, dict[str, Any]]:
    user = require_authenticated(user)
    existing = database.runs.get_tag(run_id, tag, expdb)
    ensure_tag_exists(existing, run_id, tag, "Entity")
    ensure_tag_owner_or_admin(existing, user, run_id, tag)
    database.runs.delete_tag(run_id, tag, expdb)
    tags = database.runs.get_tags(run_id, expdb)
    return {"run_tag": {"id": str(run_id), "tag": tags}}
```

You can reuse the same `tagging_service` in the flows/tasks routers by just wiring the appropriate `database.*` functions and entity name. This keeps each router very small and removes the line‑for‑line duplication while preserving current behavior and error payloads.
</issue_to_address>

### Comment 6
<location> `src/database/flows.py:53` </location>
<code_context>
     )


+def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
+    connection.execute(
+        text(
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the common tagging SQL into shared internal helper functions and have the specific tag/get_tag/delete_tag wrappers delegate to them to avoid duplication.

You can reduce repetition by extracting the shared tagging logic into a small internal helper and having the flow/task/run-specific functions delegate to it. That keeps behavior in one place while preserving the public API.

For example, in your data layer module:

```python
# Internal helpers – table/column names are fixed constants, not user input.
def _tag_generic(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    user_id: int,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            INSERT INTO {table}(`{id_column}`, `tag`, `uploader`)
            VALUES (:id, :tag, :user_id)
            """
        ),
        parameters={"id": id_, "tag": tag_, "user_id": user_id},
    )


def _get_tag_generic(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> Row | None:
    return connection.execute(
        text(
            f"""
            SELECT `{id_column}` as id, `tag`, `uploader`
            FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    ).one_or_none()


def _delete_tag_generic(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            DELETE FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    )
```

Then your flow-specific functions become tiny wrappers:

```python
def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
    _tag_generic(
        table="implementation_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        user_id=user_id,
        connection=connection,
    )


def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None:
    return _get_tag_generic(
        table="implementation_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )


def delete_tag(id_: int, tag_: str, connection: Connection) -> None:
    _delete_tag_generic(
        table="implementation_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )
```

You can apply the same pattern to the existing task/run tagging functions by just changing `table`/`id_column`. This centralizes the behavior so it’s less likely to diverge over time while keeping the public interface unchanged.
</issue_to_address>

### Comment 7
<location> `src/database/tasks.py:109` </location>
<code_context>
     )


+def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
+    connection.execute(
+        text(
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the common tag insert/get/delete logic into shared helpers parameterized by table and id column, then have the task-specific functions delegate to them.

You can keep the new `tag/get_tag/delete_tag` APIs but route them through a shared helper to avoid copy‑pasted SQL across tasks/flows/runs.

For example, introduce a small internal helper that only varies by table name (and optionally id column), then delegate from the public functions:

```python
def _insert_tag(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    user_id: int,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            INSERT INTO {table}(`{id_column}`, `tag`, `uploader`)
            VALUES (:id, :tag, :user_id)
            """
        ),
        parameters={"id": id_, "tag": tag_, "user_id": user_id},
    )


def _get_tag(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> Row | None:
    return connection.execute(
        text(
            f"""
            SELECT `{id_column}` AS id, `tag`, `uploader`
            FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    ).one_or_none()


def _delete_tag(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            DELETE FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    )
```

Then your task‑specific functions become thin wrappers:

```python
def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
    _insert_tag(
        table="task_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        user_id=user_id,
        connection=connection,
    )


def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None:
    return _get_tag(
        table="task_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )


def delete_tag(id_: int, tag_: str, connection: Connection) -> None:
    _delete_tag(
        table="task_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )
```

You can then reuse the same `_insert_tag/_get_tag/_delete_tag` helpers for flows and runs by just changing `table=` (and `id_column=` if needed). This keeps the public API intact while centralizing tagging behavior and reducing future synchronization overhead.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 80 to 89
def test_task_untag(expdb_test: Connection, py_api: TestClient) -> None:
task_id, tag = 59, "test"
py_api.post(
f"/tasks/tag?api_key={ApiKey.ADMIN}",
json={"task_id": task_id, "tag": tag},
)
response = py_api.post(
f"/tasks/untag?api_key={ApiKey.ADMIN}",
json={"task_id": task_id, "tag": tag},
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a test to cover the case where a non-admin uploader can successfully untag their own task tag.

Current tests cover admin untagging their own tag and blocking a non-owner (code 478), but they don’t verify that non-admin users can untag tags they created. Please add a test that:

  1. Tags a task using a non-admin user (e.g., ApiKey.SOME_USER), then
  2. Calls /tasks/untag as the same user and asserts success.

This ensures the ownership rule for non-admin uploaders is enforced and protected against regressions.

Suggested implementation:

def test_task_untag(expdb_test: Connection, py_api: TestClient) -> None:
    task_id, tag = 59, "test"
    py_api.post(
        f"/tasks/tag?api_key={ApiKey.ADMIN}",
        json={"task_id": task_id, "tag": tag},
    )
    response = py_api.post(
        f"/tasks/untag?api_key={ApiKey.ADMIN}",
        json={"task_id": task_id, "tag": tag},
    )
    assert response.status_code == HTTPStatus.OK


def test_task_untag_non_admin_can_untag_own_tag(
    expdb_test: Connection, py_api: TestClient
) -> None:
    task_id, tag = 60, "user-tag"
    # Non-admin user tags the task
    tag_response = py_api.post(
        f"/tasks/tag?api_key={ApiKey.SOME_USER}",
        json={"task_id": task_id, "tag": tag},
    )
    assert tag_response.status_code == HTTPStatus.OK

    # Same non-admin user un-tags their own tag
    untag_response = py_api.post(
        f"/tasks/untag?api_key={ApiKey.SOME_USER}",
        json={"task_id": task_id, "tag": tag},
    )
    assert untag_response.status_code == HTTPStatus.OK

If ApiKey.SOME_USER is not the correct non-admin uploader key in your test suite, replace it with the appropriate enum member that represents a non-admin but authorized uploader (for example, ApiKey.USER, ApiKey.UPLOADER, etc., consistent with the rest of your tests).

router = APIRouter(prefix="/flows", tags=["flows"])


@router.post(path="/tag")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the repeated auth and tag/untag logic into reusable helper functions so the individual endpoints just supply resource-specific behavior and response keys.

You can cut down duplication and make future tag changes safer by extracting the shared behavior into small helpers, then wiring them up in each router.

For example, the auth check and error payloads are identical across tag/untag endpoints and across routers:

def _require_user(user: User | None) -> User:
    if user is None:
        raise HTTPException(
            status_code=HTTPStatus.PRECONDITION_FAILED,
            detail={"code": "103", "message": "Authentication failed"},
        )
    return user

Then in your endpoints:

@router.post("/tag")
def tag_flow(
    flow_id: Annotated[int, Body()],
    tag: Annotated[str, SystemString64],
    user: Annotated[User | None, Depends(fetch_user)] = None,
    expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
    user = _require_user(user)

    tags = database.flows.get_tags(flow_id, expdb)
    if tag.casefold() in (t.casefold() for t in tags):
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "473",
                "message": "Entity already tagged by this tag.",
                "additional_information": f"id={flow_id}; tag={tag}",
            },
        )
    database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
    return {"flow_tag": {"id": str(flow_id), "tag": [*tags, tag]}}

You can also encapsulate the generic tag/untag logic so tasks/runs/flows only provide resource-specific functions and the response key:

# tagging_helpers.py
from collections.abc import Callable

def make_tag_endpoint(
    get_tags: Callable[[int, Connection], list[str]],
    tag_fn: Callable[..., None],
    response_key: str,
):
    def endpoint(
        entity_id: Annotated[int, Body()],
        tag: Annotated[str, SystemString64],
        user: Annotated[User | None, Depends(fetch_user)] = None,
        expdb: Annotated[Connection, Depends(expdb_connection)] = None,
    ) -> dict[str, dict[str, Any]]:
        user = _require_user(user)
        tags = get_tags(entity_id, expdb)
        if tag.casefold() in (t.casefold() for t in tags):
            raise HTTPException(
                status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
                detail={
                    "code": "473",
                    "message": "Entity already tagged by this tag.",
                    "additional_information": f"id={entity_id}; tag={tag}",
                },
            )
        tag_fn(entity_id, tag, user_id=user.user_id, connection=expdb)
        return {response_key: {"id": str(entity_id), "tag": [*tags, tag]}}
    return endpoint

Then in the flows router:

from tagging_helpers import make_tag_endpoint

tag_flow = router.post("/tag")(
    make_tag_endpoint(
        get_tags=database.flows.get_tags,
        tag_fn=database.flows.tag,
        response_key="flow_tag",
    )
)

A similar make_untag_endpoint can encapsulate the get_tag / ownership / delete logic, taking an owner_id_attr or a small predicate so that flows/tasks/runs don’t re-implement it. This keeps your new flow endpoints while reducing repetition across routers and making future tag behavior changes localized.

return template.replace("[CONSTANT:base_url]", server_url)


@router.post(path="/tag")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the shared tagging/untagging logic into reusable helper functions so the task endpoints become thin wrappers around common behavior.

You can reduce duplication and keep the endpoint behavior identical by extracting the common tagging logic into small helpers and using them from tag_task/untag_task (and later from the flow/run routers).

Example of how to refactor this router while preserving all behavior:

# helpers (can live in this module or a shared tagging module)

def _require_user(user: User | None) -> User:
    if user is None:
        raise HTTPException(
            status_code=HTTPStatus.PRECONDITION_FAILED,
            detail={"code": "103", "message": "Authentication failed"},
        )
    return user


def _tag_entity(
    entity_id: int,
    tag: str,
    user: User,
    connection: Connection,
    *,
    get_tags: callable,
    tag_fn: callable,
    response_key: str,
) -> dict[str, dict[str, Any]]:
    tags = get_tags(entity_id, connection)
    if tag.casefold() in (t.casefold() for t in tags):
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "473",
                "message": "Entity already tagged by this tag.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )
    tag_fn(entity_id, tag, user_id=user.user_id, connection=connection)
    return {response_key: {"id": str(entity_id), "tag": [*tags, tag]}}


def _untag_entity(
    entity_id: int,
    tag: str,
    user: User,
    connection: Connection,
    *,
    get_tag: callable,
    delete_tag: callable,
    get_tags: callable,
    response_key: str,
) -> dict[str, dict[str, Any]]:
    existing = get_tag(entity_id, tag, connection)
    if existing is None:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "477",
                "message": "Tag not found.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )
    if existing.uploader != user.user_id and UserGroup.ADMIN not in user.groups:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "478",
                "message": "Tag is not owned by you.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )
    delete_tag(entity_id, tag, connection)
    tags = get_tags(entity_id, connection)
    return {response_key: {"id": str(entity_id), "tag": tags}}

Your task endpoints become very small and declarative:

@router.post(path="/tag")
def tag_task(
    task_id: Annotated[int, Body()],
    tag: Annotated[str, SystemString64],
    user: Annotated[User | None, Depends(fetch_user)] = None,
    expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
    user = _require_user(user)
    return _tag_entity(
        task_id,
        tag,
        user,
        expdb,
        get_tags=database.tasks.get_tags,
        tag_fn=database.tasks.tag,
        response_key="task_tag",
    )


@router.post(path="/untag")
def untag_task(
    task_id: Annotated[int, Body()],
    tag: Annotated[str, SystemString64],
    user: Annotated[User | None, Depends(fetch_user)] = None,
    expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
    user = _require_user(user)
    return _untag_entity(
        task_id,
        tag,
        user,
        expdb,
        get_tag=database.tasks.get_tag,
        delete_tag=database.tasks.delete_tag,
        get_tags=database.tasks.get_tags,
        response_key="task_tag",
    )

This keeps all error codes/messages and response shapes intact but centralizes the tagging rules. The same helpers can then be reused by the flow/run routers by passing their own get_tags/get_tag/tag/delete_tag functions and appropriate response_key.

router = APIRouter(prefix="/runs", tags=["runs"])


@router.post(path="/tag")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the repeated auth and tagging checks into shared helper functions so the router endpoints only handle wiring to the database layer.

You can reduce the complexity here by extracting the duplicated tagging logic into shared helpers, and keeping the router functions as thin wiring to the DB layer.

For example, centralize the auth + duplicate‑check + error construction in a small tagging helper/service:

# tagging_service.py
from http import HTTPStatus
from fastapi import HTTPException
from database.users import User, UserGroup

def require_authenticated(user: User | None) -> User:
    if user is None:
        raise HTTPException(
            status_code=HTTPStatus.PRECONDITION_FAILED,
            detail={"code": "103", "message": "Authentication failed"},
        )
    return user

def ensure_not_tagged(tags: list[str], tag: str, entity_id: int, entity_name: str) -> None:
    if tag.casefold() in [t.casefold() for t in tags]:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "473",
                "message": f"{entity_name} already tagged by this tag.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )

def ensure_tag_exists(existing, entity_id: int, tag: str, entity_name: str) -> None:
    if existing is None:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "477",
                "message": "Tag not found.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )

def ensure_tag_owner_or_admin(existing, user: User, entity_id: int, tag: str) -> None:
    if existing.uploader != user.user_id and UserGroup.ADMIN not in user.groups:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "478",
                "message": "Tag is not owned by you.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )

Then your new router becomes mostly plumbing, similar to flows/tasks routers:

from fastapi import APIRouter, Body, Depends
from sqlalchemy import Connection
import database.runs
from database.users import User
from routers.dependencies import expdb_connection, fetch_user
from routers.types import SystemString64
from tagging_service import (
    require_authenticated,
    ensure_not_tagged,
    ensure_tag_exists,
    ensure_tag_owner_or_admin,
)

router = APIRouter(prefix="/runs", tags=["runs"])

@router.post(path="/tag")
def tag_run(
    run_id: int = Body(),
    tag: str = SystemString64,
    user: User | None = Depends(fetch_user),
    expdb: Connection = Depends(expdb_connection),
) -> dict[str, dict[str, Any]]:
    user = require_authenticated(user)
    tags = database.runs.get_tags(run_id, expdb)
    ensure_not_tagged(tags, tag, run_id, "Entity")
    database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb)
    return {"run_tag": {"id": str(run_id), "tag": [*tags, tag]}}

@router.post(path="/untag")
def untag_run(
    run_id: int = Body(),
    tag: str = SystemString64,
    user: User | None = Depends(fetch_user),
    expdb: Connection = Depends(expdb_connection),
) -> dict[str, dict[str, Any]]:
    user = require_authenticated(user)
    existing = database.runs.get_tag(run_id, tag, expdb)
    ensure_tag_exists(existing, run_id, tag, "Entity")
    ensure_tag_owner_or_admin(existing, user, run_id, tag)
    database.runs.delete_tag(run_id, tag, expdb)
    tags = database.runs.get_tags(run_id, expdb)
    return {"run_tag": {"id": str(run_id), "tag": tags}}

You can reuse the same tagging_service in the flows/tasks routers by just wiring the appropriate database.* functions and entity name. This keeps each router very small and removes the line‑for‑line duplication while preserving current behavior and error payloads.

)


def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the common tagging SQL into shared internal helper functions and have the specific tag/get_tag/delete_tag wrappers delegate to them to avoid duplication.

You can reduce repetition by extracting the shared tagging logic into a small internal helper and having the flow/task/run-specific functions delegate to it. That keeps behavior in one place while preserving the public API.

For example, in your data layer module:

# Internal helpers – table/column names are fixed constants, not user input.
def _tag_generic(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    user_id: int,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            INSERT INTO {table}(`{id_column}`, `tag`, `uploader`)
            VALUES (:id, :tag, :user_id)
            """
        ),
        parameters={"id": id_, "tag": tag_, "user_id": user_id},
    )


def _get_tag_generic(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> Row | None:
    return connection.execute(
        text(
            f"""
            SELECT `{id_column}` as id, `tag`, `uploader`
            FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    ).one_or_none()


def _delete_tag_generic(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            DELETE FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    )

Then your flow-specific functions become tiny wrappers:

def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
    _tag_generic(
        table="implementation_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        user_id=user_id,
        connection=connection,
    )


def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None:
    return _get_tag_generic(
        table="implementation_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )


def delete_tag(id_: int, tag_: str, connection: Connection) -> None:
    _delete_tag_generic(
        table="implementation_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )

You can apply the same pattern to the existing task/run tagging functions by just changing table/id_column. This centralizes the behavior so it’s less likely to diverge over time while keeping the public interface unchanged.

return [row.tag for row in tag_rows]


def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the common tag insert/get/delete logic into shared helpers parameterized by table and id column, then have the task-specific functions delegate to them.

You can keep the new tag/get_tag/delete_tag APIs but route them through a shared helper to avoid copy‑pasted SQL across tasks/flows/runs.

For example, introduce a small internal helper that only varies by table name (and optionally id column), then delegate from the public functions:

def _insert_tag(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    user_id: int,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            INSERT INTO {table}(`{id_column}`, `tag`, `uploader`)
            VALUES (:id, :tag, :user_id)
            """
        ),
        parameters={"id": id_, "tag": tag_, "user_id": user_id},
    )


def _get_tag(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> Row | None:
    return connection.execute(
        text(
            f"""
            SELECT `{id_column}` AS id, `tag`, `uploader`
            FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    ).one_or_none()


def _delete_tag(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            DELETE FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    )

Then your task‑specific functions become thin wrappers:

def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
    _insert_tag(
        table="task_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        user_id=user_id,
        connection=connection,
    )


def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None:
    return _get_tag(
        table="task_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )


def delete_tag(id_: int, tag_: str, connection: Connection) -> None:
    _delete_tag(
        table="task_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )

You can then reuse the same _insert_tag/_get_tag/_delete_tag helpers for flows and runs by just changing table= (and id_column= if needed). This keeps the public API intact while centralizing tagging behavior and reducing future synchronization overhead.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (10)
tests/routers/openml/task_tag_test.py (1)

31-41: Response assertion is tightly coupled to task 59's initial seed state.

The assertion {"task_tag": {"id": str(task_id), "tag": [tag]}} implicitly requires task 59 to have zero tags in the seed database. If seed data ever gains a tag for task 59, the test will fail with a confusing mismatch. Asserting tag in response.json()["task_tag"]["tag"] (subset check) would be more resilient.

Additionally, there's no test for tagging a non-existent task_id — this edge case would currently return a misleading success response or an FK error, neither of which is validated.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/routers/openml/task_tag_test.py` around lines 31 - 41, The
test_task_tag currently asserts an exact JSON matching which assumes task 59 had
no preexisting tags; change the assertion to check that tag is present in
response.json()["task_tag"]["tag"] (i.e., a subset/contains check) instead of
equality to allow pre-seeded tags, and keep the existing verification via
get_tags(id_=task_id, expdb=expdb_test). Also add a new test that posts to the
same endpoint ("/tasks/tag" using py_api.post) with a non-existent task_id and
asserts the correct error behavior (e.g., 4xx status and appropriate error body)
to verify handling of missing FK/invalid task IDs.
src/routers/openml/runs.py (1)

27-38: No run-existence validation before tagging.

database.runs.get_tags returns [] for a non-existent run_id, so the endpoint proceeds to INSERT a tag for an entity that doesn't exist. Depending on whether run_tag.id has an FK to run, this either silently creates an orphaned row or throws an unhandled FK IntegrityError. Adding a database.runs.get() (currently absent from src/database/runs.py) and a 404 guard here would close this.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/runs.py` around lines 27 - 38, The code calls
database.runs.get_tags(run_id, expdb) and then database.runs.tag(...) without
first verifying the run exists, which can create orphaned tags or trigger FK
errors; add an existence check by calling database.runs.get(run_id, expdb) (or
equivalent lookup) before checking tags, and if it returns no result raise
HTTPException with status_code HTTPStatus.NOT_FOUND and a descriptive detail
(include id=run_id); only proceed to duplicate-tag check and
database.runs.tag(...) after the run is confirmed to exist.
tests/routers/openml/run_tag_test.py (1)

44-54: test_run_tag only covers the admin key; task and flow tests parametrize over admin, non-owner, and owner.

Tagging by non-admin users is a valid path (any authenticated user can tag), and it's exercised in test_task_tag. Aligning test_run_tag to also parametrize over ApiKey.SOME_USER / ApiKey.OWNER_USER would give consistent coverage across entity types.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/routers/openml/run_tag_test.py` around lines 44 - 54, The test_run_tag
currently only exercises the admin path; update test_run_tag to parametrize over
API keys (e.g., ApiKey.ADMIN, ApiKey.SOME_USER, ApiKey.OWNER_USER) so it mirrors
test_task_tag's coverage: add a parameter (api_key) to the test signature, apply
a pytest.mark.parametrize decorator with the three ApiKey values, and use that
api_key value in the POST URL (replace ApiKey.ADMIN in the
f"/runs/tag?api_key=..." call). Keep the same assertions and follow-up tags
check using get_tags(id_=run_id, expdb=expdb_test).
src/database/tasks.py (1)

109-143: connection vs expdb naming divergence from existing module convention.

The three new functions use connection: Connection as the parameter name, while the pre-existing functions in this file (e.g., get_tags at line 95, get at line 7, etc.) all use expdb: Connection. The same divergence exists in src/database/flows.py and src/database/runs.py. Not a bug, but it creates a mild inconsistency in the public API surface of these modules.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/tasks.py` around lines 109 - 143, Rename the Connection
parameter in the new functions to match the module convention: change the
parameter name "connection: Connection" to "expdb: Connection" in the functions
tag, get_tag, and delete_tag so they align with existing functions like get_tags
and get; update all internal references (parameters dicts, calls) to use expdb
where applicable. Ensure the signature and any callers are adjusted consistently
to avoid breaking the public API.
src/routers/openml/tasks.py (1)

166-177: Task existence is not validated before tagging — and database.tasks.get() is already available.

Unlike the runs case, database.tasks.get(task_id, expdb) already exists and is used by get_task. Adding an existence guard here is a one-liner:

✨ Proposed guard
     if user is None:
         raise HTTPException(
             status_code=HTTPStatus.PRECONDITION_FAILED,
             detail={"code": "103", "message": "Authentication failed"},
         )
+    if not database.tasks.get(task_id, expdb):
+        raise HTTPException(
+            status_code=HTTPStatus.NOT_FOUND,
+            detail="Task not found",
+        )
     tags = database.tasks.get_tags(task_id, expdb)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/tasks.py` around lines 166 - 177, Add an existence guard
before tagging: call database.tasks.get(task_id, expdb) and if it returns falsy,
raise an HTTPException for "not found" (e.g., 404) before proceeding to use
database.tasks.get_tags and database.tasks.tag; keep the rest of the logic
(checking tag duplication using tags = database.tasks.get_tags(...), raising the
duplicate tag HTTPException, then calling database.tasks.tag(...)) unchanged so
you don't attempt to tag a non-existent task.
src/database/runs.py (1)

1-52: No get() function and expdb/connection naming divergence.

The module is missing a get(run_id, expdb) lookup function. Its absence prevents the router from performing an entity-existence check before tagging (see the router review below). Adding one now — consistent with database.tasks.get() — would complete the module's API.

The same expdb (read) / connection (write) naming asymmetry noted in src/database/tasks.py applies here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/runs.py` around lines 1 - 52, Add a read-only lookup function
get(id_: int, expdb: Connection) -> Row | None that mirrors
database.tasks.get(): execute a SELECT (e.g., SELECT * FROM run WHERE id =
:run_id) against the experiment DB and return one_or_none(); also normalize
parameter naming so read operations use expdb and write operations use
connection — change get_tag signature/usage to accept expdb: Connection (not
connection) for reads, keep tag and delete_tag using connection: Connection for
writes, and update any callers accordingly.
tests/routers/openml/flow_tag_test.py (2)

40-68: Hard-coded flow_id=1 and pre-seeded tag assumptions are fragile

test_flow_tag_returns_existing_tags and test_flow_tag_fails_if_tag_exists assume flow 1 already carries "OpenmlWeka" and "weka" in the test DB. If the seed data changes, both tests fail silently with no pointer to the cause. Consider using the flow fixture and setting up required tags explicitly at the start of each test (similar to test_flow_untag), or documenting the seed data dependency clearly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/routers/openml/flow_tag_test.py` around lines 40 - 68, Both tests
hard-code flow_id=1 and assume pre-seeded tags ("OpenmlWeka", "weka"), which is
fragile; update test_flow_tag_returns_existing_tags and
test_flow_tag_fails_if_tag_exists to use the flow fixture (e.g., obtain flow.id
from the flow fixture) and explicitly create or attach the required tags at the
start of each test (mirroring the setup pattern used in test_flow_untag) before
calling the POST /flows/tag endpoint, then use that flow.id in assertions and
the expected error payload so tests no longer rely on external seed data.

112-123: Admin bypass for untag is not tested

The implementation at src/routers/openml/flows.py Line 65 contains an admin-bypass path (UserGroup.ADMIN not in user.groups), but the test suite covers only the rejection case (non-owner, non-admin). A test where a different admin key removes another user's tag would fully exercise this logic.

Would you like me to draft the missing test_flow_untag_admin_can_untag_any_tag test case?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/routers/openml/flow_tag_test.py` around lines 112 - 123, Add a new test
named test_flow_untag_admin_can_untag_any_tag in
tests/routers/openml/flow_tag_test.py that creates a tag on a Flow as a
non-admin user (use Flow fixture and TestClient/ApiKey.SOME_USER), then calls
the untag endpoint using the admin API key (ApiKey.ADMIN) to remove that tag;
assert the untag response is successful (HTTP 200) and verify the tag is
actually removed from the flow (e.g., fetch flow tags or check the response
body). This exercises the admin-bypass logic in the untag handler (the branch
checking UserGroup.ADMIN in src/routers/openml/flows.py).
src/routers/openml/flows.py (2)

40-40: Response tag list is built from a pre-insert snapshot, inconsistent with untag_flow

untag_flow re-queries get_tags after delete_tag (Line 75), giving a consistent post-operation view. tag_flow constructs the list from the stale pre-insert tags snapshot ([*tags, tag]). While identical in practice for single requests, the two endpoints are inconsistent and the pattern is harder to reason about. Consider re-querying after the insert:

♻️ Proposed refactor
     database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
-    return {"flow_tag": {"id": str(flow_id), "tag": [*tags, tag]}}
+    tags = database.flows.get_tags(flow_id, expdb)
+    return {"flow_tag": {"id": str(flow_id), "tag": tags}}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/flows.py` at line 40, The response in tag_flow currently
returns a tags list built from the pre-insert snapshot ([*tags, tag]) which is
inconsistent with untag_flow; after performing the insert (the operation that
adds the tag in tag_flow), re-call get_tags(flow_id) to fetch the authoritative,
post-insert tag list and return that in the {"flow_tag": {"id": str(flow_id),
"tag": ...}} response so both tag_flow and untag_flow use the same fresh source
of truth.

32-32: INTERNAL_SERVER_ERROR (500) used for application-level domain errors

Codes 473, 477, and 478 are client-originated errors (duplicate tag, tag not found, wrong ownership) and should map to 4xx status codes (e.g., 409 Conflict for 473, 404 for 477, 403 for 478). Using 500 misrepresents the error to clients and monitoring tools. This applies equally to the same pattern in untag_flow (Lines 58, 67).

Acknowledged that the PR intentionally mirrors the existing dataset tag pattern — flagging for awareness if that pattern is ever revisited.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/flows.py` at line 32, Change the HTTP status mapping for
application-level domain errors in the tag_flow and untag_flow handlers: when
catching AppErrors with code 473, return 409 Conflict; with code 477, return 404
Not Found; with code 478, return 403 Forbidden instead of
HTTPStatus.INTERNAL_SERVER_ERROR; update the error->status_code logic in the
tag_flow and untag_flow exception handlers (the block that checks err.code on
AppError) to return these 4xx codes so client-originated errors are represented
correctly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/routers/openml/flows.py`:
- Around line 17-40: tag_flow currently skips validating that the flow exists
which can create orphaned tags or DB errors; before fetching tags or calling
database.flows.tag, call database.flows.get(flow_id, expdb) (same as get_flow)
and if it returns None raise HTTPException with HTTPStatus.NOT_FOUND and the
same error payload pattern used in get_flow; then continue with the existing
duplicate-tag check and database.flows.tag to safely ensure tags are only added
for existing flows.
- Around line 29-39: The current get_tags -> tag sequence has a TOCTOU race: add
a UNIQUE constraint on implementation_tag for (id, tag) at the schema level, and
modify the call site around database.flows.tag(flow_id, tag, user_id=...) to
catch the DB integrity exception (e.g., IntegrityError or the DB driver's
unique-constraint error) and translate it into the same HTTPException payload
(status_code HTTPStatus.INTERNAL_SERVER_ERROR, detail code "473" with id and
tag) instead of letting the exception propagate; alternatively, change
database.flows.tag to use an idempotent INSERT (INSERT IGNORE / ON DUPLICATE KEY
UPDATE) or perform an atomic existence-check-and-insert in the DB and surface
the 473 response on duplicate—apply the same pattern to database.flows.tag usage
in tasks.py, runs.py, and datasets.py.

In `@src/routers/openml/runs.py`:
- Around line 27-38: The pre-check in the tag path (using database.runs.get_tags
and casefold) can race with the subsequent INSERT in database.runs.tag, so wrap
the call to database.runs.tag in a try/except that catches the DB IntegrityError
(or the specific DB exception your DB layer raises) and re-raises the same
HTTPException currently used for duplicates (status_code
HTTPStatus.INTERNAL_SERVER_ERROR, detail code "473", message "Entity already
tagged by this tag.", additional_information f"id={run_id}; tag={tag}") so
concurrent inserts surface the clean 473 instead of an opaque 500; apply the
same pattern to tag_flow and tag_task where get_tags + tag are used.

In `@src/routers/openml/tasks.py`:
- Around line 166-177: The current TOCTOU flaw in the tags flow (calls to
database.tasks.get_tags and database.tasks.tag) can let concurrent inserts
bypass the pre-check and leak raw DB errors; wrap the call to
database.tasks.tag(task_id, tag, user_id=user.user_id, connection=expdb) in a
try/except that catches the DB IntegrityError and raises the same HTTPException
with status_code HTTPStatus.INTERNAL_SERVER_ERROR and the existing detail
payload (code "473", message "Entity already tagged by this tag.",
additional_information containing id and tag); keep the original behavior of
returning {"task_tag": {"id": str(task_id), "tag": [*tags, tag]}} on success.

In `@tests/routers/openml/flow_tag_test.py`:
- Around line 86-101: The setup POST that applies the tag is currently sent
fire-and-forget in test_flow_untag; change it to capture its response and assert
it succeeded before calling the untag endpoint. Specifically, after calling
py_api.post(f"/flows/tag?api_key={ApiKey.ADMIN}", json={"flow_id": flow.id,
"tag": tag}) store the response, assert response.status_code == HTTPStatus.OK
(and optionally check the JSON contains the tag), then proceed to call
py_api.post for untag and the existing assertions; this ensures failures in the
tag step (in test_flow_untag / py_api.post / ApiKey.ADMIN / flow.id) produce
clear test errors.

---

Nitpick comments:
In `@src/database/runs.py`:
- Around line 1-52: Add a read-only lookup function get(id_: int, expdb:
Connection) -> Row | None that mirrors database.tasks.get(): execute a SELECT
(e.g., SELECT * FROM run WHERE id = :run_id) against the experiment DB and
return one_or_none(); also normalize parameter naming so read operations use
expdb and write operations use connection — change get_tag signature/usage to
accept expdb: Connection (not connection) for reads, keep tag and delete_tag
using connection: Connection for writes, and update any callers accordingly.

In `@src/database/tasks.py`:
- Around line 109-143: Rename the Connection parameter in the new functions to
match the module convention: change the parameter name "connection: Connection"
to "expdb: Connection" in the functions tag, get_tag, and delete_tag so they
align with existing functions like get_tags and get; update all internal
references (parameters dicts, calls) to use expdb where applicable. Ensure the
signature and any callers are adjusted consistently to avoid breaking the public
API.

In `@src/routers/openml/flows.py`:
- Line 40: The response in tag_flow currently returns a tags list built from the
pre-insert snapshot ([*tags, tag]) which is inconsistent with untag_flow; after
performing the insert (the operation that adds the tag in tag_flow), re-call
get_tags(flow_id) to fetch the authoritative, post-insert tag list and return
that in the {"flow_tag": {"id": str(flow_id), "tag": ...}} response so both
tag_flow and untag_flow use the same fresh source of truth.
- Line 32: Change the HTTP status mapping for application-level domain errors in
the tag_flow and untag_flow handlers: when catching AppErrors with code 473,
return 409 Conflict; with code 477, return 404 Not Found; with code 478, return
403 Forbidden instead of HTTPStatus.INTERNAL_SERVER_ERROR; update the
error->status_code logic in the tag_flow and untag_flow exception handlers (the
block that checks err.code on AppError) to return these 4xx codes so
client-originated errors are represented correctly.

In `@src/routers/openml/runs.py`:
- Around line 27-38: The code calls database.runs.get_tags(run_id, expdb) and
then database.runs.tag(...) without first verifying the run exists, which can
create orphaned tags or trigger FK errors; add an existence check by calling
database.runs.get(run_id, expdb) (or equivalent lookup) before checking tags,
and if it returns no result raise HTTPException with status_code
HTTPStatus.NOT_FOUND and a descriptive detail (include id=run_id); only proceed
to duplicate-tag check and database.runs.tag(...) after the run is confirmed to
exist.

In `@src/routers/openml/tasks.py`:
- Around line 166-177: Add an existence guard before tagging: call
database.tasks.get(task_id, expdb) and if it returns falsy, raise an
HTTPException for "not found" (e.g., 404) before proceeding to use
database.tasks.get_tags and database.tasks.tag; keep the rest of the logic
(checking tag duplication using tags = database.tasks.get_tags(...), raising the
duplicate tag HTTPException, then calling database.tasks.tag(...)) unchanged so
you don't attempt to tag a non-existent task.

In `@tests/routers/openml/flow_tag_test.py`:
- Around line 40-68: Both tests hard-code flow_id=1 and assume pre-seeded tags
("OpenmlWeka", "weka"), which is fragile; update
test_flow_tag_returns_existing_tags and test_flow_tag_fails_if_tag_exists to use
the flow fixture (e.g., obtain flow.id from the flow fixture) and explicitly
create or attach the required tags at the start of each test (mirroring the
setup pattern used in test_flow_untag) before calling the POST /flows/tag
endpoint, then use that flow.id in assertions and the expected error payload so
tests no longer rely on external seed data.
- Around line 112-123: Add a new test named
test_flow_untag_admin_can_untag_any_tag in tests/routers/openml/flow_tag_test.py
that creates a tag on a Flow as a non-admin user (use Flow fixture and
TestClient/ApiKey.SOME_USER), then calls the untag endpoint using the admin API
key (ApiKey.ADMIN) to remove that tag; assert the untag response is successful
(HTTP 200) and verify the tag is actually removed from the flow (e.g., fetch
flow tags or check the response body). This exercises the admin-bypass logic in
the untag handler (the branch checking UserGroup.ADMIN in
src/routers/openml/flows.py).

In `@tests/routers/openml/run_tag_test.py`:
- Around line 44-54: The test_run_tag currently only exercises the admin path;
update test_run_tag to parametrize over API keys (e.g., ApiKey.ADMIN,
ApiKey.SOME_USER, ApiKey.OWNER_USER) so it mirrors test_task_tag's coverage: add
a parameter (api_key) to the test signature, apply a pytest.mark.parametrize
decorator with the three ApiKey values, and use that api_key value in the POST
URL (replace ApiKey.ADMIN in the f"/runs/tag?api_key=..." call). Keep the same
assertions and follow-up tags check using get_tags(id_=run_id,
expdb=expdb_test).

In `@tests/routers/openml/task_tag_test.py`:
- Around line 31-41: The test_task_tag currently asserts an exact JSON matching
which assumes task 59 had no preexisting tags; change the assertion to check
that tag is present in response.json()["task_tag"]["tag"] (i.e., a
subset/contains check) instead of equality to allow pre-seeded tags, and keep
the existing verification via get_tags(id_=task_id, expdb=expdb_test). Also add
a new test that posts to the same endpoint ("/tasks/tag" using py_api.post) with
a non-existent task_id and asserts the correct error behavior (e.g., 4xx status
and appropriate error body) to verify handling of missing FK/invalid task IDs.

Comment on lines 17 to 40
@router.post(path="/tag")
def tag_flow(
flow_id: Annotated[int, Body()],
tag: Annotated[str, SystemString64],
user: Annotated[User | None, Depends(fetch_user)] = None,
expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
if user is None:
raise HTTPException(
status_code=HTTPStatus.PRECONDITION_FAILED,
detail={"code": "103", "message": "Authentication failed"},
)
tags = database.flows.get_tags(flow_id, expdb)
if tag.casefold() in [t.casefold() for t in tags]:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={flow_id}; tag={tag}",
},
)
database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
return {"flow_tag": {"id": str(flow_id), "tag": [*tags, tag]}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

tag_flow does not validate that the flow exists before inserting a tag

get_flow (Line 97) demonstrates the pattern — call database.flows.get() and raise 404 if the result is None. tag_flow skips this step entirely. A caller supplying a non-existent flow_id will either silently insert an orphaned row into implementation_tag (no foreign-key constraint) or receive an unhandled DB exception rather than a clean 404.

🛡️ Proposed fix
 `@router.post`(path="/tag")
 def tag_flow(
     flow_id: Annotated[int, Body()],
     tag: Annotated[str, SystemString64],
     user: Annotated[User | None, Depends(fetch_user)] = None,
     expdb: Annotated[Connection, Depends(expdb_connection)] = None,
 ) -> dict[str, dict[str, Any]]:
     if user is None:
         raise HTTPException(
             status_code=HTTPStatus.PRECONDITION_FAILED,
             detail={"code": "103", "message": "Authentication failed"},
         )
+    if database.flows.get(flow_id, expdb) is None:
+        raise HTTPException(
+            status_code=HTTPStatus.NOT_FOUND,
+            detail={"code": "180", "message": "Unknown flow."},
+        )
     tags = database.flows.get_tags(flow_id, expdb)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/flows.py` around lines 17 - 40, tag_flow currently skips
validating that the flow exists which can create orphaned tags or DB errors;
before fetching tags or calling database.flows.tag, call
database.flows.get(flow_id, expdb) (same as get_flow) and if it returns None
raise HTTPException with HTTPStatus.NOT_FOUND and the same error payload pattern
used in get_flow; then continue with the existing duplicate-tag check and
database.flows.tag to safely ensure tags are only added for existing flows.

Comment on lines +29 to +39
tags = database.flows.get_tags(flow_id, expdb)
if tag.casefold() in [t.casefold() for t in tags]:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={flow_id}; tag={tag}",
},
)
database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Search for implementation_tag table definition and constraints
find . -type f \( -name "*.sql" -o -name "*.py" \) | head -30

Repository: openml/server-api

Length of output: 898


🏁 Script executed:

# Look for database models and schema definitions
rg "implementation_tag" -A 5 -B 2 --max-count=20

Repository: openml/server-api

Length of output: 1655


🏁 Script executed:

# Find the database.flows module to examine tag() and get_tags() implementations
fd "database" --type f -e py | head -20

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Check the SQL schema files for implementation_tag constraints
cat ./docker/mysql/data/openml.sql | grep -A 10 "implementation_tag"

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Also check the _expdb schema
cat ./docker/mysql/data/openml_expdb.sql | grep -A 10 "implementation_tag"

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Read the full flows.py database module to see tag() and get_tags() functions
cat -n src/database/flows.py

Repository: openml/server-api

Length of output: 3895


🏁 Script executed:

# Try to read schema files with head/tail to see structure
head -100 ./docker/mysql/data/openml.sql | grep -A 15 "implementation_tag"

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Read the router flows.py file to see the full context of lines 29-39
cat -n src/routers/openml/flows.py | head -60

Repository: openml/server-api

Length of output: 2676


🏁 Script executed:

# Search for implementation_tag table creation in SQL files
fd -e sql -x grep -l "implementation_tag" {}

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Try to read the SQL files line by line
wc -l ./docker/mysql/data/openml.sql ./docker/mysql/data/openml_expdb.sql

Repository: openml/server-api

Length of output: 143


🏁 Script executed:

# Search for UNIQUE constraint patterns in SQL files
rg "UNIQUE.*implementation_tag|implementation_tag.*UNIQUE" --include="*.sql"

Repository: openml/server-api

Length of output: 502


🏁 Script executed:

# Check for PRIMARY KEY or UNIQUE constraints on (id, tag)
rg -A 20 "CREATE TABLE.*implementation_tag" --include="*.sql"

Repository: openml/server-api

Length of output: 502


🏁 Script executed:

# Read the SQL files directly to find implementation_tag table definition
cat ./docker/mysql/data/openml.sql | head -5
cat ./docker/mysql/data/openml_expdb.sql | head -5

Repository: openml/server-api

Length of output: 158


🏁 Script executed:

# Search for implementation_tag CREATE TABLE in both files
grep -n "implementation_tag" ./docker/mysql/data/openml.sql ./docker/mysql/data/openml_expdb.sql | head -20

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Try using sed or awk to extract lines around implementation_tag
sed -n '/implementation_tag/,/;/p' ./docker/mysql/data/openml.sql | head -30
sed -n '/implementation_tag/,/;/p' ./docker/mysql/data/openml_expdb.sql | head -30

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Search for database migration or setup files
find . -type f \( -name "*migration*" -o -name "*schema*" -o -name "*setup*" \) | grep -E "\.(py|sql)$"

Repository: openml/server-api

Length of output: 380


🏁 Script executed:

# Search for IntegrityError handling in the codebase
rg "IntegrityError" -t py

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Check database/__init__.py and database/setup.py for schema or constraint info
cat -n src/database/setup.py

Repository: openml/server-api

Length of output: 1145


🏁 Script executed:

# Check if there's any documentation about the schema in README files
find . -name "README*" -o -name "readme*" | head -5

Repository: openml/server-api

Length of output: 142


🏁 Script executed:

# Search for error handling around database.flows.tag() calls
rg "database\.flows\.tag|\.tag\(" -A 5 -B 5 src/routers/

Repository: openml/server-api

Length of output: 2753


🏁 Script executed:

# Check if there's try/except around tag operations in router
rg "try|except|IntegrityError|SQLAlchemy" -A 3 src/routers/openml/flows.py

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Look for ON DUPLICATE KEY UPDATE or similar patterns in database module
rg "ON DUPLICATE|INSERT IGNORE|INSERT OR" src/database/

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Check the actual imports and error handling setup in database/flows.py
head -20 src/database/flows.py

Repository: openml/server-api

Length of output: 563


TOCTOU race condition in duplicate-tag check

The get_tags → casefold check → tag() insert sequence is not atomic. Two concurrent requests for the same (flow_id, tag) can both pass Line 30 and both reach the INSERT on Line 39, resulting in either a silent duplicate row or an unhandled database exception.

The tag() function (in src/database/flows.py) executes a plain INSERT with no error handling. No try/except wraps the call, and the query lacks ON DUPLICATE KEY UPDATE or similar protection. Even if a unique constraint exists on (id, tag) in the schema, the resulting IntegrityError would propagate uncaught rather than returning the proper 473 response.

Fix: Add a UNIQUE constraint on (id, tag) in implementation_tag and wrap the tag() call in a try/except to catch and translate IntegrityError to code 473, or use INSERT IGNORE/ON DUPLICATE KEY UPDATE with a post-insert existence check.

Note: The same pattern exists in tasks.py, runs.py, and datasets.py.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/flows.py` around lines 29 - 39, The current get_tags ->
tag sequence has a TOCTOU race: add a UNIQUE constraint on implementation_tag
for (id, tag) at the schema level, and modify the call site around
database.flows.tag(flow_id, tag, user_id=...) to catch the DB integrity
exception (e.g., IntegrityError or the DB driver's unique-constraint error) and
translate it into the same HTTPException payload (status_code
HTTPStatus.INTERNAL_SERVER_ERROR, detail code "473" with id and tag) instead of
letting the exception propagate; alternatively, change database.flows.tag to use
an idempotent INSERT (INSERT IGNORE / ON DUPLICATE KEY UPDATE) or perform an
atomic existence-check-and-insert in the DB and surface the 473 response on
duplicate—apply the same pattern to database.flows.tag usage in tasks.py,
runs.py, and datasets.py.

Comment on lines 27 to 38
tags = database.runs.get_tags(run_id, expdb)
if tag.casefold() in [t.casefold() for t in tags]:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={run_id}; tag={tag}",
},
)
database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb)
return {"run_tag": {"id": str(run_id), "tag": [*tags, tag]}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

TOCTOU between duplicate check and INSERT can surface a raw DB error instead of error 473.

get_tags and tag() are not executed atomically. Two concurrent requests with the same (run_id, tag) can both pass the casefold() pre-check before either inserts; the second INSERT then raises an uncaught IntegrityError (assuming a UNIQUE constraint on run_tag(id, tag)), which propagates as an opaque 500 rather than the clean 473. Catching IntegrityError around the INSERT closes the gap:

🛡️ Proposed fix
+from sqlalchemy.exc import IntegrityError

 ...
-    database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb)
-    return {"run_tag": {"id": str(run_id), "tag": [*tags, tag]}}
+    try:
+        database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb)
+    except IntegrityError:
+        raise HTTPException(
+            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+            detail={
+                "code": "473",
+                "message": "Entity already tagged by this tag.",
+                "additional_information": f"id={run_id}; tag={tag}",
+            },
+        )
+    return {"run_tag": {"id": str(run_id), "tag": [*tags, tag]}}

The same pattern applies to tag_flow and tag_task.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
tags = database.runs.get_tags(run_id, expdb)
if tag.casefold() in [t.casefold() for t in tags]:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={run_id}; tag={tag}",
},
)
database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb)
return {"run_tag": {"id": str(run_id), "tag": [*tags, tag]}}
tags = database.runs.get_tags(run_id, expdb)
if tag.casefold() in [t.casefold() for t in tags]:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={run_id}; tag={tag}",
},
)
try:
database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb)
except IntegrityError:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={run_id}; tag={tag}",
},
)
return {"run_tag": {"id": str(run_id), "tag": [*tags, tag]}}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/runs.py` around lines 27 - 38, The pre-check in the tag
path (using database.runs.get_tags and casefold) can race with the subsequent
INSERT in database.runs.tag, so wrap the call to database.runs.tag in a
try/except that catches the DB IntegrityError (or the specific DB exception your
DB layer raises) and re-raises the same HTTPException currently used for
duplicates (status_code HTTPStatus.INTERNAL_SERVER_ERROR, detail code "473",
message "Entity already tagged by this tag.", additional_information
f"id={run_id}; tag={tag}") so concurrent inserts surface the clean 473 instead
of an opaque 500; apply the same pattern to tag_flow and tag_task where get_tags
+ tag are used.

Comment on lines 166 to 177
tags = database.tasks.get_tags(task_id, expdb)
if tag.casefold() in [t.casefold() for t in tags]:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={task_id}; tag={tag}",
},
)
database.tasks.tag(task_id, tag, user_id=user.user_id, connection=expdb)
return {"task_tag": {"id": str(task_id), "tag": [*tags, tag]}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Same TOCTOU race as tag_run: concurrent inserts bypass error 473 and surface a raw DB error.

The identical fix applies — catch IntegrityError around the database.tasks.tag(...) call. See the src/routers/openml/runs.py comment for the proposed pattern.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/tasks.py` around lines 166 - 177, The current TOCTOU flaw
in the tags flow (calls to database.tasks.get_tags and database.tasks.tag) can
let concurrent inserts bypass the pre-check and leak raw DB errors; wrap the
call to database.tasks.tag(task_id, tag, user_id=user.user_id, connection=expdb)
in a try/except that catches the DB IntegrityError and raises the same
HTTPException with status_code HTTPStatus.INTERNAL_SERVER_ERROR and the existing
detail payload (code "473", message "Entity already tagged by this tag.",
additional_information containing id and tag); keep the original behavior of
returning {"task_tag": {"id": str(task_id), "tag": [*tags, tag]}} on success.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/routers/openml/run_tag_test.py`:
- Line 31: Update the incorrect type annotation for the test parameter named
"key" in tests/routers/openml/run_tag_test.py to allow None (e.g., change from
"key: ApiKey" to "key: ApiKey | None" or "Optional[ApiKey]") so it matches the
parametrized cases that include None; apply the same change for the second
occurrence around the other test (referenced at the other occurrence).
- Around line 59-62: The three setup POST calls using
py_api.post("/runs/tag?api_key=...") in tests test_run_tag_fails_if_tag_exists,
test_run_untag, and test_run_untag_fails_if_not_owner must assert their
responses so silent failures don't mask the actual test intent; update each
setup to capture the response and assert the expected success (e.g.,
HTTPStatus.OK or the expected response code/value) before proceeding with the
rest of the test, so subsequent asserts (like checking error codes
"473"/"477"/"478") fail for the right reasons.

- Re-read tags from DB after insert instead of appending locally
- Fix type annotations: key param is ApiKey | None when parametrize includes None
- Assert setup POST responses in tests to catch silent failures
- Add test for non-admin user untagging their own tag
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
tests/routers/openml/flow_tag_test.py (1)

113-125: Consider adding a positive ownership test (non-admin untags their own tag).

Same parity gap as in run_tag_test.py — the negative path (test_flow_untag_fails_if_not_owner) is covered but the symmetric positive case (non-admin user successfully untagging their own tag) is missing, unlike the tasks test suite.

➕ Suggested test
def test_flow_untag_non_admin_own_tag(flow: Flow, expdb_test: Connection, py_api: TestClient) -> None:
    tag = "user_tag"
    setup = py_api.post(
        f"/flows/tag?api_key={ApiKey.SOME_USER}",
        json={"flow_id": flow.id, "tag": tag},
    )
    assert setup.status_code == HTTPStatus.OK
    response = py_api.post(
        f"/flows/untag?api_key={ApiKey.SOME_USER}",
        json={"flow_id": flow.id, "tag": tag},
    )
    assert response.status_code == HTTPStatus.OK

    tags = get_tags(flow_id=flow.id, expdb=expdb_test)
    assert tag not in tags
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/routers/openml/flow_tag_test.py` around lines 113 - 125, Add a
symmetric positive test verifying a non-admin can untag their own tag: create a
new test named something like test_flow_untag_non_admin_own_tag that uses the
flow fixture, posts a tag with py_api using ApiKey.SOME_USER, asserts the tag
setup returns HTTPStatus.OK, then calls the untag endpoint with the same
ApiKey.SOME_USER and asserts HTTPStatus.OK, and finally calls
get_tags(flow_id=flow.id, expdb=expdb_test) to assert the tag is no longer
present; mirror the existing test_flow_untag_fails_if_not_owner flow but use the
non-admin user for both tag and untag and include the expdb_test check.
tests/routers/openml/run_tag_test.py (1)

125-137: Consider adding a positive ownership test (non-admin untags their own tag).

test_run_untag_fails_if_not_owner covers the rejection path; the analogous positive case — a non-admin user successfully untagging a tag they own — is absent here, unlike test_task_untag_non_admin_own_tag in task_tag_test.py. The ownership predicate is identical across modules, so the gap is low-risk, but parity with the task tests would be cleaner.

➕ Suggested test
def test_run_untag_non_admin_own_tag(run_id: int, expdb_test: Connection, py_api: TestClient) -> None:
    tag = "user_tag"
    setup = py_api.post(
        f"/runs/tag?api_key={ApiKey.SOME_USER}",
        json={"run_id": run_id, "tag": tag},
    )
    assert setup.status_code == HTTPStatus.OK
    response = py_api.post(
        f"/runs/untag?api_key={ApiKey.SOME_USER}",
        json={"run_id": run_id, "tag": tag},
    )
    assert response.status_code == HTTPStatus.OK

    tags = get_tags(id_=run_id, expdb=expdb_test)
    assert tag not in tags
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/routers/openml/run_tag_test.py` around lines 125 - 137, Add a positive
test that ensures a non-admin user can untag a tag they created: implement a new
test function (e.g., test_run_untag_non_admin_own_tag) which uses py_api to POST
to /runs/tag with ApiKey.SOME_USER and json {"run_id": run_id, "tag":
"user_tag"}, asserts the setup returned HTTPStatus.OK, then POSTs to /runs/untag
with the same ApiKey.SOME_USER and tag, asserts HTTPStatus.OK, and finally
verifies the tag was removed by calling get_tags(id_=run_id, expdb=expdb_test)
and asserting the tag is not present; reference the existing
test_run_untag_fails_if_not_owner, ApiKey.SOME_USER, /runs/tag, /runs/untag, and
get_tags to place the test consistently with other tag tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/routers/openml/runs.py`:
- Around line 15-39: Check that the run exists before inserting a tag: in
tag_run, call an existence/read helper (e.g., database.runs.get or
database.runs.get_by_id) using run_id and expdb before calling
database.runs.tag; if the helper returns no row, raise an HTTPException
(HTTPStatus.NOT_FOUND) with an appropriate detail payload (e.g.,
{"code":"404","message":"Run not found", "additional_information":
f"id={run_id}"}). Do the same existence check path before the duplicate-tag
check so you don't query tags for a missing run, and mirror this pattern for
tag_flow and tag_task (use the same database.runs.* helper names shown in this
diff to locate where to add the check).

In `@src/routers/openml/tasks.py`:
- Around line 154-178: tag_task currently writes tags without verifying the task
exists, which can create orphan rows or leak DB errors; before calling
database.tasks.tag, call the task-existence check used elsewhere (e.g., the same
lookup used by get_task or a database.tasks.get/get_by_id with the provided
expdb connection) and if not found raise an HTTPException 404 with the same
response shape used by get_task; only proceed to call database.tasks.tag and
then database.tasks.get_tags when the task exists.

---

Duplicate comments:
In `@src/routers/openml/flows.py`:
- Around line 17-41: tag_flow currently skips an existence check and is
vulnerable to a TOCTOU race between get_tags and tag; before proceeding call
database.flows.get(flow_id, expdb) and raise an HTTPException (same auth-style
payload) if it returns None, then perform the tag insertion inside a try/except
that catches the DB integrity error (e.g., IntegrityError from your DB layer)
around database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
and translate that error into the existing 473 HTTPException for duplicate tags;
keep the case-insensitive pre-check with database.flows.get_tags(flow_id, expdb)
as an early fast-fail but rely on the IntegrityError guard as the definitive
protection and still re-fetch tags after successful insertion.

In `@src/routers/openml/runs.py`:
- Around line 27-37: The current get_tags -> casefold check ->
database.runs.tag(...) sequence is vulnerable to TOCTOU; keep the pre-check if
desired but make the insert atomic by wrapping the call to
database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb) in a
try/except that catches the DB integrity exception (e.g.,
sqlalchemy.exc.IntegrityError or the DB client’s unique constraint error) and on
that exception raise the same HTTPException (status_code
HTTPStatus.INTERNAL_SERVER_ERROR with code "473" and the same detail payload) so
concurrent duplicate inserts are handled correctly; reference
database.runs.get_tags and database.runs.tag in the change.

---

Nitpick comments:
In `@tests/routers/openml/flow_tag_test.py`:
- Around line 113-125: Add a symmetric positive test verifying a non-admin can
untag their own tag: create a new test named something like
test_flow_untag_non_admin_own_tag that uses the flow fixture, posts a tag with
py_api using ApiKey.SOME_USER, asserts the tag setup returns HTTPStatus.OK, then
calls the untag endpoint with the same ApiKey.SOME_USER and asserts
HTTPStatus.OK, and finally calls get_tags(flow_id=flow.id, expdb=expdb_test) to
assert the tag is no longer present; mirror the existing
test_flow_untag_fails_if_not_owner flow but use the non-admin user for both tag
and untag and include the expdb_test check.

In `@tests/routers/openml/run_tag_test.py`:
- Around line 125-137: Add a positive test that ensures a non-admin user can
untag a tag they created: implement a new test function (e.g.,
test_run_untag_non_admin_own_tag) which uses py_api to POST to /runs/tag with
ApiKey.SOME_USER and json {"run_id": run_id, "tag": "user_tag"}, asserts the
setup returned HTTPStatus.OK, then POSTs to /runs/untag with the same
ApiKey.SOME_USER and tag, asserts HTTPStatus.OK, and finally verifies the tag
was removed by calling get_tags(id_=run_id, expdb=expdb_test) and asserting the
tag is not present; reference the existing test_run_untag_fails_if_not_owner,
ApiKey.SOME_USER, /runs/tag, /runs/untag, and get_tags to place the test
consistently with other tag tests.

Comment on lines +15 to +39
@router.post(path="/tag")
def tag_run(
run_id: Annotated[int, Body()],
tag: Annotated[str, SystemString64],
user: Annotated[User | None, Depends(fetch_user)] = None,
expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
if user is None:
raise HTTPException(
status_code=HTTPStatus.PRECONDITION_FAILED,
detail={"code": "103", "message": "Authentication failed"},
)
tags = database.runs.get_tags(run_id, expdb)
if tag.casefold() in [t.casefold() for t in tags]:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={run_id}; tag={tag}",
},
)
database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb)
tags = database.runs.get_tags(run_id, expdb)
return {"run_tag": {"id": str(run_id), "tag": tags}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

tag_run does not validate that run_id exists before inserting a tag.

No existence check is performed before writing to the tag table. A non-existent run_id will either silently create an orphaned row (no FK constraint) or produce an unhandled DB exception. The same major issue was already identified for tag_flow; tag_task carries it too.

🛡️ Proposed fix
     if user is None:
         raise HTTPException(
             status_code=HTTPStatus.PRECONDITION_FAILED,
             detail={"code": "103", "message": "Authentication failed"},
         )
+    if database.runs.get(run_id, expdb) is None:
+        raise HTTPException(
+            status_code=HTTPStatus.NOT_FOUND,
+            detail={"code": "...", "message": "Unknown run."},
+        )
     tags = database.runs.get_tags(run_id, expdb)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/runs.py` around lines 15 - 39, Check that the run exists
before inserting a tag: in tag_run, call an existence/read helper (e.g.,
database.runs.get or database.runs.get_by_id) using run_id and expdb before
calling database.runs.tag; if the helper returns no row, raise an HTTPException
(HTTPStatus.NOT_FOUND) with an appropriate detail payload (e.g.,
{"code":"404","message":"Run not found", "additional_information":
f"id={run_id}"}). Do the same existence check path before the duplicate-tag
check so you don't query tags for a missing run, and mirror this pattern for
tag_flow and tag_task (use the same database.runs.* helper names shown in this
diff to locate where to add the check).

Comment on lines +154 to +178
@router.post(path="/tag")
def tag_task(
task_id: Annotated[int, Body()],
tag: Annotated[str, SystemString64],
user: Annotated[User | None, Depends(fetch_user)] = None,
expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
if user is None:
raise HTTPException(
status_code=HTTPStatus.PRECONDITION_FAILED,
detail={"code": "103", "message": "Authentication failed"},
)
tags = database.tasks.get_tags(task_id, expdb)
if tag.casefold() in [t.casefold() for t in tags]:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={task_id}; tag={tag}",
},
)
database.tasks.tag(task_id, tag, user_id=user.user_id, connection=expdb)
tags = database.tasks.get_tags(task_id, expdb)
return {"task_tag": {"id": str(task_id), "tag": tags}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

tag_task silently creates orphaned rows when task_id does not exist.

Unlike get_task (Line 222), tag_task performs no existence check before writing to the tag table. If the caller supplies a non-existent task_id, the insert either silently produces an orphaned row (no FK constraint) or surfaces an unhandled DB exception rather than a clean 404. The same pattern was flagged as a major issue for tag_flow.

🛡️ Proposed fix
     if user is None:
         raise HTTPException(
             status_code=HTTPStatus.PRECONDITION_FAILED,
             detail={"code": "103", "message": "Authentication failed"},
         )
+    if database.tasks.get(task_id, expdb) is None:
+        raise HTTPException(
+            status_code=HTTPStatus.NOT_FOUND,
+            detail={"code": "370", "message": "Unknown task."},
+        )
     tags = database.tasks.get_tags(task_id, expdb)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/tasks.py` around lines 154 - 178, tag_task currently
writes tags without verifying the task exists, which can create orphan rows or
leak DB errors; before calling database.tasks.tag, call the task-existence check
used elsewhere (e.g., the same lookup used by get_task or a
database.tasks.get/get_by_id with the provided expdb connection) and if not
found raise an HTTPException 404 with the same response shape used by get_task;
only proceed to call database.tasks.tag and then database.tasks.get_tags when
the task exists.

@YassinNouh21
Copy link
Author

Hi @PGijsbers are u gonna look into this ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

POST /task/tag

1 participant