Tools: Engineering the Nexus Release: How I Built Secure E2EE Network Sync into a Linux Clipboard Manager (v1.5.1) (2026)

Tools: Engineering the Nexus Release: How I Built Secure E2EE Network Sync into a Linux Clipboard Manager (v1.5.1) (2026)

A deep dive into the architecture behind DotGhostBoard v1.5.0 — zero-config mDNS discovery, X25519 ECDH pairing, AES-256-GCM sync, rate limiting, and PyQt6 threading — all without a central server.

Why This Was Hard

Layer 1 — Zero-Config Device Discovery (mDNS)

Layer 2 — Secure Device Pairing (X25519 + PBKDF2 + AES-GCM)

Layer 3 — The Local REST API, Rate Limiting & Thread-Safe UI

The Full Sync Flow — End to End

Securing the Build: GPG-Signed Releases

Lessons Learned

What's Next — v2.0.0 Cerberus DotGhostBoard is a privacy-first clipboard manager for Linux, built under the DotSuite umbrella. No telemetry. No Electron. No cloud. Pure PyQt6 + SQLite. 📦 GitHub Release v1.5.1 · 🖥️ OpenDesktop Building a clipboard manager is trivial. Building one that syncs securely across devices on a local network without a central server, without trusting the network, and without ever sending data to the cloud — that's a different problem. With v1.5.0 (Nexus), I rebuilt the core architecture from scratch to solve exactly that. Here's what the final system looks like before we dive into each layer: Three layers. Each one independently secure. Let's break them down. The first UX problem: how do devices find each other without the user typing an IP address? Answer: mDNS via zeroconf. Every device broadcasts itself on the LAN under a custom service type _dotghost._tcp.local.. Other instances listen and populate the UI automatically. Because the app runs on PyQt6, the discovery engine lives in its own QThread — blocking network I/O never touches the main thread: Why QThread and not threading.Thread?

peer_found and peer_lost are pyqtSignals. They cross the thread boundary safely into the main UI thread via Qt's queued connection mechanism. Using a raw Python thread here would cause a race condition against the UI. Finding a peer is one thing. Trusting it is another. A local network isn't inherently safe — public Wi-Fi, ARP spoofing, a compromised router. The pairing protocol defends against all of it with a three-phase handshake: The PIN is a 6-digit out-of-band value shown on both screens — a human-verified channel that breaks any MITM attempt. Even if an attacker intercepts the traffic, they can't decrypt the public keys without the PIN. Why X25519 over RSA or classic ECDH on P-256?X25519 is faster, has a smaller key size (32 bytes), is immune to invalid-curve attacks by design, and is the default in TLS 1.3. It's the right choice for a constrained local protocol. Once the handshake completes, the shared secret is stored in ghost.db and the ephemeral private keys are immediately garbage-collected. With discovery and pairing solved, the actual sync transport is a minimal HTTPServer running in a background thread. It's bound to 0.0.0.0 but protected by two hard gates: Gate 1 — Peer Identity: every /api/sync request must carry a node_id that maps to a stored trusted peer. Unknown nodes get a 403 immediately. Gate 2 — E2EE Payload: even if someone spoofs a node_id, they can't forge a valid AES-GCM ciphertext without the shared secret. Wrong key = InvalidTag exception = instant drop. Why HTTPServer over WebSockets or raw TCP?HTTP gives request/response semantics for free, works through most firewalls, and is trivially testable with curl. The overhead is negligible for clipboard payloads. When v3.x arrives, the transport will be upgraded to WebRTC for true NAT-piercing P2P. Here's what happens when you copy something on Device A and it appears on Device B: Zero plaintext on the wire. Zero server in the middle. Zero cloud. A secure app with an unsigned binary is still a supply chain risk. Every release artifact — both the .AppImage and the .deb — is GPG-signed in CI and verified before upload. Users can verify any release with: mDNS is fragile on some Linux setups. If avahi-daemon is running and competing for port 5353, zeroconf will fail silently. Detect the conflict early and surface it in the UI — don't leave the user with an empty peers list and no explanation. PyInstaller and cryptography need explicit hidden imports. The cryptography package uses dynamic loading for its backend. Without --hidden-import cryptography.hazmat.primitives.asymmetric.x25519 and the aead module, the AppImage crashes at runtime with a clean ImportError that's impossible to debug without knowing where to look. dpkg-sig hangs in CI without --pinentry-mode loopback. It silently waits for a terminal that doesn't exist. Always pass the full GPG options explicitly in non-interactive environments. Rate limiting shared state needs a lock. The sliding window dict is accessed from multiple HTTP handler threads simultaneously. Without a threading.Lock, you get a race condition under concurrent pairing attempts that's near-impossible to reproduce locally. The next release is Cerberus — a Zero-Knowledge Password Vault. The AES-256 infrastructure from v1.4.0 (Eclipse) already lays the foundation. What's coming on top: The core design decision in Cerberus: detection happens at the shape of a string, not its meaning. A 1500-word article that mentions "password" doesn't trigger anything. A 40-character base64 string with high Shannon entropy does. If you find a security issue, please reach out directly before opening a public issue. ⚠️ A Note on the current DEB Release (v1.5.1)

Being transparent with the community is a core value of DotSuite. Known Issue: In the current .deb release, some users might notice the UI defaulting to Light Mode on specific GTK-based distros (like Kali). Additionally, the update helper might trigger a Polkit permission error due to setuid restrictions in the /tmp/ directory. The Fix: I am already working on v1.5.2, which migrates the update path to ~/.local/state/ and forces the Fusion style engine to ensure a consistent Dark Mode experience. The fix will be live tomorrow. Stay tuned, and thanks for the support! DotSuite — built for the shadows 👻 Templates let you quickly answer FAQs or store snippets for re-use. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse

Code Block

Copy

┌─────────────────────────────────────────────────────────────┐ │ LOCAL NETWORK (LAN) │ │ │ │ ┌──────────────┐ mDNS Discovery ┌──────────────┐ │ │ │ Device A │ ◄──────────────────► │ Device B │ │ │ │ (Arch) │ │ (Kali) │ │ │ │ │ X25519 Handshake │ │ │ │ │ ghostboard │ ──── PIN + ECDH ───► │ ghostboard │ │ │ │ │ │ │ │ │ │ HTTPServer │ ◄── AES-256-GCM ──── │ HTTPServer │ │ │ │ :PORT │ /api/sync E2EE │ :PORT │ │ │ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ ghost.db ghost.db │ │ (trusted_peers) (trusted_peers) │ └─────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────┐ │ LOCAL NETWORK (LAN) │ │ │ │ ┌──────────────┐ mDNS Discovery ┌──────────────┐ │ │ │ Device A │ ◄──────────────────► │ Device B │ │ │ │ (Arch) │ │ (Kali) │ │ │ │ │ X25519 Handshake │ │ │ │ │ ghostboard │ ──── PIN + ECDH ───► │ ghostboard │ │ │ │ │ │ │ │ │ │ HTTPServer │ ◄── AES-256-GCM ──── │ HTTPServer │ │ │ │ :PORT │ /api/sync E2EE │ :PORT │ │ │ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ ghost.db ghost.db │ │ (trusted_peers) (trusted_peers) │ └─────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────┐ │ LOCAL NETWORK (LAN) │ │ │ │ ┌──────────────┐ mDNS Discovery ┌──────────────┐ │ │ │ Device A │ ◄──────────────────► │ Device B │ │ │ │ (Arch) │ │ (Kali) │ │ │ │ │ X25519 Handshake │ │ │ │ │ ghostboard │ ──── PIN + ECDH ───► │ ghostboard │ │ │ │ │ │ │ │ │ │ HTTPServer │ ◄── AES-256-GCM ──── │ HTTPServer │ │ │ │ :PORT │ /api/sync E2EE │ :PORT │ │ │ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ ghost.db ghost.db │ │ (trusted_peers) (trusted_peers) │ └─────────────────────────────────────────────────────────────┘ # core/network_discovery.py import socket from zeroconf import ServiceBrowser, Zeroconf, ServiceInfo, IPVersion from PyQt6.QtCore import pyqtSignal, QThread _SERVICE_TYPE = "_dotghost._tcp.local." class DotGhostDiscovery(QThread): peer_found = pyqtSignal(str, str, str, int) # node_id, name, ip, port peer_lost = pyqtSignal(str) # node_id def __init__(self, node_id: str, device_name: str, port: int): super().__init__() self.node_id = node_id self.device_name = device_name self.port = port self.zeroconf = None def run(self): self.zeroconf = Zeroconf(ip_version=IPVersion.V4Only) properties = { b'node_id': self.node_id.encode('utf-8'), b'device_name': self.device_name.encode('utf-8'), b'version': b'1', } instance_name = f"{self.node_id}.{_SERVICE_TYPE}" self.info = ServiceInfo( type_=_SERVICE_TYPE, name=instance_name, addresses=[socket.inet_aton(get_local_ip())], port=self.port, properties=properties, server=f"{self.node_id}.local." ) self.zeroconf.register_service(self.info) self.browser = ServiceBrowser(self.zeroconf, _SERVICE_TYPE, self) self.exec() # Qt event loop keeps the thread alive # ── zeroconf callbacks ────────────────────────────────────── def add_service(self, zc: Zeroconf, type_: str, name: str): info = zc.get_service_info(type_, name) if not info: return props = info.properties node_id = props.get(b'node_id', b'').decode() dev_name = props.get(b'device_name', b'Unknown').decode() if node_id == self.node_id: # skip self return ip = socket.inet_ntoa(info.addresses[0]) self.peer_found.emit(node_id, dev_name, ip, info.port) def remove_service(self, zc: Zeroconf, type_: str, name: str): node_id = name.replace(f".{_SERVICE_TYPE}", "") self.peer_lost.emit(node_id) def stop(self): if self.zeroconf: self.zeroconf.unregister_service(self.info) self.zeroconf.close() self.quit() # core/network_discovery.py import socket from zeroconf import ServiceBrowser, Zeroconf, ServiceInfo, IPVersion from PyQt6.QtCore import pyqtSignal, QThread _SERVICE_TYPE = "_dotghost._tcp.local." class DotGhostDiscovery(QThread): peer_found = pyqtSignal(str, str, str, int) # node_id, name, ip, port peer_lost = pyqtSignal(str) # node_id def __init__(self, node_id: str, device_name: str, port: int): super().__init__() self.node_id = node_id self.device_name = device_name self.port = port self.zeroconf = None def run(self): self.zeroconf = Zeroconf(ip_version=IPVersion.V4Only) properties = { b'node_id': self.node_id.encode('utf-8'), b'device_name': self.device_name.encode('utf-8'), b'version': b'1', } instance_name = f"{self.node_id}.{_SERVICE_TYPE}" self.info = ServiceInfo( type_=_SERVICE_TYPE, name=instance_name, addresses=[socket.inet_aton(get_local_ip())], port=self.port, properties=properties, server=f"{self.node_id}.local." ) self.zeroconf.register_service(self.info) self.browser = ServiceBrowser(self.zeroconf, _SERVICE_TYPE, self) self.exec() # Qt event loop keeps the thread alive # ── zeroconf callbacks ────────────────────────────────────── def add_service(self, zc: Zeroconf, type_: str, name: str): info = zc.get_service_info(type_, name) if not info: return props = info.properties node_id = props.get(b'node_id', b'').decode() dev_name = props.get(b'device_name', b'Unknown').decode() if node_id == self.node_id: # skip self return ip = socket.inet_ntoa(info.addresses[0]) self.peer_found.emit(node_id, dev_name, ip, info.port) def remove_service(self, zc: Zeroconf, type_: str, name: str): node_id = name.replace(f".{_SERVICE_TYPE}", "") self.peer_lost.emit(node_id) def stop(self): if self.zeroconf: self.zeroconf.unregister_service(self.info) self.zeroconf.close() self.quit() # core/network_discovery.py import socket from zeroconf import ServiceBrowser, Zeroconf, ServiceInfo, IPVersion from PyQt6.QtCore import pyqtSignal, QThread _SERVICE_TYPE = "_dotghost._tcp.local." class DotGhostDiscovery(QThread): peer_found = pyqtSignal(str, str, str, int) # node_id, name, ip, port peer_lost = pyqtSignal(str) # node_id def __init__(self, node_id: str, device_name: str, port: int): super().__init__() self.node_id = node_id self.device_name = device_name self.port = port self.zeroconf = None def run(self): self.zeroconf = Zeroconf(ip_version=IPVersion.V4Only) properties = { b'node_id': self.node_id.encode('utf-8'), b'device_name': self.device_name.encode('utf-8'), b'version': b'1', } instance_name = f"{self.node_id}.{_SERVICE_TYPE}" self.info = ServiceInfo( type_=_SERVICE_TYPE, name=instance_name, addresses=[socket.inet_aton(get_local_ip())], port=self.port, properties=properties, server=f"{self.node_id}.local." ) self.zeroconf.register_service(self.info) self.browser = ServiceBrowser(self.zeroconf, _SERVICE_TYPE, self) self.exec() # Qt event loop keeps the thread alive # ── zeroconf callbacks ────────────────────────────────────── def add_service(self, zc: Zeroconf, type_: str, name: str): info = zc.get_service_info(type_, name) if not info: return props = info.properties node_id = props.get(b'node_id', b'').decode() dev_name = props.get(b'device_name', b'Unknown').decode() if node_id == self.node_id: # skip self return ip = socket.inet_ntoa(info.addresses[0]) self.peer_found.emit(node_id, dev_name, ip, info.port) def remove_service(self, zc: Zeroconf, type_: str, name: str): node_id = name.replace(f".{_SERVICE_TYPE}", "") self.peer_lost.emit(node_id) def stop(self): if self.zeroconf: self.zeroconf.unregister_service(self.info) self.zeroconf.close() self.quit() Device A Device B │ │ │ 1. Generate ephemeral X25519 key │ │ 2. Derive wrap key from PIN+salt │ │ 3. Encrypt pubkey → send ─────────►│ │ │ 4. Decrypt pubkey with PIN+salt │ │ 5. Generate ephemeral X25519 key │◄──────────────── send encrypted ────│ 6. Derive shared secret (ECDH) │ │ 7. Encrypt own pubkey → send │ 8. Derive shared secret (ECDH) │ │ 9. Discard ephemeral keys │ 9. Discard ephemeral keys │ │ │ Shared Secret stored in DB │ Device A Device B │ │ │ 1. Generate ephemeral X25519 key │ │ 2. Derive wrap key from PIN+salt │ │ 3. Encrypt pubkey → send ─────────►│ │ │ 4. Decrypt pubkey with PIN+salt │ │ 5. Generate ephemeral X25519 key │◄──────────────── send encrypted ────│ 6. Derive shared secret (ECDH) │ │ 7. Encrypt own pubkey → send │ 8. Derive shared secret (ECDH) │ │ 9. Discard ephemeral keys │ 9. Discard ephemeral keys │ │ │ Shared Secret stored in DB │ Device A Device B │ │ │ 1. Generate ephemeral X25519 key │ │ 2. Derive wrap key from PIN+salt │ │ 3. Encrypt pubkey → send ─────────►│ │ │ 4. Decrypt pubkey with PIN+salt │ │ 5. Generate ephemeral X25519 key │◄──────────────── send encrypted ────│ 6. Derive shared secret (ECDH) │ │ 7. Encrypt own pubkey → send │ 8. Derive shared secret (ECDH) │ │ 9. Discard ephemeral keys │ 9. Discard ephemeral keys │ │ │ Shared Secret stored in DB │ # core/pairing.py import os import base64 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import x25519 from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.ciphers.aead import AESGCM _KDF_ITERATIONS = 100_000 # OWASP minimum for PBKDF2-SHA256 def derive_handshake_key(pin: str, salt: bytes) -> bytes: """ Derive a 256-bit wrapping key from a 6-digit PIN + dynamic salt. The salt is generated fresh per-pairing session and sent in plaintext — its job is to prevent precomputed PIN dictionaries, not to be secret. """ kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=_KDF_ITERATIONS, ) return kdf.derive(pin.encode("utf-8")) def generate_pairing_keys() -> tuple[x25519.X25519PrivateKey, bytes]: """ Generate a fresh ephemeral X25519 key pair. These keys live only for the duration of the handshake. """ private_key = x25519.X25519PrivateKey.generate() public_key_bytes = private_key.public_key().public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) return private_key, public_key_bytes def encrypt_pairing_payload(public_key_bytes: bytes, handshake_key: bytes) -> str: """ Encrypt the public key using the PIN-derived wrapping key. Layout: [ 12 bytes nonce | ciphertext + 16 byte GCM tag ] """ aesgcm = AESGCM(handshake_key) nonce = os.urandom(12) ciphertext = aesgcm.encrypt(nonce, public_key_bytes, None) return base64.b64encode(nonce + ciphertext).decode("utf-8") def decrypt_pairing_payload(payload: str, handshake_key: bytes) -> bytes: """Reverse of encrypt_pairing_payload. Raises InvalidTag on wrong PIN.""" raw = base64.b64decode(payload) nonce, ciphertext = raw[:12], raw[12:] aesgcm = AESGCM(handshake_key) return aesgcm.decrypt(nonce, ciphertext, None) def derive_shared_secret( private_key: x25519.X25519PrivateKey, peer_public_key_bytes: bytes ) -> bytes: """ Complete the ECDH exchange. The result is a raw 32-byte shared secret. Both sides arrive at the same value without it ever being transmitted. """ peer_public_key = x25519.X25519PublicKey.from_public_bytes(peer_public_key_bytes) return private_key.exchange(peer_public_key) # core/pairing.py import os import base64 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import x25519 from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.ciphers.aead import AESGCM _KDF_ITERATIONS = 100_000 # OWASP minimum for PBKDF2-SHA256 def derive_handshake_key(pin: str, salt: bytes) -> bytes: """ Derive a 256-bit wrapping key from a 6-digit PIN + dynamic salt. The salt is generated fresh per-pairing session and sent in plaintext — its job is to prevent precomputed PIN dictionaries, not to be secret. """ kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=_KDF_ITERATIONS, ) return kdf.derive(pin.encode("utf-8")) def generate_pairing_keys() -> tuple[x25519.X25519PrivateKey, bytes]: """ Generate a fresh ephemeral X25519 key pair. These keys live only for the duration of the handshake. """ private_key = x25519.X25519PrivateKey.generate() public_key_bytes = private_key.public_key().public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) return private_key, public_key_bytes def encrypt_pairing_payload(public_key_bytes: bytes, handshake_key: bytes) -> str: """ Encrypt the public key using the PIN-derived wrapping key. Layout: [ 12 bytes nonce | ciphertext + 16 byte GCM tag ] """ aesgcm = AESGCM(handshake_key) nonce = os.urandom(12) ciphertext = aesgcm.encrypt(nonce, public_key_bytes, None) return base64.b64encode(nonce + ciphertext).decode("utf-8") def decrypt_pairing_payload(payload: str, handshake_key: bytes) -> bytes: """Reverse of encrypt_pairing_payload. Raises InvalidTag on wrong PIN.""" raw = base64.b64decode(payload) nonce, ciphertext = raw[:12], raw[12:] aesgcm = AESGCM(handshake_key) return aesgcm.decrypt(nonce, ciphertext, None) def derive_shared_secret( private_key: x25519.X25519PrivateKey, peer_public_key_bytes: bytes ) -> bytes: """ Complete the ECDH exchange. The result is a raw 32-byte shared secret. Both sides arrive at the same value without it ever being transmitted. """ peer_public_key = x25519.X25519PublicKey.from_public_bytes(peer_public_key_bytes) return private_key.exchange(peer_public_key) # core/pairing.py import os import base64 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import x25519 from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.ciphers.aead import AESGCM _KDF_ITERATIONS = 100_000 # OWASP minimum for PBKDF2-SHA256 def derive_handshake_key(pin: str, salt: bytes) -> bytes: """ Derive a 256-bit wrapping key from a 6-digit PIN + dynamic salt. The salt is generated fresh per-pairing session and sent in plaintext — its job is to prevent precomputed PIN dictionaries, not to be secret. """ kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=_KDF_ITERATIONS, ) return kdf.derive(pin.encode("utf-8")) def generate_pairing_keys() -> tuple[x25519.X25519PrivateKey, bytes]: """ Generate a fresh ephemeral X25519 key pair. These keys live only for the duration of the handshake. """ private_key = x25519.X25519PrivateKey.generate() public_key_bytes = private_key.public_key().public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) return private_key, public_key_bytes def encrypt_pairing_payload(public_key_bytes: bytes, handshake_key: bytes) -> str: """ Encrypt the public key using the PIN-derived wrapping key. Layout: [ 12 bytes nonce | ciphertext + 16 byte GCM tag ] """ aesgcm = AESGCM(handshake_key) nonce = os.urandom(12) ciphertext = aesgcm.encrypt(nonce, public_key_bytes, None) return base64.b64encode(nonce + ciphertext).decode("utf-8") def decrypt_pairing_payload(payload: str, handshake_key: bytes) -> bytes: """Reverse of encrypt_pairing_payload. Raises InvalidTag on wrong PIN.""" raw = base64.b64decode(payload) nonce, ciphertext = raw[:12], raw[12:] aesgcm = AESGCM(handshake_key) return aesgcm.decrypt(nonce, ciphertext, None) def derive_shared_secret( private_key: x25519.X25519PrivateKey, peer_public_key_bytes: bytes ) -> bytes: """ Complete the ECDH exchange. The result is a raw 32-byte shared secret. Both sides arrive at the same value without it ever being transmitted. """ peer_public_key = x25519.X25519PublicKey.from_public_bytes(peer_public_key_bytes) return private_key.exchange(peer_public_key) # Storage schema for trusted peers """ CREATE TABLE trusted_peers ( id INTEGER PRIMARY KEY AUTOINCREMENT, node_id TEXT UNIQUE NOT NULL, device_name TEXT NOT NULL, shared_secret BLOB NOT NULL, -- raw 32 bytes from ECDH paired_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """ # Storage schema for trusted peers """ CREATE TABLE trusted_peers ( id INTEGER PRIMARY KEY AUTOINCREMENT, node_id TEXT UNIQUE NOT NULL, device_name TEXT NOT NULL, shared_secret BLOB NOT NULL, -- raw 32 bytes from ECDH paired_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """ # Storage schema for trusted peers """ CREATE TABLE trusted_peers ( id INTEGER PRIMARY KEY AUTOINCREMENT, node_id TEXT UNIQUE NOT NULL, device_name TEXT NOT NULL, shared_secret BLOB NOT NULL, -- raw 32 bytes from ECDH paired_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """ # core/api_server.py import json import time import urllib.parse from collections import defaultdict from http.server import BaseHTTPRequestHandler, HTTPServer from threading import Lock class _RateLimiter: """Sliding window rate limiter — 3 pairing attempts per 60s per IP.""" def __init__(self, max_attempts: int = 3, window: int = 60): self._attempts = defaultdict(list) self._lock = Lock() self.max = max_attempts self.window = window def is_allowed(self, ip: str) -> bool: now = time.time() with self._lock: # drop timestamps outside the window self._attempts[ip] = [ t for t in self._attempts[ip] if now - t < self.window ] if len(self._attempts[ip]) >= self.max: return False self._attempts[ip].append(now) return True _rate_limiter = _RateLimiter() class GhostAPIHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass # silence default HTTP logs def _send_response(self, code: int, body: dict): payload = json.dumps(body).encode() self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(payload))) self.end_headers() self.wfile.write(payload) def do_POST(self): parsed = urllib.parse.urlparse(self.path) client_ip = self.client_address[0] # ── /api/pair — pairing PIN exchange ────────────────────── if parsed.path == '/api/pair': if not _rate_limiter.is_allowed(client_ip): self._send_response(429, { "status": "error", "message": "Too many pairing attempts. Try again later." }) return # ... PIN verification and key exchange logic self._send_response(200, {"status": "paired"}) return # ── /api/sync — incoming E2EE clipboard item ─────────────── if parsed.path == '/api/sync': body = self.rfile.read(int(self.headers.get('Content-Length', 0))) data = json.loads(body) peer_node_id = data.get("node_id") peer = storage.get_trusted_peer(peer_node_id) if not peer: self._send_response(403, { "status": "error", "message": "Untrusted peer" }) return try: plaintext = decrypt_from_peer( data.get("payload"), peer["shared_secret"] ) except Exception: # Wrong key or tampered payload — silent drop self._send_response(403, { "status": "error", "message": "Decryption failed" }) return item_id = storage.add_item("text", plaintext) # Cross-thread UI update via Qt signal — safe from any thread self.server.qthread_parent.sync_received.emit(item_id, plaintext) self._send_response(201, {"status": "synced"}) # core/api_server.py import json import time import urllib.parse from collections import defaultdict from http.server import BaseHTTPRequestHandler, HTTPServer from threading import Lock class _RateLimiter: """Sliding window rate limiter — 3 pairing attempts per 60s per IP.""" def __init__(self, max_attempts: int = 3, window: int = 60): self._attempts = defaultdict(list) self._lock = Lock() self.max = max_attempts self.window = window def is_allowed(self, ip: str) -> bool: now = time.time() with self._lock: # drop timestamps outside the window self._attempts[ip] = [ t for t in self._attempts[ip] if now - t < self.window ] if len(self._attempts[ip]) >= self.max: return False self._attempts[ip].append(now) return True _rate_limiter = _RateLimiter() class GhostAPIHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass # silence default HTTP logs def _send_response(self, code: int, body: dict): payload = json.dumps(body).encode() self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(payload))) self.end_headers() self.wfile.write(payload) def do_POST(self): parsed = urllib.parse.urlparse(self.path) client_ip = self.client_address[0] # ── /api/pair — pairing PIN exchange ────────────────────── if parsed.path == '/api/pair': if not _rate_limiter.is_allowed(client_ip): self._send_response(429, { "status": "error", "message": "Too many pairing attempts. Try again later." }) return # ... PIN verification and key exchange logic self._send_response(200, {"status": "paired"}) return # ── /api/sync — incoming E2EE clipboard item ─────────────── if parsed.path == '/api/sync': body = self.rfile.read(int(self.headers.get('Content-Length', 0))) data = json.loads(body) peer_node_id = data.get("node_id") peer = storage.get_trusted_peer(peer_node_id) if not peer: self._send_response(403, { "status": "error", "message": "Untrusted peer" }) return try: plaintext = decrypt_from_peer( data.get("payload"), peer["shared_secret"] ) except Exception: # Wrong key or tampered payload — silent drop self._send_response(403, { "status": "error", "message": "Decryption failed" }) return item_id = storage.add_item("text", plaintext) # Cross-thread UI update via Qt signal — safe from any thread self.server.qthread_parent.sync_received.emit(item_id, plaintext) self._send_response(201, {"status": "synced"}) # core/api_server.py import json import time import urllib.parse from collections import defaultdict from http.server import BaseHTTPRequestHandler, HTTPServer from threading import Lock class _RateLimiter: """Sliding window rate limiter — 3 pairing attempts per 60s per IP.""" def __init__(self, max_attempts: int = 3, window: int = 60): self._attempts = defaultdict(list) self._lock = Lock() self.max = max_attempts self.window = window def is_allowed(self, ip: str) -> bool: now = time.time() with self._lock: # drop timestamps outside the window self._attempts[ip] = [ t for t in self._attempts[ip] if now - t < self.window ] if len(self._attempts[ip]) >= self.max: return False self._attempts[ip].append(now) return True _rate_limiter = _RateLimiter() class GhostAPIHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass # silence default HTTP logs def _send_response(self, code: int, body: dict): payload = json.dumps(body).encode() self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(payload))) self.end_headers() self.wfile.write(payload) def do_POST(self): parsed = urllib.parse.urlparse(self.path) client_ip = self.client_address[0] # ── /api/pair — pairing PIN exchange ────────────────────── if parsed.path == '/api/pair': if not _rate_limiter.is_allowed(client_ip): self._send_response(429, { "status": "error", "message": "Too many pairing attempts. Try again later." }) return # ... PIN verification and key exchange logic self._send_response(200, {"status": "paired"}) return # ── /api/sync — incoming E2EE clipboard item ─────────────── if parsed.path == '/api/sync': body = self.rfile.read(int(self.headers.get('Content-Length', 0))) data = json.loads(body) peer_node_id = data.get("node_id") peer = storage.get_trusted_peer(peer_node_id) if not peer: self._send_response(403, { "status": "error", "message": "Untrusted peer" }) return try: plaintext = decrypt_from_peer( data.get("payload"), peer["shared_secret"] ) except Exception: # Wrong key or tampered payload — silent drop self._send_response(403, { "status": "error", "message": "Decryption failed" }) return item_id = storage.add_item("text", plaintext) # Cross-thread UI update via Qt signal — safe from any thread self.server.qthread_parent.sync_received.emit(item_id, plaintext) self._send_response(201, {"status": "synced"}) Device A (sender) Device B (receiver) ───────────────── ──────────────────── 1. User copies text 2. ClipboardMonitor detects change 3. Encrypt with shared_secret [ AES-256-GCM | random 12-byte nonce ] 4. POST /api/sync ──────────────────────► 5. GhostAPIHandler.do_POST() { 6. Lookup peer by node_id "node_id": "abc123", 7. Decrypt with shared_secret "payload": "<base64 ciphertext>" 8. storage.add_item() } 9. sync_received.emit() 10. UI updates in main thread ◄──────────────── 201 { "status": "synced" } Device A (sender) Device B (receiver) ───────────────── ──────────────────── 1. User copies text 2. ClipboardMonitor detects change 3. Encrypt with shared_secret [ AES-256-GCM | random 12-byte nonce ] 4. POST /api/sync ──────────────────────► 5. GhostAPIHandler.do_POST() { 6. Lookup peer by node_id "node_id": "abc123", 7. Decrypt with shared_secret "payload": "<base64 ciphertext>" 8. storage.add_item() } 9. sync_received.emit() 10. UI updates in main thread ◄──────────────── 201 { "status": "synced" } Device A (sender) Device B (receiver) ───────────────── ──────────────────── 1. User copies text 2. ClipboardMonitor detects change 3. Encrypt with shared_secret [ AES-256-GCM | random 12-byte nonce ] 4. POST /api/sync ──────────────────────► 5. GhostAPIHandler.do_POST() { 6. Lookup peer by node_id "node_id": "abc123", 7. Decrypt with shared_secret "payload": "<base64 ciphertext>" 8. storage.add_item() } 9. sync_received.emit() 10. UI updates in main thread ◄──────────────── 201 { "status": "synced" } # .github/workflows/build-all.yml (signing steps) - name: Sign AppImage (GPG) run: | echo "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --import --batch --yes gpg --batch --yes --pinentry-mode loopback \ --passphrase "${{ secrets.GPG_PASSPHRASE }}" \ --detach-sign --armor \ DotGhostBoard-*.AppImage - name: Verify AppImage signature run: | gpg --verify DotGhostBoard-*.AppImage.asc DotGhostBoard-*.AppImage echo "✅ AppImage signature verified" - name: Sign DEB Package (GPG) run: | echo "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --import --batch --yes dpkg-sig --sign builder \ -k "${{ secrets.GPG_KEY_ID }}" \ --gpg-options "--passphrase ${{ secrets.GPG_PASSPHRASE }} --pinentry-mode loopback --batch --yes" \ dotghostboard_*.deb - name: Generate SHA256 checksums run: | cd out && sha256sum * > SHA256SUMS.txt # .github/workflows/build-all.yml (signing steps) - name: Sign AppImage (GPG) run: | echo "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --import --batch --yes gpg --batch --yes --pinentry-mode loopback \ --passphrase "${{ secrets.GPG_PASSPHRASE }}" \ --detach-sign --armor \ DotGhostBoard-*.AppImage - name: Verify AppImage signature run: | gpg --verify DotGhostBoard-*.AppImage.asc DotGhostBoard-*.AppImage echo "✅ AppImage signature verified" - name: Sign DEB Package (GPG) run: | echo "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --import --batch --yes dpkg-sig --sign builder \ -k "${{ secrets.GPG_KEY_ID }}" \ --gpg-options "--passphrase ${{ secrets.GPG_PASSPHRASE }} --pinentry-mode loopback --batch --yes" \ dotghostboard_*.deb - name: Generate SHA256 checksums run: | cd out && sha256sum * > SHA256SUMS.txt # .github/workflows/build-all.yml (signing steps) - name: Sign AppImage (GPG) run: | echo "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --import --batch --yes gpg --batch --yes --pinentry-mode loopback \ --passphrase "${{ secrets.GPG_PASSPHRASE }}" \ --detach-sign --armor \ DotGhostBoard-*.AppImage - name: Verify AppImage signature run: | gpg --verify DotGhostBoard-*.AppImage.asc DotGhostBoard-*.AppImage echo "✅ AppImage signature verified" - name: Sign DEB Package (GPG) run: | echo "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --import --batch --yes dpkg-sig --sign builder \ -k "${{ secrets.GPG_KEY_ID }}" \ --gpg-options "--passphrase ${{ secrets.GPG_PASSPHRASE }} --pinentry-mode loopback --batch --yes" \ dotghostboard_*.deb - name: Generate SHA256 checksums run: | cd out && sha256sum * > SHA256SUMS.txt # Verify AppImage gpg --verify DotGhostBoard-1.5.1-x86_64.AppImage.asc \ DotGhostBoard-1.5.1-x86_64.AppImage # Verify DEB dpkg-sig --verify dotghostboard_1.5.1_amd64.deb # Verify checksum sha256sum -c SHA256SUMS.txt # Verify AppImage gpg --verify DotGhostBoard-1.5.1-x86_64.AppImage.asc \ DotGhostBoard-1.5.1-x86_64.AppImage # Verify DEB dpkg-sig --verify dotghostboard_1.5.1_amd64.deb # Verify checksum sha256sum -c SHA256SUMS.txt # Verify AppImage gpg --verify DotGhostBoard-1.5.1-x86_64.AppImage.asc \ DotGhostBoard-1.5.1-x86_64.AppImage # Verify DEB dpkg-sig --verify dotghostboard_1.5.1_amd64.deb # Verify checksum sha256sum -c SHA256SUMS.txt - A fully isolated vault.db (separate file, separate connection, locked when not in use) - Pattern-based secret detection using Regex — JWT, AWS keys, GitHub tokens, high-entropy hex strings — not keyword matching - Auto-clear: wipes the clipboard 30 seconds after a Vault paste - Paranoia Mode: a toggle that suspends all DB writes temporarily