-
-
Notifications
You must be signed in to change notification settings - Fork 42
Add tag/untag endpoints for tasks, flows, and runs #250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
289dcd9
2297b44
c7d717c
59a7927
18e713f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| from sqlalchemy import Connection, Row, text | ||
|
|
||
|
|
||
| def get_tags(id_: int, expdb: Connection) -> list[str]: | ||
| tag_rows = expdb.execute( | ||
| text( | ||
| """ | ||
| SELECT `tag` | ||
| FROM run_tag | ||
| WHERE `id` = :run_id | ||
| """, | ||
| ), | ||
| parameters={"run_id": id_}, | ||
| ) | ||
| return [row.tag for row in tag_rows] | ||
|
|
||
|
|
||
| def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None: | ||
| connection.execute( | ||
| text( | ||
| """ | ||
| INSERT INTO run_tag(`id`, `tag`, `uploader`) | ||
| VALUES (:run_id, :tag, :user_id) | ||
| """, | ||
| ), | ||
| parameters={"run_id": id_, "tag": tag_, "user_id": user_id}, | ||
| ) | ||
|
|
||
|
|
||
| def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None: | ||
| return connection.execute( | ||
| text( | ||
| """ | ||
| SELECT `id`, `tag`, `uploader` | ||
| FROM run_tag | ||
| WHERE `id` = :run_id AND `tag` = :tag | ||
| """, | ||
| ), | ||
| parameters={"run_id": id_, "tag": tag_}, | ||
| ).one_or_none() | ||
|
|
||
|
|
||
| def delete_tag(id_: int, tag_: str, connection: Connection) -> None: | ||
| connection.execute( | ||
| text( | ||
| """ | ||
| DELETE FROM run_tag | ||
| WHERE `id` = :run_id AND `tag` = :tag | ||
| """, | ||
| ), | ||
| parameters={"run_id": id_, "tag": tag_}, | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,3 +104,40 @@ def get_tags(id_: int, expdb: Connection) -> list[str]: | |
| parameters={"task_id": id_}, | ||
| ) | ||
| return [row.tag for row in tag_rows] | ||
|
|
||
|
|
||
| def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider extracting the common tag insert/get/delete logic into shared helpers parameterized by table and id column, then have the task-specific functions delegate to them. You can keep the new For example, introduce a small internal helper that only varies by table name (and optionally id column), then delegate from the public functions: def _insert_tag(
*,
table: str,
id_column: str,
id_: int,
tag_: str,
user_id: int,
connection: Connection,
) -> None:
connection.execute(
text(
f"""
INSERT INTO {table}(`{id_column}`, `tag`, `uploader`)
VALUES (:id, :tag, :user_id)
"""
),
parameters={"id": id_, "tag": tag_, "user_id": user_id},
)
def _get_tag(
*,
table: str,
id_column: str,
id_: int,
tag_: str,
connection: Connection,
) -> Row | None:
return connection.execute(
text(
f"""
SELECT `{id_column}` AS id, `tag`, `uploader`
FROM {table}
WHERE `{id_column}` = :id AND `tag` = :tag
"""
),
parameters={"id": id_, "tag": tag_},
).one_or_none()
def _delete_tag(
*,
table: str,
id_column: str,
id_: int,
tag_: str,
connection: Connection,
) -> None:
connection.execute(
text(
f"""
DELETE FROM {table}
WHERE `{id_column}` = :id AND `tag` = :tag
"""
),
parameters={"id": id_, "tag": tag_},
)Then your task‑specific functions become thin wrappers: def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
_insert_tag(
table="task_tag",
id_column="id",
id_=id_,
tag_=tag_,
user_id=user_id,
connection=connection,
)
def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None:
return _get_tag(
table="task_tag",
id_column="id",
id_=id_,
tag_=tag_,
connection=connection,
)
def delete_tag(id_: int, tag_: str, connection: Connection) -> None:
_delete_tag(
table="task_tag",
id_column="id",
id_=id_,
tag_=tag_,
connection=connection,
)You can then reuse the same |
||
| connection.execute( | ||
| text( | ||
| """ | ||
| INSERT INTO task_tag(`id`, `tag`, `uploader`) | ||
| VALUES (:task_id, :tag, :user_id) | ||
| """, | ||
| ), | ||
| parameters={"task_id": id_, "tag": tag_, "user_id": user_id}, | ||
| ) | ||
|
|
||
|
|
||
| def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None: | ||
| return connection.execute( | ||
| text( | ||
| """ | ||
| SELECT `id`, `tag`, `uploader` | ||
| FROM task_tag | ||
| WHERE `id` = :task_id AND `tag` = :tag | ||
| """, | ||
| ), | ||
| parameters={"task_id": id_, "tag": tag_}, | ||
| ).one_or_none() | ||
|
|
||
|
|
||
| def delete_tag(id_: int, tag_: str, connection: Connection) -> None: | ||
| connection.execute( | ||
| text( | ||
| """ | ||
| DELETE FROM task_tag | ||
| WHERE `id` = :task_id AND `tag` = :tag | ||
| """, | ||
| ), | ||
| parameters={"task_id": id_, "tag": tag_}, | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,17 +1,82 @@ | ||
| from http import HTTPStatus | ||
| from typing import Annotated, Literal | ||
| from typing import Annotated, Any, Literal | ||
|
|
||
| from fastapi import APIRouter, Depends, HTTPException | ||
| from fastapi import APIRouter, Body, Depends, HTTPException | ||
| from sqlalchemy import Connection | ||
|
|
||
| import database.flows | ||
| from core.conversions import _str_to_num | ||
| from routers.dependencies import expdb_connection | ||
| from database.users import User, UserGroup | ||
| from routers.dependencies import expdb_connection, fetch_user | ||
| from routers.types import SystemString64 | ||
| from schemas.flows import Flow, Parameter, Subflow | ||
|
|
||
| router = APIRouter(prefix="/flows", tags=["flows"]) | ||
|
|
||
|
|
||
| @router.post(path="/tag") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider extracting the repeated auth and tag/untag logic into reusable helper functions so the individual endpoints just supply resource-specific behavior and response keys. You can cut down duplication and make future tag changes safer by extracting the shared behavior into small helpers, then wiring them up in each router. For example, the auth check and error payloads are identical across tag/untag endpoints and across routers: def _require_user(user: User | None) -> User:
if user is None:
raise HTTPException(
status_code=HTTPStatus.PRECONDITION_FAILED,
detail={"code": "103", "message": "Authentication failed"},
)
return userThen in your endpoints: @router.post("/tag")
def tag_flow(
flow_id: Annotated[int, Body()],
tag: Annotated[str, SystemString64],
user: Annotated[User | None, Depends(fetch_user)] = None,
expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
user = _require_user(user)
tags = database.flows.get_tags(flow_id, expdb)
if tag.casefold() in (t.casefold() for t in tags):
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={flow_id}; tag={tag}",
},
)
database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
return {"flow_tag": {"id": str(flow_id), "tag": [*tags, tag]}}You can also encapsulate the generic tag/untag logic so tasks/runs/flows only provide resource-specific functions and the response key: # tagging_helpers.py
from collections.abc import Callable
def make_tag_endpoint(
get_tags: Callable[[int, Connection], list[str]],
tag_fn: Callable[..., None],
response_key: str,
):
def endpoint(
entity_id: Annotated[int, Body()],
tag: Annotated[str, SystemString64],
user: Annotated[User | None, Depends(fetch_user)] = None,
expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
user = _require_user(user)
tags = get_tags(entity_id, expdb)
if tag.casefold() in (t.casefold() for t in tags):
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={entity_id}; tag={tag}",
},
)
tag_fn(entity_id, tag, user_id=user.user_id, connection=expdb)
return {response_key: {"id": str(entity_id), "tag": [*tags, tag]}}
return endpointThen in the flows router: from tagging_helpers import make_tag_endpoint
tag_flow = router.post("/tag")(
make_tag_endpoint(
get_tags=database.flows.get_tags,
tag_fn=database.flows.tag,
response_key="flow_tag",
)
)A similar |
||
| def tag_flow( | ||
| flow_id: Annotated[int, Body()], | ||
| tag: Annotated[str, SystemString64], | ||
| user: Annotated[User | None, Depends(fetch_user)] = None, | ||
| expdb: Annotated[Connection, Depends(expdb_connection)] = None, | ||
| ) -> dict[str, dict[str, Any]]: | ||
| if user is None: | ||
| raise HTTPException( | ||
| status_code=HTTPStatus.PRECONDITION_FAILED, | ||
| detail={"code": "103", "message": "Authentication failed"}, | ||
| ) | ||
| tags = database.flows.get_tags(flow_id, expdb) | ||
| if tag.casefold() in [t.casefold() for t in tags]: | ||
| raise HTTPException( | ||
| status_code=HTTPStatus.INTERNAL_SERVER_ERROR, | ||
| detail={ | ||
| "code": "473", | ||
| "message": "Entity already tagged by this tag.", | ||
| "additional_information": f"id={flow_id}; tag={tag}", | ||
| }, | ||
| ) | ||
| database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb) | ||
|
Comment on lines
+29
to
+39
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # Search for implementation_tag table definition and constraints
find . -type f \( -name "*.sql" -o -name "*.py" \) | head -30Repository: openml/server-api Length of output: 898 🏁 Script executed: # Look for database models and schema definitions
rg "implementation_tag" -A 5 -B 2 --max-count=20Repository: openml/server-api Length of output: 1655 🏁 Script executed: # Find the database.flows module to examine tag() and get_tags() implementations
fd "database" --type f -e py | head -20Repository: openml/server-api Length of output: 43 🏁 Script executed: # Check the SQL schema files for implementation_tag constraints
cat ./docker/mysql/data/openml.sql | grep -A 10 "implementation_tag"Repository: openml/server-api Length of output: 43 🏁 Script executed: # Also check the _expdb schema
cat ./docker/mysql/data/openml_expdb.sql | grep -A 10 "implementation_tag"Repository: openml/server-api Length of output: 43 🏁 Script executed: # Read the full flows.py database module to see tag() and get_tags() functions
cat -n src/database/flows.pyRepository: openml/server-api Length of output: 3895 🏁 Script executed: # Try to read schema files with head/tail to see structure
head -100 ./docker/mysql/data/openml.sql | grep -A 15 "implementation_tag"Repository: openml/server-api Length of output: 43 🏁 Script executed: # Read the router flows.py file to see the full context of lines 29-39
cat -n src/routers/openml/flows.py | head -60Repository: openml/server-api Length of output: 2676 🏁 Script executed: # Search for implementation_tag table creation in SQL files
fd -e sql -x grep -l "implementation_tag" {}Repository: openml/server-api Length of output: 43 🏁 Script executed: # Try to read the SQL files line by line
wc -l ./docker/mysql/data/openml.sql ./docker/mysql/data/openml_expdb.sqlRepository: openml/server-api Length of output: 143 🏁 Script executed: # Search for UNIQUE constraint patterns in SQL files
rg "UNIQUE.*implementation_tag|implementation_tag.*UNIQUE" --include="*.sql"Repository: openml/server-api Length of output: 502 🏁 Script executed: # Check for PRIMARY KEY or UNIQUE constraints on (id, tag)
rg -A 20 "CREATE TABLE.*implementation_tag" --include="*.sql"Repository: openml/server-api Length of output: 502 🏁 Script executed: # Read the SQL files directly to find implementation_tag table definition
cat ./docker/mysql/data/openml.sql | head -5
cat ./docker/mysql/data/openml_expdb.sql | head -5Repository: openml/server-api Length of output: 158 🏁 Script executed: # Search for implementation_tag CREATE TABLE in both files
grep -n "implementation_tag" ./docker/mysql/data/openml.sql ./docker/mysql/data/openml_expdb.sql | head -20Repository: openml/server-api Length of output: 43 🏁 Script executed: # Try using sed or awk to extract lines around implementation_tag
sed -n '/implementation_tag/,/;/p' ./docker/mysql/data/openml.sql | head -30
sed -n '/implementation_tag/,/;/p' ./docker/mysql/data/openml_expdb.sql | head -30Repository: openml/server-api Length of output: 43 🏁 Script executed: # Search for database migration or setup files
find . -type f \( -name "*migration*" -o -name "*schema*" -o -name "*setup*" \) | grep -E "\.(py|sql)$"Repository: openml/server-api Length of output: 380 🏁 Script executed: # Search for IntegrityError handling in the codebase
rg "IntegrityError" -t pyRepository: openml/server-api Length of output: 43 🏁 Script executed: # Check database/__init__.py and database/setup.py for schema or constraint info
cat -n src/database/setup.pyRepository: openml/server-api Length of output: 1145 🏁 Script executed: # Check if there's any documentation about the schema in README files
find . -name "README*" -o -name "readme*" | head -5Repository: openml/server-api Length of output: 142 🏁 Script executed: # Search for error handling around database.flows.tag() calls
rg "database\.flows\.tag|\.tag\(" -A 5 -B 5 src/routers/Repository: openml/server-api Length of output: 2753 🏁 Script executed: # Check if there's try/except around tag operations in router
rg "try|except|IntegrityError|SQLAlchemy" -A 3 src/routers/openml/flows.pyRepository: openml/server-api Length of output: 43 🏁 Script executed: # Look for ON DUPLICATE KEY UPDATE or similar patterns in database module
rg "ON DUPLICATE|INSERT IGNORE|INSERT OR" src/database/Repository: openml/server-api Length of output: 43 🏁 Script executed: # Check the actual imports and error handling setup in database/flows.py
head -20 src/database/flows.pyRepository: openml/server-api Length of output: 563 TOCTOU race condition in duplicate-tag check The The Fix: Add a Note: The same pattern exists in 🤖 Prompt for AI Agents |
||
| tags = database.flows.get_tags(flow_id, expdb) | ||
| return {"flow_tag": {"id": str(flow_id), "tag": tags}} | ||
|
|
||
|
|
||
| @router.post(path="/untag") | ||
| def untag_flow( | ||
| flow_id: Annotated[int, Body()], | ||
| tag: Annotated[str, SystemString64], | ||
| user: Annotated[User | None, Depends(fetch_user)] = None, | ||
| expdb: Annotated[Connection, Depends(expdb_connection)] = None, | ||
| ) -> dict[str, dict[str, Any]]: | ||
| if user is None: | ||
| raise HTTPException( | ||
| status_code=HTTPStatus.PRECONDITION_FAILED, | ||
| detail={"code": "103", "message": "Authentication failed"}, | ||
| ) | ||
| existing = database.flows.get_tag(flow_id, tag, expdb) | ||
| if existing is None: | ||
| raise HTTPException( | ||
| status_code=HTTPStatus.INTERNAL_SERVER_ERROR, | ||
| detail={ | ||
| "code": "477", | ||
| "message": "Tag not found.", | ||
| "additional_information": f"id={flow_id}; tag={tag}", | ||
| }, | ||
| ) | ||
| if existing.uploader != user.user_id and UserGroup.ADMIN not in user.groups: | ||
| raise HTTPException( | ||
| status_code=HTTPStatus.INTERNAL_SERVER_ERROR, | ||
| detail={ | ||
| "code": "478", | ||
| "message": "Tag is not owned by you.", | ||
| "additional_information": f"id={flow_id}; tag={tag}", | ||
| }, | ||
| ) | ||
| database.flows.delete_tag(flow_id, tag, expdb) | ||
| tags = database.flows.get_tags(flow_id, expdb) | ||
| return {"flow_tag": {"id": str(flow_id), "tag": tags}} | ||
|
|
||
|
|
||
| @router.get("/exists/{name}/{external_version}") | ||
| def flow_exists( | ||
| name: str, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| from http import HTTPStatus | ||
| from typing import Annotated, Any | ||
|
|
||
| from fastapi import APIRouter, Body, Depends, HTTPException | ||
| from sqlalchemy import Connection | ||
|
|
||
| import database.runs | ||
| from database.users import User, UserGroup | ||
| from routers.dependencies import expdb_connection, fetch_user | ||
| from routers.types import SystemString64 | ||
|
|
||
| router = APIRouter(prefix="/runs", tags=["runs"]) | ||
|
|
||
|
|
||
| @router.post(path="/tag") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider extracting the repeated auth and tagging checks into shared helper functions so the router endpoints only handle wiring to the database layer. You can reduce the complexity here by extracting the duplicated tagging logic into shared helpers, and keeping the router functions as thin wiring to the DB layer. For example, centralize the auth + duplicate‑check + error construction in a small tagging helper/service: # tagging_service.py
from http import HTTPStatus
from fastapi import HTTPException
from database.users import User, UserGroup
def require_authenticated(user: User | None) -> User:
if user is None:
raise HTTPException(
status_code=HTTPStatus.PRECONDITION_FAILED,
detail={"code": "103", "message": "Authentication failed"},
)
return user
def ensure_not_tagged(tags: list[str], tag: str, entity_id: int, entity_name: str) -> None:
if tag.casefold() in [t.casefold() for t in tags]:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": f"{entity_name} already tagged by this tag.",
"additional_information": f"id={entity_id}; tag={tag}",
},
)
def ensure_tag_exists(existing, entity_id: int, tag: str, entity_name: str) -> None:
if existing is None:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "477",
"message": "Tag not found.",
"additional_information": f"id={entity_id}; tag={tag}",
},
)
def ensure_tag_owner_or_admin(existing, user: User, entity_id: int, tag: str) -> None:
if existing.uploader != user.user_id and UserGroup.ADMIN not in user.groups:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "478",
"message": "Tag is not owned by you.",
"additional_information": f"id={entity_id}; tag={tag}",
},
)Then your new router becomes mostly plumbing, similar to flows/tasks routers: from fastapi import APIRouter, Body, Depends
from sqlalchemy import Connection
import database.runs
from database.users import User
from routers.dependencies import expdb_connection, fetch_user
from routers.types import SystemString64
from tagging_service import (
require_authenticated,
ensure_not_tagged,
ensure_tag_exists,
ensure_tag_owner_or_admin,
)
router = APIRouter(prefix="/runs", tags=["runs"])
@router.post(path="/tag")
def tag_run(
run_id: int = Body(),
tag: str = SystemString64,
user: User | None = Depends(fetch_user),
expdb: Connection = Depends(expdb_connection),
) -> dict[str, dict[str, Any]]:
user = require_authenticated(user)
tags = database.runs.get_tags(run_id, expdb)
ensure_not_tagged(tags, tag, run_id, "Entity")
database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb)
return {"run_tag": {"id": str(run_id), "tag": [*tags, tag]}}
@router.post(path="/untag")
def untag_run(
run_id: int = Body(),
tag: str = SystemString64,
user: User | None = Depends(fetch_user),
expdb: Connection = Depends(expdb_connection),
) -> dict[str, dict[str, Any]]:
user = require_authenticated(user)
existing = database.runs.get_tag(run_id, tag, expdb)
ensure_tag_exists(existing, run_id, tag, "Entity")
ensure_tag_owner_or_admin(existing, user, run_id, tag)
database.runs.delete_tag(run_id, tag, expdb)
tags = database.runs.get_tags(run_id, expdb)
return {"run_tag": {"id": str(run_id), "tag": tags}}You can reuse the same |
||
| def tag_run( | ||
| run_id: Annotated[int, Body()], | ||
| tag: Annotated[str, SystemString64], | ||
| user: Annotated[User | None, Depends(fetch_user)] = None, | ||
| expdb: Annotated[Connection, Depends(expdb_connection)] = None, | ||
| ) -> dict[str, dict[str, Any]]: | ||
| if user is None: | ||
| raise HTTPException( | ||
| status_code=HTTPStatus.PRECONDITION_FAILED, | ||
| detail={"code": "103", "message": "Authentication failed"}, | ||
| ) | ||
| tags = database.runs.get_tags(run_id, expdb) | ||
| if tag.casefold() in [t.casefold() for t in tags]: | ||
| raise HTTPException( | ||
| status_code=HTTPStatus.INTERNAL_SERVER_ERROR, | ||
| detail={ | ||
| "code": "473", | ||
| "message": "Entity already tagged by this tag.", | ||
| "additional_information": f"id={run_id}; tag={tag}", | ||
| }, | ||
| ) | ||
| database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb) | ||
| tags = database.runs.get_tags(run_id, expdb) | ||
| return {"run_tag": {"id": str(run_id), "tag": tags}} | ||
|
Comment on lines
+15
to
+39
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No existence check is performed before writing to the tag table. A non-existent 🛡️ Proposed fix if user is None:
raise HTTPException(
status_code=HTTPStatus.PRECONDITION_FAILED,
detail={"code": "103", "message": "Authentication failed"},
)
+ if database.runs.get(run_id, expdb) is None:
+ raise HTTPException(
+ status_code=HTTPStatus.NOT_FOUND,
+ detail={"code": "...", "message": "Unknown run."},
+ )
tags = database.runs.get_tags(run_id, expdb)🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| @router.post(path="/untag") | ||
| def untag_run( | ||
| run_id: Annotated[int, Body()], | ||
| tag: Annotated[str, SystemString64], | ||
| user: Annotated[User | None, Depends(fetch_user)] = None, | ||
| expdb: Annotated[Connection, Depends(expdb_connection)] = None, | ||
| ) -> dict[str, dict[str, Any]]: | ||
| if user is None: | ||
| raise HTTPException( | ||
| status_code=HTTPStatus.PRECONDITION_FAILED, | ||
| detail={"code": "103", "message": "Authentication failed"}, | ||
| ) | ||
| existing = database.runs.get_tag(run_id, tag, expdb) | ||
| if existing is None: | ||
| raise HTTPException( | ||
| status_code=HTTPStatus.INTERNAL_SERVER_ERROR, | ||
| detail={ | ||
| "code": "477", | ||
| "message": "Tag not found.", | ||
| "additional_information": f"id={run_id}; tag={tag}", | ||
| }, | ||
| ) | ||
| if existing.uploader != user.user_id and UserGroup.ADMIN not in user.groups: | ||
| raise HTTPException( | ||
| status_code=HTTPStatus.INTERNAL_SERVER_ERROR, | ||
| detail={ | ||
| "code": "478", | ||
| "message": "Tag is not owned by you.", | ||
| "additional_information": f"id={run_id}; tag={tag}", | ||
| }, | ||
| ) | ||
| database.runs.delete_tag(run_id, tag, expdb) | ||
| tags = database.runs.get_tags(run_id, expdb) | ||
| return {"run_tag": {"id": str(run_id), "tag": tags}} | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider extracting the common tagging SQL into shared internal helper functions and have the specific tag/get_tag/delete_tag wrappers delegate to them to avoid duplication.
You can reduce repetition by extracting the shared tagging logic into a small internal helper and having the flow/task/run-specific functions delegate to it. That keeps behavior in one place while preserving the public API.
For example, in your data layer module:
Then your flow-specific functions become tiny wrappers:
You can apply the same pattern to the existing task/run tagging functions by just changing
table/id_column. This centralizes the behavior so it’s less likely to diverge over time while keeping the public interface unchanged.