Worker side

One Axon serves every shape.

The worker has no idea which shape the caller will use \u2014 it just declares a capability and replies with AGENT_OUTPUT. The shape lives entirely on the orchestrator side.

worker.py
# worker.py - one Axon that streams PLAN / THOUGHT_DELTA / AGENT_OUTPUT
import asyncio
from cosmonapse import Axon, Dendrite, SignalType, connect_synapse

async def planner(input, context):
    # Inside a real Neuron you would stream chunks via emit_thought_delta.
    # Here we just return a final answer so the example stays self-contained.
    return {"plan": ["step-1", "step-2"]}

async def main():
    synapse = await connect_synapse("cosmo://127.0.0.1:7070")
    worker = Dendrite(synapse=synapse, namespace="demo", role="worker")
    worker.attach_axon(Axon(
        neuron_id="planner", neuron_fn=planner,
        capabilities=["plan"],
    ))
    async with worker:
        await asyncio.sleep(float("inf"))

asyncio.run(main())
Shape 1

await pw.wait() \u2014 sequential.

The familiar request/reply shape. Blocks until the first AGENT_OUTPUT, CLARIFICATION, ERROR, or FINAL arrives. Pathway auto-closes when the call returns. Use this when the orchestrator just needs the answer.

wait.py
# Shape 1 - sequential / request-reply.
# Block until the first terminal Signal arrives, then close the Pathway.
from cosmonapse import Dendrite, connect_synapse

async def main():
    synapse = await connect_synapse("cosmo://127.0.0.1:7070")
    orch = Dendrite(synapse=synapse, namespace="demo")

    async with orch:
        sig = await orch.dispatch_and_wait(
            capabilities=["plan"],
            input={"goal": "ship feature X"},
            timeout_s=5.0,
        )
        print(sig.payload["output"])
    await synapse.close()
Shape 2

@pw.on(SignalType.X) \u2014 reactive.

Trace-scoped callbacks. Each callback fires for matching Signals on this trace, not the whole namespace. Useful for streaming (THOUGHT_DELTA), structured cognition (PLAN, TOOL_CALL), or any case where the orchestrator wants to react to intermediate events.

on.py
# Shape 2 - reactive / trace-scoped callbacks.
# Pathway.on(SignalType.X) registers a handler that fires for each matching
# Signal on this trace - not the whole namespace.
from cosmonapse import Dendrite, SignalType, connect_synapse

async def main():
    synapse = await connect_synapse("cosmo://127.0.0.1:7070")
    orch = Dendrite(synapse=synapse, namespace="demo")

    async with orch:
        pw = await orch.dispatch_and_subscribe(
            capabilities=["plan"], input={"goal": "..."},
        )

        @pw.on(SignalType.THOUGHT_DELTA)
        async def stream(s): print(s.payload["delta"], end="")

        @pw.on(SignalType.PLAN)
        async def on_plan(s): print("\nPLAN:", s.payload["steps"])

        @pw.on(SignalType.AGENT_OUTPUT)
        async def done(s):
            print("DONE:", s.payload["output"])
            await pw.close()

        await asyncio.sleep(2.0)   # give callbacks time to fire
Shape 3

async for sig in pw \u2014 streaming.

Walks every Signal on the trace as it arrives. Closes on FINAL / ERROR or when you break out of the loop. The natural shape for log pipes, UI streams, or building a custom workflow walker on top.

iter.py
# Shape 3 - streaming iteration.
# Walk every Signal on the trace as it arrives, until you break or it closes.
from cosmonapse import Dendrite, SignalType, connect_synapse

async def main():
    synapse = await connect_synapse("cosmo://127.0.0.1:7070")
    orch = Dendrite(synapse=synapse, namespace="demo")

    async with orch:
        async with await orch.dispatch(
            capabilities=["plan"], input={"goal": "..."},
        ) as pw:
            async for sig in pw:
                print(sig.type, sig.payload)
                if sig.type is SignalType.AGENT_OUTPUT:
                    break
scope

scope="terminal" \u2014 decentralised.

scope="all" (the default) delivers every PATHWAY_TYPES Signal on the trace. scope="terminal" filters to FINAL / ERROR / CLARIFICATION only \u2014 the decentralised pattern where intermediate orchestration happens peer-to-peer and the Cortex only wakes for events that demand its attention. FINAL and ERROR still auto-close the Pathway regardless of scope.

scope.py
# scope="terminal" - decentralised orchestration.
# The Cortex only wakes for FINAL / ERROR / CLARIFICATION. Intermediate
# AGENT_OUTPUT / PLAN / TOOL_CALL fly past on the bus and are handled
# peer-to-peer by other orchestrators in the same trace.
async with orch:
    pw = await orch.dispatch(
        capabilities=["plan"],
        input={"goal": "..."},
        scope="terminal",
    )
    sig = await pw.wait(timeout_s=60.0)   # only FINAL / ERROR / CLARIFICATION resolves this
observe_pathway

Watch traces you didn't start.

observe_pathway(trace_id) opens a Pathway in observer role. No TASK is emitted; the Pathway just attaches to inbound Signals matching the given trace_id. Useful for monitors, dashboards, or secondary orchestrators that want to follow a workflow without driving it.

observe.py
# observe_pathway - watch a trace another peer started.
# No TASK is emitted; the Pathway just attaches to inbound Signals
# matching the given trace_id.
async with monitor_dendrite:
    pw = await monitor_dendrite.observe_pathway(trace_id="trc_01J...")
    assert pw.role == "observer"
    async for sig in pw:
        log.info("observed", type=sig.type.value, neuron=sig.neuron)