Tasks & Workflows¶
This page covers two related runtime layers:
- Tasks (
miminions.task) — bind a Minion to a unit of work and run many of them concurrently withasyncio.TaskGroup. - Workflow tracing (
miminions.workflow) — small, serializable records that capture what an agent did (its turns and tool calls), persisted by themiminions executionCLI.
Import from the subpackages
The top-level import miminions re-exports nothing. Import tasks from miminions.task, and workflow trace models from their submodules (miminions.workflow has no package-level exports).
Part 1 — Tasks¶
A Task is a dataclass describing a unit of work (id, name, description, status, priority, timing). An AgentTask extends it by binding an agent plus the args/kwargs to pass to that agent's run(). The TaskRuntime collects AgentTasks and executes them concurrently.
Python 3.12+
TaskRuntime.run() uses asyncio.TaskGroup, which requires Python 3.11+ — already satisfied by MiMinions' >= 3.12 floor.
Quick Start¶
import asyncio
from miminions.agent import create_minion
from miminions.task import TaskRuntime, AgentTask
async def main():
runtime = TaskRuntime()
# Bind agents to tasks. Each AgentTask.agent must expose an async run().
# create_minion(...) returns a Minion, which does.
runtime.add_task(AgentTask(
name="haiku",
agent=create_minion("Poet", provider="test"),
args=["Write a haiku about minions."],
))
runtime.add_task(AgentTask(
name="summary",
agent=create_minion("Summarizer", provider="test"),
args=["Summarize what an agentic framework is."],
))
# Runs every task concurrently in one TaskGroup.
results = await runtime.run()
# The dict is keyed by task.id (a uuid1), not by the name you set.
for task_id, info in results.items():
print(task_id, info["status"], info["result"])
asyncio.run(main())
run() returns a dict keyed by task.id (the random uuid1 id, not the name you set); each value is {"status": TaskStatus, "result": <agent output>}. Both fields are also written back onto the stored AgentTask (.status, .result). To recover a task's name from its id, look it up via runtime.get_tasks()[task_id].name.
In-code docstring is inaccurate
TaskRuntime.run()'s own docstring says it returns "task names as keys" — that wording is wrong. The code builds the dict from self.tasks.items(), which is keyed by task.id.
Synchronous entry point
From non-async code, call runtime.run_sync(). It spins up a fresh event loop, runs run(), and returns the same result dict.
AgentTask fields¶
AgentTask is a @dataclass extending Task. Sensible defaults mean you usually only set agent and args.
| Field | Type | Default | Notes |
|---|---|---|---|
id |
str |
random uuid1() |
Unique task id; used as the key in TaskRuntime.tasks. |
name |
str |
random name | Human-readable label only — not the key in the results dict (the id is). |
description |
str |
random sentence | Free-text description. |
status |
TaskStatus |
PENDING |
Updated by the runtime as the task progresses. |
priority |
TaskPriority |
MEDIUM |
Advisory priority level (see below). |
start_time / end_time |
datetime \| None |
None |
Not auto-populated by the runtime. |
agent |
Agent |
None |
The bound agent. Must expose an awaitable run(). |
args |
list |
[] |
Positional args forwarded to agent.run(*args, ...). |
kwargs |
dict |
{} |
Keyword args forwarded to agent.run(..., **kwargs). |
max_turns |
int |
5 |
Not currently applied — see limitations. |
call_back |
callable \| None |
None |
Stored on the task; not invoked by the current runtime. |
result |
AgentRunResult |
None |
Populated with the agent's output after run(). |
Status & priority¶
| Value | Meaning |
|---|---|
PENDING |
Created, waiting. |
INITIALIZED |
Runtime initialized. |
IDLE |
Ready to execute. |
RUNNING |
Currently executing. |
IN_PROGRESS |
Alias kept for CLI compatibility. |
PAUSED |
Execution paused. |
COMPLETED |
Finished successfully. |
FAILED |
Raised an exception during the run. |
CANCELLED |
Cancelled. |
| Value |
|---|
LOW |
MEDIUM |
HIGH |
CRITICAL |
Managing the task set¶
TaskRuntime keeps tasks in a dict keyed by task.id.
runtime = TaskRuntime()
task = AgentTask(name="job", agent=create_minion("A", provider="test"), args=["hi"])
runtime.add_task(task)
runtime.get_tasks() # -> {task.id: AgentTask, ...}
runtime.filter_tasks("status", TaskStatus.PENDING) # -> list of matching tasks
runtime.update_task(task.id, priority=TaskPriority.HIGH)
await runtime.get_task_status(task.id) # -> TaskStatus (or {id: status} if no id given)
runtime.clear_tasks()
Limitations — read before relying on this
- Failures are swallowed. When a bound agent raises, the task's status is set to
TaskStatus.FAILED, but the exception itself is not re-raised or surfaced in the result dict (resultstays whatever it was, usuallyNone). Inspectstatus == TaskStatus.FAILEDto detect failures; the underlying error is not reported. max_turnsis not enforced.AgentTask.max_turnsis stored but the current runtime does not pass it to the agent or cap turns.call_backis not invoked. It is stored on the task but the runtime never calls it.- No per-task timing.
start_time/end_timeare not auto-populated.
Part 2 — Workflow tracing¶
The miminions.workflow models are lightweight, JSON-serializable records that describe what an agent run did: each agent turn and each tool call, in order. They are the schema behind the miminions execution interaction log.
No package-level exports
import miminions.workflow exposes nothing. Import the concrete symbols from the submodules:
The records¶
| Type | Captures |
|---|---|
AgentRunRecord |
One agent turn — prompt in, output out (plus id, created_at). |
ToolCallRecord |
One tool invocation — tool_name, args, kwargs, result, error, status, execution_time_ms, order, timestamp. |
WorkflowTrace |
An ordered list of agent and tool records — the single source of truth for sequence. |
WorkflowRun |
A completed run: agent_name + a WorkflowTrace (plus id, created_at, schema_version). |
Every record has to_dict() / from_dict() for round-tripping to JSON.
Building a trace by hand¶
from miminions.workflow.models import WorkflowTrace, WorkflowRun, AgentRunRecord
trace = WorkflowTrace()
trace.add_agent_record(AgentRunRecord(prompt="Add 2 and 3", output="5"))
trace.add_tool_record(
tool_name="add",
kwargs={"a": 2, "b": 3},
result=5,
status="success",
execution_time_ms=0.4,
)
# Aggregate helpers
print(trace.tool_usage_counts()) # {'add': 1}
print(trace.most_used_tool()) # 'add'
run = WorkflowRun(agent_name="Calculator", trace=trace)
serialized = run.to_dict() # JSON-ready dict
restored = WorkflowRun.from_dict(serialized)
WorkflowController — trace a Minion's tool calls¶
WorkflowController bridges a Minion and a trace. You create the AgentRunRecord and WorkflowTrace, hand them to the controller, and every execute(...) it proxies to the agent is recorded into the trace automatically.
from miminions.agent import create_minion
from miminions.workflow.models import AgentRunRecord, WorkflowTrace
from miminions.workflow.controller import WorkflowController
agent = create_minion("Calculator", provider="test")
agent.register_tool("add", "Add two numbers", lambda a, b: a + b)
record = AgentRunRecord(prompt="Add 6 and 7")
trace = WorkflowTrace()
controller = WorkflowController(agent, agent_record=record, trace=trace)
# Proxies to agent.execute(...) and appends a ToolCallRecord to the trace
result = controller.execute("add", arguments={"a": 6, "b": 7})
run = controller.finish_run(output="13") # -> WorkflowRun
Note
The controller only appends records — it never starts a run on its own. It uses the agent's execute / execute_async (which return ToolExecutionResult and never raise), so a failing tool is recorded with an error and status rather than crashing the trace. There is also an async await controller.execute_async(...).
How workflows are driven today¶
miminions workflow is planned, not enabled
A workflow CLI command group exists in the codebase but is not registered with the CLI ("not yet implemented") — miminions workflow ... does not run today. Do not script against it yet.
In the current release, workflow orchestration is driven through:
- The
TaskRuntimeAPI (above) for running agent-bound work concurrently in Python. - The
miminions executioncommands, which persist each tool run as aWorkflowRun(AgentRunRecord+ToolCallRecord) under~/.miminions/interactions.json.execution interaction list/showread those traces back.
So the trace models on this page are real and in use — just consumed via execution and TaskRuntime, not a workflow command.
API Reference¶
TaskRuntime¶
| Method | Description |
|---|---|
add_task(task: AgentTask) |
Add a task (keyed by task.id). |
get_tasks() -> dict[str, AgentTask] |
All tasks by id. |
filter_tasks(attribute, value) -> list |
Tasks whose attribute equals value. |
update_task(task_id, **attrs) |
Set attributes on a task; raises ValueError if id missing. |
clear_tasks() |
Remove all tasks. |
async run() -> dict |
Run all tasks concurrently; returns {task_id: {"status", "result"}} (keyed by task.id). |
run_sync() -> dict |
Synchronous wrapper around run() (manages its own event loop). |
async get_task_status(task_id=None) -> TaskStatus |
One task's status, or {id: status} for all when task_id is None. |
WorkflowTrace¶
| Method | Description |
|---|---|
add_agent_record(record) |
Append an AgentRunRecord. |
add_tool_record(tool_name, *, args=None, kwargs=None, result=None, error=None, status=None, execution_time_ms=None) -> ToolCallRecord |
Append a tool call and return it. |
tool_usage_counts() -> dict[str, int] |
Count of calls per tool. |
most_used_tool() -> str \| None |
Most-called tool, or None. |
to_dict() / from_dict(data) |
JSON round-trip. |
WorkflowController¶
| Method | Description |
|---|---|
WorkflowController(agent, agent_record, trace, agent_name=None) |
Bind an agent + record + trace; the record is appended to the trace. |
execute(tool_name, arguments=None, **kwargs) -> ToolExecutionResult |
Proxy to agent.execute and record the call. |
async execute_async(tool_name, arguments=None, **kwargs) |
Async variant. |
finish_run(output) -> WorkflowRun |
Set the agent record's output and return a WorkflowRun. |
See Also¶
- Agent — the
MinioneachAgentTaskbinds to - CLI & Chat — the
executioncommands that persistWorkflowRuns - Workspaces — nodes, rules, and on-disk layout