← Examples/No Orchestrator
Pick your stack

The same topology, expressed across language and transport. Slide between tabs — the routing logic never changes; only the imports, the synapse you connect to, and how you launch it do.

01 · Install

The dev devsynapse ships in the CLI — no external broker to install.

# SDK + CLI + httpx (used by the HuggingFace Neuron wrapper)
pip install -e cosmonapse-core/packages/python-sdk
pip install -e cosmonapse-core/packages/cli
pip install httpx

# Hugging Face token — read scope is enough
$ export HF_TOKEN=hf_xxxxxxxxxxxxxxxxxxxxxxxx
02 · Start the Synapse

One bus, one namespace. The CLI also streams every Signal that crosses it to stdout, so the synapse terminal doubles as a Doppler.

$ cosmo synapse start memory --namespace=quickstart

  URL:        cosmo://127.0.0.1:7070
  Namespace:  quickstart
  Transport:  TCP + NDJSON  (single-host dev only)
  ────────────────────────────────────────────────
03 · Worker — it decides for itself

Note the Axon is built but never attached to the Dendrite. Attaching would make the Dendrite auto-handle every TASK addressed to its neuron_id; here we want the worker to decide for itself, so we subscribe to TASK directly and run the Neuron only when the hash names us.

worker_a.py
import asyncio, hashlib, os
from cosmonapse import Axon, Neuron, Dendrite, SignalType, connect_synapse

SYNAPSE_URL = "cosmo://127.0.0.1:7070"   # ← the only line that changes per transport
NAMESPACE   = "quickstart"
MY_ID       = "worker-a"                 # worker-b differs only here
PEERS       = ("worker-a", "worker-b")   # every peer knows the pool

def owner_of(trace_id: str) -> str:
    """Pure function: every peer computes the SAME owner — no coordination."""
    h = int(hashlib.sha1(trace_id.encode()).hexdigest(), 16)
    return PEERS[h % len(PEERS)]

async def main():
    # The Axon is NOT attached — we route by hash, not by neuron-id.
    axon = Axon(
        neuron_id=MY_ID,
        neuron_fn=Neuron(
            source="huggingface",
            endpoint="https://router.huggingface.co",
            model="meta-llama/Llama-3.1-8B-Instruct",
            api_key=os.environ["HF_TOKEN"],
            use_chat_api=True, max_new_tokens=128,
        ),
        capabilities=["chat"],
    )

    synapse  = await connect_synapse(SYNAPSE_URL)
    dendrite = Dendrite(synapse=synapse, namespace=NAMESPACE,
                        dendrite_id=MY_ID, heartbeat_s=0)

    async def on_task(task):
        if owner_of(task.trace_id) != MY_ID:
            return                              # a peer owns this one
        print(f"[{MY_ID}] claims {task.trace_id[4:12]}")
        reply = await axon.handle_task(task)   # run the Neuron
        await dendrite.publish(reply)          # emit AGENT_OUTPUT

    try:
        async with dendrite:
            await dendrite.subscribe(SignalType.TASK, on_task)
            print(f"{MY_ID} listening — no cortex, no queue")
            await asyncio.Event().wait()
    finally:
        await synapse.close()

asyncio.run(main())
04 · Producer — drops work in, routes nothing

The producer fires tasks addressed to a logical "pool" and waits for results. It never picks a worker — the AGENT_OUTPUT tells it who answered via sig.neuron.

producer.py
import asyncio
from cosmonapse import Dendrite, connect_synapse, new_trace_id

SYNAPSE_URL = "cosmo://127.0.0.1:7070"

async def main():
    synapse  = await connect_synapse(SYNAPSE_URL)
    dendrite = Dendrite(synapse=synapse, namespace="quickstart",
                        dendrite_id="producer", heartbeat_s=0)
    pending = {}

    @dendrite.on_agent_output
    async def _on_output(sig):
        fut = pending.pop(sig.trace_id, None)
        if fut and not fut.done():
            fut.set_result((sig.neuron, sig.payload.get("output", {})))

    prompts = ["the sun", "the moon", "the sea", "the wind"]
    try:
        async with dendrite:
            for p in prompts:
                trace_id = new_trace_id()
                fut = asyncio.get_running_loop().create_future()
                pending[trace_id] = fut
                # The producer does NO routing — it just drops work in.
                await dendrite.dispatch_task(neuron="pool",
                                             input={"prompt": p}, trace_id=trace_id)
                who, out = await asyncio.wait_for(fut, timeout=60)
                print(f"{who} answered: {out.get('response', '').strip()}")
    finally:
        await synapse.close()

asyncio.run(main())
05 · Run the topology

Separate terminals, one Synapse shared by all. Start the bus first, then the workers, then the driver.

# terminal 1 — the bus
$ cosmo synapse start memory --namespace=quickstart

# terminal 2 — first worker
$ python worker_a.py

# terminal 3 — second worker
$ python worker_b.py

# terminal 4 — the producer
$ python producer.py

Ownership is decided by the trace id, so the split is deterministic — not strictly alternating:

$ python producer.py
# …meanwhile, in the two worker terminals:
[worker-b] claims a3f2c1d8
[worker-a] claims 7b1e0942
[worker-a] claims 11ce88a4
[worker-b] claims 92aa5b30

# producer.py prints, in order:
worker-b answered: Golden disc ascends — silence breaks into light.
worker-a answered: Pale lantern in the dark — tides remember her face.
worker-a answered: Salt sighs against stone, an old song the wind forgot.
worker-b answered: Invisible river — it bends the wheat into prayer.
Extend the pattern

More peers. Add an id to PEERS on every worker. The hash spreads load across the new size automatically — no central change.

Smoother rebalancing. Swap the modulo for consistent hashing with virtual nodes, so adding or removing a peer only moves a fraction of traces instead of reshuffling all of them.

Broker-side balancing instead. If you don't need the workers to decide, give them the same queue_group on subscribe(...) and let the Synapse hand each task to exactly one — no hashing required.

Live membership. Attach a registry_store and derive PEERS from REGISTER / DEREGISTER so the pool tracks workers joining and leaving.