// Example 06 \u00b7 Bidding
Competitive bidding for capability dispatch.
TASK_OFFER / BID / TASK_AWARDED is the atomic-claim variant of capability-routed dispatch. Use it when multiple Dendrites can do the work but the producer wants strict one-of-N selection, cost-aware routing, or confidence-weighted choice.
Why
Capability routing covers the homogeneous case.
Bidding covers the rest.
why.md
# Capability-routed dispatch on a separate subject + queue group gives # once-only delivery WITHIN a matching cap profile. But heterogeneous # deployments - different Dendrites with different but overlapping # cap sets - still get at-least-once across profiles. # # Bidding solves that: producer broadcasts a TASK_OFFER, candidates BID, # the producer picks a winner and emits a TASK_AWARDED naming exactly # one Axon. The winner processes; everyone else sees TASK_DECLINED and # releases any tentative reservation.
Worker
on_task_offer + bid()
Workers register an on_task_offer handler scoped to a capability and call bid() to compete. bid() uses the private publish path so workers with role="worker" can still bid even though the role guard blocks them from dispatching new TASKs.
worker.py
# worker.py - hosts an Axon and bids on offers matching its capability. import asyncio from cosmonapse import Axon, Dendrite, connect_synapse async def summarize(input, context): return {"summary": input["text"][:80] + "..."} async def main(): synapse = await connect_synapse("cosmo://127.0.0.1:7070") worker = Dendrite( synapse=synapse, namespace="demo", dendrite_id="worker-a", role="worker", ) worker.attach_axon(Axon( neuron_id="summarizer-a", neuron_fn=summarize, capabilities=["summarize", "english"], )) @worker.on_task_offer(capability="summarize") async def respond(offer): # bid() bypasses the orchestrator role guard - bidding is how # workers announce capability, not how they orchestrate. await worker.bid( offer, neuron="summarizer-a", cost=0.002, # estimated USD eta_ms=300, # estimated latency confidence=0.92, # self-assessment 0..1 ) async with worker: await asyncio.sleep(float("inf")) asyncio.run(main())
Producer
dispatch_offer returns a Pathway.
The producer emits one TASK_OFFER, drains BIDs for deadline_ms, picks a winner per the select= strategy, emits TASK_AWARDED, and returns a Pathway scoped to the awarded workflow. From the caller's perspective it's just await pw.wait().
producer.py
# producer.py - emit TASK_OFFER, collect BIDs, pick a winner, await result. import asyncio from cosmonapse import Dendrite, connect_synapse async def main(): synapse = await connect_synapse("cosmo://127.0.0.1:7070") orch = Dendrite(synapse=synapse, namespace="demo") async with orch: pw = await orch.dispatch_offer( input={"text": "... a long article ..."}, capabilities=["summarize"], deadline_ms=250, # collect BIDs for this window select="lowest_cost", # or first_bid / highest_confidence ) sig = await pw.wait(timeout_s=5.0) print("winner:", sig.neuron, "result:", sig.payload["output"]) asyncio.run(main())
Wire
What crosses the bus.
flow.txt
# What flows on the bus: # # producer --[TASK_OFFER]--------> *broadcast* # worker-a --[BID cost=0.002]----> producer # worker-b --[BID cost=0.005]----> producer # (wait deadline_ms; pick winner) # producer --[TASK_AWARDED neuron=summarizer-a]--> bus # producer --[TASK_DECLINED neuron=summarizer-b]-> bus (informational) # worker-a's Dendrite sees TASK_AWARDED for one of its Axons and # synthesises an internal TASK to route to it; AGENT_OUTPUT flows back # on the producer's Pathway.
Strategies
select= picks the winner.
strategies.py
# select= picks the bid-evaluation strategy. All three see the same # BID stream; they only differ in which one becomes the winner. # first_bid - latency wins. Returns the first BID to arrive; cancels # the wait window early. Best when any answer is fine. await orch.dispatch_offer(input=..., capabilities=..., select="first_bid") # lowest_cost - drains deadline_ms; picks the bidder with smallest cost. # Best for cost-aware fanout where bidders price honestly. await orch.dispatch_offer(input=..., capabilities=..., select="lowest_cost") # highest_confidence - drains deadline_ms; picks the bidder with the # largest self-assessment. Best for quality-critical work. await orch.dispatch_offer(input=..., capabilities=..., select="highest_confidence")