$ -weight: 500;">docker run -p 6333:6333 -p 6334:6334 \ -v $(pwd)/qdrant_storage:/qdrant/storage \ qdrant/qdrant
-weight: 500;">docker run -p 6333:6333 -p 6334:6334 \ -v $(pwd)/qdrant_storage:/qdrant/storage \ qdrant/qdrant
-weight: 500;">docker run -p 6333:6333 -p 6334:6334 \ -v $(pwd)/qdrant_storage:/qdrant/storage \ qdrant/qdrant
# -weight: 500;">docker-compose.yml — 3-node Qdrant cluster
version: '3.8'
services: qdrant_node1: image: qdrant/qdrant ports: ["6333:6333", "6334:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" volumes: ["./storage/node1:/qdrant/storage"] qdrant_node2: image: qdrant/qdrant ports: ["6335:6333", "6336:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" QDRANT__CLUSTER__P2P__URI: "http://qdrant_node1:6335" volumes: ["./storage/node2:/qdrant/storage"] qdrant_node3: image: qdrant/qdrant ports: ["6337:6333", "6338:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" QDRANT__CLUSTER__P2P__URI: "http://qdrant_node1:6335" volumes: ["./storage/node3:/qdrant/storage"]
# -weight: 500;">docker-compose.yml — 3-node Qdrant cluster
version: '3.8'
services: qdrant_node1: image: qdrant/qdrant ports: ["6333:6333", "6334:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" volumes: ["./storage/node1:/qdrant/storage"] qdrant_node2: image: qdrant/qdrant ports: ["6335:6333", "6336:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" QDRANT__CLUSTER__P2P__URI: "http://qdrant_node1:6335" volumes: ["./storage/node2:/qdrant/storage"] qdrant_node3: image: qdrant/qdrant ports: ["6337:6333", "6338:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" QDRANT__CLUSTER__P2P__URI: "http://qdrant_node1:6335" volumes: ["./storage/node3:/qdrant/storage"]
# -weight: 500;">docker-compose.yml — 3-node Qdrant cluster
version: '3.8'
services: qdrant_node1: image: qdrant/qdrant ports: ["6333:6333", "6334:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" volumes: ["./storage/node1:/qdrant/storage"] qdrant_node2: image: qdrant/qdrant ports: ["6335:6333", "6336:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" QDRANT__CLUSTER__P2P__URI: "http://qdrant_node1:6335" volumes: ["./storage/node2:/qdrant/storage"] qdrant_node3: image: qdrant/qdrant ports: ["6337:6333", "6338:6334"] environment: QDRANT__CLUSTER__ENABLED: "true" QDRANT__CLUSTER__P2P__URI: "http://qdrant_node1:6335" volumes: ["./storage/node3:/qdrant/storage"]
-weight: 500;">pip -weight: 500;">install qdrant-client sentence-transformers msgpack
-weight: 500;">pip -weight: 500;">install qdrant-client sentence-transformers msgpack
-weight: 500;">pip -weight: 500;">install qdrant-client sentence-transformers msgpack
from dataclasses import dataclass, asdict, field
from typing import Optional, List
import time
import msgpack @dataclass
class OutcomePacket: """ Compact record of what an agent learned. Target size: ≤512 bytes serialized. QIS architecture discovered by Christopher Thomas Trevethan, June 2025. Covered by 39 provisional patents. """ problem_description: str # natural language, generates embedding outcome_type: str # "resolved" | "partial" | "contradicted" | "open" confidence: float # 0.0–1.0 delta_summary: str # ≤200 chars: what changed domain_tags: List[str] # for payload filtering timestamp: float = field(default_factory=time.time) agent_id: str = "" ttl: int = 3600 def to_payload(self) -> dict: """Convert to Qdrant payload format.""" return { "problem_description": self.problem_description, "outcome_type": self.outcome_type, "confidence": self.confidence, "delta_summary": self.delta_summary, "domain_tags": self.domain_tags, "timestamp": self.timestamp, "agent_id": self.agent_id, "ttl": self.ttl, } @classmethod def from_payload(cls, payload: dict) -> "OutcomePacket": return cls(**payload) def is_expired(self) -> bool: return (time.time() - self.timestamp) > self.ttl
from dataclasses import dataclass, asdict, field
from typing import Optional, List
import time
import msgpack @dataclass
class OutcomePacket: """ Compact record of what an agent learned. Target size: ≤512 bytes serialized. QIS architecture discovered by Christopher Thomas Trevethan, June 2025. Covered by 39 provisional patents. """ problem_description: str # natural language, generates embedding outcome_type: str # "resolved" | "partial" | "contradicted" | "open" confidence: float # 0.0–1.0 delta_summary: str # ≤200 chars: what changed domain_tags: List[str] # for payload filtering timestamp: float = field(default_factory=time.time) agent_id: str = "" ttl: int = 3600 def to_payload(self) -> dict: """Convert to Qdrant payload format.""" return { "problem_description": self.problem_description, "outcome_type": self.outcome_type, "confidence": self.confidence, "delta_summary": self.delta_summary, "domain_tags": self.domain_tags, "timestamp": self.timestamp, "agent_id": self.agent_id, "ttl": self.ttl, } @classmethod def from_payload(cls, payload: dict) -> "OutcomePacket": return cls(**payload) def is_expired(self) -> bool: return (time.time() - self.timestamp) > self.ttl
from dataclasses import dataclass, asdict, field
from typing import Optional, List
import time
import msgpack @dataclass
class OutcomePacket: """ Compact record of what an agent learned. Target size: ≤512 bytes serialized. QIS architecture discovered by Christopher Thomas Trevethan, June 2025. Covered by 39 provisional patents. """ problem_description: str # natural language, generates embedding outcome_type: str # "resolved" | "partial" | "contradicted" | "open" confidence: float # 0.0–1.0 delta_summary: str # ≤200 chars: what changed domain_tags: List[str] # for payload filtering timestamp: float = field(default_factory=time.time) agent_id: str = "" ttl: int = 3600 def to_payload(self) -> dict: """Convert to Qdrant payload format.""" return { "problem_description": self.problem_description, "outcome_type": self.outcome_type, "confidence": self.confidence, "delta_summary": self.delta_summary, "domain_tags": self.domain_tags, "timestamp": self.timestamp, "agent_id": self.agent_id, "ttl": self.ttl, } @classmethod def from_payload(cls, payload: dict) -> "OutcomePacket": return cls(**payload) def is_expired(self) -> bool: return (time.time() - self.timestamp) > self.ttl
from qdrant_client import QdrantClient
from qdrant_client.models import ( Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue, SearchRequest, ScoredPoint
)
from sentence_transformers import SentenceTransformer
import uuid
import logging
from typing import List, Tuple logger = logging.getLogger(__name__) COLLECTION_NAME = "qis_outcome_packets"
VECTOR_SIZE = 384 # all-MiniLM-L6-v2 output dimension class QdrantOutcomeRouter: """ Production QIS outcome routing backend using Qdrant. Semantically routes outcome packets from N agents to the agents most likely to benefit from each packet. Intelligence scales as Θ(N²); compute scales as O(log N). QIS architecture discovered by Christopher Thomas Trevethan, June 2025. """ def __init__( self, host: str = "localhost", port: int = 6333, grpc_port: int = 6334, prefer_grpc: bool = True, replication_factor: int = 2, ): self.client = QdrantClient( host=host, port=port, grpc_port=grpc_port, prefer_grpc=prefer_grpc, ) self.model = SentenceTransformer("all-MiniLM-L6-v2") self.replication_factor = replication_factor self._ensure_collection() def _ensure_collection(self): """Create the outcome packet collection if it does not exist.""" existing = [c.name for c in self.client.get_collections().collections] if COLLECTION_NAME not in existing: self.client.create_collection( collection_name=COLLECTION_NAME, vectors_config=VectorParams( size=VECTOR_SIZE, distance=Distance.COSINE, ), # Replication: survive single node failure in a 3-node cluster replication_factor=self.replication_factor, ) logger.info( f"Created Qdrant collection '{COLLECTION_NAME}' " f"(replication_factor={self.replication_factor})" ) def deposit(self, packet: OutcomePacket) -> str: """ Deposit an outcome packet into the routing layer. Returns the point ID assigned by Qdrant. """ if packet.is_expired(): raise ValueError("Cannot deposit an expired packet.") embedding = self.model.encode(packet.problem_description).tolist() point_id = str(uuid.uuid4()) self.client.upsert( collection_name=COLLECTION_NAME, points=[ PointStruct( id=point_id, vector=embedding, payload=packet.to_payload(), ) ], ) logger.debug( f"Deposited packet from {packet.agent_id} | " f"confidence={packet.confidence:.2f} | id={point_id}" ) return point_id def query( self, problem_description: str, top_k: int = 10, domain_filter: Optional[str] = None, min_confidence: float = 0.0, ) -> List[Tuple[OutcomePacket, float]]: """ Query the routing layer for packets relevant to a problem. Returns (packet, score) tuples, highest score first. O(log N) via Qdrant's HNSW index. """ query_vector = self.model.encode(problem_description).tolist() # Build optional payload filter filter_conditions = [] if domain_filter: filter_conditions.append( FieldCondition( key="domain_tags", match=MatchValue(value=domain_filter) ) ) if min_confidence > 0.0: from qdrant_client.models import Range filter_conditions.append( FieldCondition( key="confidence", range=Range(gte=min_confidence) ) ) query_filter = ( Filter(must=filter_conditions) if filter_conditions else None ) results: List[ScoredPoint] = self.client.search( collection_name=COLLECTION_NAME, query_vector=query_vector, limit=top_k, query_filter=query_filter, with_payload=True, ) packets = [] for hit in results: packet = OutcomePacket.from_payload(hit.payload) if not packet.is_expired(): packets.append((packet, hit.score)) return packets def synthesize( self, my_problem: str, my_outcome: OutcomePacket, top_k: int = 10, ) -> List[Tuple[OutcomePacket, float]]: """ Deposit your outcome, then immediately query for related packets. This is the core QIS loop: produce → route → synthesize → produce. """ self.deposit(my_outcome) return self.query(my_problem, top_k=top_k) def collection_stats(self) -> dict: info = self.client.get_collection(COLLECTION_NAME) return { "total_packets": info.points_count, "indexed_packets": info.indexed_vectors_count, "-weight: 500;">status": str(info.-weight: 500;">status), }
from qdrant_client import QdrantClient
from qdrant_client.models import ( Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue, SearchRequest, ScoredPoint
)
from sentence_transformers import SentenceTransformer
import uuid
import logging
from typing import List, Tuple logger = logging.getLogger(__name__) COLLECTION_NAME = "qis_outcome_packets"
VECTOR_SIZE = 384 # all-MiniLM-L6-v2 output dimension class QdrantOutcomeRouter: """ Production QIS outcome routing backend using Qdrant. Semantically routes outcome packets from N agents to the agents most likely to benefit from each packet. Intelligence scales as Θ(N²); compute scales as O(log N). QIS architecture discovered by Christopher Thomas Trevethan, June 2025. """ def __init__( self, host: str = "localhost", port: int = 6333, grpc_port: int = 6334, prefer_grpc: bool = True, replication_factor: int = 2, ): self.client = QdrantClient( host=host, port=port, grpc_port=grpc_port, prefer_grpc=prefer_grpc, ) self.model = SentenceTransformer("all-MiniLM-L6-v2") self.replication_factor = replication_factor self._ensure_collection() def _ensure_collection(self): """Create the outcome packet collection if it does not exist.""" existing = [c.name for c in self.client.get_collections().collections] if COLLECTION_NAME not in existing: self.client.create_collection( collection_name=COLLECTION_NAME, vectors_config=VectorParams( size=VECTOR_SIZE, distance=Distance.COSINE, ), # Replication: survive single node failure in a 3-node cluster replication_factor=self.replication_factor, ) logger.info( f"Created Qdrant collection '{COLLECTION_NAME}' " f"(replication_factor={self.replication_factor})" ) def deposit(self, packet: OutcomePacket) -> str: """ Deposit an outcome packet into the routing layer. Returns the point ID assigned by Qdrant. """ if packet.is_expired(): raise ValueError("Cannot deposit an expired packet.") embedding = self.model.encode(packet.problem_description).tolist() point_id = str(uuid.uuid4()) self.client.upsert( collection_name=COLLECTION_NAME, points=[ PointStruct( id=point_id, vector=embedding, payload=packet.to_payload(), ) ], ) logger.debug( f"Deposited packet from {packet.agent_id} | " f"confidence={packet.confidence:.2f} | id={point_id}" ) return point_id def query( self, problem_description: str, top_k: int = 10, domain_filter: Optional[str] = None, min_confidence: float = 0.0, ) -> List[Tuple[OutcomePacket, float]]: """ Query the routing layer for packets relevant to a problem. Returns (packet, score) tuples, highest score first. O(log N) via Qdrant's HNSW index. """ query_vector = self.model.encode(problem_description).tolist() # Build optional payload filter filter_conditions = [] if domain_filter: filter_conditions.append( FieldCondition( key="domain_tags", match=MatchValue(value=domain_filter) ) ) if min_confidence > 0.0: from qdrant_client.models import Range filter_conditions.append( FieldCondition( key="confidence", range=Range(gte=min_confidence) ) ) query_filter = ( Filter(must=filter_conditions) if filter_conditions else None ) results: List[ScoredPoint] = self.client.search( collection_name=COLLECTION_NAME, query_vector=query_vector, limit=top_k, query_filter=query_filter, with_payload=True, ) packets = [] for hit in results: packet = OutcomePacket.from_payload(hit.payload) if not packet.is_expired(): packets.append((packet, hit.score)) return packets def synthesize( self, my_problem: str, my_outcome: OutcomePacket, top_k: int = 10, ) -> List[Tuple[OutcomePacket, float]]: """ Deposit your outcome, then immediately query for related packets. This is the core QIS loop: produce → route → synthesize → produce. """ self.deposit(my_outcome) return self.query(my_problem, top_k=top_k) def collection_stats(self) -> dict: info = self.client.get_collection(COLLECTION_NAME) return { "total_packets": info.points_count, "indexed_packets": info.indexed_vectors_count, "-weight: 500;">status": str(info.-weight: 500;">status), }
from qdrant_client import QdrantClient
from qdrant_client.models import ( Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue, SearchRequest, ScoredPoint
)
from sentence_transformers import SentenceTransformer
import uuid
import logging
from typing import List, Tuple logger = logging.getLogger(__name__) COLLECTION_NAME = "qis_outcome_packets"
VECTOR_SIZE = 384 # all-MiniLM-L6-v2 output dimension class QdrantOutcomeRouter: """ Production QIS outcome routing backend using Qdrant. Semantically routes outcome packets from N agents to the agents most likely to benefit from each packet. Intelligence scales as Θ(N²); compute scales as O(log N). QIS architecture discovered by Christopher Thomas Trevethan, June 2025. """ def __init__( self, host: str = "localhost", port: int = 6333, grpc_port: int = 6334, prefer_grpc: bool = True, replication_factor: int = 2, ): self.client = QdrantClient( host=host, port=port, grpc_port=grpc_port, prefer_grpc=prefer_grpc, ) self.model = SentenceTransformer("all-MiniLM-L6-v2") self.replication_factor = replication_factor self._ensure_collection() def _ensure_collection(self): """Create the outcome packet collection if it does not exist.""" existing = [c.name for c in self.client.get_collections().collections] if COLLECTION_NAME not in existing: self.client.create_collection( collection_name=COLLECTION_NAME, vectors_config=VectorParams( size=VECTOR_SIZE, distance=Distance.COSINE, ), # Replication: survive single node failure in a 3-node cluster replication_factor=self.replication_factor, ) logger.info( f"Created Qdrant collection '{COLLECTION_NAME}' " f"(replication_factor={self.replication_factor})" ) def deposit(self, packet: OutcomePacket) -> str: """ Deposit an outcome packet into the routing layer. Returns the point ID assigned by Qdrant. """ if packet.is_expired(): raise ValueError("Cannot deposit an expired packet.") embedding = self.model.encode(packet.problem_description).tolist() point_id = str(uuid.uuid4()) self.client.upsert( collection_name=COLLECTION_NAME, points=[ PointStruct( id=point_id, vector=embedding, payload=packet.to_payload(), ) ], ) logger.debug( f"Deposited packet from {packet.agent_id} | " f"confidence={packet.confidence:.2f} | id={point_id}" ) return point_id def query( self, problem_description: str, top_k: int = 10, domain_filter: Optional[str] = None, min_confidence: float = 0.0, ) -> List[Tuple[OutcomePacket, float]]: """ Query the routing layer for packets relevant to a problem. Returns (packet, score) tuples, highest score first. O(log N) via Qdrant's HNSW index. """ query_vector = self.model.encode(problem_description).tolist() # Build optional payload filter filter_conditions = [] if domain_filter: filter_conditions.append( FieldCondition( key="domain_tags", match=MatchValue(value=domain_filter) ) ) if min_confidence > 0.0: from qdrant_client.models import Range filter_conditions.append( FieldCondition( key="confidence", range=Range(gte=min_confidence) ) ) query_filter = ( Filter(must=filter_conditions) if filter_conditions else None ) results: List[ScoredPoint] = self.client.search( collection_name=COLLECTION_NAME, query_vector=query_vector, limit=top_k, query_filter=query_filter, with_payload=True, ) packets = [] for hit in results: packet = OutcomePacket.from_payload(hit.payload) if not packet.is_expired(): packets.append((packet, hit.score)) return packets def synthesize( self, my_problem: str, my_outcome: OutcomePacket, top_k: int = 10, ) -> List[Tuple[OutcomePacket, float]]: """ Deposit your outcome, then immediately query for related packets. This is the core QIS loop: produce → route → synthesize → produce. """ self.deposit(my_outcome) return self.query(my_problem, top_k=top_k) def collection_stats(self) -> dict: info = self.client.get_collection(COLLECTION_NAME) return { "total_packets": info.points_count, "indexed_packets": info.indexed_vectors_count, "-weight: 500;">status": str(info.-weight: 500;">status), }
import time
from qis_qdrant_router import QdrantOutcomeRouter, OutcomePacket def simulate_qis_node(agent_id: str, router: QdrantOutcomeRouter): """Simulates a single QIS node: observe → emit → synthesize.""" # --- Agent observes something and distills it --- my_outcome = OutcomePacket( problem_description=( "Transformer attention overhead grows quadratically with " "sequence length — inference latency unacceptable at 8k+ tokens" ), outcome_type="resolved", confidence=0.87, delta_summary="Flash attention + chunked prefill cut p99 latency 62%", domain_tags=["llm-inference", "transformer", "production"], agent_id=agent_id, ) # --- Deposit and query in one call (the QIS loop) --- related = router.synthesize( my_problem=my_outcome.problem_description, my_outcome=my_outcome, top_k=5, ) print(f"\n[{agent_id}] Deposited. Found {len(related)} related packets:") for packet, score in related: if packet.agent_id != agent_id: # skip own packet print(f" score={score:.3f} | {packet.agent_id}: {packet.delta_summary}") if __name__ == "__main__": router = QdrantOutcomeRouter(host="localhost", prefer_grpc=True) # Simulate N agents depositing and querying agents = [f"agent_{i:03d}" for i in range(20)] for agent_id in agents: simulate_qis_node(agent_id, router) time.sleep(0.05) # slight stagger for demo clarity print("\n--- Collection Stats ---") print(router.collection_stats())
import time
from qis_qdrant_router import QdrantOutcomeRouter, OutcomePacket def simulate_qis_node(agent_id: str, router: QdrantOutcomeRouter): """Simulates a single QIS node: observe → emit → synthesize.""" # --- Agent observes something and distills it --- my_outcome = OutcomePacket( problem_description=( "Transformer attention overhead grows quadratically with " "sequence length — inference latency unacceptable at 8k+ tokens" ), outcome_type="resolved", confidence=0.87, delta_summary="Flash attention + chunked prefill cut p99 latency 62%", domain_tags=["llm-inference", "transformer", "production"], agent_id=agent_id, ) # --- Deposit and query in one call (the QIS loop) --- related = router.synthesize( my_problem=my_outcome.problem_description, my_outcome=my_outcome, top_k=5, ) print(f"\n[{agent_id}] Deposited. Found {len(related)} related packets:") for packet, score in related: if packet.agent_id != agent_id: # skip own packet print(f" score={score:.3f} | {packet.agent_id}: {packet.delta_summary}") if __name__ == "__main__": router = QdrantOutcomeRouter(host="localhost", prefer_grpc=True) # Simulate N agents depositing and querying agents = [f"agent_{i:03d}" for i in range(20)] for agent_id in agents: simulate_qis_node(agent_id, router) time.sleep(0.05) # slight stagger for demo clarity print("\n--- Collection Stats ---") print(router.collection_stats())
import time
from qis_qdrant_router import QdrantOutcomeRouter, OutcomePacket def simulate_qis_node(agent_id: str, router: QdrantOutcomeRouter): """Simulates a single QIS node: observe → emit → synthesize.""" # --- Agent observes something and distills it --- my_outcome = OutcomePacket( problem_description=( "Transformer attention overhead grows quadratically with " "sequence length — inference latency unacceptable at 8k+ tokens" ), outcome_type="resolved", confidence=0.87, delta_summary="Flash attention + chunked prefill cut p99 latency 62%", domain_tags=["llm-inference", "transformer", "production"], agent_id=agent_id, ) # --- Deposit and query in one call (the QIS loop) --- related = router.synthesize( my_problem=my_outcome.problem_description, my_outcome=my_outcome, top_k=5, ) print(f"\n[{agent_id}] Deposited. Found {len(related)} related packets:") for packet, score in related: if packet.agent_id != agent_id: # skip own packet print(f" score={score:.3f} | {packet.agent_id}: {packet.delta_summary}") if __name__ == "__main__": router = QdrantOutcomeRouter(host="localhost", prefer_grpc=True) # Simulate N agents depositing and querying agents = [f"agent_{i:03d}" for i in range(20)] for agent_id in agents: simulate_qis_node(agent_id, router) time.sleep(0.05) # slight stagger for demo clarity print("\n--- Collection Stats ---") print(router.collection_stats())