Skip to content

Block Types

Every block in a workflow has a type field that determines its behavior. All blocks share the common fields from BaseBlockDef — this page covers the type-specific fields.

Single LLM call through a soul. The most common block type.

blocks:
research:
type: linear
soul_ref: researcher
FieldTypeDefaultDescription
soul_refstrrequiredSoul ID to use for this block

The soul receives the current workflow state as context and its system_prompt as the system message. The LLM response becomes the block’s output, stored at state.results[block_id].

If the block has exits defined, the soul can use the delegate tool to pick an exit port — see Dispatch & Delegate.

LLM quality gate — evaluates another block’s output and routes on pass/fail.

blocks:
quality_check:
type: gate
soul_ref: reviewer
eval_key: draft_step
pass: publish
fail: revise
FieldTypeDefaultDescription
soul_refstrrequiredSoul ID for the gate evaluator
eval_keystrrequiredBlock ID whose output is being evaluated
extract_fieldstrnoneJSON field to extract from the target block’s output before evaluation
passstrnoneTarget block on pass (shorthand for exit routing)
failstrnoneTarget block on fail (shorthand for exit routing)

The gate soul receives the output of eval_key and makes a pass/fail judgment. If extract_field is set, only that JSON field is extracted before the soul sees it.

pass and fail are shorthand for exit ports — they automatically create two ExitDef entries with IDs "pass" and "fail". Both must be set together or both omitted. When omitted, the gate result is determined by the soul’s output and standard exit conditions.

Runs Python code in a sandboxed environment. The code must define a def main(data) function.

blocks:
transform:
type: code
code: |
import json
def main(data):
raw = data.get("research", "")
parsed = json.loads(raw) if raw.startswith("{") else {"text": raw}
return {"structured": parsed}
timeout_seconds: 15
allowed_imports: [json, re]
FieldTypeDefaultDescription
codestrrequiredPython source code with a def main(data) function
timeout_secondsint30Execution timeout in seconds (overrides base default of 300)
allowed_importsList[str]safe whitelistWhitelist of importable modules. If omitted, defaults to: json, re, math, datetime, collections, itertools, hashlib, base64, time, urllib.parse. Set explicitly to expand or restrict.

The main function receives a data dict containing all upstream block results. It must return a dict — the return value becomes the block’s output.

Iterates inner blocks for multiple rounds with optional break conditions.

blocks:
refine:
type: loop
inner_block_refs: [draft, review]
max_rounds: 3
break_condition:
eval_key: review.verdict
operator: equals
value: approved
carry_context:
enabled: true
mode: last
inject_as: previous_feedback
FieldTypeDefaultDescription
inner_block_refsList[str]requiredBlock IDs to execute each round (min 1)
max_roundsint5Maximum iterations (1–50)
break_conditionConditionDef or ConditionGroupDefnoneCondition to exit the loop early
carry_contextCarryContextConfignonePass context between rounds
break_on_exitstrnoneExit handle that triggers loop break
retry_on_exitstrnoneExit handle that triggers another round

Controls how context flows between loop rounds:

FieldTypeDefaultDescription
enabledbooltrueEnable context carrying
modestr"last""last" (only previous round) or "all" (accumulate all rounds)
source_blocksList[str]noneSpecific blocks to carry from (default: all inner blocks)
inject_asstr"previous_round_context"Key name for injected context

If stateful: true is set on inner blocks, they maintain conversation history across rounds — the soul remembers prior iterations.

Executes a child workflow as a sub-step. Parent-child run linkage, independent error handling.

blocks:
sub_pipeline:
type: workflow
workflow_ref: analysis-pipeline
inputs:
topic: research.output
outputs:
summary: analysis.result
on_error: catch
FieldTypeDefaultDescription
workflow_refstrrequiredID of the child workflow file to execute
inputsDict[str, str]noneParent state key → child state key mapping
outputsDict[str, str]noneParent path → child dotted path mapping
max_depthintnoneMaximum nesting depth limit
on_errorstr"raise""raise" (propagate) or "catch" (absorb error, continue parent)

The child workflow runs as a separate execution with its own run record linked to the parent. The child’s blocks execute independently — they do not see the parent’s state unless explicitly mapped via inputs.

When on_error: catch is set, the parent workflow continues execution even if the child fails.

Parallel branching — each exit port gets its own soul and task instruction. All branches execute concurrently.

blocks:
analyze:
type: dispatch
exits:
- id: sentiment
label: Sentiment Analysis
soul_ref: sentiment_analyst
task: Analyze the sentiment of the input text.
- id: entities
label: Entity Extraction
soul_ref: entity_extractor
task: Extract all named entities from the input.

The dispatch block uses DispatchExitDef (not the standard ExitDef):

FieldTypeDescription
idstrUnique exit port ID
labelstrHuman-readable label
soul_refstrSoul ID for this branch
taskstrTask instruction for this branch’s soul

All branches run concurrently via asyncio.gather. Each branch gets its own budget session for cost isolation. Results are stored per-exit at state.results["{block_id}.{exit_id}"] and combined at state.results[block_id] as a JSON array.

blocks:
step_a:
type: linear
soul_ref: researcher
step_b:
type: linear
soul_ref: writer
depends: step_a
step_c:
type: gate
soul_ref: reviewer
eval_key: step_b
depends: step_b
workflow:
name: Pipeline
entry: step_a
blocks:
draft:
type: linear
soul_ref: writer
stateful: true
review:
type: gate
soul_ref: reviewer
eval_key: draft
pass: done
fail: draft
refine_loop:
type: loop
inner_block_refs: [draft, review]
max_rounds: 3
break_on_exit: pass
carry_context:
enabled: true
mode: last
done:
type: code
code: |
def main(data):
return {"final": data.get("draft", "")}
depends: refine_loop
workflow:
name: Iterative Refinement
entry: refine_loop