01 · Install

Python 3.11 or newer.

# Python 3.11+
pip install cosmonapse flask
02 · Start a Synapse

cosmo synapse start memory boots a local TCP+NDJSON broker — no Docker, no NATS, no Postgres. To watch Signals crossing the bus, attach cosmo doppler in another terminal (Step 06). The --namespace flag scopes all Signals so multiple projects can share the same server.

# Boot a local TCP dev synapse — no Docker, no NATS, no Postgres.
# Streams every Signal to stdout as it crosses the bus.
$ cosmo synapse start memory --namespace=quickstart

  URL:        cosmo://127.0.0.1:7070
  Namespace:  quickstart
  Transport:  TCP + NDJSON  (single-host dev only)
  ────────────────────────────────────────────────

Leave this terminal open. Swap the URL for nats:// or kafka:// when you move to production — the rest of your code stays the same.

03 · Write a Neuron

A Neuron is a plain async function — no imports from Cosmonapse, no decorators, no protocol knowledge. It receives input (the TASK payload) and context (fetched by the Axon via context_ref) and returns a plain dict.

# A Neuron is a plain async function. Zero protocol knowledge.

async def hello_neuron(input: dict, context: list) -> dict:
    name = input.get("name", "world")
    return {"message": f"Hello, {name}!"}
04 · Wire an Axon and Dendrite

The Axon wraps the Neuron and gives it an identity on the bus. The Dendrite is the only component that touches the Synapse — it hosts the Axon, emits REGISTER / HEARTBEAT / DEREGISTER, and routes inbound TASKs.

worker.py
import asyncio
from cosmonapse import Axon, Dendrite, connect_synapse

async def main():
    # 1. Wrap the Neuron in an Axon.
    axon = Axon(
        neuron_id="hello-neuron",
        neuron_fn=hello_neuron,
        capabilities=["greet"],
        version="1.0.0",
    )

    # 2. Connect to the Synapse.
    synapse = await connect_synapse("cosmo://127.0.0.1:7070")

    # 3. Build a worker Dendrite and attach the Axon.
    #    role="worker" — workers host Axons and reply to TASKs, but
    #    cannot dispatch new ones (the role guard sits on emit()).
    worker = Dendrite(
        synapse=synapse,
        namespace="quickstart",
        role="worker",
    )
    worker.attach_axon(axon)

    async with worker:
        await asyncio.sleep(float("inf"))  # run until cancelled

asyncio.run(main())

Run this in a second terminal. The worker registers on the bus and waits for tasks. Any process that dispatches a TASK to "hello-neuron" on namespace "quickstart" will be routed here.

05 · Connect an HTTP interface

An orchestrator Dendrite has no Axon — its job is to dispatch tasks and collect results. Flask is synchronous; cosmonapse is async. The bridge is a concurrent.futures.Future per trace_id: the asyncio handler resolves it, the Flask route blocks on it.

server.py
import asyncio, concurrent.futures, threading
from flask import Flask, request, jsonify
from cosmonapse import Dendrite, connect_synapse, new_trace_id

# asyncio loop in a background thread — Flask stays synchronous.
loop = asyncio.new_event_loop()
threading.Thread(target=loop.run_forever, daemon=True).start()

pending: dict[str, concurrent.futures.Future] = {}
orch: Dendrite = None

async def setup():
    global orch
    synapse = await connect_synapse("cosmo://127.0.0.1:7070")
    orch = Dendrite(synapse=synapse, namespace="quickstart")

    @orch.on_agent_output
    async def on_output(sig):
        fut = pending.pop(sig.trace_id, None)
        if fut and not fut.done(): fut.set_result(sig.payload["output"])

    await orch.start()

# NEW: orch.dispatch_and_wait(...) is now the preferred RPC shape.
# The Future/dict pattern above predates Pathway; new code should just do:
#   sig = await orch.dispatch_and_wait(neuron="hello-neuron", input=..., timeout_s=5)
asyncio.run_coroutine_threadsafe(setup(), loop).result(timeout=10)

app = Flask(__name__)

@app.post("/task")
def submit():
    trace_id = new_trace_id()
    fut = concurrent.futures.Future()
    pending[trace_id] = fut

    async def dispatch():
        await orch.dispatch_task(neuron="hello-neuron",
                                  input=request.get_json(), trace_id=trace_id)

    asyncio.run_coroutine_threadsafe(dispatch(), loop).result(timeout=5)
    return jsonify(fut.result(timeout=10))

app.run(port=5000)
06 · Watch the Signals flow

Attach a Doppler to see every Signal as it crosses the Synapse. It is a passive read-only subscriber — it never competes with Dendrites for messages.

$ cosmo doppler --url=cosmo://127.0.0.1:7070 --namespace=quickstart

  REGISTER      neuron=hello-neuron  capabilities=['greet']
  TASK          trace=trc_…  neuron=hello-neuron
  AGENT_OUTPUT  trace=trc_…  neuron=hello-neuron  → {message: Hello, Cosmonapse!}

# filter to specific types
$ cosmo doppler --url=cosmo://127.0.0.1:7070 -n quickstart --type TASK --type AGENT_OUTPUT
07 · Test it

With the synapse, worker, and server all running, send a task:

$ curl -s -X POST http://localhost:5000/task \
       -H "Content-Type: application/json" \
       -d '{"name": "Cosmonapse"}'

{"message": "Hello, Cosmonapse!"}

Watch the Doppler terminal — you'll see the full REGISTER → TASK → AGENT_OUTPUT trace as it happens.