Pathway \u2014 three shapes, one primitive.
dendrite.dispatch(...) returns a Pathway \u2014 a per-trace event handle that exposes three consumption shapes on the same primitive. The developer picks the shape that fits the workflow; the SDK doesn't force a style.
One Axon serves every shape.
The worker has no idea which shape the caller will use \u2014 it just declares a capability and replies with AGENT_OUTPUT. The shape lives entirely on the orchestrator side.
# worker.py - one Axon that streams PLAN / THOUGHT_DELTA / AGENT_OUTPUT import asyncio from cosmonapse import Axon, Dendrite, SignalType, connect_synapse async def planner(input, context): # Inside a real Neuron you would stream chunks via emit_thought_delta. # Here we just return a final answer so the example stays self-contained. return {"plan": ["step-1", "step-2"]} async def main(): synapse = await connect_synapse("cosmo://127.0.0.1:7070") worker = Dendrite(synapse=synapse, namespace="demo", role="worker") worker.attach_axon(Axon( neuron_id="planner", neuron_fn=planner, capabilities=["plan"], )) async with worker: await asyncio.sleep(float("inf")) asyncio.run(main())
await pw.wait() \u2014 sequential.
The familiar request/reply shape. Blocks until the first AGENT_OUTPUT, CLARIFICATION, ERROR, or FINAL arrives. Pathway auto-closes when the call returns. Use this when the orchestrator just needs the answer.
# Shape 1 - sequential / request-reply. # Block until the first terminal Signal arrives, then close the Pathway. from cosmonapse import Dendrite, connect_synapse async def main(): synapse = await connect_synapse("cosmo://127.0.0.1:7070") orch = Dendrite(synapse=synapse, namespace="demo") async with orch: sig = await orch.dispatch_and_wait( capabilities=["plan"], input={"goal": "ship feature X"}, timeout_s=5.0, ) print(sig.payload["output"]) await synapse.close()
@pw.on(SignalType.X) \u2014 reactive.
Trace-scoped callbacks. Each callback fires for matching Signals on this trace, not the whole namespace. Useful for streaming (THOUGHT_DELTA), structured cognition (PLAN, TOOL_CALL), or any case where the orchestrator wants to react to intermediate events.
# Shape 2 - reactive / trace-scoped callbacks. # Pathway.on(SignalType.X) registers a handler that fires for each matching # Signal on this trace - not the whole namespace. from cosmonapse import Dendrite, SignalType, connect_synapse async def main(): synapse = await connect_synapse("cosmo://127.0.0.1:7070") orch = Dendrite(synapse=synapse, namespace="demo") async with orch: pw = await orch.dispatch_and_subscribe( capabilities=["plan"], input={"goal": "..."}, ) @pw.on(SignalType.THOUGHT_DELTA) async def stream(s): print(s.payload["delta"], end="") @pw.on(SignalType.PLAN) async def on_plan(s): print("\nPLAN:", s.payload["steps"]) @pw.on(SignalType.AGENT_OUTPUT) async def done(s): print("DONE:", s.payload["output"]) await pw.close() await asyncio.sleep(2.0) # give callbacks time to fire
async for sig in pw \u2014 streaming.
Walks every Signal on the trace as it arrives. Closes on FINAL / ERROR or when you break out of the loop. The natural shape for log pipes, UI streams, or building a custom workflow walker on top.
# Shape 3 - streaming iteration. # Walk every Signal on the trace as it arrives, until you break or it closes. from cosmonapse import Dendrite, SignalType, connect_synapse async def main(): synapse = await connect_synapse("cosmo://127.0.0.1:7070") orch = Dendrite(synapse=synapse, namespace="demo") async with orch: async with await orch.dispatch( capabilities=["plan"], input={"goal": "..."}, ) as pw: async for sig in pw: print(sig.type, sig.payload) if sig.type is SignalType.AGENT_OUTPUT: break
scope="terminal" \u2014 decentralised.
scope="all" (the default) delivers every PATHWAY_TYPES Signal on the trace. scope="terminal" filters to FINAL / ERROR / CLARIFICATION only \u2014 the decentralised pattern where intermediate orchestration happens peer-to-peer and the Cortex only wakes for events that demand its attention. FINAL and ERROR still auto-close the Pathway regardless of scope.
# scope="terminal" - decentralised orchestration. # The Cortex only wakes for FINAL / ERROR / CLARIFICATION. Intermediate # AGENT_OUTPUT / PLAN / TOOL_CALL fly past on the bus and are handled # peer-to-peer by other orchestrators in the same trace. async with orch: pw = await orch.dispatch( capabilities=["plan"], input={"goal": "..."}, scope="terminal", ) sig = await pw.wait(timeout_s=60.0) # only FINAL / ERROR / CLARIFICATION resolves this
Watch traces you didn't start.
observe_pathway(trace_id) opens a Pathway in observer role. No TASK is emitted; the Pathway just attaches to inbound Signals matching the given trace_id. Useful for monitors, dashboards, or secondary orchestrators that want to follow a workflow without driving it.
# observe_pathway - watch a trace another peer started. # No TASK is emitted; the Pathway just attaches to inbound Signals # matching the given trace_id. async with monitor_dendrite: pw = await monitor_dendrite.observe_pathway(trace_id="trc_01J...") assert pw.role == "observer" async for sig in pw: log.info("observed", type=sig.type.value, neuron=sig.neuron)
Bidding
dispatch_offer / on_task_offer / bid \u2014 atomic claim for heterogeneous capability routing.
Capability routing
Address TASKs by capability instead of neuron_id. Once-only delivery via the .routed subject.
Pathway API
Full reference for the Pathway class \u2014 wait, wait_for, on, iteration, scope, lifecycle.