← Examples/Capability-based routing
Pick your stack

The same topology, expressed across language and transport. Slide between tabs — the routing logic never changes; only the imports, the synapse you connect to, and how you launch it do.

01 · Install

These workers are plain functions — no model provider — so the only dependency is the SDK itself (plus a transport driver where the bus isn't in-process).

# SDK + the bundled cosmo CLE — the devsynapse needs no broker.
pip install cosmonapse
02 · Start the Synapse

One bus, one namespace. The CLI also streams every Signal that crosses it to stdout, so the synapse terminal doubles as a Doppler.

$ cosmo synapse start memory --namespace=quickstart

  URL:        cosmo://127.0.0.1:7070
  Namespace:  quickstart
  Transport:  TCP + NDJSON  (single-host dev only)
  ────────────────────────────────────────────────
03 · Workers — two specialised Neurons

One worker.py serves both roles; the ROLES map picks the capabilities and the function. Each Axon advertises its capabilities in REGISTER — that is what the router searches on.

worker.py
import asyncio, sys
from cosmonapse import Axon, Dendrite, connect_synapse

SYNAPSE_URL = "cosmo://127.0.0.1:7070"   # ← the only line that changes per transport
NAMESPACE   = "quickstart"

# A Neuron is just an async function. Each worker advertises a
# different capability — swap in a real model when you're ready.
async def summarize(input, context):
    return {"result": f"summary: {input['text'][:40]}…"}

async def translate(input, context):
    return {"result": f"[fr] {input['text']}"}

# role -> (capabilities advertised in REGISTER, neuron_fn)
ROLES = {
    "summarizer": (["summarize", "text"], summarize),
    "translator": (["translate", "text"], translate),
}

async def main():
    role = sys.argv[1]                  # "summarizer" | "translator"
    capabilities, fn = ROLES[role]

    axon = Axon(neuron_id=role, neuron_fn=fn, capabilities=capabilities)

    synapse  = await connect_synapse(SYNAPSE_URL)
    dendrite = Dendrite(synapse=synapse, namespace=NAMESPACE,
                        dendrite_id=role,
                        heartbeat_s=5.0)   # re-announce every 5s so a late router finds us
    dendrite.attach_axon(axon)

    try:
        async with dendrite:
            print(f"{role} ready — advertising {capabilities}")
            await asyncio.Event().wait()
    finally:
        await synapse.close()

asyncio.run(main())
04 · The Router — discover by capability

The router is a Dendrite with a registry_store. Because it has a store, it auto-subscribes to REGISTER / HEARTBEAT / DEREGISTER and keeps a live view of the namespace. Each task calls find_neurons(capability=…) instead of naming a worker — workers can join or leave at runtime.

router.py
import asyncio
from cosmonapse import Dendrite, MemoryRegistryStore, connect_synapse, new_trace_id

SYNAPSE_URL = "cosmo://127.0.0.1:7070"
NAMESPACE   = "quickstart"

# Tasks are tagged with the CAPABILITY they need — never a worker id.
TASKS = [
    {"capability": "summarize", "text": "Cosmonapse is an A2A protocol that…"},
    {"capability": "translate", "text": "Hello, world"},
    {"capability": "summarize", "text": "Neurons are plain functions that…"},
]

class CapabilityRouter:
    """Routes each task to a worker advertising the required capability,
    discovered live from the RegistryStore — no hard-coded worker ids."""

    def __init__(self, dendrite):
        self._dendrite = dendrite
        self._pending  = {}

        @dendrite.on_agent_output
        async def _on_output(sig):
            fut = self._pending.pop(sig.trace_id, None)
            if fut and not fut.done():
                fut.set_result(sig.payload.get("output", {}))

    async def route(self, capability, payload, timeout=30.0):
        # Ask the registry who can do this right now.
        candidates = await self._dendrite.find_neurons(capability=capability)
        if not candidates:
            raise RuntimeError(f"no live neuron advertises {capability!r}")
        target = candidates[0].neuron_id        # first live match

        trace_id = new_trace_id()
        fut = asyncio.get_running_loop().create_future()
        self._pending[trace_id] = fut
        await self._dendrite.dispatch_task(
            neuron=target, input=payload, trace_id=trace_id,
        )
        print(f"→ {capability:9} routed to {target}")
        return await asyncio.wait_for(fut, timeout=timeout)

async def wait_for_caps(dendrite, needed, timeout=15.0):
    """Block until every required capability has a live provider."""
    loop = asyncio.get_running_loop()
    deadline = loop.time() + timeout
    while loop.time() < deadline:
        ready = [c for c in needed if await dendrite.find_neurons(capability=c)]
        if len(ready) == len(needed):
            return
        await asyncio.sleep(0.25)
    raise TimeoutError(f"workers for {needed} never appeared")

async def main():
    synapse  = await connect_synapse(SYNAPSE_URL)
    dendrite = Dendrite(
        synapse=synapse, namespace=NAMESPACE, dendrite_id="router",
        registry_store=MemoryRegistryStore(),   # ← powers find_neurons()
        heartbeat_s=0,                       # the router hosts no Axon
    )
    router = CapabilityRouter(dendrite)

    try:
        async with dendrite:           # subscribes to REGISTER/HEARTBEAT/DEREGISTER
            needed = {t["capability"] for t in TASKS}
            print(f"waiting for workers advertising {needed}…")
            await wait_for_caps(dendrite, needed)
            for t in TASKS:
                out = await router.route(t["capability"], {"text": t["text"]})
                print(f"   ← {out.get('result', '')}")
    finally:
        await synapse.close()

asyncio.run(main())
05 · Run the topology

Separate terminals, one Synapse shared by all. Start the bus first, then the workers, then the driver.

# terminal 1 — the bus
$ cosmo synapse start memory --namespace=quickstart

# terminal 2 — the summarizer
$ python worker.py summarizer

# terminal 3 — the translator
$ python worker.py translator

# terminal 4 — the router
$ python router.py

Each worker re-broadcasts REGISTER every 5s, so even though the router starts last it discovers both within one heartbeat. Watch tasks land on the worker that owns the capability:

$ python router.py
waiting for workers advertising {'summarize', 'translate'}…
→ summarize routed to summarizer
   ← summary: Cosmonapse is an A2A protocol that…
→ translate routed to translator
   ← [fr] Hello, world
→ summarize routed to summarizer
   ← summary: Neurons are plain functions that…
Extend the pattern

Many providers per capability. find_neurons(capability=…) returns every live match — load-balance across them (round-robin, least-recently-seen, or by version) instead of always taking the first.

Persist the registry. Swap MemoryRegistryStore for SqliteRegistryStore or PostgresRegistryStore (Python) so the view survives restarts and is shared across multiple router processes.

Watch members come and go. Kill a worker — it stops heartbeating and is marked deregistered; registry_snapshot(include_deregistered=True) shows the full history, while routing skips anything offline.

Change transport. Every other tab is the same topology — only the install, the synapse you connect to, and the launch commands change. The capability-routing logic is byte-for-byte identical.