┌─────────────────────────────────────────────────────────────┐ │ LOCAL NETWORK (LAN) │ │ │ │ ┌──────────────┐ mDNS Discovery ┌──────────────┐ │ │ │ Device A │ ◄──────────────────► │ Device B │ │ │ │ (Arch) │ │ (Kali) │ │ │ │ │ X25519 Handshake │ │ │ │ │ ghostboard │ ──── PIN + ECDH ───► │ ghostboard │ │ │ │ │ │ │ │ │ │ HTTPServer │ ◄── AES-256-GCM ──── │ HTTPServer │ │ │ │ :PORT │ /api/sync E2EE │ :PORT │ │ │ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ ghost.db ghost.db │ │ (trusted_peers) (trusted_peers) │ └─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐ │ LOCAL NETWORK (LAN) │ │ │ │ ┌──────────────┐ mDNS Discovery ┌──────────────┐ │ │ │ Device A │ ◄──────────────────► │ Device B │ │ │ │ (Arch) │ │ (Kali) │ │ │ │ │ X25519 Handshake │ │ │ │ │ ghostboard │ ──── PIN + ECDH ───► │ ghostboard │ │ │ │ │ │ │ │ │ │ HTTPServer │ ◄── AES-256-GCM ──── │ HTTPServer │ │ │ │ :PORT │ /api/sync E2EE │ :PORT │ │ │ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ ghost.db ghost.db │ │ (trusted_peers) (trusted_peers) │ └─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐ │ LOCAL NETWORK (LAN) │ │ │ │ ┌──────────────┐ mDNS Discovery ┌──────────────┐ │ │ │ Device A │ ◄──────────────────► │ Device B │ │ │ │ (Arch) │ │ (Kali) │ │ │ │ │ X25519 Handshake │ │ │ │ │ ghostboard │ ──── PIN + ECDH ───► │ ghostboard │ │ │ │ │ │ │ │ │ │ HTTPServer │ ◄── AES-256-GCM ──── │ HTTPServer │ │ │ │ :PORT │ /api/sync E2EE │ :PORT │ │ │ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ ghost.db ghost.db │ │ (trusted_peers) (trusted_peers) │ └─────────────────────────────────────────────────────────────┘
# core/network_discovery.py
import socket
from zeroconf import ServiceBrowser, Zeroconf, ServiceInfo, IPVersion
from PyQt6.QtCore import pyqtSignal, QThread _SERVICE_TYPE = "_dotghost._tcp.local." class DotGhostDiscovery(QThread): peer_found = pyqtSignal(str, str, str, int) # node_id, name, ip, port peer_lost = pyqtSignal(str) # node_id def __init__(self, node_id: str, device_name: str, port: int): super().__init__() self.node_id = node_id self.device_name = device_name self.port = port self.zeroconf = None def run(self): self.zeroconf = Zeroconf(ip_version=IPVersion.V4Only) properties = { b'node_id': self.node_id.encode('utf-8'), b'device_name': self.device_name.encode('utf-8'), b'version': b'1', } instance_name = f"{self.node_id}.{_SERVICE_TYPE}" self.info = ServiceInfo( type_=_SERVICE_TYPE, name=instance_name, addresses=[socket.inet_aton(get_local_ip())], port=self.port, properties=properties, server=f"{self.node_id}.local." ) self.zeroconf.register_service(self.info) self.browser = ServiceBrowser(self.zeroconf, _SERVICE_TYPE, self) self.exec() # Qt event loop keeps the thread alive # ── zeroconf callbacks ────────────────────────────────────── def add_service(self, zc: Zeroconf, type_: str, name: str): info = zc.get_service_info(type_, name) if not info: return props = info.properties node_id = props.get(b'node_id', b'').decode() dev_name = props.get(b'device_name', b'Unknown').decode() if node_id == self.node_id: # skip self return ip = socket.inet_ntoa(info.addresses[0]) self.peer_found.emit(node_id, dev_name, ip, info.port) def remove_service(self, zc: Zeroconf, type_: str, name: str): node_id = name.replace(f".{_SERVICE_TYPE}", "") self.peer_lost.emit(node_id) def stop(self): if self.zeroconf: self.zeroconf.unregister_service(self.info) self.zeroconf.close() self.quit()
# core/network_discovery.py
import socket
from zeroconf import ServiceBrowser, Zeroconf, ServiceInfo, IPVersion
from PyQt6.QtCore import pyqtSignal, QThread _SERVICE_TYPE = "_dotghost._tcp.local." class DotGhostDiscovery(QThread): peer_found = pyqtSignal(str, str, str, int) # node_id, name, ip, port peer_lost = pyqtSignal(str) # node_id def __init__(self, node_id: str, device_name: str, port: int): super().__init__() self.node_id = node_id self.device_name = device_name self.port = port self.zeroconf = None def run(self): self.zeroconf = Zeroconf(ip_version=IPVersion.V4Only) properties = { b'node_id': self.node_id.encode('utf-8'), b'device_name': self.device_name.encode('utf-8'), b'version': b'1', } instance_name = f"{self.node_id}.{_SERVICE_TYPE}" self.info = ServiceInfo( type_=_SERVICE_TYPE, name=instance_name, addresses=[socket.inet_aton(get_local_ip())], port=self.port, properties=properties, server=f"{self.node_id}.local." ) self.zeroconf.register_service(self.info) self.browser = ServiceBrowser(self.zeroconf, _SERVICE_TYPE, self) self.exec() # Qt event loop keeps the thread alive # ── zeroconf callbacks ────────────────────────────────────── def add_service(self, zc: Zeroconf, type_: str, name: str): info = zc.get_service_info(type_, name) if not info: return props = info.properties node_id = props.get(b'node_id', b'').decode() dev_name = props.get(b'device_name', b'Unknown').decode() if node_id == self.node_id: # skip self return ip = socket.inet_ntoa(info.addresses[0]) self.peer_found.emit(node_id, dev_name, ip, info.port) def remove_service(self, zc: Zeroconf, type_: str, name: str): node_id = name.replace(f".{_SERVICE_TYPE}", "") self.peer_lost.emit(node_id) def stop(self): if self.zeroconf: self.zeroconf.unregister_service(self.info) self.zeroconf.close() self.quit()
# core/network_discovery.py
import socket
from zeroconf import ServiceBrowser, Zeroconf, ServiceInfo, IPVersion
from PyQt6.QtCore import pyqtSignal, QThread _SERVICE_TYPE = "_dotghost._tcp.local." class DotGhostDiscovery(QThread): peer_found = pyqtSignal(str, str, str, int) # node_id, name, ip, port peer_lost = pyqtSignal(str) # node_id def __init__(self, node_id: str, device_name: str, port: int): super().__init__() self.node_id = node_id self.device_name = device_name self.port = port self.zeroconf = None def run(self): self.zeroconf = Zeroconf(ip_version=IPVersion.V4Only) properties = { b'node_id': self.node_id.encode('utf-8'), b'device_name': self.device_name.encode('utf-8'), b'version': b'1', } instance_name = f"{self.node_id}.{_SERVICE_TYPE}" self.info = ServiceInfo( type_=_SERVICE_TYPE, name=instance_name, addresses=[socket.inet_aton(get_local_ip())], port=self.port, properties=properties, server=f"{self.node_id}.local." ) self.zeroconf.register_service(self.info) self.browser = ServiceBrowser(self.zeroconf, _SERVICE_TYPE, self) self.exec() # Qt event loop keeps the thread alive # ── zeroconf callbacks ────────────────────────────────────── def add_service(self, zc: Zeroconf, type_: str, name: str): info = zc.get_service_info(type_, name) if not info: return props = info.properties node_id = props.get(b'node_id', b'').decode() dev_name = props.get(b'device_name', b'Unknown').decode() if node_id == self.node_id: # skip self return ip = socket.inet_ntoa(info.addresses[0]) self.peer_found.emit(node_id, dev_name, ip, info.port) def remove_service(self, zc: Zeroconf, type_: str, name: str): node_id = name.replace(f".{_SERVICE_TYPE}", "") self.peer_lost.emit(node_id) def stop(self): if self.zeroconf: self.zeroconf.unregister_service(self.info) self.zeroconf.close() self.quit()
Device A Device B │ │ │ 1. Generate ephemeral X25519 key │ │ 2. Derive wrap key from PIN+salt │ │ 3. Encrypt pubkey → send ─────────►│ │ │ 4. Decrypt pubkey with PIN+salt │ │ 5. Generate ephemeral X25519 key │◄──────────────── send encrypted ────│ 6. Derive shared secret (ECDH) │ │ 7. Encrypt own pubkey → send │ 8. Derive shared secret (ECDH) │ │ 9. Discard ephemeral keys │ 9. Discard ephemeral keys │ │ │ Shared Secret stored in DB │
Device A Device B │ │ │ 1. Generate ephemeral X25519 key │ │ 2. Derive wrap key from PIN+salt │ │ 3. Encrypt pubkey → send ─────────►│ │ │ 4. Decrypt pubkey with PIN+salt │ │ 5. Generate ephemeral X25519 key │◄──────────────── send encrypted ────│ 6. Derive shared secret (ECDH) │ │ 7. Encrypt own pubkey → send │ 8. Derive shared secret (ECDH) │ │ 9. Discard ephemeral keys │ 9. Discard ephemeral keys │ │ │ Shared Secret stored in DB │
Device A Device B │ │ │ 1. Generate ephemeral X25519 key │ │ 2. Derive wrap key from PIN+salt │ │ 3. Encrypt pubkey → send ─────────►│ │ │ 4. Decrypt pubkey with PIN+salt │ │ 5. Generate ephemeral X25519 key │◄──────────────── send encrypted ────│ 6. Derive shared secret (ECDH) │ │ 7. Encrypt own pubkey → send │ 8. Derive shared secret (ECDH) │ │ 9. Discard ephemeral keys │ 9. Discard ephemeral keys │ │ │ Shared Secret stored in DB │
# core/pairing.py
import os
import base64
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers.aead import AESGCM _KDF_ITERATIONS = 100_000 # OWASP minimum for PBKDF2-SHA256 def derive_handshake_key(pin: str, salt: bytes) -> bytes: """ Derive a 256-bit wrapping key from a 6-digit PIN + dynamic salt. The salt is generated fresh per-pairing session and sent in plaintext — its job is to prevent precomputed PIN dictionaries, not to be secret. """ kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=_KDF_ITERATIONS, ) return kdf.derive(pin.encode("utf-8")) def generate_pairing_keys() -> tuple[x25519.X25519PrivateKey, bytes]: """ Generate a fresh ephemeral X25519 key pair. These keys live only for the duration of the handshake. """ private_key = x25519.X25519PrivateKey.generate() public_key_bytes = private_key.public_key().public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) return private_key, public_key_bytes def encrypt_pairing_payload(public_key_bytes: bytes, handshake_key: bytes) -> str: """ Encrypt the public key using the PIN-derived wrapping key. Layout: [ 12 bytes nonce | ciphertext + 16 byte GCM tag ] """ aesgcm = AESGCM(handshake_key) nonce = os.urandom(12) ciphertext = aesgcm.encrypt(nonce, public_key_bytes, None) return base64.b64encode(nonce + ciphertext).decode("utf-8") def decrypt_pairing_payload(payload: str, handshake_key: bytes) -> bytes: """Reverse of encrypt_pairing_payload. Raises InvalidTag on wrong PIN.""" raw = base64.b64decode(payload) nonce, ciphertext = raw[:12], raw[12:] aesgcm = AESGCM(handshake_key) return aesgcm.decrypt(nonce, ciphertext, None) def derive_shared_secret( private_key: x25519.X25519PrivateKey, peer_public_key_bytes: bytes
) -> bytes: """ Complete the ECDH exchange. The result is a raw 32-byte shared secret. Both sides arrive at the same value without it ever being transmitted. """ peer_public_key = x25519.X25519PublicKey.from_public_bytes(peer_public_key_bytes) return private_key.exchange(peer_public_key)
# core/pairing.py
import os
import base64
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers.aead import AESGCM _KDF_ITERATIONS = 100_000 # OWASP minimum for PBKDF2-SHA256 def derive_handshake_key(pin: str, salt: bytes) -> bytes: """ Derive a 256-bit wrapping key from a 6-digit PIN + dynamic salt. The salt is generated fresh per-pairing session and sent in plaintext — its job is to prevent precomputed PIN dictionaries, not to be secret. """ kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=_KDF_ITERATIONS, ) return kdf.derive(pin.encode("utf-8")) def generate_pairing_keys() -> tuple[x25519.X25519PrivateKey, bytes]: """ Generate a fresh ephemeral X25519 key pair. These keys live only for the duration of the handshake. """ private_key = x25519.X25519PrivateKey.generate() public_key_bytes = private_key.public_key().public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) return private_key, public_key_bytes def encrypt_pairing_payload(public_key_bytes: bytes, handshake_key: bytes) -> str: """ Encrypt the public key using the PIN-derived wrapping key. Layout: [ 12 bytes nonce | ciphertext + 16 byte GCM tag ] """ aesgcm = AESGCM(handshake_key) nonce = os.urandom(12) ciphertext = aesgcm.encrypt(nonce, public_key_bytes, None) return base64.b64encode(nonce + ciphertext).decode("utf-8") def decrypt_pairing_payload(payload: str, handshake_key: bytes) -> bytes: """Reverse of encrypt_pairing_payload. Raises InvalidTag on wrong PIN.""" raw = base64.b64decode(payload) nonce, ciphertext = raw[:12], raw[12:] aesgcm = AESGCM(handshake_key) return aesgcm.decrypt(nonce, ciphertext, None) def derive_shared_secret( private_key: x25519.X25519PrivateKey, peer_public_key_bytes: bytes
) -> bytes: """ Complete the ECDH exchange. The result is a raw 32-byte shared secret. Both sides arrive at the same value without it ever being transmitted. """ peer_public_key = x25519.X25519PublicKey.from_public_bytes(peer_public_key_bytes) return private_key.exchange(peer_public_key)
# core/pairing.py
import os
import base64
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers.aead import AESGCM _KDF_ITERATIONS = 100_000 # OWASP minimum for PBKDF2-SHA256 def derive_handshake_key(pin: str, salt: bytes) -> bytes: """ Derive a 256-bit wrapping key from a 6-digit PIN + dynamic salt. The salt is generated fresh per-pairing session and sent in plaintext — its job is to prevent precomputed PIN dictionaries, not to be secret. """ kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=_KDF_ITERATIONS, ) return kdf.derive(pin.encode("utf-8")) def generate_pairing_keys() -> tuple[x25519.X25519PrivateKey, bytes]: """ Generate a fresh ephemeral X25519 key pair. These keys live only for the duration of the handshake. """ private_key = x25519.X25519PrivateKey.generate() public_key_bytes = private_key.public_key().public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) return private_key, public_key_bytes def encrypt_pairing_payload(public_key_bytes: bytes, handshake_key: bytes) -> str: """ Encrypt the public key using the PIN-derived wrapping key. Layout: [ 12 bytes nonce | ciphertext + 16 byte GCM tag ] """ aesgcm = AESGCM(handshake_key) nonce = os.urandom(12) ciphertext = aesgcm.encrypt(nonce, public_key_bytes, None) return base64.b64encode(nonce + ciphertext).decode("utf-8") def decrypt_pairing_payload(payload: str, handshake_key: bytes) -> bytes: """Reverse of encrypt_pairing_payload. Raises InvalidTag on wrong PIN.""" raw = base64.b64decode(payload) nonce, ciphertext = raw[:12], raw[12:] aesgcm = AESGCM(handshake_key) return aesgcm.decrypt(nonce, ciphertext, None) def derive_shared_secret( private_key: x25519.X25519PrivateKey, peer_public_key_bytes: bytes
) -> bytes: """ Complete the ECDH exchange. The result is a raw 32-byte shared secret. Both sides arrive at the same value without it ever being transmitted. """ peer_public_key = x25519.X25519PublicKey.from_public_bytes(peer_public_key_bytes) return private_key.exchange(peer_public_key)
# Storage schema for trusted peers
"""
CREATE TABLE trusted_peers ( id INTEGER PRIMARY KEY AUTOINCREMENT, node_id TEXT UNIQUE NOT NULL, device_name TEXT NOT NULL, shared_secret BLOB NOT NULL, -- raw 32 bytes from ECDH paired_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
# Storage schema for trusted peers
"""
CREATE TABLE trusted_peers ( id INTEGER PRIMARY KEY AUTOINCREMENT, node_id TEXT UNIQUE NOT NULL, device_name TEXT NOT NULL, shared_secret BLOB NOT NULL, -- raw 32 bytes from ECDH paired_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
# Storage schema for trusted peers
"""
CREATE TABLE trusted_peers ( id INTEGER PRIMARY KEY AUTOINCREMENT, node_id TEXT UNIQUE NOT NULL, device_name TEXT NOT NULL, shared_secret BLOB NOT NULL, -- raw 32 bytes from ECDH paired_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
# core/api_server.py
import json
import time
import urllib.parse
from collections import defaultdict
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Lock class _RateLimiter: """Sliding window rate limiter — 3 pairing attempts per 60s per IP.""" def __init__(self, max_attempts: int = 3, window: int = 60): self._attempts = defaultdict(list) self._lock = Lock() self.max = max_attempts self.window = window def is_allowed(self, ip: str) -> bool: now = time.time() with self._lock: # drop timestamps outside the window self._attempts[ip] = [ t for t in self._attempts[ip] if now - t < self.window ] if len(self._attempts[ip]) >= self.max: return False self._attempts[ip].append(now) return True _rate_limiter = _RateLimiter() class GhostAPIHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass # silence default HTTP logs def _send_response(self, code: int, body: dict): payload = json.dumps(body).encode() self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(payload))) self.end_headers() self.wfile.write(payload) def do_POST(self): parsed = urllib.parse.urlparse(self.path) client_ip = self.client_address[0] # ── /api/pair — pairing PIN exchange ────────────────────── if parsed.path == '/api/pair': if not _rate_limiter.is_allowed(client_ip): self._send_response(429, { "status": "error", "message": "Too many pairing attempts. Try again later." }) return # ... PIN verification and key exchange logic self._send_response(200, {"status": "paired"}) return # ── /api/sync — incoming E2EE clipboard item ─────────────── if parsed.path == '/api/sync': body = self.rfile.read(int(self.headers.get('Content-Length', 0))) data = json.loads(body) peer_node_id = data.get("node_id") peer = storage.get_trusted_peer(peer_node_id) if not peer: self._send_response(403, { "status": "error", "message": "Untrusted peer" }) return try: plaintext = decrypt_from_peer( data.get("payload"), peer["shared_secret"] ) except Exception: # Wrong key or tampered payload — silent drop self._send_response(403, { "status": "error", "message": "Decryption failed" }) return item_id = storage.add_item("text", plaintext) # Cross-thread UI update via Qt signal — safe from any thread self.server.qthread_parent.sync_received.emit(item_id, plaintext) self._send_response(201, {"status": "synced"})
# core/api_server.py
import json
import time
import urllib.parse
from collections import defaultdict
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Lock class _RateLimiter: """Sliding window rate limiter — 3 pairing attempts per 60s per IP.""" def __init__(self, max_attempts: int = 3, window: int = 60): self._attempts = defaultdict(list) self._lock = Lock() self.max = max_attempts self.window = window def is_allowed(self, ip: str) -> bool: now = time.time() with self._lock: # drop timestamps outside the window self._attempts[ip] = [ t for t in self._attempts[ip] if now - t < self.window ] if len(self._attempts[ip]) >= self.max: return False self._attempts[ip].append(now) return True _rate_limiter = _RateLimiter() class GhostAPIHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass # silence default HTTP logs def _send_response(self, code: int, body: dict): payload = json.dumps(body).encode() self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(payload))) self.end_headers() self.wfile.write(payload) def do_POST(self): parsed = urllib.parse.urlparse(self.path) client_ip = self.client_address[0] # ── /api/pair — pairing PIN exchange ────────────────────── if parsed.path == '/api/pair': if not _rate_limiter.is_allowed(client_ip): self._send_response(429, { "status": "error", "message": "Too many pairing attempts. Try again later." }) return # ... PIN verification and key exchange logic self._send_response(200, {"status": "paired"}) return # ── /api/sync — incoming E2EE clipboard item ─────────────── if parsed.path == '/api/sync': body = self.rfile.read(int(self.headers.get('Content-Length', 0))) data = json.loads(body) peer_node_id = data.get("node_id") peer = storage.get_trusted_peer(peer_node_id) if not peer: self._send_response(403, { "status": "error", "message": "Untrusted peer" }) return try: plaintext = decrypt_from_peer( data.get("payload"), peer["shared_secret"] ) except Exception: # Wrong key or tampered payload — silent drop self._send_response(403, { "status": "error", "message": "Decryption failed" }) return item_id = storage.add_item("text", plaintext) # Cross-thread UI update via Qt signal — safe from any thread self.server.qthread_parent.sync_received.emit(item_id, plaintext) self._send_response(201, {"status": "synced"})
# core/api_server.py
import json
import time
import urllib.parse
from collections import defaultdict
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Lock class _RateLimiter: """Sliding window rate limiter — 3 pairing attempts per 60s per IP.""" def __init__(self, max_attempts: int = 3, window: int = 60): self._attempts = defaultdict(list) self._lock = Lock() self.max = max_attempts self.window = window def is_allowed(self, ip: str) -> bool: now = time.time() with self._lock: # drop timestamps outside the window self._attempts[ip] = [ t for t in self._attempts[ip] if now - t < self.window ] if len(self._attempts[ip]) >= self.max: return False self._attempts[ip].append(now) return True _rate_limiter = _RateLimiter() class GhostAPIHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass # silence default HTTP logs def _send_response(self, code: int, body: dict): payload = json.dumps(body).encode() self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(payload))) self.end_headers() self.wfile.write(payload) def do_POST(self): parsed = urllib.parse.urlparse(self.path) client_ip = self.client_address[0] # ── /api/pair — pairing PIN exchange ────────────────────── if parsed.path == '/api/pair': if not _rate_limiter.is_allowed(client_ip): self._send_response(429, { "status": "error", "message": "Too many pairing attempts. Try again later." }) return # ... PIN verification and key exchange logic self._send_response(200, {"status": "paired"}) return # ── /api/sync — incoming E2EE clipboard item ─────────────── if parsed.path == '/api/sync': body = self.rfile.read(int(self.headers.get('Content-Length', 0))) data = json.loads(body) peer_node_id = data.get("node_id") peer = storage.get_trusted_peer(peer_node_id) if not peer: self._send_response(403, { "status": "error", "message": "Untrusted peer" }) return try: plaintext = decrypt_from_peer( data.get("payload"), peer["shared_secret"] ) except Exception: # Wrong key or tampered payload — silent drop self._send_response(403, { "status": "error", "message": "Decryption failed" }) return item_id = storage.add_item("text", plaintext) # Cross-thread UI update via Qt signal — safe from any thread self.server.qthread_parent.sync_received.emit(item_id, plaintext) self._send_response(201, {"status": "synced"})
Device A (sender) Device B (receiver) ───────────────── ──────────────────── 1. User copies text 2. ClipboardMonitor detects change 3. Encrypt with shared_secret [ AES-256-GCM | random 12-byte nonce ] 4. POST /api/sync ──────────────────────► 5. GhostAPIHandler.do_POST() { 6. Lookup peer by node_id "node_id": "abc123", 7. Decrypt with shared_secret "payload": "<base64 ciphertext>" 8. storage.add_item() } 9. sync_received.emit() 10. UI updates in main thread ◄──────────────── 201 { "status": "synced" }
Device A (sender) Device B (receiver) ───────────────── ──────────────────── 1. User copies text 2. ClipboardMonitor detects change 3. Encrypt with shared_secret [ AES-256-GCM | random 12-byte nonce ] 4. POST /api/sync ──────────────────────► 5. GhostAPIHandler.do_POST() { 6. Lookup peer by node_id "node_id": "abc123", 7. Decrypt with shared_secret "payload": "<base64 ciphertext>" 8. storage.add_item() } 9. sync_received.emit() 10. UI updates in main thread ◄──────────────── 201 { "status": "synced" }
Device A (sender) Device B (receiver) ───────────────── ──────────────────── 1. User copies text 2. ClipboardMonitor detects change 3. Encrypt with shared_secret [ AES-256-GCM | random 12-byte nonce ] 4. POST /api/sync ──────────────────────► 5. GhostAPIHandler.do_POST() { 6. Lookup peer by node_id "node_id": "abc123", 7. Decrypt with shared_secret "payload": "<base64 ciphertext>" 8. storage.add_item() } 9. sync_received.emit() 10. UI updates in main thread ◄──────────────── 201 { "status": "synced" }
# .github/workflows/build-all.yml (signing steps) - name: Sign AppImage (GPG) run: | echo "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --import --batch --yes gpg --batch --yes --pinentry-mode loopback \ --passphrase "${{ secrets.GPG_PASSPHRASE }}" \ --detach-sign --armor \ DotGhostBoard-*.AppImage - name: Verify AppImage signature run: | gpg --verify DotGhostBoard-*.AppImage.asc DotGhostBoard-*.AppImage echo "✅ AppImage signature verified" - name: Sign DEB Package (GPG) run: | echo "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --import --batch --yes dpkg-sig --sign builder \ -k "${{ secrets.GPG_KEY_ID }}" \ --gpg-options "--passphrase ${{ secrets.GPG_PASSPHRASE }} --pinentry-mode loopback --batch --yes" \ dotghostboard_*.deb - name: Generate SHA256 checksums run: | cd out && sha256sum * > SHA256SUMS.txt
# .github/workflows/build-all.yml (signing steps) - name: Sign AppImage (GPG) run: | echo "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --import --batch --yes gpg --batch --yes --pinentry-mode loopback \ --passphrase "${{ secrets.GPG_PASSPHRASE }}" \ --detach-sign --armor \ DotGhostBoard-*.AppImage - name: Verify AppImage signature run: | gpg --verify DotGhostBoard-*.AppImage.asc DotGhostBoard-*.AppImage echo "✅ AppImage signature verified" - name: Sign DEB Package (GPG) run: | echo "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --import --batch --yes dpkg-sig --sign builder \ -k "${{ secrets.GPG_KEY_ID }}" \ --gpg-options "--passphrase ${{ secrets.GPG_PASSPHRASE }} --pinentry-mode loopback --batch --yes" \ dotghostboard_*.deb - name: Generate SHA256 checksums run: | cd out && sha256sum * > SHA256SUMS.txt
# .github/workflows/build-all.yml (signing steps) - name: Sign AppImage (GPG) run: | echo "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --import --batch --yes gpg --batch --yes --pinentry-mode loopback \ --passphrase "${{ secrets.GPG_PASSPHRASE }}" \ --detach-sign --armor \ DotGhostBoard-*.AppImage - name: Verify AppImage signature run: | gpg --verify DotGhostBoard-*.AppImage.asc DotGhostBoard-*.AppImage echo "✅ AppImage signature verified" - name: Sign DEB Package (GPG) run: | echo "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --import --batch --yes dpkg-sig --sign builder \ -k "${{ secrets.GPG_KEY_ID }}" \ --gpg-options "--passphrase ${{ secrets.GPG_PASSPHRASE }} --pinentry-mode loopback --batch --yes" \ dotghostboard_*.deb - name: Generate SHA256 checksums run: | cd out && sha256sum * > SHA256SUMS.txt
# Verify AppImage
gpg --verify DotGhostBoard-1.5.1-x86_64.AppImage.asc \ DotGhostBoard-1.5.1-x86_64.AppImage # Verify DEB
dpkg-sig --verify dotghostboard_1.5.1_amd64.deb # Verify checksum
sha256sum -c SHA256SUMS.txt
# Verify AppImage
gpg --verify DotGhostBoard-1.5.1-x86_64.AppImage.asc \ DotGhostBoard-1.5.1-x86_64.AppImage # Verify DEB
dpkg-sig --verify dotghostboard_1.5.1_amd64.deb # Verify checksum
sha256sum -c SHA256SUMS.txt
# Verify AppImage
gpg --verify DotGhostBoard-1.5.1-x86_64.AppImage.asc \ DotGhostBoard-1.5.1-x86_64.AppImage # Verify DEB
dpkg-sig --verify dotghostboard_1.5.1_amd64.deb # Verify checksum
sha256sum -c SHA256SUMS.txt - A fully isolated vault.db (separate file, separate connection, locked when not in use)
- Pattern-based secret detection using Regex — JWT, AWS keys, GitHub tokens, high-entropy hex strings — not keyword matching
- Auto-clear: wipes the clipboard 30 seconds after a Vault paste
- Paranoia Mode: a toggle that suspends all DB writes temporarily