First five minutes.
Install Cosmonapse, write a Neuron, wire an Axon and Dendrite, boot a local Synapse, watch Signals flow. No Docker. No running broker. Just pip install and a few files.
Python 3.11 or newer.
# Python 3.11+
pip install cosmonapse flaskcosmo synapse start memory boots a local TCP+NDJSON broker — no Docker, no NATS, no Postgres. To watch Signals crossing the bus, attach cosmo doppler in another terminal (Step 06). The --namespace flag scopes all Signals so multiple projects can share the same server.
# Boot a local TCP dev synapse — no Docker, no NATS, no Postgres. # Streams every Signal to stdout as it crosses the bus. $ cosmo synapse start memory --namespace=quickstart URL: cosmo://127.0.0.1:7070 Namespace: quickstart Transport: TCP + NDJSON (single-host dev only) ────────────────────────────────────────────────
Leave this terminal open. Swap the URL for nats:// or kafka:// when you move to production — the rest of your code stays the same.
A Neuron is a plain async function — no imports from Cosmonapse, no decorators, no protocol knowledge. It receives input (the TASK payload) and context (fetched by the Axon via context_ref) and returns a plain dict.
# A Neuron is a plain async function. Zero protocol knowledge. async def hello_neuron(input: dict, context: list) -> dict: name = input.get("name", "world") return {"message": f"Hello, {name}!"}
The Axon wraps the Neuron and gives it an identity on the bus. The Dendrite is the only component that touches the Synapse — it hosts the Axon, emits REGISTER / HEARTBEAT / DEREGISTER, and routes inbound TASKs.
import asyncio from cosmonapse import Axon, Dendrite, connect_synapse async def main(): # 1. Wrap the Neuron in an Axon. axon = Axon( neuron_id="hello-neuron", neuron_fn=hello_neuron, capabilities=["greet"], version="1.0.0", ) # 2. Connect to the Synapse. synapse = await connect_synapse("cosmo://127.0.0.1:7070") # 3. Build a worker Dendrite and attach the Axon. # role="worker" — workers host Axons and reply to TASKs, but # cannot dispatch new ones (the role guard sits on emit()). worker = Dendrite( synapse=synapse, namespace="quickstart", role="worker", ) worker.attach_axon(axon) async with worker: await asyncio.sleep(float("inf")) # run until cancelled asyncio.run(main())
Run this in a second terminal. The worker registers on the bus and waits for tasks. Any process that dispatches a TASK to "hello-neuron" on namespace "quickstart" will be routed here.
An orchestrator Dendrite has no Axon — its job is to dispatch tasks and collect results. Flask is synchronous; cosmonapse is async. The bridge is a concurrent.futures.Future per trace_id: the asyncio handler resolves it, the Flask route blocks on it.
import asyncio, concurrent.futures, threading from flask import Flask, request, jsonify from cosmonapse import Dendrite, connect_synapse, new_trace_id # asyncio loop in a background thread — Flask stays synchronous. loop = asyncio.new_event_loop() threading.Thread(target=loop.run_forever, daemon=True).start() pending: dict[str, concurrent.futures.Future] = {} orch: Dendrite = None async def setup(): global orch synapse = await connect_synapse("cosmo://127.0.0.1:7070") orch = Dendrite(synapse=synapse, namespace="quickstart") @orch.on_agent_output async def on_output(sig): fut = pending.pop(sig.trace_id, None) if fut and not fut.done(): fut.set_result(sig.payload["output"]) await orch.start() # NEW: orch.dispatch_and_wait(...) is now the preferred RPC shape. # The Future/dict pattern above predates Pathway; new code should just do: # sig = await orch.dispatch_and_wait(neuron="hello-neuron", input=..., timeout_s=5) asyncio.run_coroutine_threadsafe(setup(), loop).result(timeout=10) app = Flask(__name__) @app.post("/task") def submit(): trace_id = new_trace_id() fut = concurrent.futures.Future() pending[trace_id] = fut async def dispatch(): await orch.dispatch_task(neuron="hello-neuron", input=request.get_json(), trace_id=trace_id) asyncio.run_coroutine_threadsafe(dispatch(), loop).result(timeout=5) return jsonify(fut.result(timeout=10)) app.run(port=5000)
Attach a Doppler to see every Signal as it crosses the Synapse. It is a passive read-only subscriber — it never competes with Dendrites for messages.
$ cosmo doppler --url=cosmo://127.0.0.1:7070 --namespace=quickstart REGISTER neuron=hello-neuron capabilities=['greet'] TASK trace=trc_… neuron=hello-neuron AGENT_OUTPUT trace=trc_… neuron=hello-neuron → {message: Hello, Cosmonapse!} # filter to specific types $ cosmo doppler --url=cosmo://127.0.0.1:7070 -n quickstart --type TASK --type AGENT_OUTPUT
With the synapse, worker, and server all running, send a task:
$ curl -s -X POST http://localhost:5000/task \ -H "Content-Type: application/json" \ -d '{"name": "Cosmonapse"}' {"message": "Hello, Cosmonapse!"}
Watch the Doppler terminal — you'll see the full REGISTER → TASK → AGENT_OUTPUT trace as it happens.