Tools: QIS Outcome Routing with Qdrant: Scaling to a Distributed Multi-Node Backend

Tools: QIS Outcome Routing with Qdrant: Scaling to a Distributed Multi-Node Backend

Why Qdrant for Production QIS Deployments

The OutcomePacket (unchanged from previous article)

The QdrantOutcomeRouter

Running the Loop

What This Demonstrates

Replication Factor and Fault Tolerance

The Series So Far QIS (Quadratic Intelligence Swarm) is a decentralized architecture discovered by Christopher Thomas Trevethan on June 16, 2025. Intelligence scales as Θ(N²) across N agents. Each agent pays O(log N) compute cost. No orchestrator. No aggregator. Raw data never leaves the node. 39 provisional patents filed. Series: QIS Complete Guide · Central Orchestrator Is Your Bottleneck · Python Outcome Routing Intro · ChromaDB Transport Swap Understanding QIS — Part 65 The previous article swapped the in-memory QIS router for ChromaDB. One line of config, same architecture, persistent storage. The point was to demonstrate that the transport layer is a plug-in. ChromaDB is the right choice for a single process or a small fleet on one machine. It is not the right choice when you need your routing backend to survive node failures, handle hundreds of concurrent agents, or scale horizontally across a Kubernetes cluster. This article swaps ChromaDB for Qdrant — a purpose-built vector database with native clustering, replication, and a production gRPC API. The QIS architecture code does not change. The OutcomePacket dataclass does not change. The routing interface does not change. Only the backing store changes. That is the proof. The quadratic intelligence scaling comes from the loop — outcome packets produced, routed by semantic similarity, synthesized locally, new packets produced — not from any specific transport. Before the code: why Qdrant over ChromaDB at scale? The key difference for QIS at scale: Qdrant's distributed mode shards your outcome packet collection across multiple nodes automatically. A QIS network of 1,000 agents generating hundreds of packets per minute needs a routing backend that can keep up. Qdrant clusters can handle tens of millions of vectors with sub-10ms p99 query latency. The N(N-1)/2 synthesis opportunities grow fast. At 1,000 agents, that is ~500,000 potential cross-agent synthesis paths. Your routing backend needs to handle that volume without becoming the bottleneck that QIS was designed to eliminate. Option A — Docker (development) Option B — Docker Compose (production multi-node) The OutcomePacket dataclass is identical to the ChromaDB version. The only change is swapping to_bytes() / from_bytes() for to_payload() / from_payload() — Qdrant stores structured payload dicts natively alongside the vector. Run this with 20 agents and you see the phase transition: the first few agents find nothing. By agent 8-10, every agent starts getting relevant matches. By agent 20, the routing layer has enough density that every new problem lands near existing outcome packets. This is the QIS cold-start curve described in the cold start article. The shape is the same whether the routing backend is in-memory, ChromaDB, or Qdrant. The curve is an architectural property of the loop, not the storage layer. The production difference with Qdrant: this demo running with 1,000 agents across a 3-node cluster looks exactly the same to the application code. Qdrant handles the sharding, replication, and fault tolerance. The QdrantOutcomeRouter interface is unchanged. In the Docker Compose setup above, replication_factor=2 means every outcome packet is stored on 2 of the 3 nodes. If one node goes down, routing continues without interruption. No coordinator, no election protocol, no downtime window. This is consistent with the QIS architecture principle: there is no single point of failure. The routing layer is a distributed index, not a central aggregator. The agents synthesize locally. The Qdrant cluster stores the packets. Neither is a bottleneck. For a write-heavy deployment (many agents depositing frequently), set replication_factor=1 and use Qdrant's write-ahead log for durability. For read-heavy deployments (many agents querying the same outcome space), replication_factor=3 gives you full read availability on any node failure. Same OutcomePacket. Same synthesize() loop. Same N(N-1)/2 scaling property. Three different transport layers. The next article in this series: implementing the routing layer over a REST API — showing that QIS outcome routing works even when the "transport" is a simple HTTP endpoint with no vector database at all. The quadratic scaling does not require HNSW or Qdrant or ChromaDB. It requires the loop. QIS — Quadratic Intelligence Swarm was discovered by Christopher Thomas Trevethan on June 16, 2025. Covered by 39 provisional patents. The architecture, not any specific transport layer, is the discovery. Full spec: qisprotocol.com | GitHub Templates let you quickly answer FAQs or store snippets for re-use. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse

Command

Copy

$ -weight: 500;">docker run -p 6333:6333 -p 6334:6334 \ -v $(pwd)/qdrant_storage:/qdrant/storage \ qdrant/qdrant -weight: 500;">docker run -p 6333:6333 -p 6334:6334 \ -v $(pwd)/qdrant_storage:/qdrant/storage \ qdrant/qdrant -weight: 500;">docker run -p 6333:6333 -p 6334:6334 \ -v $(pwd)/qdrant_storage:/qdrant/storage \ qdrant/qdrant # -weight: 500;">docker-compose.yml — 3-node Qdrant cluster version: '3.8' services: qdrant_node1: image: qdrant/qdrant ports: ["6333:6333", "6334:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" volumes: ["./storage/node1:/qdrant/storage"] qdrant_node2: image: qdrant/qdrant ports: ["6335:6333", "6336:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" QDRANT__CLUSTER__P2P__URI: "http://qdrant_node1:6335" volumes: ["./storage/node2:/qdrant/storage"] qdrant_node3: image: qdrant/qdrant ports: ["6337:6333", "6338:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" QDRANT__CLUSTER__P2P__URI: "http://qdrant_node1:6335" volumes: ["./storage/node3:/qdrant/storage"] # -weight: 500;">docker-compose.yml — 3-node Qdrant cluster version: '3.8' services: qdrant_node1: image: qdrant/qdrant ports: ["6333:6333", "6334:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" volumes: ["./storage/node1:/qdrant/storage"] qdrant_node2: image: qdrant/qdrant ports: ["6335:6333", "6336:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" QDRANT__CLUSTER__P2P__URI: "http://qdrant_node1:6335" volumes: ["./storage/node2:/qdrant/storage"] qdrant_node3: image: qdrant/qdrant ports: ["6337:6333", "6338:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" QDRANT__CLUSTER__P2P__URI: "http://qdrant_node1:6335" volumes: ["./storage/node3:/qdrant/storage"] # -weight: 500;">docker-compose.yml — 3-node Qdrant cluster version: '3.8' services: qdrant_node1: image: qdrant/qdrant ports: ["6333:6333", "6334:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" volumes: ["./storage/node1:/qdrant/storage"] qdrant_node2: image: qdrant/qdrant ports: ["6335:6333", "6336:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" QDRANT__CLUSTER__P2P__URI: "http://qdrant_node1:6335" volumes: ["./storage/node2:/qdrant/storage"] qdrant_node3: image: qdrant/qdrant ports: ["6337:6333", "6338:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" QDRANT__CLUSTER__P2P__URI: "http://qdrant_node1:6335" volumes: ["./storage/node3:/qdrant/storage"] -weight: 500;">pip -weight: 500;">install qdrant-client sentence-transformers msgpack -weight: 500;">pip -weight: 500;">install qdrant-client sentence-transformers msgpack -weight: 500;">pip -weight: 500;">install qdrant-client sentence-transformers msgpack from dataclasses import dataclass, asdict, field from typing import Optional, List import time import msgpack @dataclass class OutcomePacket: """ Compact record of what an agent learned. Target size: ≤512 bytes serialized. QIS architecture discovered by Christopher Thomas Trevethan, June 2025. Covered by 39 provisional patents. """ problem_description: str # natural language, generates embedding outcome_type: str # "resolved" | "partial" | "contradicted" | "open" confidence: float # 0.0–1.0 delta_summary: str # ≤200 chars: what changed domain_tags: List[str] # for payload filtering timestamp: float = field(default_factory=time.time) agent_id: str = "" ttl: int = 3600 def to_payload(self) -> dict: """Convert to Qdrant payload format.""" return { "problem_description": self.problem_description, "outcome_type": self.outcome_type, "confidence": self.confidence, "delta_summary": self.delta_summary, "domain_tags": self.domain_tags, "timestamp": self.timestamp, "agent_id": self.agent_id, "ttl": self.ttl, } @classmethod def from_payload(cls, payload: dict) -> "OutcomePacket": return cls(**payload) def is_expired(self) -> bool: return (time.time() - self.timestamp) > self.ttl from dataclasses import dataclass, asdict, field from typing import Optional, List import time import msgpack @dataclass class OutcomePacket: """ Compact record of what an agent learned. Target size: ≤512 bytes serialized. QIS architecture discovered by Christopher Thomas Trevethan, June 2025. Covered by 39 provisional patents. """ problem_description: str # natural language, generates embedding outcome_type: str # "resolved" | "partial" | "contradicted" | "open" confidence: float # 0.0–1.0 delta_summary: str # ≤200 chars: what changed domain_tags: List[str] # for payload filtering timestamp: float = field(default_factory=time.time) agent_id: str = "" ttl: int = 3600 def to_payload(self) -> dict: """Convert to Qdrant payload format.""" return { "problem_description": self.problem_description, "outcome_type": self.outcome_type, "confidence": self.confidence, "delta_summary": self.delta_summary, "domain_tags": self.domain_tags, "timestamp": self.timestamp, "agent_id": self.agent_id, "ttl": self.ttl, } @classmethod def from_payload(cls, payload: dict) -> "OutcomePacket": return cls(**payload) def is_expired(self) -> bool: return (time.time() - self.timestamp) > self.ttl from dataclasses import dataclass, asdict, field from typing import Optional, List import time import msgpack @dataclass class OutcomePacket: """ Compact record of what an agent learned. Target size: ≤512 bytes serialized. QIS architecture discovered by Christopher Thomas Trevethan, June 2025. Covered by 39 provisional patents. """ problem_description: str # natural language, generates embedding outcome_type: str # "resolved" | "partial" | "contradicted" | "open" confidence: float # 0.0–1.0 delta_summary: str # ≤200 chars: what changed domain_tags: List[str] # for payload filtering timestamp: float = field(default_factory=time.time) agent_id: str = "" ttl: int = 3600 def to_payload(self) -> dict: """Convert to Qdrant payload format.""" return { "problem_description": self.problem_description, "outcome_type": self.outcome_type, "confidence": self.confidence, "delta_summary": self.delta_summary, "domain_tags": self.domain_tags, "timestamp": self.timestamp, "agent_id": self.agent_id, "ttl": self.ttl, } @classmethod def from_payload(cls, payload: dict) -> "OutcomePacket": return cls(**payload) def is_expired(self) -> bool: return (time.time() - self.timestamp) > self.ttl from qdrant_client import QdrantClient from qdrant_client.models import ( Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue, SearchRequest, ScoredPoint ) from sentence_transformers import SentenceTransformer import uuid import logging from typing import List, Tuple logger = logging.getLogger(__name__) COLLECTION_NAME = "qis_outcome_packets" VECTOR_SIZE = 384 # all-MiniLM-L6-v2 output dimension class QdrantOutcomeRouter: """ Production QIS outcome routing backend using Qdrant. Semantically routes outcome packets from N agents to the agents most likely to benefit from each packet. Intelligence scales as Θ(N²); compute scales as O(log N). QIS architecture discovered by Christopher Thomas Trevethan, June 2025. """ def __init__( self, host: str = "localhost", port: int = 6333, grpc_port: int = 6334, prefer_grpc: bool = True, replication_factor: int = 2, ): self.client = QdrantClient( host=host, port=port, grpc_port=grpc_port, prefer_grpc=prefer_grpc, ) self.model = SentenceTransformer("all-MiniLM-L6-v2") self.replication_factor = replication_factor self._ensure_collection() def _ensure_collection(self): """Create the outcome packet collection if it does not exist.""" existing = [c.name for c in self.client.get_collections().collections] if COLLECTION_NAME not in existing: self.client.create_collection( collection_name=COLLECTION_NAME, vectors_config=VectorParams( size=VECTOR_SIZE, distance=Distance.COSINE, ), # Replication: survive single node failure in a 3-node cluster replication_factor=self.replication_factor, ) logger.info( f"Created Qdrant collection '{COLLECTION_NAME}' " f"(replication_factor={self.replication_factor})" ) def deposit(self, packet: OutcomePacket) -> str: """ Deposit an outcome packet into the routing layer. Returns the point ID assigned by Qdrant. """ if packet.is_expired(): raise ValueError("Cannot deposit an expired packet.") embedding = self.model.encode(packet.problem_description).tolist() point_id = str(uuid.uuid4()) self.client.upsert( collection_name=COLLECTION_NAME, points=[ PointStruct( id=point_id, vector=embedding, payload=packet.to_payload(), ) ], ) logger.debug( f"Deposited packet from {packet.agent_id} | " f"confidence={packet.confidence:.2f} | id={point_id}" ) return point_id def query( self, problem_description: str, top_k: int = 10, domain_filter: Optional[str] = None, min_confidence: float = 0.0, ) -> List[Tuple[OutcomePacket, float]]: """ Query the routing layer for packets relevant to a problem. Returns (packet, score) tuples, highest score first. O(log N) via Qdrant's HNSW index. """ query_vector = self.model.encode(problem_description).tolist() # Build optional payload filter filter_conditions = [] if domain_filter: filter_conditions.append( FieldCondition( key="domain_tags", match=MatchValue(value=domain_filter) ) ) if min_confidence > 0.0: from qdrant_client.models import Range filter_conditions.append( FieldCondition( key="confidence", range=Range(gte=min_confidence) ) ) query_filter = ( Filter(must=filter_conditions) if filter_conditions else None ) results: List[ScoredPoint] = self.client.search( collection_name=COLLECTION_NAME, query_vector=query_vector, limit=top_k, query_filter=query_filter, with_payload=True, ) packets = [] for hit in results: packet = OutcomePacket.from_payload(hit.payload) if not packet.is_expired(): packets.append((packet, hit.score)) return packets def synthesize( self, my_problem: str, my_outcome: OutcomePacket, top_k: int = 10, ) -> List[Tuple[OutcomePacket, float]]: """ Deposit your outcome, then immediately query for related packets. This is the core QIS loop: produce → route → synthesize → produce. """ self.deposit(my_outcome) return self.query(my_problem, top_k=top_k) def collection_stats(self) -> dict: info = self.client.get_collection(COLLECTION_NAME) return { "total_packets": info.points_count, "indexed_packets": info.indexed_vectors_count, "-weight: 500;">status": str(info.-weight: 500;">status), } from qdrant_client import QdrantClient from qdrant_client.models import ( Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue, SearchRequest, ScoredPoint ) from sentence_transformers import SentenceTransformer import uuid import logging from typing import List, Tuple logger = logging.getLogger(__name__) COLLECTION_NAME = "qis_outcome_packets" VECTOR_SIZE = 384 # all-MiniLM-L6-v2 output dimension class QdrantOutcomeRouter: """ Production QIS outcome routing backend using Qdrant. Semantically routes outcome packets from N agents to the agents most likely to benefit from each packet. Intelligence scales as Θ(N²); compute scales as O(log N). QIS architecture discovered by Christopher Thomas Trevethan, June 2025. """ def __init__( self, host: str = "localhost", port: int = 6333, grpc_port: int = 6334, prefer_grpc: bool = True, replication_factor: int = 2, ): self.client = QdrantClient( host=host, port=port, grpc_port=grpc_port, prefer_grpc=prefer_grpc, ) self.model = SentenceTransformer("all-MiniLM-L6-v2") self.replication_factor = replication_factor self._ensure_collection() def _ensure_collection(self): """Create the outcome packet collection if it does not exist.""" existing = [c.name for c in self.client.get_collections().collections] if COLLECTION_NAME not in existing: self.client.create_collection( collection_name=COLLECTION_NAME, vectors_config=VectorParams( size=VECTOR_SIZE, distance=Distance.COSINE, ), # Replication: survive single node failure in a 3-node cluster replication_factor=self.replication_factor, ) logger.info( f"Created Qdrant collection '{COLLECTION_NAME}' " f"(replication_factor={self.replication_factor})" ) def deposit(self, packet: OutcomePacket) -> str: """ Deposit an outcome packet into the routing layer. Returns the point ID assigned by Qdrant. """ if packet.is_expired(): raise ValueError("Cannot deposit an expired packet.") embedding = self.model.encode(packet.problem_description).tolist() point_id = str(uuid.uuid4()) self.client.upsert( collection_name=COLLECTION_NAME, points=[ PointStruct( id=point_id, vector=embedding, payload=packet.to_payload(), ) ], ) logger.debug( f"Deposited packet from {packet.agent_id} | " f"confidence={packet.confidence:.2f} | id={point_id}" ) return point_id def query( self, problem_description: str, top_k: int = 10, domain_filter: Optional[str] = None, min_confidence: float = 0.0, ) -> List[Tuple[OutcomePacket, float]]: """ Query the routing layer for packets relevant to a problem. Returns (packet, score) tuples, highest score first. O(log N) via Qdrant's HNSW index. """ query_vector = self.model.encode(problem_description).tolist() # Build optional payload filter filter_conditions = [] if domain_filter: filter_conditions.append( FieldCondition( key="domain_tags", match=MatchValue(value=domain_filter) ) ) if min_confidence > 0.0: from qdrant_client.models import Range filter_conditions.append( FieldCondition( key="confidence", range=Range(gte=min_confidence) ) ) query_filter = ( Filter(must=filter_conditions) if filter_conditions else None ) results: List[ScoredPoint] = self.client.search( collection_name=COLLECTION_NAME, query_vector=query_vector, limit=top_k, query_filter=query_filter, with_payload=True, ) packets = [] for hit in results: packet = OutcomePacket.from_payload(hit.payload) if not packet.is_expired(): packets.append((packet, hit.score)) return packets def synthesize( self, my_problem: str, my_outcome: OutcomePacket, top_k: int = 10, ) -> List[Tuple[OutcomePacket, float]]: """ Deposit your outcome, then immediately query for related packets. This is the core QIS loop: produce → route → synthesize → produce. """ self.deposit(my_outcome) return self.query(my_problem, top_k=top_k) def collection_stats(self) -> dict: info = self.client.get_collection(COLLECTION_NAME) return { "total_packets": info.points_count, "indexed_packets": info.indexed_vectors_count, "-weight: 500;">status": str(info.-weight: 500;">status), } from qdrant_client import QdrantClient from qdrant_client.models import ( Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue, SearchRequest, ScoredPoint ) from sentence_transformers import SentenceTransformer import uuid import logging from typing import List, Tuple logger = logging.getLogger(__name__) COLLECTION_NAME = "qis_outcome_packets" VECTOR_SIZE = 384 # all-MiniLM-L6-v2 output dimension class QdrantOutcomeRouter: """ Production QIS outcome routing backend using Qdrant. Semantically routes outcome packets from N agents to the agents most likely to benefit from each packet. Intelligence scales as Θ(N²); compute scales as O(log N). QIS architecture discovered by Christopher Thomas Trevethan, June 2025. """ def __init__( self, host: str = "localhost", port: int = 6333, grpc_port: int = 6334, prefer_grpc: bool = True, replication_factor: int = 2, ): self.client = QdrantClient( host=host, port=port, grpc_port=grpc_port, prefer_grpc=prefer_grpc, ) self.model = SentenceTransformer("all-MiniLM-L6-v2") self.replication_factor = replication_factor self._ensure_collection() def _ensure_collection(self): """Create the outcome packet collection if it does not exist.""" existing = [c.name for c in self.client.get_collections().collections] if COLLECTION_NAME not in existing: self.client.create_collection( collection_name=COLLECTION_NAME, vectors_config=VectorParams( size=VECTOR_SIZE, distance=Distance.COSINE, ), # Replication: survive single node failure in a 3-node cluster replication_factor=self.replication_factor, ) logger.info( f"Created Qdrant collection '{COLLECTION_NAME}' " f"(replication_factor={self.replication_factor})" ) def deposit(self, packet: OutcomePacket) -> str: """ Deposit an outcome packet into the routing layer. Returns the point ID assigned by Qdrant. """ if packet.is_expired(): raise ValueError("Cannot deposit an expired packet.") embedding = self.model.encode(packet.problem_description).tolist() point_id = str(uuid.uuid4()) self.client.upsert( collection_name=COLLECTION_NAME, points=[ PointStruct( id=point_id, vector=embedding, payload=packet.to_payload(), ) ], ) logger.debug( f"Deposited packet from {packet.agent_id} | " f"confidence={packet.confidence:.2f} | id={point_id}" ) return point_id def query( self, problem_description: str, top_k: int = 10, domain_filter: Optional[str] = None, min_confidence: float = 0.0, ) -> List[Tuple[OutcomePacket, float]]: """ Query the routing layer for packets relevant to a problem. Returns (packet, score) tuples, highest score first. O(log N) via Qdrant's HNSW index. """ query_vector = self.model.encode(problem_description).tolist() # Build optional payload filter filter_conditions = [] if domain_filter: filter_conditions.append( FieldCondition( key="domain_tags", match=MatchValue(value=domain_filter) ) ) if min_confidence > 0.0: from qdrant_client.models import Range filter_conditions.append( FieldCondition( key="confidence", range=Range(gte=min_confidence) ) ) query_filter = ( Filter(must=filter_conditions) if filter_conditions else None ) results: List[ScoredPoint] = self.client.search( collection_name=COLLECTION_NAME, query_vector=query_vector, limit=top_k, query_filter=query_filter, with_payload=True, ) packets = [] for hit in results: packet = OutcomePacket.from_payload(hit.payload) if not packet.is_expired(): packets.append((packet, hit.score)) return packets def synthesize( self, my_problem: str, my_outcome: OutcomePacket, top_k: int = 10, ) -> List[Tuple[OutcomePacket, float]]: """ Deposit your outcome, then immediately query for related packets. This is the core QIS loop: produce → route → synthesize → produce. """ self.deposit(my_outcome) return self.query(my_problem, top_k=top_k) def collection_stats(self) -> dict: info = self.client.get_collection(COLLECTION_NAME) return { "total_packets": info.points_count, "indexed_packets": info.indexed_vectors_count, "-weight: 500;">status": str(info.-weight: 500;">status), } import time from qis_qdrant_router import QdrantOutcomeRouter, OutcomePacket def simulate_qis_node(agent_id: str, router: QdrantOutcomeRouter): """Simulates a single QIS node: observe → emit → synthesize.""" # --- Agent observes something and distills it --- my_outcome = OutcomePacket( problem_description=( "Transformer attention overhead grows quadratically with " "sequence length — inference latency unacceptable at 8k+ tokens" ), outcome_type="resolved", confidence=0.87, delta_summary="Flash attention + chunked prefill cut p99 latency 62%", domain_tags=["llm-inference", "transformer", "production"], agent_id=agent_id, ) # --- Deposit and query in one call (the QIS loop) --- related = router.synthesize( my_problem=my_outcome.problem_description, my_outcome=my_outcome, top_k=5, ) print(f"\n[{agent_id}] Deposited. Found {len(related)} related packets:") for packet, score in related: if packet.agent_id != agent_id: # skip own packet print(f" score={score:.3f} | {packet.agent_id}: {packet.delta_summary}") if __name__ == "__main__": router = QdrantOutcomeRouter(host="localhost", prefer_grpc=True) # Simulate N agents depositing and querying agents = [f"agent_{i:03d}" for i in range(20)] for agent_id in agents: simulate_qis_node(agent_id, router) time.sleep(0.05) # slight stagger for demo clarity print("\n--- Collection Stats ---") print(router.collection_stats()) import time from qis_qdrant_router import QdrantOutcomeRouter, OutcomePacket def simulate_qis_node(agent_id: str, router: QdrantOutcomeRouter): """Simulates a single QIS node: observe → emit → synthesize.""" # --- Agent observes something and distills it --- my_outcome = OutcomePacket( problem_description=( "Transformer attention overhead grows quadratically with " "sequence length — inference latency unacceptable at 8k+ tokens" ), outcome_type="resolved", confidence=0.87, delta_summary="Flash attention + chunked prefill cut p99 latency 62%", domain_tags=["llm-inference", "transformer", "production"], agent_id=agent_id, ) # --- Deposit and query in one call (the QIS loop) --- related = router.synthesize( my_problem=my_outcome.problem_description, my_outcome=my_outcome, top_k=5, ) print(f"\n[{agent_id}] Deposited. Found {len(related)} related packets:") for packet, score in related: if packet.agent_id != agent_id: # skip own packet print(f" score={score:.3f} | {packet.agent_id}: {packet.delta_summary}") if __name__ == "__main__": router = QdrantOutcomeRouter(host="localhost", prefer_grpc=True) # Simulate N agents depositing and querying agents = [f"agent_{i:03d}" for i in range(20)] for agent_id in agents: simulate_qis_node(agent_id, router) time.sleep(0.05) # slight stagger for demo clarity print("\n--- Collection Stats ---") print(router.collection_stats()) import time from qis_qdrant_router import QdrantOutcomeRouter, OutcomePacket def simulate_qis_node(agent_id: str, router: QdrantOutcomeRouter): """Simulates a single QIS node: observe → emit → synthesize.""" # --- Agent observes something and distills it --- my_outcome = OutcomePacket( problem_description=( "Transformer attention overhead grows quadratically with " "sequence length — inference latency unacceptable at 8k+ tokens" ), outcome_type="resolved", confidence=0.87, delta_summary="Flash attention + chunked prefill cut p99 latency 62%", domain_tags=["llm-inference", "transformer", "production"], agent_id=agent_id, ) # --- Deposit and query in one call (the QIS loop) --- related = router.synthesize( my_problem=my_outcome.problem_description, my_outcome=my_outcome, top_k=5, ) print(f"\n[{agent_id}] Deposited. Found {len(related)} related packets:") for packet, score in related: if packet.agent_id != agent_id: # skip own packet print(f" score={score:.3f} | {packet.agent_id}: {packet.delta_summary}") if __name__ == "__main__": router = QdrantOutcomeRouter(host="localhost", prefer_grpc=True) # Simulate N agents depositing and querying agents = [f"agent_{i:03d}" for i in range(20)] for agent_id in agents: simulate_qis_node(agent_id, router) time.sleep(0.05) # slight stagger for demo clarity print("\n--- Collection Stats ---") print(router.collection_stats())