Route by what a worker can do,
not by its name.
A router Dendrite holds a RegistryStore and discovers workers from the REGISTER signals they broadcast. Each task names a capability; the router calls find_neurons(capability=…) to find a live worker that advertises it. Workers join and leave at runtime — no hard-coded address lists. The same code runs over five language × transport stacks — pick one below.
The SDK now supports capability dispatch without an explicit RegistryStore lookup: await orch.dispatch(capabilities=["summarize"], input=...) publishes on a separate subject cosmonapse.<ns>.TASK.routed with a queue group keyed on each Dendrite’s aggregate capabilities. The broker delivers each TASK exactly once within a matching cap profile — no router-side discovery code needed. Use the RegistryStore-based pattern below when you need richer selection (preferred version, locality, cost) or use bidding for atomic claim across heterogeneous workers.
The same topology, expressed across language and transport. Slide between tabs — the routing logic never changes; only the imports, the synapse you connect to, and how you launch it do.
These workers are plain functions — no model provider — so the only dependency is the SDK itself (plus a transport driver where the bus isn't in-process).
# SDK + the bundled cosmo CLE — the devsynapse needs no broker.
pip install cosmonapseOne bus, one namespace. The CLI also streams every Signal that crosses it to stdout, so the synapse terminal doubles as a Doppler.
$ cosmo synapse start memory --namespace=quickstart URL: cosmo://127.0.0.1:7070 Namespace: quickstart Transport: TCP + NDJSON (single-host dev only) ────────────────────────────────────────────────
One worker.py serves both roles; the ROLES map picks the capabilities and the function. Each Axon advertises its capabilities in REGISTER — that is what the router searches on.
import asyncio, sys from cosmonapse import Axon, Dendrite, connect_synapse SYNAPSE_URL = "cosmo://127.0.0.1:7070" # ← the only line that changes per transport NAMESPACE = "quickstart" # A Neuron is just an async function. Each worker advertises a # different capability — swap in a real model when you're ready. async def summarize(input, context): return {"result": f"summary: {input['text'][:40]}…"} async def translate(input, context): return {"result": f"[fr] {input['text']}"} # role -> (capabilities advertised in REGISTER, neuron_fn) ROLES = { "summarizer": (["summarize", "text"], summarize), "translator": (["translate", "text"], translate), } async def main(): role = sys.argv[1] # "summarizer" | "translator" capabilities, fn = ROLES[role] axon = Axon(neuron_id=role, neuron_fn=fn, capabilities=capabilities) synapse = await connect_synapse(SYNAPSE_URL) dendrite = Dendrite(synapse=synapse, namespace=NAMESPACE, dendrite_id=role, heartbeat_s=5.0) # re-announce every 5s so a late router finds us dendrite.attach_axon(axon) try: async with dendrite: print(f"{role} ready — advertising {capabilities}") await asyncio.Event().wait() finally: await synapse.close() asyncio.run(main())
The router is a Dendrite with a registry_store. Because it has a store, it auto-subscribes to REGISTER / HEARTBEAT / DEREGISTER and keeps a live view of the namespace. Each task calls find_neurons(capability=…) instead of naming a worker — workers can join or leave at runtime.
import asyncio from cosmonapse import Dendrite, MemoryRegistryStore, connect_synapse, new_trace_id SYNAPSE_URL = "cosmo://127.0.0.1:7070" NAMESPACE = "quickstart" # Tasks are tagged with the CAPABILITY they need — never a worker id. TASKS = [ {"capability": "summarize", "text": "Cosmonapse is an A2A protocol that…"}, {"capability": "translate", "text": "Hello, world"}, {"capability": "summarize", "text": "Neurons are plain functions that…"}, ] class CapabilityRouter: """Routes each task to a worker advertising the required capability, discovered live from the RegistryStore — no hard-coded worker ids.""" def __init__(self, dendrite): self._dendrite = dendrite self._pending = {} @dendrite.on_agent_output async def _on_output(sig): fut = self._pending.pop(sig.trace_id, None) if fut and not fut.done(): fut.set_result(sig.payload.get("output", {})) async def route(self, capability, payload, timeout=30.0): # Ask the registry who can do this right now. candidates = await self._dendrite.find_neurons(capability=capability) if not candidates: raise RuntimeError(f"no live neuron advertises {capability!r}") target = candidates[0].neuron_id # first live match trace_id = new_trace_id() fut = asyncio.get_running_loop().create_future() self._pending[trace_id] = fut await self._dendrite.dispatch_task( neuron=target, input=payload, trace_id=trace_id, ) print(f"→ {capability:9} routed to {target}") return await asyncio.wait_for(fut, timeout=timeout) async def wait_for_caps(dendrite, needed, timeout=15.0): """Block until every required capability has a live provider.""" loop = asyncio.get_running_loop() deadline = loop.time() + timeout while loop.time() < deadline: ready = [c for c in needed if await dendrite.find_neurons(capability=c)] if len(ready) == len(needed): return await asyncio.sleep(0.25) raise TimeoutError(f"workers for {needed} never appeared") async def main(): synapse = await connect_synapse(SYNAPSE_URL) dendrite = Dendrite( synapse=synapse, namespace=NAMESPACE, dendrite_id="router", registry_store=MemoryRegistryStore(), # ← powers find_neurons() heartbeat_s=0, # the router hosts no Axon ) router = CapabilityRouter(dendrite) try: async with dendrite: # subscribes to REGISTER/HEARTBEAT/DEREGISTER needed = {t["capability"] for t in TASKS} print(f"waiting for workers advertising {needed}…") await wait_for_caps(dendrite, needed) for t in TASKS: out = await router.route(t["capability"], {"text": t["text"]}) print(f" ← {out.get('result', '')}") finally: await synapse.close() asyncio.run(main())
Separate terminals, one Synapse shared by all. Start the bus first, then the workers, then the driver.
# terminal 1 — the bus $ cosmo synapse start memory --namespace=quickstart # terminal 2 — the summarizer $ python worker.py summarizer # terminal 3 — the translator $ python worker.py translator # terminal 4 — the router $ python router.py
Each worker re-broadcasts REGISTER every 5s, so even though the router starts last it discovers both within one heartbeat. Watch tasks land on the worker that owns the capability:
$ python router.py
waiting for workers advertising {'summarize', 'translate'}…
→ summarize routed to summarizer
← summary: Cosmonapse is an A2A protocol that…
→ translate routed to translator
← [fr] Hello, world
→ summarize routed to summarizer
← summary: Neurons are plain functions that…Many providers per capability. find_neurons(capability=…) returns every live match — load-balance across them (round-robin, least-recently-seen, or by version) instead of always taking the first.
Persist the registry. Swap MemoryRegistryStore for SqliteRegistryStore or PostgresRegistryStore (Python) so the view survives restarts and is shared across multiple router processes.
Watch members come and go. Kill a worker — it stops heartbeating and is marked deregistered; registry_snapshot(include_deregistered=True) shows the full history, while routing skips anything offline.
Change transport. Every other tab is the same topology — only the install, the synapse you connect to, and the launch commands change. The capability-routing logic is byte-for-byte identical.