A Python task pipeline manager with DAG-based dependency resolution and an interactive TUI. Supports local parallel execution and SLURM/HPC clusters via submitit.
We start with setting up a Pipeline:
from depio.Pipeline import Pipeline
from depio.Executors import ParallelExecutor
defaultpipeline = Pipeline(depioExecutor=ParallelExecutor())To this pipeline object you can now add Tasks. There are two ways how you can add tasks. The first (1) is via decorators and the second (2) is a function interface. Before we consider the differences we start with parts that are similar for both.
To add tasks via decorators you need use the @task("datapipeline") decorator from depio.decorators.task:
import time
import pathlib
from typing import Annotated
from depio.Pipeline import Pipeline
from depio.Executors import ParallelExecutor
from depio.Task import Product, Dependency
from depio.decorators import task
defaultpipeline = Pipeline(depioExecutor=ParallelExecutor())
BLD = pathlib.Path("build")
BLD.mkdir(exist_ok=True)
print("Touching an initial file")
(BLD/"input.txt").touch()
@task("datapipeline")
def slowfunction(output: Annotated[pathlib.Path, Product],
input: Annotated[pathlib.Path, Dependency] = None,
sec:int = 0
):
print(f"A function that is reading from {input} and writing to {output} in {sec} seconds.")
time.sleep(sec)
with open(output,'w') as f:
f.write("Hallo from depio")
defaultpipeline.add_task(slowfunction(BLD/"output1.txt",input=BLD/"input.txt", sec=2))
defaultpipeline.add_task(slowfunction(BLD/"output2.txt",input=BLD/"input.txt", sec=3))
defaultpipeline.add_task(slowfunction(BLD/"final1.txt",BLD/"output1.txt", sec=1))
exit(defaultpipeline.run())First, we add a folder build in which we want to produce our artifacts.
Then, we create an initial artifact build/input.txt via touch.
Thereafter, begins the interesting part:
We define a function slowfunction that takes a couple of seconds to produce an output file from a given input file.
We annotate the function with the @task decorator and use typing.Annotated to tell depio which arguments are dependencies and which are products of the function.
depio will parse this for us and set up the dependencies between the tasks.
Finally, we add the function calls to the pipeline via add_task and run the pipeline.
import time
import pathlib
from typing import Annotated
from depio.Pipeline import Pipeline
from depio.Executors import ParallelExecutor
from depio.Task import Product, Dependency
from depio.Task import Task
defaultpipeline = Pipeline(depioExecutor=ParallelExecutor())
BLD = pathlib.Path("build")
BLD.mkdir(exist_ok=True)
print("Touching an initial file")
(BLD/"input.txt").touch()
def slowfunction(
input: Annotated[pathlib.Path, Dependency],
output: Annotated[pathlib.Path, Product],
sec:int = 0
):
print(f"A function that is reading from {input} and writing to {output} in {sec} seconds.")
time.sleep(sec)
with open(output,'w') as f:
f.write("Hallo from depio")
defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [BLD/"input.txt", BLD/"output1.txt"], {"sec": 2}))
defaultpipeline.add_task(Task("functionaldemo2", slowfunction, [BLD/"output1.txt", BLD/"final1.txt"], {"sec": 1}))
exit(defaultpipeline.run())The main difference is that you have to pass the args and kwargs manually, but you can also overwrite the task name. However you can also define the DAG by yourself:
import time
from depio.Pipeline import Pipeline
from depio.Executors import ParallelExecutor
from depio.Task import Task
defaultpipeline = Pipeline(depioExecutor=ParallelExecutor())
def slowfunction(sec:int = 0):
print(f"A function that is doing something for {sec} seconds.")
time.sleep(sec)
t1 = defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [1]))
t2 = defaultpipeline.add_task(Task("functionaldemo2", slowfunction, [1]))
t3 = defaultpipeline.add_task(Task("functionaldemo3", slowfunction, [1]))
t4 = defaultpipeline.add_task(Task("functionaldemo4", slowfunction, [2], depends_on=[t3]))
t5 = defaultpipeline.add_task(Task("functionaldemo5", slowfunction, [3], depends_on=[t4]))
exit(defaultpipeline.run())Notice how depio deduplicates tasks: if the same function is called with identical arguments, add_task returns the already-registered instance rather than adding a duplicate.
When using the functional interface with hard-coded dependencies (depends_on), always save the return value of add_task and use that object when wiring up downstream tasks.
You just have to replace the executor with a SubmitItExecutor like so:
import os
from typing import Annotated
import pathlib
import time
from depio.Executors import SubmitItExecutor
from depio.Pipeline import Pipeline
from depio.decorators import task
from depio.Task import Product, Dependency
BLD = pathlib.Path("build")
BLD.mkdir(exist_ok=True)
SLURM = pathlib.Path("slurm")
SLURM.mkdir(exist_ok=True)
# Configure the slurm jobs
os.environ["SBATCH_RESERVATION"] = "<your reservation>"
defaultpipeline = Pipeline(depioExecutor=SubmitItExecutor(folder=SLURM))
@task("datapipeline")
def slowfunction(
input: Annotated[pathlib.Path, Dependency],
output: Annotated[pathlib.Path, Product],
sec:int = 0
):
print(f"A function that is reading from {input} and writing to {output} in {sec} seconds.")
time.sleep(sec)
with open(output,'w') as f:
f.write("Hallo from depio")
defaultpipeline.add_task(slowfunction(BLD/"input.txt", BLD/"output1.txt", sec=2))
defaultpipeline.add_task(slowfunction(BLD/"input.txt", BLD/"output2.txt", sec=3))
defaultpipeline.add_task(slowfunction(BLD/"output1.txt", BLD/"final1.txt", sec=1))
exit(defaultpipeline.run())SLURM executor settings (partition, time limit, memory, GPU count, job queue limits) can be configured in .depio/config.json — see the Configuration section below.
Here is how you can use it with hydra:
import os
from typing import Annotated
import pathlib
import time
from omegaconf import DictConfig, OmegaConf
import hydra
from depio.Executors import SubmitItExecutor
from depio.Pipeline import Pipeline
from depio.decorators import task
from depio.Task import Product, Dependency, IgnoredForEq
SLURM = pathlib.Path("slurm")
SLURM.mkdir(exist_ok=True)
CONFIG = pathlib.Path("config")
CONFIG.mkdir(exist_ok=True)
os.environ["SBATCH_RESERVATION"] = "isec-team"
defaultpipeline = Pipeline(depioExecutor=SubmitItExecutor(folder=SLURM))
@task("datapipeline")
def slowfunction(
input: Annotated[pathlib.Path, Dependency],
output: Annotated[pathlib.Path, Product],
cfg: Annotated[DictConfig, IgnoredForEq],
sec:int = 0
):
print(f"A function that is reading from {input} and writing to {output} in {sec} seconds.")
time.sleep(sec)
with open(output,'w') as f:
f.write(OmegaConf.to_yaml(cfg))
@hydra.main(version_base=None, config_path=str(CONFIG), config_name="config")
def my_hydra(cfg: Annotated[DictConfig, IgnoredForEq]) -> None:
BLD = pathlib.Path(cfg["bld_path"])
BLD.mkdir(exist_ok=True)
defaultpipeline.add_task(slowfunction(None, BLD/f"input.txt", cfg, sec=4))
defaultpipeline.add_task(slowfunction(BLD/"input.txt", BLD/f"output_{cfg['attack'].name}.txt", cfg, sec=2))
defaultpipeline.add_task(slowfunction(BLD/f"output_{cfg['attack'].name}.txt", BLD/f"final_{cfg['attack'].name}.txt", cfg, sec=1))
if __name__ == "__main__":
my_hydra()
exit(defaultpipeline.run())Then you can run hydra's multiruns to generate a bunch of tasks:
python demo_hydra.py -m attack=ours,otherattack1,otherattack2Or you can use it for sweeps also.
To use different build modes you can set the buildmode parameter when creating the task:
from depio.BuildMode import BuildMode
@task("datapipeline", buildmode=BuildMode.ALWAYS)
def funcdec(output: Annotated[pathlib.Path, Product]):
with open(output,'w') as f:
f.write("Hallo from depio")There are seven values to choose from:
BuildMode.NEVER— Never run the task; always skip it.BuildMode.IF_MISSING— Run if any product file is missing. Does not check input timestamps or upstream task results.BuildMode.ALWAYS— Always run, unconditionally.BuildMode.IF_NEW— Run if any product is missing, or if any upstream task ran in this pipeline invocation.BuildMode.IF_OLDER— Run if any product is missing, or if any product is older than its path dependencies (make-style timestamp comparison).BuildMode.IF_OLD— Run if any product is missing, or if any product is older than a configurable age threshold (max_age_secondsin.depio/config.json, default 24 h). Can also be set per-task via@task(..., max_age=3600).BuildMode.IF_CODE_CHANGED— Run if any product is missing, or if the task function's source code has changed since the last successful run. Hashes are stored in.depio/task_hashes.json. Enable per-task via@task(..., track_code=True).
In addition, there are flags you can pass to the pipeline:
clear_screen:bool— Clear the screen on each refresh so the TUI stays at the top.hide_successful_terminated_tasks:bool— Hide successfully finished or skipped tasks from the list.submit_only_if_runnable:bool— Only submit tasks that are immediately ready for execution.refreshrate:float— Polling interval in seconds. Can also be set in.depio/config.json.quiet:bool— Disable the TUI entirely; useful for scripted or CI runs.
depio supports callbacks that fire when a task or the whole pipeline finishes:
from depio.hooks import TaskResult, PipelineResult
def on_done(result: TaskResult):
print(f"{result.name} finished with status {result.status}")
pipeline = Pipeline(
depioExecutor=ParallelExecutor(),
on_task_finished=on_done,
)To automatically save each task's stdout/stderr to disk, use the built-in save hook:
from pathlib import Path
pipeline = Pipeline(
depioExecutor=ParallelExecutor(),
on_task_finished=Pipeline.make_save_hook(Path("outputs/")),
)Available callbacks on Pipeline: on_task_finished, on_task_failed, on_pipeline_finished.
Per-task callbacks can also be set directly on Task objects: on_finished, on_task_failed.
On first run, depio creates .depio/config.json with sensible defaults:
{
"pipeline": { "refreshrate": 1.0 },
"task": {
"default_buildmode": "IF_MISSING",
"max_age_seconds": 86400,
"code_hash_method": "source"
},
"executor": {
"parallel": {},
"slurm": {
"max_jobs_pending": 45,
"max_jobs_queued": 20,
"partition": "gpu",
"time_minutes": 2880,
"mem_gb": 32,
"gpus_per_node": 0
}
}
}Edit this file to change defaults for your project without touching any code.
Create an editable install:
pip install -e .Run
pytestSee LICENCE.
See SECURITY.