Sinks
Sinks define where agent output goes after a run completes. They are most useful in daemon mode and flow pipelines, where agents run unattended and their results need to be routed somewhere: a webhook, a file, a custom function, or another agent.
Sinks are configured in the spec.sinks list.
Quick Example
spec:
sinks:
- type: webhook
url: https://hooks.slack.com/services/T.../B.../xxx
headers:
Content-Type: application/json
- type: file
path: ./output/results.json
format: jsonSink Types
| Type | Description |
|---|---|
webhook | HTTP POST to a URL |
file | Write to a local file |
custom | Call a Python function |
Webhook
Sends a JSON payload to a URL via HTTP POST. Useful for Slack, Discord, PagerDuty, or any HTTP endpoint.
sinks:
- type: webhook
url: https://hooks.slack.com/services/T.../B.../xxx
headers:
Content-Type: application/json
Authorization: Bearer ${WEBHOOK_TOKEN}
timeout_seconds: 30
retry_count: 3| Field | Type | Default | Description |
|---|---|---|---|
url | str | (required) | Destination URL |
method | str | "POST" | HTTP method |
headers | dict | {} | HTTP headers (supports ${VAR} substitution) |
timeout_seconds | int | 30 | Request timeout |
retry_count | int | 0 | Number of retry attempts on failure |
Payload Format
The webhook POST body is a JSON object:
{
"agent_name": "monitor-agent",
"run_id": "a1b2c3d4e5f6",
"prompt": "Check system health and report status.",
"output": "All 3 services healthy. Response times: api=120ms, web=85ms, db=45ms.",
"success": true,
"error": null,
"tokens_in": 850,
"tokens_out": 400,
"duration_ms": 4200,
"model": "gpt-5-mini",
"provider": "openai",
"trigger_type": "cron",
"trigger_metadata": {},
"timestamp": "2025-01-15T09:00:05Z"
}File
Appends agent output to a local file. Parent directories are created if they do not exist. Supports JSON and plain text formats.
sinks:
- type: file
path: ./output/results.json
format: json| Field | Type | Default | Description |
|---|---|---|---|
path | str | (required) | Output file path |
format | str | "json" | Output format: "json" or "text" |
jsonappends one JSON object per line (JSONL), same schema as the webhook payloadtextappends one human-readable line per result:[timestamp] agent-name | OK | output
Custom
Calls a Python function with the run result. Use this for custom integrations like database writes, email, message queues, or anything else.
sinks:
- type: custom
module: my_sinks
function: send_to_database| Field | Type | Default | Description |
|---|---|---|---|
module | str | (required) | Python module path (must be importable) |
function | str | (required) | Function name to call |
The function signature:
def send_to_database(result: dict) -> None:
"""Called by InitRunner after each agent run.
Args:
result: Run result dict (same schema as webhook payload).
"""
# ... process resultMultiple Sinks
An agent can have multiple sinks. All sinks fire after each run completes:
spec:
sinks:
# Log to file
- type: file
path: ./logs/runs.json
format: json
# Notify Slack
- type: webhook
url: ${SLACK_WEBHOOK_URL}
# Store in database
- type: custom
module: my_sinks
function: store_resultSinks with Daemon Mode
Sinks are most commonly used with triggers and daemon mode. When a trigger fires and an agent run completes, all configured sinks receive the result:
spec:
triggers:
- type: cron
schedule: "0 */6 * * *"
prompt: "Check system health and report status."
sinks:
- type: webhook
url: ${SLACK_WEBHOOK_URL}
- type: file
path: ./logs/health-checks.json
format: jsoninitrunner run role.yaml --daemonEvery 6 hours, the agent runs, and the output is sent to both Slack and the log file.
Delegate Sink
The delegate sink routes one agent's output to one or more other agents. It is flow-only: you configure it under a flow agent's sink: field (spec.agents.<name>.sink), not in a role's spec.sinks list. Only successful runs are forwarded.
# Single target
spec:
agents:
writer:
role: roles/writer.yaml
sink:
type: delegate
target: editor
# Fan-out to multiple targets
spec:
agents:
triager:
role: roles/triager.yaml
sink:
type: delegate
target:
- researcher
- responder| Field | Type | Default | Description |
|---|---|---|---|
type | str | (required) | Must be "delegate" |
target | str | list[str] | (required) | Target agent name(s) |
strategy | "all" | "keyword" | "sense" | "ensemble" | "all" | Routing strategy for multi-target delegates |
ensemble | EnsembleConfig | null | null | Voting config. Required when strategy is ensemble, rejected otherwise |
loop_back | LoopBackConfig | null | null | Bounded loop-back edge for critic/refine patterns |
keep_existing_sinks | bool | false | When true, the agent's role-level sinks also fire alongside the delegate |
queue_size | int | 100 | Daemon ingress queue capacity (bounded backpressure for trigger-driven runs) |
timeout_seconds | int | 60 | Reserved (kept for schema compatibility) |
For startup ordering, fan-in wiring, and full worked pipelines, see Flow. For routing multiple agents as a coordinated unit, see Team Mode.
Routing Strategy
The strategy field only matters when a delegate has multiple targets. With a single target it has no effect.
| Strategy | Behavior | API calls |
|---|---|---|
all | Fan-out: every target receives every message (default) | None |
keyword | Intent sensing keyword scoring picks one target | None |
sense | Keyword scoring first, LLM tiebreaker when ambiguous | 0 or 1 per message |
ensemble | Fan-out to all targets, then vote and keep one winner | Depends on mode |
The keyword and sense strategies use the two-pass intent sensing logic. See Flow for the full routing walkthrough.
Ensemble Voting
With strategy: ensemble, the same prompt fans out to every target (like all), then a reducer keeps one winning answer that flows downstream as a single result. Ensemble requires at least two targets and an ensemble: block. The ensemble block is rejected for any other strategy.
spec:
agents:
drafter:
role: roles/drafter.yaml
sink:
type: delegate
strategy: ensemble
target:
- gpt
- claude
- gemini
ensemble:
mode: majority| Field | Type | Default | Description |
|---|---|---|---|
mode | "majority" | "weighted" | "judge" | "majority" | How the winning answer is chosen |
judge_model | str | "openai:gpt-4o-mini" | Model used to score candidates when mode is judge |
judge_criteria | list[str] | [] | Criteria the judge checks. Empty list falls back to clarity, completeness, accuracy |
weights | dict[str, float] | null | null | Per-target weight for weighted mode. Keys must be target names, non-negative, not all zero |
The three modes:
majority: the most frequent identical answer wins. Ties break on the lowest topology index, so the result is deterministic. Resolves in-process with no extra API calls.weighted: the highest-weight target wins, with ties breaking on the lowest index. Requires a non-emptyweightsmap. Resolves in-process with no extra API calls.judge: an LLM judge scores each candidate, and the answer passing the most criteria wins (ties break on lowest index). Costs one judge call per candidate.
Weighted mode example:
sink:
type: delegate
strategy: ensemble
target:
- fast-model
- strong-model
ensemble:
mode: weighted
weights:
fast-model: 1.0
strong-model: 2.0Each vote is recorded on the audit chain with trigger_type ensemble_vote and a vote trace. See Audit for the audit details and Flow for fan-in behavior.
Loop-Back Routing
A loop_back edge turns a forward delegation into a bounded refine loop, the classic writer to critic to writer pattern. It is the only cycle a flow permits. Every other cycle is rejected at validation.
spec:
agents:
writer:
role: roles/writer.yaml
sink:
type: delegate
target: critic
critic:
role: roles/critic.yaml
sink:
type: delegate
target: publisher
loop_back:
target: writer
max_iterations: 4
until:
output: "contains:APPROVED"| Field | Type | Default | Description |
|---|---|---|---|
type | str | "loop-back" | Discriminator. Note the hyphenated value loop-back differs from the loop_back field name |
target | str | (required) | Agent the loop returns to. Must be a known agent and must not be one of the sink's forward targets |
max_iterations | int | 3 | Hard cap on loop rounds, bounded 1 to 20 |
until | dict[str, str] | null | null | Optional early-exit predicate. Only the output key is supported |
The until value is one of:
contains:<text>: case-insensitive substring match against the latest output, for example acontains:APPROVEDsentinel.<op><number>: compares the first number parsed from the output, where<op>is one of>,>=,<,<=,==. For example">0.8"for a self-reported confidence score.
The loop stops when max_iterations rounds complete or the until predicate matches the latest output, whichever comes first. The flow depth limit remains a final backstop. See Flow for a full worked loop example.
Validate a flow and inspect its sink summaries with:
initrunner flow validate flow.yamlThe Sink column renders the delegate summary, for example delegate: a, b [ensemble:majority] for an ensemble sink, with a (loop-back: writer x4) note when a loop-back edge is set.